Project
Loading...
Searching...
No Matches
O2HitMerger.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_HITMERGER_H_
15#define ALICEO2_DEVICES_HITMERGER_H_
16
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include "TFile.h"
30#include "TMemFile.h"
31#include "TTree.h"
32#include "TROOT.h"
33#include <memory>
34#include <TMessage.h>
35#include <fairmq/Parts.h>
36#include <ctime>
37#include <TStopwatch.h>
38#include <sstream>
39#include <cassert>
40#include "FairSystemInfo.h"
41
42#include "O2HitMerger.h"
43#include "O2SimDevice.h"
48#include <TOFSimulation/Detector.h>
60
62#include <map>
63#include <vector>
64#include <list>
65#include <csignal>
66#include <mutex>
67#include <filesystem>
68#include <functional>
69
71
72#ifdef ENABLE_UPGRADES
81#endif
82
83#include <tbb/concurrent_unordered_map.h>
84
85namespace o2
86{
87namespace devices
88{
89
90// signal handler
91void sighandler(int signal)
92{
93 if (signal == SIGSEGV) {
94 LOG(warn) << "segmentation violation ... just exit without coredump in order not to hang";
95 raise(SIGKILL);
96 }
97}
98
100{
101
102 class TMessageWrapper : public TMessage
103 {
104 public:
105 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
106 ~TMessageWrapper() override = default;
107 };
108
109 public:
112 {
113 mTimer.Start();
114 mInitialOutputDir = std::filesystem::current_path().string();
115 mCurrentOutputDir = mInitialOutputDir;
116 }
117
119 ~O2HitMerger() override
120 {
121 FairSystemInfo sysinfo;
122 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
123 mTimer.Continue();
124 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
125 << sysinfo.GetMaxMemory() << " MB\n";
126 }
127
128 private:
130 void InitTask() final
131 {
132 LOG(info) << "INIT HIT MERGER";
133 // signal(SIGSEGV, sighandler);
134 ROOT::EnableThreadSafety();
135
136 std::string outfilename("o2sim_merged_hits.root"); // default name
137 // query the sim config ... which is used to extract the filenames
138 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
140 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
141 }
145
146 mOutFileName = outfilename.c_str();
147 if (mWriteToDisc) {
148 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
149 mOutTree = new TTree("o2sim", "o2sim");
150 mOutTree->SetDirectory(mOutFile);
151
152 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
153 mMCHeaderTree = new TTree("o2sim", "o2sim");
154 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
155 }
156 // detectors init only once
157 if (mDetectorInstances.size() == 0) {
158 initDetInstances();
159 // has to be after init of Detectors
161 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
162 }
163
164 // init pipe
165 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
166 if (pipeenv) {
167 mPipeToDriver = atoi(pipeenv);
168 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
169 } else {
170 LOG(warning) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
171 }
172
173 // if no data to expect we shut down the device NOW since it would otherwise hang
174 if (mNExpectedEvents == 0) {
175 if (mAsService) {
176 waitForControlInput();
177 } else {
178 LOG(info) << "NOT EXPECTING ANY DATA; SHUTTING DOWN";
179 raise(SIGINT);
180 }
181 }
182 }
183
184 bool setWorkingDirectory(std::string const& dir)
185 {
186 namespace fs = std::filesystem;
187
188 // sets the output directory where simulation files are produced
189 // and creates it when it doesn't exist already
190
191 // 2 possibilities:
192 // a) dir is relative dir. Then we interpret it as relative to the initial
193 // base directory
194 // b) or dir is itself absolut.
195 try {
196 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
197 if (!dir.empty()) {
198 auto absolutePath = fs::absolute(fs::path(dir));
199 if (!fs::exists(absolutePath)) {
200 if (!fs::create_directory(absolutePath)) {
201 LOG(error) << "Could not create directory " << absolutePath.string();
202 return false;
203 }
204 }
205 // set the current path
206 fs::current_path(absolutePath.string().c_str());
207 mCurrentOutputDir = fs::current_path().string();
208 }
209 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
210 } catch (std::exception e) {
211 LOG(error) << " could not change path to " << dir;
212 }
213 return true;
214 }
215
216 // function for intermediate/on-the-fly reinitializations
217 bool ReInit(o2::conf::SimReconfigData const& reconfig)
218 {
219 if (reconfig.stop) {
220 return false;
221 }
222 if (!setWorkingDirectory(reconfig.outputDir)) {
223 return false;
224 }
225
226 std::string outfilename("o2sim_merged_hits.root"); // default name
228 mNExpectedEvents = reconfig.nEvents;
229 mOutFileName = outfilename.c_str();
230 if (mWriteToDisc) {
231 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
232 mOutTree = new TTree("o2sim", "o2sim");
233 mOutTree->SetDirectory(mOutFile);
234
235 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
236 mMCHeaderTree = new TTree("o2sim", "o2sim");
237 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
238 }
239 // reinit detectorInstance files (also make sure they are closed before continuing)
240 initHitFiles(reconfig.outputPrefix);
241
242 // clear "counter" datastructures
243 mPartsCheckSum.clear();
244 mEventChecksum = 0;
245
246 // clear collector datastructures
247 mMCTrackBuffer.clear();
248 mTrackRefBuffer.clear();
249 mSubEventInfoBuffer.clear();
250 mFlushableEvents.clear();
251 mNextFlushID = 1;
252
253 return true;
254 }
255
256 template <typename T, typename V>
257 V insertAdd(std::map<T, V>& m, T const& key, V value)
258 {
259 const auto iter = m.find(key);
260 V accum{0};
261 if (iter != m.end()) {
262 iter->second += value;
263 accum = iter->second;
264 } else {
265 m.insert(std::make_pair(key, value));
266 accum = value;
267 }
268 return accum;
269 }
270
271 template <typename T>
272 bool isDataComplete(T checksum, T nparts)
273 {
274 return checksum == nparts * (nparts + 1) / 2;
275 }
276
277 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
278 {
279 auto detIDmessage = std::move(data.At(index++));
280 // this should be a detector ID
281 if (detIDmessage->GetSize() == 4) {
282 auto ptr = (int*)detIDmessage->GetData();
284 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
285 << data.At(index)->GetSize() / 1024. / 1024.;
286
287 // get the detector that can interpret it
288 auto detector = mDetectorInstances[id].get();
289 if (detector) {
290 detector->collectHits(eventID, data, index);
291 }
292 }
293 }
294
295 template <typename T, typename BT>
296 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
297 {
298 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
299 if (buffer.find(eventID) == buffer.end()) {
300 buffer[eventID] = typename BT::mapped_type();
301 }
302 buffer[eventID].push_back(decodeddata);
303 // delete decodeddata; --> we store the pointers
304 index++;
305 }
306
307 // fills a special branch of SubEventInfos in order to keep
308 // track of which entry corresponds to which event etc.
309 // also creates the MCEventHeader branch expected for physics analysis
310 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
311 {
312 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
313 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
314 }
315 mSubEventInfoBuffer[info.eventID].push_back(&info);
316 }
317
319 {
320 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
321
322 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
323 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
324 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
325 LOG(info) << "SOCKETNAME " << controlsocketname;
326 channel.Connect(std::string(controlsocketname));
327 channel.Validate();
328 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
329
330 LOG(info) << "WAITING FOR INPUT";
331 if (channel.Receive(reply) > 0) {
332 auto data = reply->GetData();
333 auto size = reply->GetSize();
334
335 std::string command(reinterpret_cast<char const*>(data), size);
336 LOG(info) << "message: " << command;
337
339 o2::conf::parseSimReconfigFromString(command, reconfig);
340 return ReInit(reconfig);
341 } else {
342 LOG(info) << "NOTHING RECEIVED";
343 }
344 return true;
345 }
346
347 bool ConditionalRun() override
348 {
349 auto& channel = GetChannels().at("simdata").at(0);
350 fair::mq::Parts request;
351 auto bytes = channel.Receive(request);
352 if (bytes < 0) {
353 LOG(error) << "Some error occurred on socket during receive on sim data";
354 return true; // keep going
355 }
356 TStopwatch timer;
357 timer.Start();
358 auto more = handleSimData(request, 0);
359 LOG(info) << "HitMerger processing took " << timer.RealTime();
360 if (!more && mAsService) {
361 LOG(info) << " CONTROL ";
362 // if we are done treating data we may go back to init phase
363 // for the next batch
364 return waitForControlInput();
365 }
366 return more;
367 }
368
369 bool handleSimData(fair::mq::Parts& data, int /*index*/)
370 {
371 bool expectmore = true;
372 int index = 0;
373 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
374 o2::data::SubEventInfo& info = *infoptr;
375 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
376
377 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
378
379 fillSubEventInfoEntry(info);
380 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
381 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
382 while (index < data.Size()) {
383 consumeHits(info.eventID, data, index);
384 }
385
386 if (isDataComplete<uint32_t>(accum, info.nparts)) {
387 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
388 mFlushableEvents[info.eventID] = true;
389
390 // check if previous flush finished
391 // start merging only when no merging currently happening
392 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
393 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
394 if (!mergingInProgress) {
395 if (mMergerIOThread.joinable()) {
396 mMergerIOThread.join();
397 }
398 // start hit merging and flushing in a separate thread in order not to block
399 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
400 }
401
402 mEventChecksum += info.eventID;
403 // we also need to check if we have all events
404 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
405 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
406
407 // flush remaining data and close file
408 if (mMergerIOThread.joinable()) {
409 mMergerIOThread.join();
410 }
411 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
412 if (mMergerIOThread.joinable()) {
413 mMergerIOThread.join();
414 }
415
416 expectmore = false;
417 }
418
419 if (mPipeToDriver != -1) {
420 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
421 LOG(error) << "FAILED WRITING TO PIPE";
422 };
423 }
424 }
425 if (!expectmore) {
426 // somehow FairMQ has difficulties shutting down; helping manually
427 // raise(SIGINT);
428 }
429 return expectmore;
430 }
431
432 void cleanEvent(int eventID)
433 {
434 // cleanup intermediate per-Event buffers
435 }
436
437 template <typename T>
438 void backInsert(T const& from, T& to)
439 {
440 std::copy(from.begin(), from.end(), std::back_inserter(to));
441 }
442
443 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
444 {
445 // avoid doing this for trivial cases
446 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
447 auto targetdata = std::make_unique<std::vector<MCTrack>>();
448
449 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
450 const auto entries = vectorOfSubEventMCTracks.size();
451
452 if (entries > 1) {
453 //
454 // loop over subevents to store the primary events
455 //
456 int nprimTot = 0;
457 for (int entry = entries - 1; entry >= 0; --entry) {
458 int index = nsubevents[entry];
459 nprimTot += nprimaries[index];
460 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
461 for (int i = 0; i < nprimaries[index]; i++) {
462 auto& track = (*vectorOfSubEventMCTracks[index])[i];
463 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
464 track.SetFirstDaughterTrackId(-1);
465 track.SetLastDaughterTrackId(-1);
466 }
467 targetdata->push_back(track);
468 }
469 }
470 //
471 // loop a second time to store the secondaries and fix the mother track IDs
472 //
473 Int_t idelta1 = nprimTot;
474 Int_t idelta0 = 0;
475 for (int entry = entries - 1; entry >= 0; --entry) {
476 int index = nsubevents[entry];
477
478 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
479 // we need to fetch the right mctracks here!!
480 Int_t npart = (int)(subEventTracks.size());
481 Int_t nprim = nprimaries[index];
482 idelta1 -= nprim;
483
484 for (Int_t i = nprim; i < npart; i++) {
485 auto& track = subEventTracks[i];
486 Int_t cId = track.getMotherTrackId();
487 if (cId >= nprim) {
488 cId += idelta1;
489 } else {
490 cId += idelta0;
491 }
492 track.SetMotherTrackId(cId);
493 track.SetFirstDaughterTrackId(-1);
494
495 Int_t hwm = (int)(targetdata->size());
496 auto& mother = (*targetdata)[cId];
497 if (mother.getFirstDaughterTrackId() == -1) {
498 mother.SetFirstDaughterTrackId(hwm);
499 }
500 mother.SetLastDaughterTrackId(hwm);
501
502 targetdata->push_back(track);
503 }
504 idelta0 += nprim;
505 idelta1 += npart;
506 }
507 }
508 //
509 // write to output
510 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
511
512 // we give the possibility to produce some MC track statistics
513 // to be saved as part of the MCHeader structure
514 tracks_analysis_hook(*filladdr);
515
516 if (mWriteToDisc && target) {
517 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
518 targetbr->SetAddress(&filladdr);
519 targetbr->Fill();
520 targetbr->ResetAddress();
521 }
522 // forwarding the track data to other consumers (pub/sub)
523 if (mForwardKine) {
524 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
525 auto& channel = GetChannels().at("kineforward").at(0);
526 TMessage* tmsg = new TMessage(kMESS_OBJECT);
527 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
528 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
529 tmsg = new TMessage(kMESS_OBJECT);
530 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
531 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
532 fair::mq::Parts reply;
533 reply.AddPart(std::move(headermessage));
534 reply.AddPart(std::move(trackmessage));
535 channel.Send(reply);
536 LOG(info) << "Forward publish MC tracks on channel";
537 }
538
539 // cleanup buffered data
540 for (auto ptr : vectorOfSubEventMCTracks) {
541 delete ptr; // avoid this by using unique ptr
542 }
543 }
544
545 template <typename T, typename M>
546 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
547 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
548 {
549 //
550 // Remap the mother track IDs by adding an offset.
551 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
552 // This method is called by O2HitMerger::mergeAndFlushData(int)
553 //
554 T* incomingdata = nullptr;
555 std::unique_ptr<T> targetdata(nullptr);
556 auto& vectorOfT = mapOfVectorOfTs[eventID];
557 const auto entries = vectorOfT.size();
558
559 if (entries == 1) {
560 // nothing to do in case there is only one entry
561 incomingdata = vectorOfT[0];
562 } else {
563 targetdata = std::make_unique<T>();
564 // loop over subevents
565 Int_t nprimTot = 0;
566 for (int entry = 0; entry < entries; entry++) {
567 nprimTot += nprimaries[entry];
568 }
569 Int_t idelta0 = 0;
570 Int_t idelta1 = nprimTot;
571 for (int entry = entries - 1; entry >= 0; --entry) {
572 Int_t index = subevOrdered[entry];
573 Int_t nprim = nprimaries[index];
574 incomingdata = vectorOfT[index];
575 idelta1 -= nprim;
576 for (auto& data : *incomingdata) {
577 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
578 targetdata->push_back(data);
579 }
580 idelta0 += nprim;
581 idelta1 += trackoffsets[index];
582 }
583 }
584 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
585 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
586 targetbr->SetAddress(&dataaddr);
587 targetbr->Fill();
588 targetbr->ResetAddress();
589
590 // cleanup mem
591 for (auto ptr : vectorOfT) {
592 delete ptr; // avoid this by using unique ptr
593 }
594 }
595
596 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
597 {
598 Int_t cId = track.getMotherTrackId();
599 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
600 if (cId != -1) {
601 track.SetMotherTrackId(cId + ioffset);
602 }
603 }
604
605 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
606 {
607 Int_t cId = ref.getTrackID();
608 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
609 ref.setTrackID(cId + ioffset);
610 }
611
612 void initHitTreeAndOutFile(std::string prefix, int detID)
613 {
615 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
616 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
617 mDetectorOutFiles[detID]->Close();
618 delete mDetectorOutFiles[detID];
619 }
620 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
621 if (mWriteToDisc) {
622 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
623 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
624 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
625 } else {
626 mDetectorOutFiles[detID] = nullptr;
627 mDetectorToTTreeMap[detID] = nullptr;
628 }
629 }
630
631 // This method goes over the buffers containing data for a given event; potentially merges
632 // them and flushes into the actual output file.
633 // The method can be called asynchronously to data collection
634 bool mergeAndFlushData()
635 {
636 auto checkIfNextFlushable = [this]() -> bool {
637 mNextFlushID++;
638 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
639 };
640
641 LOG(info) << "Launching merge kernel ";
642 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
643 if (!canflush) {
644 return false;
645 }
646 while (canflush == true) {
647 auto flusheventID = mNextFlushID;
648 LOG(info) << "Merge and flush event " << flusheventID;
649 auto iter = mSubEventInfoBuffer.find(flusheventID);
650 if (iter == mSubEventInfoBuffer.end()) {
651 LOG(error) << "No info/data found for event " << flusheventID;
652 if (!checkIfNextFlushable()) {
653 return false;
654 }
655 }
656
657 auto& subEventInfoList = (*iter).second;
658 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
659 LOG(error) << "No data entries found for event " << flusheventID;
660 if (!checkIfNextFlushable()) {
661 return false;
662 }
663 }
664
665 TStopwatch timer;
666 timer.Start();
667
668 // calculate trackoffsets
669 auto& confref = o2::conf::SimConfig::Instance();
670
671 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
672 std::vector<int> trackoffsets;
673 // collecting primary particles in each subevent (data arrival id)
674 std::vector<int> nprimaries;
675 // mapping of id to actual sub-event id (or part)
676 std::vector<int> nsubevents;
677
678 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
679
680 // the MC labels (trackID) for hits
681 for (auto info : subEventInfoList) {
682 assert(info->npersistenttracks >= 0);
683 trackoffsets.emplace_back(info->npersistenttracks);
684 nprimaries.emplace_back(info->nprimarytracks);
685 nsubevents.emplace_back(info->part);
686 if (eventheader == nullptr) {
687 eventheader = &info->mMCEventHeader;
688 } else {
689 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
690 }
691 }
692
693 // now see which events can be discarded in any case due to no hits
694 if (confref.isFilterOutNoHitEvents()) {
695 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
696 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
697 cleanEvent(flusheventID);
698 if (!checkIfNextFlushable()) {
699 return true;
700 }
701 }
702 }
703
704 // attention: We need to make sure that we write everything in the same event order
705 // but iteration over keys of a standard map in C++ is ordered
706
707 // b) merge the general data
708 //
709 // for MCTrack remap the motherIds and merge at the same go
710 const auto entries = subEventInfoList.size();
711 std::vector<int> subevOrdered((int)(nsubevents.size()));
712 for (int entry = entries - 1; entry >= 0; --entry) {
713 subevOrdered[nsubevents[entry] - 1] = entry;
714 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
715 }
716
717 // This is a hook that collects some useful statistics/properties on the event
718 // for use by other components;
719 // Properties are attached making use of the extensible "Info" feature which is already
720 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
721 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
722 // NOTE: This function might also be called directly in the primary server!?
723 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
724 int eta1Point2Counter = 0;
725 int eta1Point0Counter = 0;
726 int eta0Point8Counter = 0;
727 int eta1Point2CounterPi = 0;
728 int eta1Point0CounterPi = 0;
729 int eta0Point8CounterPi = 0;
730 int prims = 0;
731 for (auto& tr : tracks) {
732 if (tr.isPrimary()) {
733 prims++;
734 const auto eta = tr.GetEta();
735 if (eta < 1.2) {
736 eta1Point2Counter++;
737 if (std::abs(tr.GetPdgCode()) == 211) {
738 eta1Point2CounterPi++;
739 }
740 }
741 if (eta < 1.0) {
742 eta1Point0Counter++;
743 if (std::abs(tr.GetPdgCode()) == 211) {
744 eta1Point0CounterPi++;
745 }
746 }
747 if (eta < 0.8) {
748 eta0Point8Counter++;
749 if (std::abs(tr.GetPdgCode()) == 211) {
750 eta0Point8CounterPi++;
751 }
752 }
753 } else {
754 break; // track layout is such that all prims are first anyway
755 }
756 }
757 // attach these properties to eventheader
758 // we only need to make the names standard
759 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
760 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
761 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
762 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
763 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
764 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
765 eventheader->putInfo("prims_total", prims);
766 };
767
768 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
769
770 if (mOutTree) {
771 // adjusting and merging track references
772 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
773
774 // write MC event headers
775 {
776 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
777 headerbr->SetAddress(&eventheader);
778 headerbr->Fill();
779 headerbr->ResetAddress();
780 }
781
782 {
783 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
784 headerbr->SetAddress(&eventheader);
785 headerbr->Fill();
786 headerbr->ResetAddress();
787 }
788 }
789
790 // c) do the merge procedure for all hits ... delegate this to detector specific functions
791 // since they know about types; number of branches; etc.
792 // this will also fix the trackIDs inside the hits
793 for (int id = 0; id < mDetectorInstances.size(); ++id) {
794 auto& det = mDetectorInstances[id];
795 if (det) {
796 auto hittree = mDetectorToTTreeMap[id];
797 if (hittree) {
798 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
799 hittree->SetEntries(hittree->GetEntries() + 1);
800 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
801 }
802 }
803 }
804
805 // increase the entry count in the tree
806 if (mOutTree) {
807 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
808 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
809 }
810 if (mMCHeaderTree) {
811 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
812 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
813 }
814
815 cleanEvent(flusheventID);
816 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
817 if (!checkIfNextFlushable()) {
818 break;
819 }
820 } // end while
821 if (mWriteToDisc && mOutFile) {
822 LOG(info) << "Writing TTrees";
823 mOutFile->Write("", TObject::kOverwrite);
824 for (int id = 0; id < mDetectorInstances.size(); ++id) {
825 auto& det = mDetectorInstances[id];
826 if (det && mDetectorOutFiles[id]) {
827 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
828 }
829 }
830 if (mMCHeaderOnlyOutFile) {
831 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
832 }
833 }
834 return true;
835 }
836
837 std::map<uint32_t, uint32_t> mPartsCheckSum;
838 std::string mOutFileName;
839
840 // structures for the final flush
841 TFile* mOutFile;
842 TTree* mOutTree;
843 TFile* mMCHeaderOnlyOutFile;
844 TTree* mMCHeaderTree;
845
846 template <class K, class V>
847 using Hashtable = tbb::concurrent_unordered_map<K, V>;
848 Hashtable<int, TFile*> mDetectorOutFiles;
849 Hashtable<int, TTree*> mDetectorToTTreeMap;
850
851 // intermediate structures to collect data per event
852 std::thread mMergerIOThread;
853 bool mergingInProgress = false;
854
855 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
856 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
857 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
858 Hashtable<int, bool> mFlushableEvents;
859
860 int mEventChecksum = 0;
861 int mNExpectedEvents = 0;
862 int mNextFlushID = 1;
863 TStopwatch mTimer;
864
865 bool mAsService = false;
866 bool mForwardKine = true;
867 bool mWriteToDisc = true;
868
869 int mPipeToDriver = -1;
870
871 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
872
873 // output folder configuration
874 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
875 std::string mCurrentOutputDir; // current output folder asked
876
877 // channel to PUB status messages to outside subscribers
878 fair::mq::Channel mPubChannel;
879
880 // init detector instances
881 void initDetInstances();
882 void initHitFiles(std::string prefix);
883};
884
885void O2HitMerger::initHitFiles(std::string prefix)
886{
888
889 // a little helper lambda
890 auto isActivated = [](std::string s) -> bool {
891 // access user configuration for list of wanted modules
893 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
894 return active; };
895
896 for (int i = DetID::First; i <= DetID::Last; ++i) {
897 if (!isActivated(DetID::getName(i))) {
898 continue;
899 }
900 // init the detector specific output files
901 initHitTreeAndOutFile(prefix, i);
902 }
903}
904
905// init detector instances used to write hit data to a TTree
906void O2HitMerger::initDetInstances()
907{
909
910 // a little helper lambda
911 auto isActivated = [](std::string s) -> bool {
912 // access user configuration for list of wanted modules
914 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
915 return active; };
916
917 mDetectorInstances.resize(DetID::nDetectors);
918 // like a factory of detector objects
919
920 int counter = 0;
921 for (int i = DetID::First; i <= DetID::Last; ++i) {
922 if (!isActivated(DetID::getName(i))) {
923 continue;
924 }
925
926 if (i == DetID::TPC) {
927 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
928 counter++;
929 }
930 if (i == DetID::ITS) {
931 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
932 counter++;
933 }
934 if (i == DetID::MFT) {
935 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
936 counter++;
937 }
938 if (i == DetID::TRD) {
939 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
940 counter++;
941 }
942 if (i == DetID::PHS) {
943 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
944 counter++;
945 }
946 if (i == DetID::CPV) {
947 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
948 counter++;
949 }
950 if (i == DetID::EMC) {
951 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
952 counter++;
953 }
954 if (i == DetID::HMP) {
955 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
956 counter++;
957 }
958 if (i == DetID::TOF) {
959 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
960 counter++;
961 }
962 if (i == DetID::FT0) {
963 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
964 counter++;
965 }
966 if (i == DetID::FV0) {
967 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
968 counter++;
969 }
970 if (i == DetID::FDD) {
971 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
972 counter++;
973 }
974 if (i == DetID::MCH) {
975 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
976 counter++;
977 }
978 if (i == DetID::MID) {
979 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
980 counter++;
981 }
982 if (i == DetID::ZDC) {
983 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
984 counter++;
985 }
986 if (i == DetID::FOC) {
987 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, gSystem->ExpandPathName("$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Spaghetti.txt")));
988 counter++;
989 }
990#ifdef ENABLE_UPGRADES
991 if (i == DetID::IT3) {
992 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
993 counter++;
994 }
995 if (i == DetID::TRK) {
996 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
997 counter++;
998 }
999 if (i == DetID::FT3) {
1000 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
1001 counter++;
1002 }
1003 if (i == DetID::FCT) {
1004 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
1005 counter++;
1006 }
1007 if (i == DetID::TF3) {
1008 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
1009 counter++;
1010 }
1011 if (i == DetID::RCH) {
1012 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1013 counter++;
1014 }
1015 if (i == DetID::MI3) {
1016 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1017 counter++;
1018 }
1019 if (i == DetID::ECL) {
1020 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1021 counter++;
1022 }
1023#endif
1024 }
1025 if (counter != DetID::nDetectors) {
1026 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1027 }
1028}
1029
1030} // namespace devices
1031} // namespace o2
1032
1033#endif
Definition of the DescriptorInnerBarrelITS3 class.
Definition of the Names Generator class.
Definition of the Stack class.
Definition of the Detector class.
Definition of the Detector class.
Definition of the FV0 detector class.
int32_t i
Definition of the Detector class.
Definition of the Detector class.
bool waitForControlInput(int workerID)
TBranch * ptr
Definition of the Detector class.
Definition of the Detector class.
StringRef key
void SetMotherTrackId(Int_t id)
Modifiers.
Definition MCTrack.h:166
Int_t getMotherTrackId() const
Definition MCTrack.h:73
static std::string getHitsFileName(DId d, const std::string_view prefix=STANDARDSIMPREFIX)
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static std::string getMCHeadersFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:52
bool writeToDisc() const
Definition SimConfig.h:179
bool asService() const
Definition SimConfig.h:173
unsigned int getNEvents() const
Definition SimConfig.h:156
bool forwardKine() const
Definition SimConfig.h:178
static SimConfig & Instance()
Definition SimConfig.h:111
std::vector< std::string > const & getReadoutDetectors() const
Definition SimConfig.h:140
void putInfo(std::string const &key, T const &value)
void add(MCEventStats const &other)
merge from another object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:94
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:96
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID FOC
Definition DetID.h:80
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
O2HitMerger()
Default constructor.
~O2HitMerger() override
Default destructor.
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
static ShmManager & Instance()
Definition ShmManager.h:61
const GLfloat * m
Definition glcorearb.h:4066
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
TBranch * getOrMakeBranch(TTree &tree, const char *brname, T *ptr)
Definition Detector.h:281
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
void sighandler(int signal)
Definition O2HitMerger.h:91
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
struct o2::upgrades_utils::@454 tracks
structure to keep trigger-related info
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
o2::dataformats::MCEventHeader mMCEventHeader
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"