Project
Loading...
Searching...
No Matches
O2HitMerger.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_HITMERGER_H_
15#define ALICEO2_DEVICES_HITMERGER_H_
16
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include "TFile.h"
30#include "TMemFile.h"
31#include "TTree.h"
32#include "TROOT.h"
33#include <memory>
34#include <TMessage.h>
35#include <fairmq/Parts.h>
36#include <ctime>
37#include <TStopwatch.h>
38#include <sstream>
39#include <cassert>
40#include "FairSystemInfo.h"
41
42#include "O2HitMerger.h"
43#include "O2SimDevice.h"
48#include <TOFSimulation/Detector.h>
60
62#include <map>
63#include <vector>
64#include <list>
65#include <csignal>
66#include <mutex>
67#include <filesystem>
68#include <functional>
69
71
72#ifdef ENABLE_UPGRADES
82#endif
83
84#include <tbb/concurrent_unordered_map.h>
85
86namespace o2
87{
88namespace devices
89{
90
92{
93
94 class TMessageWrapper : public TMessage
95 {
96 public:
97 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
98 ~TMessageWrapper() override = default;
99 };
100
101 public:
104 {
105 mTimer.Start();
106 mInitialOutputDir = std::filesystem::current_path().string();
107 mCurrentOutputDir = mInitialOutputDir;
108 }
109
111 ~O2HitMerger() override
112 {
113 FairSystemInfo sysinfo;
114 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
115 mTimer.Continue();
116 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
117 << sysinfo.GetMaxMemory() << " MB\n";
118 }
119
120 private:
122 void InitTask() final
123 {
124 LOG(info) << "INIT HIT MERGER";
125 ROOT::EnableThreadSafety();
126
127 std::string outfilename("o2sim_merged_hits.root"); // default name
128 // query the sim config ... which is used to extract the filenames
129 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
131 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
132 }
136
137 mOutFileName = outfilename.c_str();
138 if (mWriteToDisc) {
139 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
140 mOutTree = new TTree("o2sim", "o2sim");
141 mOutTree->SetDirectory(mOutFile);
142
143 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
144 mMCHeaderTree = new TTree("o2sim", "o2sim");
145 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
146 }
147 // detectors init only once
148 if (mDetectorInstances.size() == 0) {
149 initDetInstances();
150 // has to be after init of Detectors
152 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
153 }
154
155 // init pipe
156 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
157 if (pipeenv) {
158 mPipeToDriver = atoi(pipeenv);
159 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
160 } else {
161 LOG(warning) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
162 }
163
164 // if no data to expect we shut down the device NOW since it would otherwise hang
165 if (mNExpectedEvents == 0) {
166 if (mAsService) {
167 waitForControlInput();
168 } else {
169 LOG(info) << "NOT EXPECTING ANY DATA; SHUTTING DOWN";
170 raise(SIGINT);
171 }
172 }
173 }
174
175 bool setWorkingDirectory(std::string const& dir)
176 {
177 namespace fs = std::filesystem;
178
179 // sets the output directory where simulation files are produced
180 // and creates it when it doesn't exist already
181
182 // 2 possibilities:
183 // a) dir is relative dir. Then we interpret it as relative to the initial
184 // base directory
185 // b) or dir is itself absolut.
186 try {
187 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
188 if (!dir.empty()) {
189 auto absolutePath = fs::absolute(fs::path(dir));
190 if (!fs::exists(absolutePath)) {
191 if (!fs::create_directory(absolutePath)) {
192 LOG(error) << "Could not create directory " << absolutePath.string();
193 return false;
194 }
195 }
196 // set the current path
197 fs::current_path(absolutePath.string().c_str());
198 mCurrentOutputDir = fs::current_path().string();
199 }
200 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
201 } catch (std::exception e) {
202 LOG(error) << " could not change path to " << dir;
203 }
204 return true;
205 }
206
207 // function for intermediate/on-the-fly reinitializations
208 bool ReInit(o2::conf::SimReconfigData const& reconfig)
209 {
210 if (reconfig.stop) {
211 return false;
212 }
213 if (!setWorkingDirectory(reconfig.outputDir)) {
214 return false;
215 }
216
217 std::string outfilename("o2sim_merged_hits.root"); // default name
219 mNExpectedEvents = reconfig.nEvents;
220 mOutFileName = outfilename.c_str();
221 if (mWriteToDisc) {
222 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
223 mOutTree = new TTree("o2sim", "o2sim");
224 mOutTree->SetDirectory(mOutFile);
225
226 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
227 mMCHeaderTree = new TTree("o2sim", "o2sim");
228 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
229 }
230 // reinit detectorInstance files (also make sure they are closed before continuing)
231 initHitFiles(reconfig.outputPrefix);
232
233 // clear "counter" datastructures
234 mPartsCheckSum.clear();
235 mEventChecksum = 0;
236
237 // clear collector datastructures
238 mMCTrackBuffer.clear();
239 mTrackRefBuffer.clear();
240 mSubEventInfoBuffer.clear();
241 mFlushableEvents.clear();
242 mNextFlushID = 1;
243
244 return true;
245 }
246
247 template <typename T, typename V>
248 V insertAdd(std::map<T, V>& m, T const& key, V value)
249 {
250 const auto iter = m.find(key);
251 V accum{0};
252 if (iter != m.end()) {
253 iter->second += value;
254 accum = iter->second;
255 } else {
256 m.insert(std::make_pair(key, value));
257 accum = value;
258 }
259 return accum;
260 }
261
262 template <typename T>
263 bool isDataComplete(T checksum, T nparts)
264 {
265 return checksum == nparts * (nparts + 1) / 2;
266 }
267
268 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
269 {
270 auto detIDmessage = std::move(data.At(index++));
271 // this should be a detector ID
272 if (detIDmessage->GetSize() == 4) {
273 auto ptr = (int*)detIDmessage->GetData();
275 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
276 << data.At(index)->GetSize() / 1024. / 1024.;
277
278 // get the detector that can interpret it
279 auto detector = mDetectorInstances[id].get();
280 if (detector) {
281 detector->collectHits(eventID, data, index);
282 }
283 }
284 }
285
286 template <typename T, typename BT>
287 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
288 {
289 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
290 if (buffer.find(eventID) == buffer.end()) {
291 buffer[eventID] = typename BT::mapped_type();
292 }
293 buffer[eventID].push_back(decodeddata);
294 // delete decodeddata; --> we store the pointers
295 index++;
296 }
297
298 // fills a special branch of SubEventInfos in order to keep
299 // track of which entry corresponds to which event etc.
300 // also creates the MCEventHeader branch expected for physics analysis
301 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
302 {
303 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
304 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
305 }
306 mSubEventInfoBuffer[info.eventID].push_back(&info);
307 }
308
310 {
311 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
312
313 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
314 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
315 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
316 LOG(info) << "SOCKETNAME " << controlsocketname;
317 channel.Connect(std::string(controlsocketname));
318 channel.Validate();
319 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
320
321 LOG(info) << "WAITING FOR INPUT";
322 if (channel.Receive(reply) > 0) {
323 auto data = reply->GetData();
324 auto size = reply->GetSize();
325
326 std::string command(reinterpret_cast<char const*>(data), size);
327 LOG(info) << "message: " << command;
328
330 o2::conf::parseSimReconfigFromString(command, reconfig);
331 return ReInit(reconfig);
332 } else {
333 LOG(info) << "NOTHING RECEIVED";
334 }
335 return true;
336 }
337
338 bool ConditionalRun() override
339 {
340 auto& channel = GetChannels().at("simdata").at(0);
341 fair::mq::Parts request;
342 auto bytes = channel.Receive(request);
343 if (bytes < 0) {
344 LOG(error) << "Some error occurred on socket during receive on sim data";
345 return true; // keep going
346 }
347 TStopwatch timer;
348 timer.Start();
349 auto more = handleSimData(request, 0);
350 LOG(info) << "HitMerger processing took " << timer.RealTime();
351 if (!more && mAsService) {
352 LOG(info) << " CONTROL ";
353 // if we are done treating data we may go back to init phase
354 // for the next batch
355 return waitForControlInput();
356 }
357 return more;
358 }
359
360 bool handleSimData(fair::mq::Parts& data, int /*index*/)
361 {
362 bool expectmore = true;
363 int index = 0;
364 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
365 o2::data::SubEventInfo& info = *infoptr;
366 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
367
368 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
369
370 fillSubEventInfoEntry(info);
371 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
372 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
373 while (index < data.Size()) {
374 consumeHits(info.eventID, data, index);
375 }
376
377 if (isDataComplete<uint32_t>(accum, info.nparts)) {
378 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
379 mFlushableEvents[info.eventID] = true;
380
381 // check if previous flush finished
382 // start merging only when no merging currently happening
383 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
384 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
385 if (!mergingInProgress) {
386 if (mMergerIOThread.joinable()) {
387 mMergerIOThread.join();
388 }
389 // start hit merging and flushing in a separate thread in order not to block
390 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
391 }
392
393 mEventChecksum += info.eventID;
394 // we also need to check if we have all events
395 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
396 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
397
398 // flush remaining data and close file
399 if (mMergerIOThread.joinable()) {
400 mMergerIOThread.join();
401 }
402 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
403 if (mMergerIOThread.joinable()) {
404 mMergerIOThread.join();
405 }
406
407 expectmore = false;
408 }
409
410 if (mPipeToDriver != -1) {
411 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
412 LOG(error) << "FAILED WRITING TO PIPE";
413 };
414 }
415 }
416 if (!expectmore) {
417 // somehow FairMQ has difficulties shutting down; helping manually
418 // raise(SIGINT);
419 }
420 return expectmore;
421 }
422
423 void cleanEvent(int eventID)
424 {
425 // cleanup intermediate per-Event buffers
426 }
427
428 template <typename T>
429 void backInsert(T const& from, T& to)
430 {
431 std::copy(from.begin(), from.end(), std::back_inserter(to));
432 }
433
434 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
435 {
436 // avoid doing this for trivial cases
437 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
438 auto targetdata = std::make_unique<std::vector<MCTrack>>();
439
440 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
441 const auto entries = vectorOfSubEventMCTracks.size();
442
443 if (entries > 1) {
444 //
445 // loop over subevents to store the primary events
446 //
447 int nprimTot = 0;
448 for (int entry = entries - 1; entry >= 0; --entry) {
449 int index = nsubevents[entry];
450 nprimTot += nprimaries[index];
451 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
452 for (int i = 0; i < nprimaries[index]; i++) {
453 auto& track = (*vectorOfSubEventMCTracks[index])[i];
454 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
455 track.SetFirstDaughterTrackId(-1);
456 track.SetLastDaughterTrackId(-1);
457 }
458 targetdata->push_back(track);
459 }
460 }
461 //
462 // loop a second time to store the secondaries and fix the mother track IDs
463 //
464 Int_t idelta1 = nprimTot;
465 Int_t idelta0 = 0;
466 for (int entry = entries - 1; entry >= 0; --entry) {
467 int index = nsubevents[entry];
468
469 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
470 // we need to fetch the right mctracks here!!
471 Int_t npart = (int)(subEventTracks.size());
472 Int_t nprim = nprimaries[index];
473 idelta1 -= nprim;
474
475 for (Int_t i = nprim; i < npart; i++) {
476 auto& track = subEventTracks[i];
477 Int_t cId = track.getMotherTrackId();
478 if (cId >= nprim) {
479 cId += idelta1;
480 } else {
481 cId += idelta0;
482 }
483 track.SetMotherTrackId(cId);
484 track.SetFirstDaughterTrackId(-1);
485
486 Int_t hwm = (int)(targetdata->size());
487 auto& mother = (*targetdata)[cId];
488 if (mother.getFirstDaughterTrackId() == -1) {
489 mother.SetFirstDaughterTrackId(hwm);
490 }
491 mother.SetLastDaughterTrackId(hwm);
492
493 targetdata->push_back(track);
494 }
495 idelta0 += nprim;
496 idelta1 += npart;
497 }
498 }
499 //
500 // write to output
501 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
502
503 // we give the possibility to produce some MC track statistics
504 // to be saved as part of the MCHeader structure
505 tracks_analysis_hook(*filladdr);
506
507 if (mWriteToDisc && target) {
508 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
509 targetbr->SetAddress(&filladdr);
510 targetbr->Fill();
511 targetbr->ResetAddress();
512 }
513 // forwarding the track data to other consumers (pub/sub)
514 if (mForwardKine) {
515 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
516 auto& channel = GetChannels().at("kineforward").at(0);
517 TMessage* tmsg = new TMessage(kMESS_OBJECT);
518 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
519 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
520 tmsg = new TMessage(kMESS_OBJECT);
521 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
522 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
523 fair::mq::Parts reply;
524 reply.AddPart(std::move(headermessage));
525 reply.AddPart(std::move(trackmessage));
526 channel.Send(reply);
527 LOG(info) << "Forward publish MC tracks on channel";
528 }
529
530 // cleanup buffered data
531 for (auto ptr : vectorOfSubEventMCTracks) {
532 delete ptr; // avoid this by using unique ptr
533 }
534 }
535
536 template <typename T, typename M>
537 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
538 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
539 {
540 //
541 // Remap the mother track IDs by adding an offset.
542 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
543 // This method is called by O2HitMerger::mergeAndFlushData(int)
544 //
545 T* incomingdata = nullptr;
546 std::unique_ptr<T> targetdata(nullptr);
547 auto& vectorOfT = mapOfVectorOfTs[eventID];
548 const auto entries = vectorOfT.size();
549
550 if (entries == 1) {
551 // nothing to do in case there is only one entry
552 incomingdata = vectorOfT[0];
553 } else {
554 targetdata = std::make_unique<T>();
555 // loop over subevents
556 Int_t nprimTot = 0;
557 for (int entry = 0; entry < entries; entry++) {
558 nprimTot += nprimaries[entry];
559 }
560 Int_t idelta0 = 0;
561 Int_t idelta1 = nprimTot;
562 for (int entry = entries - 1; entry >= 0; --entry) {
563 Int_t index = subevOrdered[entry];
564 Int_t nprim = nprimaries[index];
565 incomingdata = vectorOfT[index];
566 idelta1 -= nprim;
567 for (auto& data : *incomingdata) {
568 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
569 targetdata->push_back(data);
570 }
571 idelta0 += nprim;
572 idelta1 += trackoffsets[index];
573 }
574 }
575 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
576 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
577 targetbr->SetAddress(&dataaddr);
578 targetbr->Fill();
579 targetbr->ResetAddress();
580
581 // cleanup mem
582 for (auto ptr : vectorOfT) {
583 delete ptr; // avoid this by using unique ptr
584 }
585 }
586
587 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
588 {
589 Int_t cId = track.getMotherTrackId();
590 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
591 if (cId != -1) {
592 track.SetMotherTrackId(cId + ioffset);
593 }
594 }
595
596 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
597 {
598 Int_t cId = ref.getTrackID();
599 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
600 ref.setTrackID(cId + ioffset);
601 }
602
603 void initHitTreeAndOutFile(std::string prefix, int detID)
604 {
606 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
607 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
608 mDetectorOutFiles[detID]->Close();
609 delete mDetectorOutFiles[detID];
610 }
611 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
612 if (mWriteToDisc) {
613 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
614 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
615 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
616 } else {
617 mDetectorOutFiles[detID] = nullptr;
618 mDetectorToTTreeMap[detID] = nullptr;
619 }
620 }
621
622 // This method goes over the buffers containing data for a given event; potentially merges
623 // them and flushes into the actual output file.
624 // The method can be called asynchronously to data collection
625 bool mergeAndFlushData()
626 {
627 auto checkIfNextFlushable = [this]() -> bool {
628 mNextFlushID++;
629 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
630 };
631
632 LOG(info) << "Launching merge kernel ";
633 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
634 if (!canflush) {
635 return false;
636 }
637 while (canflush == true) {
638 auto flusheventID = mNextFlushID;
639 LOG(info) << "Merge and flush event " << flusheventID;
640 auto iter = mSubEventInfoBuffer.find(flusheventID);
641 if (iter == mSubEventInfoBuffer.end()) {
642 LOG(error) << "No info/data found for event " << flusheventID;
643 if (!checkIfNextFlushable()) {
644 return false;
645 }
646 }
647
648 auto& subEventInfoList = (*iter).second;
649 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
650 LOG(error) << "No data entries found for event " << flusheventID;
651 if (!checkIfNextFlushable()) {
652 return false;
653 }
654 }
655
656 TStopwatch timer;
657 timer.Start();
658
659 // calculate trackoffsets
660 auto& confref = o2::conf::SimConfig::Instance();
661
662 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
663 std::vector<int> trackoffsets;
664 // collecting primary particles in each subevent (data arrival id)
665 std::vector<int> nprimaries;
666 // mapping of id to actual sub-event id (or part)
667 std::vector<int> nsubevents;
668
669 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
670
671 // the MC labels (trackID) for hits
672 for (auto info : subEventInfoList) {
673 assert(info->npersistenttracks >= 0);
674 trackoffsets.emplace_back(info->npersistenttracks);
675 nprimaries.emplace_back(info->nprimarytracks);
676 nsubevents.emplace_back(info->part);
677 if (eventheader == nullptr) {
678 eventheader = &info->mMCEventHeader;
679 } else {
680 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
681 }
682 }
683
684 // now see which events can be discarded in any case due to no hits
685 if (confref.isFilterOutNoHitEvents()) {
686 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
687 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
688 cleanEvent(flusheventID);
689 if (!checkIfNextFlushable()) {
690 return true;
691 }
692 }
693 }
694
695 // attention: We need to make sure that we write everything in the same event order
696 // but iteration over keys of a standard map in C++ is ordered
697
698 // b) merge the general data
699 //
700 // for MCTrack remap the motherIds and merge at the same go
701 const auto entries = subEventInfoList.size();
702 std::vector<int> subevOrdered((int)(nsubevents.size()));
703 for (int entry = entries - 1; entry >= 0; --entry) {
704 subevOrdered[nsubevents[entry] - 1] = entry;
705 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
706 }
707
708 // This is a hook that collects some useful statistics/properties on the event
709 // for use by other components;
710 // Properties are attached making use of the extensible "Info" feature which is already
711 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
712 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
713 // NOTE: This function might also be called directly in the primary server!?
714 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
715 int eta1Point2Counter = 0;
716 int eta1Point0Counter = 0;
717 int eta0Point8Counter = 0;
718 int eta1Point2CounterPi = 0;
719 int eta1Point0CounterPi = 0;
720 int eta0Point8CounterPi = 0;
721 int prims = 0;
722 for (auto& tr : tracks) {
723 if (tr.isPrimary()) {
724 prims++;
725 const auto eta = tr.GetEta();
726 if (eta < 1.2) {
727 eta1Point2Counter++;
728 if (std::abs(tr.GetPdgCode()) == 211) {
729 eta1Point2CounterPi++;
730 }
731 }
732 if (eta < 1.0) {
733 eta1Point0Counter++;
734 if (std::abs(tr.GetPdgCode()) == 211) {
735 eta1Point0CounterPi++;
736 }
737 }
738 if (eta < 0.8) {
739 eta0Point8Counter++;
740 if (std::abs(tr.GetPdgCode()) == 211) {
741 eta0Point8CounterPi++;
742 }
743 }
744 } else {
745 break; // track layout is such that all prims are first anyway
746 }
747 }
748 // attach these properties to eventheader
749 // we only need to make the names standard
750 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
751 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
752 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
753 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
754 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
755 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
756 eventheader->putInfo("prims_total", prims);
757 };
758 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
759
760 if (mOutTree) {
761 // adjusting and merging track references
762 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
763
764 // write MC event headers
765 {
766 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
767 headerbr->SetAddress(&eventheader);
768 headerbr->Fill();
769 headerbr->ResetAddress();
770 }
771
772 {
773 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
774 headerbr->SetAddress(&eventheader);
775 headerbr->Fill();
776 headerbr->ResetAddress();
777 }
778 }
779
780 // c) do the merge procedure for all hits ... delegate this to detector specific functions
781 // since they know about types; number of branches; etc.
782 // this will also fix the trackIDs inside the hits
783 for (int id = 0; id < mDetectorInstances.size(); ++id) {
784 auto& det = mDetectorInstances[id];
785 if (det) {
786 auto hittree = mDetectorToTTreeMap[id];
787 if (hittree) {
788 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
789 hittree->SetEntries(hittree->GetEntries() + 1);
790 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
791 }
792 }
793 }
794
795 // increase the entry count in the tree
796 if (mOutTree) {
797 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
798 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
799 }
800 if (mMCHeaderTree) {
801 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
802 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
803 }
804
805 cleanEvent(flusheventID);
806 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
807 if (!checkIfNextFlushable()) {
808 break;
809 }
810 } // end while
811 if (mWriteToDisc && mOutFile) {
812 LOG(info) << "Writing TTrees";
813 mOutFile->Write("", TObject::kOverwrite);
814 for (int id = 0; id < mDetectorInstances.size(); ++id) {
815 auto& det = mDetectorInstances[id];
816 if (det && mDetectorOutFiles[id]) {
817 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
818 }
819 }
820 if (mMCHeaderOnlyOutFile) {
821 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
822 }
823 }
824 return true;
825 }
826
827 std::map<uint32_t, uint32_t> mPartsCheckSum;
828 std::string mOutFileName;
829
830 // structures for the final flush
831 TFile* mOutFile;
832 TTree* mOutTree;
833 TFile* mMCHeaderOnlyOutFile;
834 TTree* mMCHeaderTree;
835
836 template <class K, class V>
837 using Hashtable = tbb::concurrent_unordered_map<K, V>;
838 Hashtable<int, TFile*> mDetectorOutFiles;
839 Hashtable<int, TTree*> mDetectorToTTreeMap;
840
841 // intermediate structures to collect data per event
842 std::thread mMergerIOThread;
843 bool mergingInProgress = false;
844
845 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
846 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
847 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
848 Hashtable<int, bool> mFlushableEvents;
849
850 int mEventChecksum = 0;
851 int mNExpectedEvents = 0;
852 int mNextFlushID = 1;
853 TStopwatch mTimer;
854
855 bool mAsService = false;
856 bool mForwardKine = true;
857 bool mWriteToDisc = true;
858
859 int mPipeToDriver = -1;
860
861 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
862
863 // output folder configuration
864 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
865 std::string mCurrentOutputDir; // current output folder asked
866
867 // channel to PUB status messages to outside subscribers
868 fair::mq::Channel mPubChannel;
869
870 // init detector instances
871 void initDetInstances();
872 void initHitFiles(std::string prefix);
873};
874
875void O2HitMerger::initHitFiles(std::string prefix)
876{
878
879 // a little helper lambda
880 auto isActivated = [](std::string s) -> bool {
881 // access user configuration for list of wanted modules
883 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
884 return active; };
885
886 for (int i = DetID::First; i <= DetID::Last; ++i) {
887 if (!isActivated(DetID::getName(i))) {
888 continue;
889 }
890 // init the detector specific output files
891 initHitTreeAndOutFile(prefix, i);
892 }
893}
894
895// init detector instances used to write hit data to a TTree
896void O2HitMerger::initDetInstances()
897{
899
900 // a little helper lambda
901 auto isActivated = [](std::string s) -> bool {
902 // access user configuration for list of wanted modules
904 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
905 return active; };
906
907 mDetectorInstances.resize(DetID::nDetectors);
908 // like a factory of detector objects
909
910 int counter = 0;
911 for (int i = DetID::First; i <= DetID::Last; ++i) {
912 if (!isActivated(DetID::getName(i))) {
913 continue;
914 }
915
916 if (i == DetID::TPC) {
917 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
918 counter++;
919 }
920 if (i == DetID::ITS) {
921 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
922 counter++;
923 }
924 if (i == DetID::MFT) {
925 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
926 counter++;
927 }
928 if (i == DetID::TRD) {
929 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
930 counter++;
931 }
932 if (i == DetID::PHS) {
933 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
934 counter++;
935 }
936 if (i == DetID::CPV) {
937 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
938 counter++;
939 }
940 if (i == DetID::EMC) {
941 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
942 counter++;
943 }
944 if (i == DetID::HMP) {
945 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
946 counter++;
947 }
948 if (i == DetID::TOF) {
949 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
950 counter++;
951 }
952 if (i == DetID::FT0) {
953 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
954 counter++;
955 }
956 if (i == DetID::FV0) {
957 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
958 counter++;
959 }
960 if (i == DetID::FDD) {
961 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
962 counter++;
963 }
964 if (i == DetID::MCH) {
965 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
966 counter++;
967 }
968 if (i == DetID::MID) {
969 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
970 counter++;
971 }
972 if (i == DetID::ZDC) {
973 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
974 counter++;
975 }
976 if (i == DetID::FOC) {
977 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, gSystem->ExpandPathName("$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Sheets.txt")));
978 counter++;
979 }
980#ifdef ENABLE_UPGRADES
981 if (i == DetID::IT3) {
982 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
983 counter++;
984 }
985 if (i == DetID::TRK) {
986 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
987 counter++;
988 }
989 if (i == DetID::FT3) {
990 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
991 counter++;
992 }
993 if (i == DetID::FCT) {
994 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
995 counter++;
996 }
997 if (i == DetID::TF3) {
998 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
999 counter++;
1000 }
1001 if (i == DetID::RCH) {
1002 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1003 counter++;
1004 }
1005 if (i == DetID::MI3) {
1006 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1007 counter++;
1008 }
1009 if (i == DetID::ECL) {
1010 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1011 counter++;
1012 }
1013 if (i == DetID::FD3) {
1014 mDetectorInstances[i] = std::move(std::make_unique<o2::fd3::Detector>(true));
1015 counter++;
1016 }
1017#endif
1018 }
1019 if (counter != DetID::nDetectors) {
1020 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1021 }
1022}
1023
1024} // namespace devices
1025} // namespace o2
1026
1027#endif
Definition of the DescriptorInnerBarrelITS3 class.
Definition of the Names Generator class.
Definition of the Stack class.
Definition of the Detector class.
Definition of the Detector class.
Definition of the FV0 detector class.
int32_t i
Definition of the Detector class.
Definition of the Detector class.
bool waitForControlInput(int workerID)
TBranch * ptr
ECal geometry creation and hit processing.
Definition of the Detector class.
Definition of the Detector class.
Definition of the Detector class.
StringRef key
void SetMotherTrackId(Int_t id)
Modifiers.
Definition MCTrack.h:166
Int_t getMotherTrackId() const
Definition MCTrack.h:73
static std::string getHitsFileName(DId d, const std::string_view prefix=STANDARDSIMPREFIX)
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static std::string getMCHeadersFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:52
bool writeToDisc() const
Definition SimConfig.h:179
bool asService() const
Definition SimConfig.h:173
unsigned int getNEvents() const
Definition SimConfig.h:156
bool forwardKine() const
Definition SimConfig.h:178
static SimConfig & Instance()
Definition SimConfig.h:111
std::vector< std::string > const & getReadoutDetectors() const
Definition SimConfig.h:140
void putInfo(std::string const &key, T const &value)
void add(MCEventStats const &other)
merge from another object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:146
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:95
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:97
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID FOC
Definition DetID.h:80
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
O2HitMerger()
Default constructor.
~O2HitMerger() override
Default destructor.
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
static ShmManager & Instance()
Definition ShmManager.h:61
const GLfloat * m
Definition glcorearb.h:4066
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
TBranch * getOrMakeBranch(TTree &tree, const char *brname, T *ptr)
Definition Detector.h:281
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
struct o2::upgrades_utils::@454 tracks
structure to keep trigger-related info
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
o2::dataformats::MCEventHeader mMCEventHeader
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"