Project
Loading...
Searching...
No Matches
O2HitMerger.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_HITMERGER_H_
15#define ALICEO2_DEVICES_HITMERGER_H_
16
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include "TFile.h"
30#include "TMemFile.h"
31#include "TTree.h"
32#include "TROOT.h"
33#include <memory>
34#include <TMessage.h>
35#include <fairmq/Parts.h>
36#include <ctime>
37#include <TStopwatch.h>
38#include <sstream>
39#include <cassert>
40#include "FairSystemInfo.h"
41
42#include "O2HitMerger.h"
43#include "O2SimDevice.h"
48#include <TOFSimulation/Detector.h>
60
62#include <map>
63#include <vector>
64#include <list>
65#include <csignal>
66#include <mutex>
67#include <filesystem>
68#include <functional>
69
71
72#ifdef ENABLE_UPGRADES
81#endif
82
83#include <tbb/concurrent_unordered_map.h>
84
85namespace o2
86{
87namespace devices
88{
89
91{
92
93 class TMessageWrapper : public TMessage
94 {
95 public:
96 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
97 ~TMessageWrapper() override = default;
98 };
99
100 public:
103 {
104 mTimer.Start();
105 mInitialOutputDir = std::filesystem::current_path().string();
106 mCurrentOutputDir = mInitialOutputDir;
107 }
108
110 ~O2HitMerger() override
111 {
112 FairSystemInfo sysinfo;
113 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
114 mTimer.Continue();
115 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
116 << sysinfo.GetMaxMemory() << " MB\n";
117 }
118
119 private:
121 void InitTask() final
122 {
123 LOG(info) << "INIT HIT MERGER";
124 ROOT::EnableThreadSafety();
125
126 std::string outfilename("o2sim_merged_hits.root"); // default name
127 // query the sim config ... which is used to extract the filenames
128 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
130 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
131 }
135
136 mOutFileName = outfilename.c_str();
137 if (mWriteToDisc) {
138 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
139 mOutTree = new TTree("o2sim", "o2sim");
140 mOutTree->SetDirectory(mOutFile);
141
142 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
143 mMCHeaderTree = new TTree("o2sim", "o2sim");
144 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
145 }
146 // detectors init only once
147 if (mDetectorInstances.size() == 0) {
148 initDetInstances();
149 // has to be after init of Detectors
151 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
152 }
153
154 // init pipe
155 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
156 if (pipeenv) {
157 mPipeToDriver = atoi(pipeenv);
158 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
159 } else {
160 LOG(warning) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
161 }
162
163 // if no data to expect we shut down the device NOW since it would otherwise hang
164 if (mNExpectedEvents == 0) {
165 if (mAsService) {
166 waitForControlInput();
167 } else {
168 LOG(info) << "NOT EXPECTING ANY DATA; SHUTTING DOWN";
169 raise(SIGINT);
170 }
171 }
172 }
173
174 bool setWorkingDirectory(std::string const& dir)
175 {
176 namespace fs = std::filesystem;
177
178 // sets the output directory where simulation files are produced
179 // and creates it when it doesn't exist already
180
181 // 2 possibilities:
182 // a) dir is relative dir. Then we interpret it as relative to the initial
183 // base directory
184 // b) or dir is itself absolut.
185 try {
186 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
187 if (!dir.empty()) {
188 auto absolutePath = fs::absolute(fs::path(dir));
189 if (!fs::exists(absolutePath)) {
190 if (!fs::create_directory(absolutePath)) {
191 LOG(error) << "Could not create directory " << absolutePath.string();
192 return false;
193 }
194 }
195 // set the current path
196 fs::current_path(absolutePath.string().c_str());
197 mCurrentOutputDir = fs::current_path().string();
198 }
199 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
200 } catch (std::exception e) {
201 LOG(error) << " could not change path to " << dir;
202 }
203 return true;
204 }
205
206 // function for intermediate/on-the-fly reinitializations
207 bool ReInit(o2::conf::SimReconfigData const& reconfig)
208 {
209 if (reconfig.stop) {
210 return false;
211 }
212 if (!setWorkingDirectory(reconfig.outputDir)) {
213 return false;
214 }
215
216 std::string outfilename("o2sim_merged_hits.root"); // default name
218 mNExpectedEvents = reconfig.nEvents;
219 mOutFileName = outfilename.c_str();
220 if (mWriteToDisc) {
221 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
222 mOutTree = new TTree("o2sim", "o2sim");
223 mOutTree->SetDirectory(mOutFile);
224
225 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
226 mMCHeaderTree = new TTree("o2sim", "o2sim");
227 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
228 }
229 // reinit detectorInstance files (also make sure they are closed before continuing)
230 initHitFiles(reconfig.outputPrefix);
231
232 // clear "counter" datastructures
233 mPartsCheckSum.clear();
234 mEventChecksum = 0;
235
236 // clear collector datastructures
237 mMCTrackBuffer.clear();
238 mTrackRefBuffer.clear();
239 mSubEventInfoBuffer.clear();
240 mFlushableEvents.clear();
241 mNextFlushID = 1;
242
243 return true;
244 }
245
246 template <typename T, typename V>
247 V insertAdd(std::map<T, V>& m, T const& key, V value)
248 {
249 const auto iter = m.find(key);
250 V accum{0};
251 if (iter != m.end()) {
252 iter->second += value;
253 accum = iter->second;
254 } else {
255 m.insert(std::make_pair(key, value));
256 accum = value;
257 }
258 return accum;
259 }
260
261 template <typename T>
262 bool isDataComplete(T checksum, T nparts)
263 {
264 return checksum == nparts * (nparts + 1) / 2;
265 }
266
267 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
268 {
269 auto detIDmessage = std::move(data.At(index++));
270 // this should be a detector ID
271 if (detIDmessage->GetSize() == 4) {
272 auto ptr = (int*)detIDmessage->GetData();
274 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
275 << data.At(index)->GetSize() / 1024. / 1024.;
276
277 // get the detector that can interpret it
278 auto detector = mDetectorInstances[id].get();
279 if (detector) {
280 detector->collectHits(eventID, data, index);
281 }
282 }
283 }
284
285 template <typename T, typename BT>
286 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
287 {
288 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
289 if (buffer.find(eventID) == buffer.end()) {
290 buffer[eventID] = typename BT::mapped_type();
291 }
292 buffer[eventID].push_back(decodeddata);
293 // delete decodeddata; --> we store the pointers
294 index++;
295 }
296
297 // fills a special branch of SubEventInfos in order to keep
298 // track of which entry corresponds to which event etc.
299 // also creates the MCEventHeader branch expected for physics analysis
300 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
301 {
302 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
303 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
304 }
305 mSubEventInfoBuffer[info.eventID].push_back(&info);
306 }
307
309 {
310 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
311
312 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
313 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
314 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
315 LOG(info) << "SOCKETNAME " << controlsocketname;
316 channel.Connect(std::string(controlsocketname));
317 channel.Validate();
318 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
319
320 LOG(info) << "WAITING FOR INPUT";
321 if (channel.Receive(reply) > 0) {
322 auto data = reply->GetData();
323 auto size = reply->GetSize();
324
325 std::string command(reinterpret_cast<char const*>(data), size);
326 LOG(info) << "message: " << command;
327
329 o2::conf::parseSimReconfigFromString(command, reconfig);
330 return ReInit(reconfig);
331 } else {
332 LOG(info) << "NOTHING RECEIVED";
333 }
334 return true;
335 }
336
337 bool ConditionalRun() override
338 {
339 auto& channel = GetChannels().at("simdata").at(0);
340 fair::mq::Parts request;
341 auto bytes = channel.Receive(request);
342 if (bytes < 0) {
343 LOG(error) << "Some error occurred on socket during receive on sim data";
344 return true; // keep going
345 }
346 TStopwatch timer;
347 timer.Start();
348 auto more = handleSimData(request, 0);
349 LOG(info) << "HitMerger processing took " << timer.RealTime();
350 if (!more && mAsService) {
351 LOG(info) << " CONTROL ";
352 // if we are done treating data we may go back to init phase
353 // for the next batch
354 return waitForControlInput();
355 }
356 return more;
357 }
358
359 bool handleSimData(fair::mq::Parts& data, int /*index*/)
360 {
361 bool expectmore = true;
362 int index = 0;
363 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
364 o2::data::SubEventInfo& info = *infoptr;
365 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
366
367 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
368
369 fillSubEventInfoEntry(info);
370 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
371 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
372 while (index < data.Size()) {
373 consumeHits(info.eventID, data, index);
374 }
375
376 if (isDataComplete<uint32_t>(accum, info.nparts)) {
377 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
378 mFlushableEvents[info.eventID] = true;
379
380 // check if previous flush finished
381 // start merging only when no merging currently happening
382 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
383 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
384 if (!mergingInProgress) {
385 if (mMergerIOThread.joinable()) {
386 mMergerIOThread.join();
387 }
388 // start hit merging and flushing in a separate thread in order not to block
389 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
390 }
391
392 mEventChecksum += info.eventID;
393 // we also need to check if we have all events
394 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
395 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
396
397 // flush remaining data and close file
398 if (mMergerIOThread.joinable()) {
399 mMergerIOThread.join();
400 }
401 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
402 if (mMergerIOThread.joinable()) {
403 mMergerIOThread.join();
404 }
405
406 expectmore = false;
407 }
408
409 if (mPipeToDriver != -1) {
410 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
411 LOG(error) << "FAILED WRITING TO PIPE";
412 };
413 }
414 }
415 if (!expectmore) {
416 // somehow FairMQ has difficulties shutting down; helping manually
417 // raise(SIGINT);
418 }
419 return expectmore;
420 }
421
422 void cleanEvent(int eventID)
423 {
424 // cleanup intermediate per-Event buffers
425 }
426
427 template <typename T>
428 void backInsert(T const& from, T& to)
429 {
430 std::copy(from.begin(), from.end(), std::back_inserter(to));
431 }
432
433 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
434 {
435 // avoid doing this for trivial cases
436 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
437 auto targetdata = std::make_unique<std::vector<MCTrack>>();
438
439 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
440 const auto entries = vectorOfSubEventMCTracks.size();
441
442 if (entries > 1) {
443 //
444 // loop over subevents to store the primary events
445 //
446 int nprimTot = 0;
447 for (int entry = entries - 1; entry >= 0; --entry) {
448 int index = nsubevents[entry];
449 nprimTot += nprimaries[index];
450 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
451 for (int i = 0; i < nprimaries[index]; i++) {
452 auto& track = (*vectorOfSubEventMCTracks[index])[i];
453 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
454 track.SetFirstDaughterTrackId(-1);
455 track.SetLastDaughterTrackId(-1);
456 }
457 targetdata->push_back(track);
458 }
459 }
460 //
461 // loop a second time to store the secondaries and fix the mother track IDs
462 //
463 Int_t idelta1 = nprimTot;
464 Int_t idelta0 = 0;
465 for (int entry = entries - 1; entry >= 0; --entry) {
466 int index = nsubevents[entry];
467
468 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
469 // we need to fetch the right mctracks here!!
470 Int_t npart = (int)(subEventTracks.size());
471 Int_t nprim = nprimaries[index];
472 idelta1 -= nprim;
473
474 for (Int_t i = nprim; i < npart; i++) {
475 auto& track = subEventTracks[i];
476 Int_t cId = track.getMotherTrackId();
477 if (cId >= nprim) {
478 cId += idelta1;
479 } else {
480 cId += idelta0;
481 }
482 track.SetMotherTrackId(cId);
483 track.SetFirstDaughterTrackId(-1);
484
485 Int_t hwm = (int)(targetdata->size());
486 auto& mother = (*targetdata)[cId];
487 if (mother.getFirstDaughterTrackId() == -1) {
488 mother.SetFirstDaughterTrackId(hwm);
489 }
490 mother.SetLastDaughterTrackId(hwm);
491
492 targetdata->push_back(track);
493 }
494 idelta0 += nprim;
495 idelta1 += npart;
496 }
497 }
498 //
499 // write to output
500 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
501
502 // we give the possibility to produce some MC track statistics
503 // to be saved as part of the MCHeader structure
504 tracks_analysis_hook(*filladdr);
505
506 if (mWriteToDisc && target) {
507 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
508 targetbr->SetAddress(&filladdr);
509 targetbr->Fill();
510 targetbr->ResetAddress();
511 }
512 // forwarding the track data to other consumers (pub/sub)
513 if (mForwardKine) {
514 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
515 auto& channel = GetChannels().at("kineforward").at(0);
516 TMessage* tmsg = new TMessage(kMESS_OBJECT);
517 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
518 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
519 tmsg = new TMessage(kMESS_OBJECT);
520 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
521 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
522 fair::mq::Parts reply;
523 reply.AddPart(std::move(headermessage));
524 reply.AddPart(std::move(trackmessage));
525 channel.Send(reply);
526 LOG(info) << "Forward publish MC tracks on channel";
527 }
528
529 // cleanup buffered data
530 for (auto ptr : vectorOfSubEventMCTracks) {
531 delete ptr; // avoid this by using unique ptr
532 }
533 }
534
535 template <typename T, typename M>
536 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
537 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
538 {
539 //
540 // Remap the mother track IDs by adding an offset.
541 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
542 // This method is called by O2HitMerger::mergeAndFlushData(int)
543 //
544 T* incomingdata = nullptr;
545 std::unique_ptr<T> targetdata(nullptr);
546 auto& vectorOfT = mapOfVectorOfTs[eventID];
547 const auto entries = vectorOfT.size();
548
549 if (entries == 1) {
550 // nothing to do in case there is only one entry
551 incomingdata = vectorOfT[0];
552 } else {
553 targetdata = std::make_unique<T>();
554 // loop over subevents
555 Int_t nprimTot = 0;
556 for (int entry = 0; entry < entries; entry++) {
557 nprimTot += nprimaries[entry];
558 }
559 Int_t idelta0 = 0;
560 Int_t idelta1 = nprimTot;
561 for (int entry = entries - 1; entry >= 0; --entry) {
562 Int_t index = subevOrdered[entry];
563 Int_t nprim = nprimaries[index];
564 incomingdata = vectorOfT[index];
565 idelta1 -= nprim;
566 for (auto& data : *incomingdata) {
567 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
568 targetdata->push_back(data);
569 }
570 idelta0 += nprim;
571 idelta1 += trackoffsets[index];
572 }
573 }
574 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
575 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
576 targetbr->SetAddress(&dataaddr);
577 targetbr->Fill();
578 targetbr->ResetAddress();
579
580 // cleanup mem
581 for (auto ptr : vectorOfT) {
582 delete ptr; // avoid this by using unique ptr
583 }
584 }
585
586 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
587 {
588 Int_t cId = track.getMotherTrackId();
589 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
590 if (cId != -1) {
591 track.SetMotherTrackId(cId + ioffset);
592 }
593 }
594
595 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
596 {
597 Int_t cId = ref.getTrackID();
598 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
599 ref.setTrackID(cId + ioffset);
600 }
601
602 void initHitTreeAndOutFile(std::string prefix, int detID)
603 {
605 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
606 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
607 mDetectorOutFiles[detID]->Close();
608 delete mDetectorOutFiles[detID];
609 }
610 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
611 if (mWriteToDisc) {
612 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
613 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
614 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
615 } else {
616 mDetectorOutFiles[detID] = nullptr;
617 mDetectorToTTreeMap[detID] = nullptr;
618 }
619 }
620
621 // This method goes over the buffers containing data for a given event; potentially merges
622 // them and flushes into the actual output file.
623 // The method can be called asynchronously to data collection
624 bool mergeAndFlushData()
625 {
626 auto checkIfNextFlushable = [this]() -> bool {
627 mNextFlushID++;
628 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
629 };
630
631 LOG(info) << "Launching merge kernel ";
632 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
633 if (!canflush) {
634 return false;
635 }
636 while (canflush == true) {
637 auto flusheventID = mNextFlushID;
638 LOG(info) << "Merge and flush event " << flusheventID;
639 auto iter = mSubEventInfoBuffer.find(flusheventID);
640 if (iter == mSubEventInfoBuffer.end()) {
641 LOG(error) << "No info/data found for event " << flusheventID;
642 if (!checkIfNextFlushable()) {
643 return false;
644 }
645 }
646
647 auto& subEventInfoList = (*iter).second;
648 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
649 LOG(error) << "No data entries found for event " << flusheventID;
650 if (!checkIfNextFlushable()) {
651 return false;
652 }
653 }
654
655 TStopwatch timer;
656 timer.Start();
657
658 // calculate trackoffsets
659 auto& confref = o2::conf::SimConfig::Instance();
660
661 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
662 std::vector<int> trackoffsets;
663 // collecting primary particles in each subevent (data arrival id)
664 std::vector<int> nprimaries;
665 // mapping of id to actual sub-event id (or part)
666 std::vector<int> nsubevents;
667
668 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
669
670 // the MC labels (trackID) for hits
671 for (auto info : subEventInfoList) {
672 assert(info->npersistenttracks >= 0);
673 trackoffsets.emplace_back(info->npersistenttracks);
674 nprimaries.emplace_back(info->nprimarytracks);
675 nsubevents.emplace_back(info->part);
676 if (eventheader == nullptr) {
677 eventheader = &info->mMCEventHeader;
678 } else {
679 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
680 }
681 }
682
683 // now see which events can be discarded in any case due to no hits
684 if (confref.isFilterOutNoHitEvents()) {
685 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
686 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
687 cleanEvent(flusheventID);
688 if (!checkIfNextFlushable()) {
689 return true;
690 }
691 }
692 }
693
694 // attention: We need to make sure that we write everything in the same event order
695 // but iteration over keys of a standard map in C++ is ordered
696
697 // b) merge the general data
698 //
699 // for MCTrack remap the motherIds and merge at the same go
700 const auto entries = subEventInfoList.size();
701 std::vector<int> subevOrdered((int)(nsubevents.size()));
702 for (int entry = entries - 1; entry >= 0; --entry) {
703 subevOrdered[nsubevents[entry] - 1] = entry;
704 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
705 }
706
707 // This is a hook that collects some useful statistics/properties on the event
708 // for use by other components;
709 // Properties are attached making use of the extensible "Info" feature which is already
710 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
711 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
712 // NOTE: This function might also be called directly in the primary server!?
713 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
714 int eta1Point2Counter = 0;
715 int eta1Point0Counter = 0;
716 int eta0Point8Counter = 0;
717 int eta1Point2CounterPi = 0;
718 int eta1Point0CounterPi = 0;
719 int eta0Point8CounterPi = 0;
720 int prims = 0;
721 for (auto& tr : tracks) {
722 if (tr.isPrimary()) {
723 prims++;
724 const auto eta = tr.GetEta();
725 if (eta < 1.2) {
726 eta1Point2Counter++;
727 if (std::abs(tr.GetPdgCode()) == 211) {
728 eta1Point2CounterPi++;
729 }
730 }
731 if (eta < 1.0) {
732 eta1Point0Counter++;
733 if (std::abs(tr.GetPdgCode()) == 211) {
734 eta1Point0CounterPi++;
735 }
736 }
737 if (eta < 0.8) {
738 eta0Point8Counter++;
739 if (std::abs(tr.GetPdgCode()) == 211) {
740 eta0Point8CounterPi++;
741 }
742 }
743 } else {
744 break; // track layout is such that all prims are first anyway
745 }
746 }
747 // attach these properties to eventheader
748 // we only need to make the names standard
749 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
750 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
751 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
752 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
753 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
754 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
755 eventheader->putInfo("prims_total", prims);
756 };
757 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
758
759 if (mOutTree) {
760 // adjusting and merging track references
761 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
762
763 // write MC event headers
764 {
765 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
766 headerbr->SetAddress(&eventheader);
767 headerbr->Fill();
768 headerbr->ResetAddress();
769 }
770
771 {
772 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
773 headerbr->SetAddress(&eventheader);
774 headerbr->Fill();
775 headerbr->ResetAddress();
776 }
777 }
778
779 // c) do the merge procedure for all hits ... delegate this to detector specific functions
780 // since they know about types; number of branches; etc.
781 // this will also fix the trackIDs inside the hits
782 for (int id = 0; id < mDetectorInstances.size(); ++id) {
783 auto& det = mDetectorInstances[id];
784 if (det) {
785 auto hittree = mDetectorToTTreeMap[id];
786 if (hittree) {
787 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
788 hittree->SetEntries(hittree->GetEntries() + 1);
789 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
790 }
791 }
792 }
793
794 // increase the entry count in the tree
795 if (mOutTree) {
796 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
797 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
798 }
799 if (mMCHeaderTree) {
800 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
801 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
802 }
803
804 cleanEvent(flusheventID);
805 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
806 if (!checkIfNextFlushable()) {
807 break;
808 }
809 } // end while
810 if (mWriteToDisc && mOutFile) {
811 LOG(info) << "Writing TTrees";
812 mOutFile->Write("", TObject::kOverwrite);
813 for (int id = 0; id < mDetectorInstances.size(); ++id) {
814 auto& det = mDetectorInstances[id];
815 if (det && mDetectorOutFiles[id]) {
816 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
817 }
818 }
819 if (mMCHeaderOnlyOutFile) {
820 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
821 }
822 }
823 return true;
824 }
825
826 std::map<uint32_t, uint32_t> mPartsCheckSum;
827 std::string mOutFileName;
828
829 // structures for the final flush
830 TFile* mOutFile;
831 TTree* mOutTree;
832 TFile* mMCHeaderOnlyOutFile;
833 TTree* mMCHeaderTree;
834
835 template <class K, class V>
836 using Hashtable = tbb::concurrent_unordered_map<K, V>;
837 Hashtable<int, TFile*> mDetectorOutFiles;
838 Hashtable<int, TTree*> mDetectorToTTreeMap;
839
840 // intermediate structures to collect data per event
841 std::thread mMergerIOThread;
842 bool mergingInProgress = false;
843
844 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
845 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
846 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
847 Hashtable<int, bool> mFlushableEvents;
848
849 int mEventChecksum = 0;
850 int mNExpectedEvents = 0;
851 int mNextFlushID = 1;
852 TStopwatch mTimer;
853
854 bool mAsService = false;
855 bool mForwardKine = true;
856 bool mWriteToDisc = true;
857
858 int mPipeToDriver = -1;
859
860 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
861
862 // output folder configuration
863 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
864 std::string mCurrentOutputDir; // current output folder asked
865
866 // channel to PUB status messages to outside subscribers
867 fair::mq::Channel mPubChannel;
868
869 // init detector instances
870 void initDetInstances();
871 void initHitFiles(std::string prefix);
872};
873
874void O2HitMerger::initHitFiles(std::string prefix)
875{
877
878 // a little helper lambda
879 auto isActivated = [](std::string s) -> bool {
880 // access user configuration for list of wanted modules
882 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
883 return active; };
884
885 for (int i = DetID::First; i <= DetID::Last; ++i) {
886 if (!isActivated(DetID::getName(i))) {
887 continue;
888 }
889 // init the detector specific output files
890 initHitTreeAndOutFile(prefix, i);
891 }
892}
893
894// init detector instances used to write hit data to a TTree
895void O2HitMerger::initDetInstances()
896{
898
899 // a little helper lambda
900 auto isActivated = [](std::string s) -> bool {
901 // access user configuration for list of wanted modules
903 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
904 return active; };
905
906 mDetectorInstances.resize(DetID::nDetectors);
907 // like a factory of detector objects
908
909 int counter = 0;
910 for (int i = DetID::First; i <= DetID::Last; ++i) {
911 if (!isActivated(DetID::getName(i))) {
912 continue;
913 }
914
915 if (i == DetID::TPC) {
916 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
917 counter++;
918 }
919 if (i == DetID::ITS) {
920 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
921 counter++;
922 }
923 if (i == DetID::MFT) {
924 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
925 counter++;
926 }
927 if (i == DetID::TRD) {
928 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
929 counter++;
930 }
931 if (i == DetID::PHS) {
932 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
933 counter++;
934 }
935 if (i == DetID::CPV) {
936 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
937 counter++;
938 }
939 if (i == DetID::EMC) {
940 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
941 counter++;
942 }
943 if (i == DetID::HMP) {
944 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
945 counter++;
946 }
947 if (i == DetID::TOF) {
948 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
949 counter++;
950 }
951 if (i == DetID::FT0) {
952 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
953 counter++;
954 }
955 if (i == DetID::FV0) {
956 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
957 counter++;
958 }
959 if (i == DetID::FDD) {
960 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
961 counter++;
962 }
963 if (i == DetID::MCH) {
964 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
965 counter++;
966 }
967 if (i == DetID::MID) {
968 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
969 counter++;
970 }
971 if (i == DetID::ZDC) {
972 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
973 counter++;
974 }
975 if (i == DetID::FOC) {
976 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, gSystem->ExpandPathName("$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Sheets.txt")));
977 counter++;
978 }
979#ifdef ENABLE_UPGRADES
980 if (i == DetID::IT3) {
981 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
982 counter++;
983 }
984 if (i == DetID::TRK) {
985 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
986 counter++;
987 }
988 if (i == DetID::FT3) {
989 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
990 counter++;
991 }
992 if (i == DetID::FCT) {
993 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
994 counter++;
995 }
996 if (i == DetID::TF3) {
997 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
998 counter++;
999 }
1000 if (i == DetID::RCH) {
1001 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1002 counter++;
1003 }
1004 if (i == DetID::MI3) {
1005 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1006 counter++;
1007 }
1008 if (i == DetID::ECL) {
1009 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1010 counter++;
1011 }
1012#endif
1013 }
1014 if (counter != DetID::nDetectors) {
1015 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1016 }
1017}
1018
1019} // namespace devices
1020} // namespace o2
1021
1022#endif
Definition of the DescriptorInnerBarrelITS3 class.
Definition of the Names Generator class.
Definition of the Stack class.
Definition of the Detector class.
Definition of the Detector class.
Definition of the FV0 detector class.
int32_t i
Definition of the Detector class.
Definition of the Detector class.
bool waitForControlInput(int workerID)
TBranch * ptr
Definition of the Detector class.
Definition of the Detector class.
StringRef key
void SetMotherTrackId(Int_t id)
Modifiers.
Definition MCTrack.h:166
Int_t getMotherTrackId() const
Definition MCTrack.h:73
static std::string getHitsFileName(DId d, const std::string_view prefix=STANDARDSIMPREFIX)
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static std::string getMCHeadersFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:52
bool writeToDisc() const
Definition SimConfig.h:179
bool asService() const
Definition SimConfig.h:173
unsigned int getNEvents() const
Definition SimConfig.h:156
bool forwardKine() const
Definition SimConfig.h:178
static SimConfig & Instance()
Definition SimConfig.h:111
std::vector< std::string > const & getReadoutDetectors() const
Definition SimConfig.h:140
void putInfo(std::string const &key, T const &value)
void add(MCEventStats const &other)
merge from another object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:94
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:96
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr ID FOC
Definition DetID.h:80
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
O2HitMerger()
Default constructor.
~O2HitMerger() override
Default destructor.
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
static ShmManager & Instance()
Definition ShmManager.h:61
const GLfloat * m
Definition glcorearb.h:4066
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
TBranch * getOrMakeBranch(TTree &tree, const char *brname, T *ptr)
Definition Detector.h:281
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
struct o2::upgrades_utils::@454 tracks
structure to keep trigger-related info
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
o2::dataformats::MCEventHeader mMCEventHeader
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"