1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See for details of the copyright holders.
3// All rights not expressly granted are reserved.
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include "TFile.h"
30#include "TMemFile.h"
31#include "TTree.h"
32#include "TROOT.h"
33#include <memory>
34#include <TMessage.h>
35#include <fairmq/Parts.h>
36#include <ctime>
37#include <TStopwatch.h>
38#include <sstream>
39#include <cassert>
40#include "FairSystemInfo.h"
42#include "O2HitMerger.h"
43#include "O2SimDevice.h"
48#include <TOFSimulation/Detector.h>
62#include <map>
63#include <vector>
64#include <list>
65#include <csignal>
66#include <mutex>
67#include <filesystem>
68#include <functional>
83#include <tbb/concurrent_unordered_map.h>
85namespace o2
87namespace devices
90// signal handler
91void sighandler(int signal)
93 if (signal == SIGSEGV) {
94 LOG(warn) << "segmentation violation ... just exit without coredump in order not to hang";
95 raise(SIGKILL);
96 }
102 class TMessageWrapper : public TMessage
103 {
104 public:
105 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
106 ~TMessageWrapper() override = default;
107 };
109 public:
112 {
113 mTimer.Start();
114 mInitialOutputDir = std::filesystem::current_path().string();
115 mCurrentOutputDir = mInitialOutputDir;
116 }
119 ~O2HitMerger() override
120 {
121 FairSystemInfo sysinfo;
122 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
123 mTimer.Continue();
124 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
125 << sysinfo.GetMaxMemory() << " MB\n";
126 }
128 private:
130 void InitTask() final
131 {
132 LOG(info) << "INIT HIT MERGER";
133 // signal(SIGSEGV, sighandler);
134 ROOT::EnableThreadSafety();
136 std::string outfilename("o2sim_merged_hits.root"); // default name
137 // query the sim config ... which is used to extract the filenames
138 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
140 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
141 }
146 mOutFileName = outfilename.c_str();
147 if (mWriteToDisc) {
148 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
149 mOutTree = new TTree("o2sim", "o2sim");
150 mOutTree->SetDirectory(mOutFile);
152 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
153 mMCHeaderTree = new TTree("o2sim", "o2sim");
154 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
155 }
156 // detectors init only once
157 if (mDetectorInstances.size() == 0) {
158 initDetInstances();
159 // has to be after init of Detectors
161 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
162 }
164 // init pipe
165 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
166 if (pipeenv) {
167 mPipeToDriver = atoi(pipeenv);
168 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
169 } else {
171 }
173 // if no data to expect we shut down the device NOW since it would otherwise hang
174 if (mNExpectedEvents == 0) {
175 if (mAsService) {
176 waitForControlInput();
177 } else {
179 raise(SIGINT);
180 }
181 }
182 }
184 bool setWorkingDirectory(std::string const& dir)
185 {
186 namespace fs = std::filesystem;
188 // sets the output directory where simulation files are produced
189 // and creates it when it doesn't exist already
191 // 2 possibilities:
192 // a) dir is relative dir. Then we interpret it as relative to the initial
193 // base directory
194 // b) or dir is itself absolut.
195 try {
196 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
197 if (!dir.empty()) {
198 auto absolutePath = fs::absolute(fs::path(dir));
199 if (!fs::exists(absolutePath)) {
200 if (!fs::create_directory(absolutePath)) {
201 LOG(error) << "Could not create directory " << absolutePath.string();
202 return false;
203 }
204 }
205 // set the current path
206 fs::current_path(absolutePath.string().c_str());
207 mCurrentOutputDir = fs::current_path().string();
208 }
209 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
210 } catch (std::exception e) {
211 LOG(error) << " could not change path to " << dir;
212 }
213 return true;
214 }
216 // function for intermediate/on-the-fly reinitializations
217 bool ReInit(o2::conf::SimReconfigData const& reconfig)
218 {
219 if (reconfig.stop) {
220 return false;
221 }
222 if (!setWorkingDirectory(reconfig.outputDir)) {
223 return false;
224 }
226 std::string outfilename("o2sim_merged_hits.root"); // default name
228 mNExpectedEvents = reconfig.nEvents;
229 mOutFileName = outfilename.c_str();
230 if (mWriteToDisc) {
231 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
232 mOutTree = new TTree("o2sim", "o2sim");
233 mOutTree->SetDirectory(mOutFile);
235 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
236 mMCHeaderTree = new TTree("o2sim", "o2sim");
237 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
238 }
239 // reinit detectorInstance files (also make sure they are closed before continuing)
240 initHitFiles(reconfig.outputPrefix);
242 // clear "counter" datastructures
243 mPartsCheckSum.clear();
244 mEventChecksum = 0;
246 // clear collector datastructures
247 mMCTrackBuffer.clear();
248 mTrackRefBuffer.clear();
249 mSubEventInfoBuffer.clear();
250 mFlushableEvents.clear();
251 mNextFlushID = 1;
253 return true;
254 }
256 template <typename T, typename V>
257 V insertAdd(std::map<T, V>& m, T const& key, V value)
258 {
259 const auto iter = m.find(key);
260 V accum{0};
261 if (iter != m.end()) {
262 iter->second += value;
263 accum = iter->second;
264 } else {
265 m.insert(std::make_pair(key, value));
266 accum = value;
267 }
268 return accum;
269 }
271 template <typename T>
272 bool isDataComplete(T checksum, T nparts)
273 {
274 return checksum == nparts * (nparts + 1) / 2;
275 }
277 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
278 {
279 auto detIDmessage = std::move(data.At(index++));
280 // this should be a detector ID
281 if (detIDmessage->GetSize() == 4) {
282 auto ptr = (int*)detIDmessage->GetData();
284 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
285 << data.At(index)->GetSize() / 1024. / 1024.;
287 // get the detector that can interpret it
288 auto detector = mDetectorInstances[id].get();
289 if (detector) {
290 detector->collectHits(eventID, data, index);
291 }
292 }
293 }
295 template <typename T, typename BT>
296 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
297 {
298 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
299 if (buffer.find(eventID) == buffer.end()) {
300 buffer[eventID] = typename BT::mapped_type();
301 }
302 buffer[eventID].push_back(decodeddata);
303 // delete decodeddata; --> we store the pointers
304 index++;
305 }
307 // fills a special branch of SubEventInfos in order to keep
308 // track of which entry corresponds to which event etc.
309 // also creates the MCEventHeader branch expected for physics analysis
310 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
311 {
312 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
313 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
314 }
315 mSubEventInfoBuffer[info.eventID].push_back(&info);
316 }
319 {
320 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
322 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
323 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
324 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
325 LOG(info) << "SOCKETNAME " << controlsocketname;
326 channel.Connect(std::string(controlsocketname));
327 channel.Validate();
328 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
330 LOG(info) << "WAITING FOR INPUT";
331 if (channel.Receive(reply) > 0) {
332 auto data = reply->GetData();
333 auto size = reply->GetSize();
335 std::string command(reinterpret_cast<char const*>(data), size);
336 LOG(info) << "message: " << command;
339 o2::conf::parseSimReconfigFromString(command, reconfig);
340 return ReInit(reconfig);
341 } else {
342 LOG(info) << "NOTHING RECEIVED";
343 }
344 return true;
345 }
347 bool ConditionalRun() override
348 {
349 auto& channel = GetChannels().at("simdata").at(0);
350 fair::mq::Parts request;
351 auto bytes = channel.Receive(request);
352 if (bytes < 0) {
353 LOG(error) << "Some error occurred on socket during receive on sim data";
354 return true; // keep going
355 }
356 TStopwatch timer;
357 timer.Start();
358 auto more = handleSimData(request, 0);
359 LOG(info) << "HitMerger processing took " << timer.RealTime();
360 if (!more && mAsService) {
361 LOG(info) << " CONTROL ";
362 // if we are done treating data we may go back to init phase
363 // for the next batch
364 return waitForControlInput();
365 }
366 return more;
367 }
369 bool handleSimData(fair::mq::Parts& data, int /*index*/)
370 {
371 bool expectmore = true;
372 int index = 0;
373 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
374 o2::data::SubEventInfo& info = *infoptr;
375 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
377 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
379 fillSubEventInfoEntry(info);
380 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
381 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
382 while (index < data.Size()) {
383 consumeHits(info.eventID, data, index);
384 }
386 if (isDataComplete<uint32_t>(accum, info.nparts)) {
387 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
388 mFlushableEvents[info.eventID] = true;
390 // check if previous flush finished
391 // start merging only when no merging currently happening
392 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
393 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
394 if (!mergingInProgress) {
395 if (mMergerIOThread.joinable()) {
396 mMergerIOThread.join();
397 }
398 // start hit merging and flushing in a separate thread in order not to block
399 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
400 }
402 mEventChecksum += info.eventID;
403 // we also need to check if we have all events
404 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
405 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
407 // flush remaining data and close file
408 if (mMergerIOThread.joinable()) {
409 mMergerIOThread.join();
410 }
411 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
412 if (mMergerIOThread.joinable()) {
413 mMergerIOThread.join();
414 }
416 expectmore = false;
417 }
419 if (mPipeToDriver != -1) {
420 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
422 };
423 }
424 }
425 if (!expectmore) {
426 // somehow FairMQ has difficulties shutting down; helping manually
427 // raise(SIGINT);
428 }
429 return expectmore;
430 }
432 void cleanEvent(int eventID)
433 {
434 // cleanup intermediate per-Event buffers
435 }
437 template <typename T>
438 void backInsert(T const& from, T& to)
439 {
440 std::copy(from.begin(), from.end(), std::back_inserter(to));
441 }
443 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
444 {
445 // avoid doing this for trivial cases
446 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
447 auto targetdata = std::make_unique<std::vector<MCTrack>>();
449 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
450 const auto entries = vectorOfSubEventMCTracks.size();
452 if (entries > 1) {
453 //
454 // loop over subevents to store the primary events
455 //
456 int nprimTot = 0;
457 for (int entry = entries - 1; entry >= 0; --entry) {
458 int index = nsubevents[entry];
459 nprimTot += nprimaries[index];
460 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
461 for (int i = 0; i < nprimaries[index]; i++) {
462 auto& track = (*vectorOfSubEventMCTracks[index])[i];
463 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
464 track.SetFirstDaughterTrackId(-1);
465 track.SetLastDaughterTrackId(-1);
466 }
467 targetdata->push_back(track);
468 }
469 }
470 //
471 // loop a second time to store the secondaries and fix the mother track IDs
472 //
473 Int_t idelta1 = nprimTot;
474 Int_t idelta0 = 0;
475 for (int entry = entries - 1; entry >= 0; --entry) {
476 int index = nsubevents[entry];
478 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
479 // we need to fetch the right mctracks here!!
480 Int_t npart = (int)(subEventTracks.size());
481 Int_t nprim = nprimaries[index];
482 idelta1 -= nprim;
484 for (Int_t i = nprim; i < npart; i++) {
485 auto& track = subEventTracks[i];
486 Int_t cId = track.getMotherTrackId();
487 if (cId >= nprim) {
488 cId += idelta1;
489 } else {
490 cId += idelta0;
491 }
492 track.SetMotherTrackId(cId);
493 track.SetFirstDaughterTrackId(-1);
495 Int_t hwm = (int)(targetdata->size());
496 auto& mother = (*targetdata)[cId];
497 if (mother.getFirstDaughterTrackId() == -1) {
498 mother.SetFirstDaughterTrackId(hwm);
499 }
500 mother.SetLastDaughterTrackId(hwm);
502 targetdata->push_back(track);
503 }
504 idelta0 += nprim;
505 idelta1 += npart;
506 }
507 }
508 //
509 // write to output
510 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
512 // we give the possibility to produce some MC track statistics
513 // to be saved as part of the MCHeader structure
514 tracks_analysis_hook(*filladdr);
516 if (mWriteToDisc && target) {
517 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
518 targetbr->SetAddress(&filladdr);
519 targetbr->Fill();
520 targetbr->ResetAddress();
521 }
522 // forwarding the track data to other consumers (pub/sub)
523 if (mForwardKine) {
524 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
525 auto& channel = GetChannels().at("kineforward").at(0);
526 TMessage* tmsg = new TMessage(kMESS_OBJECT);
527 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
528 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
529 tmsg = new TMessage(kMESS_OBJECT);
530 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
531 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
532 fair::mq::Parts reply;
533 reply.AddPart(std::move(headermessage));
534 reply.AddPart(std::move(trackmessage));
535 channel.Send(reply);
536 LOG(info) << "Forward publish MC tracks on channel";
537 }
539 // cleanup buffered data
540 for (auto ptr : vectorOfSubEventMCTracks) {
541 delete ptr; // avoid this by using unique ptr
542 }
543 }
545 template <typename T, typename M>
546 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
547 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
548 {
549 //
550 // Remap the mother track IDs by adding an offset.
551 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
552 // This method is called by O2HitMerger::mergeAndFlushData(int)
553 //
554 T* incomingdata = nullptr;
555 std::unique_ptr<T> targetdata(nullptr);
556 auto& vectorOfT = mapOfVectorOfTs[eventID];
557 const auto entries = vectorOfT.size();
559 if (entries == 1) {
560 // nothing to do in case there is only one entry
561 incomingdata = vectorOfT[0];
562 } else {
563 targetdata = std::make_unique<T>();
564 // loop over subevents
565 Int_t nprimTot = 0;
566 for (int entry = 0; entry < entries; entry++) {
567 nprimTot += nprimaries[entry];
568 }
569 Int_t idelta0 = 0;
570 Int_t idelta1 = nprimTot;
571 for (int entry = entries - 1; entry >= 0; --entry) {
572 Int_t index = subevOrdered[entry];
573 Int_t nprim = nprimaries[index];
574 incomingdata = vectorOfT[index];
575 idelta1 -= nprim;
576 for (auto& data : *incomingdata) {
577 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
578 targetdata->push_back(data);
579 }
580 idelta0 += nprim;
581 idelta1 += trackoffsets[index];
582 }
583 }
584 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
585 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
586 targetbr->SetAddress(&dataaddr);
587 targetbr->Fill();
588 targetbr->ResetAddress();
590 // cleanup mem
591 for (auto ptr : vectorOfT) {
592 delete ptr; // avoid this by using unique ptr
593 }
594 }
596 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
597 {
598 Int_t cId = track.getMotherTrackId();
599 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
600 if (cId != -1) {
601 track.SetMotherTrackId(cId + ioffset);
602 }
603 }
605 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
606 {
607 Int_t cId = ref.getTrackID();
608 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
609 ref.setTrackID(cId + ioffset);
610 }
612 void initHitTreeAndOutFile(std::string prefix, int detID)
613 {
615 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
616 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
617 mDetectorOutFiles[detID]->Close();
618 delete mDetectorOutFiles[detID];
619 }
620 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
621 if (mWriteToDisc) {
622 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
623 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
624 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
625 } else {
626 mDetectorOutFiles[detID] = nullptr;
627 mDetectorToTTreeMap[detID] = nullptr;
628 }
629 }
631 // This method goes over the buffers containing data for a given event; potentially merges
632 // them and flushes into the actual output file.
633 // The method can be called asynchronously to data collection
634 bool mergeAndFlushData()
635 {
636 auto checkIfNextFlushable = [this]() -> bool {
637 mNextFlushID++;
638 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
639 };
641 LOG(info) << "Launching merge kernel ";
642 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
643 if (!canflush) {
644 return false;
645 }
646 while (canflush == true) {
647 auto flusheventID = mNextFlushID;
648 LOG(info) << "Merge and flush event " << flusheventID;
649 auto iter = mSubEventInfoBuffer.find(flusheventID);
650 if (iter == mSubEventInfoBuffer.end()) {
651 LOG(error) << "No info/data found for event " << flusheventID;
652 if (!checkIfNextFlushable()) {
653 return false;
654 }
655 }
657 auto& subEventInfoList = (*iter).second;
658 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
659 LOG(error) << "No data entries found for event " << flusheventID;
660 if (!checkIfNextFlushable()) {
661 return false;
662 }
663 }
665 TStopwatch timer;
666 timer.Start();
668 // calculate trackoffsets
669 auto& confref = o2::conf::SimConfig::Instance();
671 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
672 std::vector<int> trackoffsets;
673 // collecting primary particles in each subevent (data arrival id)
674 std::vector<int> nprimaries;
675 // mapping of id to actual sub-event id (or part)
676 std::vector<int> nsubevents;
678 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
680 // the MC labels (trackID) for hits
681 for (auto info : subEventInfoList) {
682 assert(info->npersistenttracks >= 0);
683 trackoffsets.emplace_back(info->npersistenttracks);
684 nprimaries.emplace_back(info->nprimarytracks);
685 nsubevents.emplace_back(info->part);
686 if (eventheader == nullptr) {
687 eventheader = &info->mMCEventHeader;
688 } else {
689 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
690 }
691 }
693 // now see which events can be discarded in any case due to no hits
694 if (confref.isFilterOutNoHitEvents()) {
695 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
696 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
697 cleanEvent(flusheventID);
698 if (!checkIfNextFlushable()) {
699 return true;
700 }
701 }
702 }
704 // attention: We need to make sure that we write everything in the same event order
705 // but iteration over keys of a standard map in C++ is ordered
707 // b) merge the general data
708 //
709 // for MCTrack remap the motherIds and merge at the same go
710 const auto entries = subEventInfoList.size();
711 std::vector<int> subevOrdered((int)(nsubevents.size()));
712 for (int entry = entries - 1; entry >= 0; --entry) {
713 subevOrdered[nsubevents[entry] - 1] = entry;
714 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
715 }
717 // This is a hook that collects some useful statistics/properties on the event
718 // for use by other components;
719 // Properties are attached making use of the extensible "Info" feature which is already
720 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
721 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
722 // NOTE: This function might also be called directly in the primary server!?
723 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
724 int eta1Point2Counter = 0;
725 int eta1Point0Counter = 0;
726 int eta0Point8Counter = 0;
727 int eta1Point2CounterPi = 0;
728 int eta1Point0CounterPi = 0;
729 int eta0Point8CounterPi = 0;
730 int prims = 0;
731 for (auto& tr : tracks) {
732 if (tr.isPrimary()) {
733 prims++;
734 const auto eta = tr.GetEta();
735 if (eta < 1.2) {
736 eta1Point2Counter++;
737 if (std::abs(tr.GetPdgCode()) == 211) {
738 eta1Point2CounterPi++;
739 }
740 }
741 if (eta < 1.0) {
742 eta1Point0Counter++;
743 if (std::abs(tr.GetPdgCode()) == 211) {
744 eta1Point0CounterPi++;
745 }
746 }
747 if (eta < 0.8) {
748 eta0Point8Counter++;
749 if (std::abs(tr.GetPdgCode()) == 211) {
750 eta0Point8CounterPi++;
751 }
752 }
753 } else {
754 break; // track layout is such that all prims are first anyway
755 }
756 }
757 // attach these properties to eventheader
758 // we only need to make the names standard
759 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
760 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
761 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
762 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
763 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
764 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
765 eventheader->putInfo("prims_total", prims);
766 };
768 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
770 if (mOutTree) {
771 // adjusting and merging track references
772 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
774 // write MC event headers
775 {
776 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
777 headerbr->SetAddress(&eventheader);
778 headerbr->Fill();
779 headerbr->ResetAddress();
780 }
782 {
783 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
784 headerbr->SetAddress(&eventheader);
785 headerbr->Fill();
786 headerbr->ResetAddress();
787 }
788 }
790 // c) do the merge procedure for all hits ... delegate this to detector specific functions
791 // since they know about types; number of branches; etc.
792 // this will also fix the trackIDs inside the hits
793 for (int id = 0; id < mDetectorInstances.size(); ++id) {
794 auto& det = mDetectorInstances[id];
795 if (det) {
796 auto hittree = mDetectorToTTreeMap[id];
797 if (hittree) {
798 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
799 hittree->SetEntries(hittree->GetEntries() + 1);
800 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
801 }
802 }
803 }
805 // increase the entry count in the tree
806 if (mOutTree) {
807 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
808 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
809 }
810 if (mMCHeaderTree) {
811 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
812 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
813 }
815 cleanEvent(flusheventID);
816 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
817 if (!checkIfNextFlushable()) {
818 break;
819 }
820 } // end while
821 if (mWriteToDisc && mOutFile) {
822 LOG(info) << "Writing TTrees";
823 mOutFile->Write("", TObject::kOverwrite);
824 for (int id = 0; id < mDetectorInstances.size(); ++id) {
825 auto& det = mDetectorInstances[id];
826 if (det && mDetectorOutFiles[id]) {
827 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
828 }
829 }
830 if (mMCHeaderOnlyOutFile) {
831 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
832 }
833 }
834 return true;
835 }
837 std::map<uint32_t, uint32_t> mPartsCheckSum;
838 std::string mOutFileName;
840 // structures for the final flush
841 TFile* mOutFile;
842 TTree* mOutTree;
843 TFile* mMCHeaderOnlyOutFile;
844 TTree* mMCHeaderTree;
846 template <class K, class V>
847 using Hashtable = tbb::concurrent_unordered_map<K, V>;
848 Hashtable<int, TFile*> mDetectorOutFiles;
849 Hashtable<int, TTree*> mDetectorToTTreeMap;
851 // intermediate structures to collect data per event
852 std::thread mMergerIOThread;
853 bool mergingInProgress = false;
855 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
856 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
857 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
858 Hashtable<int, bool> mFlushableEvents;
860 int mEventChecksum = 0;
861 int mNExpectedEvents = 0;
862 int mNextFlushID = 1;
863 TStopwatch mTimer;
865 bool mAsService = false;
866 bool mForwardKine = true;
867 bool mWriteToDisc = true;
869 int mPipeToDriver = -1;
871 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
873 // output folder configuration
874 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
875 std::string mCurrentOutputDir; // current output folder asked
877 // channel to PUB status messages to outside subscribers
878 fair::mq::Channel mPubChannel;
880 // init detector instances
881 void initDetInstances();
882 void initHitFiles(std::string prefix);
885void O2HitMerger::initHitFiles(std::string prefix)
889 // a little helper lambda
890 auto isActivated = [](std::string s) -> bool {
891 // access user configuration for list of wanted modules
893 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
894 return active; };
896 for (int i = DetID::First; i <= DetID::Last; ++i) {
897 if (!isActivated(DetID::getName(i))) {
898 continue;
899 }
900 // init the detector specific output files
901 initHitTreeAndOutFile(prefix, i);
902 }
905// init detector instances used to write hit data to a TTree
906void O2HitMerger::initDetInstances()
910 // a little helper lambda
911 auto isActivated = [](std::string s) -> bool {
912 // access user configuration for list of wanted modules
914 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
915 return active; };
917 mDetectorInstances.resize(DetID::nDetectors);
918 // like a factory of detector objects
920 int counter = 0;
921 for (int i = DetID::First; i <= DetID::Last; ++i) {
922 if (!isActivated(DetID::getName(i))) {
923 continue;
924 }
926 if (i == DetID::TPC) {
927 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
928 counter++;
929 }
930 if (i == DetID::ITS) {
931 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
932 counter++;
933 }
934 if (i == DetID::MFT) {
935 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
936 counter++;
937 }
938 if (i == DetID::TRD) {
939 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
940 counter++;
941 }
942 if (i == DetID::PHS) {
943 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
944 counter++;
945 }
946 if (i == DetID::CPV) {
947 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
948 counter++;
949 }
950 if (i == DetID::EMC) {
951 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
952 counter++;
953 }
954 if (i == DetID::HMP) {
955 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
956 counter++;
957 }
958 if (i == DetID::TOF) {
959 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
960 counter++;
961 }
962 if (i == DetID::FT0) {
963 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
964 counter++;
965 }
966 if (i == DetID::FV0) {
967 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
968 counter++;
969 }
970 if (i == DetID::FDD) {
971 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
972 counter++;
973 }
974 if (i == DetID::MCH) {
975 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
976 counter++;
977 }
978 if (i == DetID::MID) {
979 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
980 counter++;
981 }
982 if (i == DetID::ZDC) {
983 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
984 counter++;
985 }
986 if (i == DetID::FOC) {
987 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, gSystem->ExpandPathName("$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Spaghetti.txt")));
988 counter++;
989 }
991 if (i == DetID::IT3) {
992 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
993 counter++;
994 }
995 if (i == DetID::TRK) {
996 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
997 counter++;
998 }
999 if (i == DetID::FT3) {
1000 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
1001 counter++;
1002 }
1003 if (i == DetID::FCT) {
1004 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
1005 counter++;
1006 }
1007 if (i == DetID::TF3) {
1008 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
1009 counter++;
1010 }
1011 if (i == DetID::RCH) {
1012 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1013 counter++;
1014 }
1015 if (i == DetID::MI3) {
1016 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1017 counter++;
1018 }
1019 if (i == DetID::ECL) {
1020 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1021 counter++;
1022 }
1024 }
1025 if (counter != DetID::nDetectors) {
1026 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1027 }
1030} // namespace devices
1031} // namespace o2
