Project
Loading...
Searching...
No Matches
O2HitMerger.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_HITMERGER_H_
15#define ALICEO2_DEVICES_HITMERGER_H_
16
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include "TFile.h"
30#include "TMemFile.h"
31#include "TTree.h"
32#include "TROOT.h"
33#include <memory>
34#include <TMessage.h>
35#include <fairmq/Parts.h>
36#include <ctime>
37#include <TStopwatch.h>
38#include <sstream>
39#include <cassert>
40#include "FairSystemInfo.h"
41
42#include "O2HitMerger.h"
43#include "O2SimDevice.h"
48#include <TOFSimulation/Detector.h>
60
62#include <map>
63#include <vector>
64#include <list>
65#include <csignal>
66#include <mutex>
67#include <filesystem>
68#include <functional>
69
71
72#ifdef ENABLE_UPGRADES
82#endif
83
84#include <tbb/concurrent_unordered_map.h>
85
86namespace o2
87{
88namespace devices
89{
90
91// Function communicating to primary particle server that it is now safe to shutdown.
92// From the perspective of o2-sim, this is the case when all configs have been propagated and the system
93// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers.
94bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel)
95{
96 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown));
97 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
98
99 int timeoutinMS = 100;
100 if (channel.Send(request, timeoutinMS) > 0) {
101 LOG(info) << "Sending Shutdown permission to particle server";
102 if (channel.Receive(reply, timeoutinMS) > 0) {
103 // the answer is a simple ack with a status code
104 LOG(info) << "Shutdown permission was acknowledged";
105 } else {
106 LOG(error) << "No answer received within " << timeoutinMS << "ms\n";
107 return false;
108 }
109 return true;
110 }
111 return false;
112}
113
115{
116
117 class TMessageWrapper : public TMessage
118 {
119 public:
120 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
121 ~TMessageWrapper() override = default;
122 };
123
124 public:
127 {
128 mTimer.Start();
129 mInitialOutputDir = std::filesystem::current_path().string();
130 mCurrentOutputDir = mInitialOutputDir;
131 }
132
134 ~O2HitMerger() override
135 {
136 FairSystemInfo sysinfo;
137 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
138 mTimer.Continue();
139 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
140 << sysinfo.GetMaxMemory() << " MB\n";
141 }
142
143 private:
145 void InitTask() final
146 {
147 LOG(info) << "INIT HIT MERGER";
148 ROOT::EnableThreadSafety();
149
150 std::string outfilename("o2sim_merged_hits.root"); // default name
151 // query the sim config ... which is used to extract the filenames
152 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
154 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
155 } else {
156 // we didn't manage to get a configuration --> better to fail
157 LOG(fatal) << "No configuration received. Aborting";
158 }
162
163 mOutFileName = outfilename.c_str();
164 if (mWriteToDisc) {
165 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
166 mOutTree = new TTree("o2sim", "o2sim");
167 mOutTree->SetDirectory(mOutFile);
168
169 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
170 mMCHeaderTree = new TTree("o2sim", "o2sim");
171 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
172 }
173 // detectors init only once
174 if (mDetectorInstances.size() == 0) {
175 initDetInstances();
176 // has to be after init of Detectors
178 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
179 }
180
181 // init pipe
182 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
183 if (pipeenv) {
184 mPipeToDriver = atoi(pipeenv);
185 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
186 } else {
187 LOG(warning) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
188 }
189
190 // if no data to expect we shut down the device NOW since it would otherwise hang
191 if (mNExpectedEvents == 0) {
192 if (mAsService) {
193 waitForControlInput();
194 } else {
195 LOG(info) << "NOT EXPECTING ANY DATA; SHUTTING DOWN";
196 raise(SIGINT);
197 }
198 }
199 }
200
201 bool setWorkingDirectory(std::string const& dir)
202 {
203 namespace fs = std::filesystem;
204
205 // sets the output directory where simulation files are produced
206 // and creates it when it doesn't exist already
207
208 // 2 possibilities:
209 // a) dir is relative dir. Then we interpret it as relative to the initial
210 // base directory
211 // b) or dir is itself absolut.
212 try {
213 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
214 if (!dir.empty()) {
215 auto absolutePath = fs::absolute(fs::path(dir));
216 if (!fs::exists(absolutePath)) {
217 if (!fs::create_directory(absolutePath)) {
218 LOG(error) << "Could not create directory " << absolutePath.string();
219 return false;
220 }
221 }
222 // set the current path
223 fs::current_path(absolutePath.string().c_str());
224 mCurrentOutputDir = fs::current_path().string();
225 }
226 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
227 } catch (std::exception e) {
228 LOG(error) << " could not change path to " << dir;
229 }
230 return true;
231 }
232
233 // function for intermediate/on-the-fly reinitializations
234 bool ReInit(o2::conf::SimReconfigData const& reconfig)
235 {
236 if (reconfig.stop) {
237 return false;
238 }
239 if (!setWorkingDirectory(reconfig.outputDir)) {
240 return false;
241 }
242
243 std::string outfilename("o2sim_merged_hits.root"); // default name
245 mNExpectedEvents = reconfig.nEvents;
246 mOutFileName = outfilename.c_str();
247 if (mWriteToDisc) {
248 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
249 mOutTree = new TTree("o2sim", "o2sim");
250 mOutTree->SetDirectory(mOutFile);
251
252 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
253 mMCHeaderTree = new TTree("o2sim", "o2sim");
254 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
255 }
256 // reinit detectorInstance files (also make sure they are closed before continuing)
257 initHitFiles(reconfig.outputPrefix);
258
259 // clear "counter" datastructures
260 mPartsCheckSum.clear();
261 mEventChecksum = 0;
262
263 // clear collector datastructures
264 mMCTrackBuffer.clear();
265 mTrackRefBuffer.clear();
266 mSubEventInfoBuffer.clear();
267 mFlushableEvents.clear();
268 mNextFlushID = 1;
269
270 return true;
271 }
272
273 template <typename T, typename V>
274 V insertAdd(std::map<T, V>& m, T const& key, V value)
275 {
276 const auto iter = m.find(key);
277 V accum{0};
278 if (iter != m.end()) {
279 iter->second += value;
280 accum = iter->second;
281 } else {
282 m.insert(std::make_pair(key, value));
283 accum = value;
284 }
285 return accum;
286 }
287
288 template <typename T>
289 bool isDataComplete(T checksum, T nparts)
290 {
291 return checksum == nparts * (nparts + 1) / 2;
292 }
293
294 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
295 {
296 auto detIDmessage = std::move(data.At(index++));
297 // this should be a detector ID
298 if (detIDmessage->GetSize() == 4) {
299 auto ptr = (int*)detIDmessage->GetData();
301 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
302 << data.At(index)->GetSize() / 1024. / 1024.;
303
304 // get the detector that can interpret it
305 auto detector = mDetectorInstances[id].get();
306 if (detector) {
307 detector->collectHits(eventID, data, index);
308 }
309 }
310 }
311
312 template <typename T, typename BT>
313 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
314 {
315 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
316 if (buffer.find(eventID) == buffer.end()) {
317 buffer[eventID] = typename BT::mapped_type();
318 }
319 buffer[eventID].push_back(decodeddata);
320 // delete decodeddata; --> we store the pointers
321 index++;
322 }
323
324 // fills a special branch of SubEventInfos in order to keep
325 // track of which entry corresponds to which event etc.
326 // also creates the MCEventHeader branch expected for physics analysis
327 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
328 {
329 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
330 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
331 }
332 mSubEventInfoBuffer[info.eventID].push_back(&info);
333 }
334
336 {
337 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
338
339 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
340 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
341 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
342 LOG(info) << "SOCKETNAME " << controlsocketname;
343 channel.Connect(std::string(controlsocketname));
344 channel.Validate();
345 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
346
347 LOG(info) << "WAITING FOR INPUT";
348 if (channel.Receive(reply) > 0) {
349 auto data = reply->GetData();
350 auto size = reply->GetSize();
351
352 std::string command(reinterpret_cast<char const*>(data), size);
353 LOG(info) << "message: " << command;
354
356 o2::conf::parseSimReconfigFromString(command, reconfig);
357 return ReInit(reconfig);
358 } else {
359 LOG(info) << "NOTHING RECEIVED";
360 }
361 return true;
362 }
363
364 bool ConditionalRun() override
365 {
366 auto& channel = GetChannels().at("simdata").at(0);
367 fair::mq::Parts request;
368 auto bytes = channel.Receive(request);
369 if (bytes < 0) {
370 LOG(error) << "Some error occurred on socket during receive on sim data";
371 return true; // keep going
372 }
373 TStopwatch timer;
374 timer.Start();
375 auto more = handleSimData(request, 0);
376 LOG(info) << "HitMerger processing took " << timer.RealTime();
377 if (!more && mAsService) {
378 LOG(info) << " CONTROL ";
379 // if we are done treating data we may go back to init phase
380 // for the next batch
381 return waitForControlInput();
382 }
383
384 static bool initAcknowledged = false;
385 if (!initAcknowledged) {
386 primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0));
387 initAcknowledged = true;
388 }
389
390 return more;
391 }
392
393 bool handleSimData(fair::mq::Parts& data, int /*index*/)
394 {
395 bool expectmore = true;
396 int index = 0;
397 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
398 o2::data::SubEventInfo& info = *infoptr;
399 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
400
401 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
402
403 fillSubEventInfoEntry(info);
404 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
405 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
406 while (index < data.Size()) {
407 consumeHits(info.eventID, data, index);
408 }
409
410 if (isDataComplete<uint32_t>(accum, info.nparts)) {
411 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
412 mFlushableEvents[info.eventID] = true;
413
414 // check if previous flush finished
415 // start merging only when no merging currently happening
416 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
417 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
418 if (!mergingInProgress) {
419 if (mMergerIOThread.joinable()) {
420 mMergerIOThread.join();
421 }
422 // start hit merging and flushing in a separate thread in order not to block
423 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
424 }
425
426 mEventChecksum += info.eventID;
427 // we also need to check if we have all events
428 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
429 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
430
431 // flush remaining data and close file
432 if (mMergerIOThread.joinable()) {
433 mMergerIOThread.join();
434 }
435 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
436 if (mMergerIOThread.joinable()) {
437 mMergerIOThread.join();
438 }
439
440 expectmore = false;
441 }
442
443 if (mPipeToDriver != -1) {
444 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
445 LOG(error) << "FAILED WRITING TO PIPE";
446 };
447 }
448 }
449 return expectmore;
450 }
451
452 void cleanEvent(int eventID)
453 {
454 // cleanup intermediate per-Event buffers
455 }
456
457 template <typename T>
458 void backInsert(T const& from, T& to)
459 {
460 std::copy(from.begin(), from.end(), std::back_inserter(to));
461 }
462
463 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
464 {
465 // avoid doing this for trivial cases
466 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
467 auto targetdata = std::make_unique<std::vector<MCTrack>>();
468
469 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
470 const auto entries = vectorOfSubEventMCTracks.size();
471
472 if (entries > 1) {
473 //
474 // loop over subevents to store the primary events
475 //
476 int nprimTot = 0;
477 for (int entry = entries - 1; entry >= 0; --entry) {
478 int index = nsubevents[entry];
479 nprimTot += nprimaries[index];
480 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
481 for (int i = 0; i < nprimaries[index]; i++) {
482 auto& track = (*vectorOfSubEventMCTracks[index])[i];
483 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
484 track.SetFirstDaughterTrackId(-1);
485 track.SetLastDaughterTrackId(-1);
486 }
487 targetdata->push_back(track);
488 }
489 }
490 //
491 // loop a second time to store the secondaries and fix the mother track IDs
492 //
493 Int_t idelta1 = nprimTot;
494 Int_t idelta0 = 0;
495 for (int entry = entries - 1; entry >= 0; --entry) {
496 int index = nsubevents[entry];
497
498 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
499 // we need to fetch the right mctracks here!!
500 Int_t npart = (int)(subEventTracks.size());
501 Int_t nprim = nprimaries[index];
502 idelta1 -= nprim;
503
504 for (Int_t i = nprim; i < npart; i++) {
505 auto& track = subEventTracks[i];
506 Int_t cId = track.getMotherTrackId();
507 if (cId >= nprim) {
508 cId += idelta1;
509 } else {
510 cId += idelta0;
511 }
512 track.SetMotherTrackId(cId);
513 track.SetFirstDaughterTrackId(-1);
514
515 Int_t hwm = (int)(targetdata->size());
516 auto& mother = (*targetdata)[cId];
517 if (mother.getFirstDaughterTrackId() == -1) {
518 mother.SetFirstDaughterTrackId(hwm);
519 }
520 mother.SetLastDaughterTrackId(hwm);
521
522 targetdata->push_back(track);
523 }
524 idelta0 += nprim;
525 idelta1 += npart;
526 }
527 }
528 //
529 // write to output
530 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
531
532 // we give the possibility to produce some MC track statistics
533 // to be saved as part of the MCHeader structure
534 tracks_analysis_hook(*filladdr);
535
536 if (mWriteToDisc && target) {
537 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
538 targetbr->SetAddress(&filladdr);
539 targetbr->Fill();
540 targetbr->ResetAddress();
541 }
542 // forwarding the track data to other consumers (pub/sub)
543 if (mForwardKine) {
544 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
545 auto& channel = GetChannels().at("kineforward").at(0);
546 TMessage* tmsg = new TMessage(kMESS_OBJECT);
547 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
548 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
549 tmsg = new TMessage(kMESS_OBJECT);
550 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
551 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
552 fair::mq::Parts reply;
553 reply.AddPart(std::move(headermessage));
554 reply.AddPart(std::move(trackmessage));
555 channel.Send(reply);
556 LOG(info) << "Forward publish MC tracks on channel";
557 }
558
559 // cleanup buffered data
560 for (auto ptr : vectorOfSubEventMCTracks) {
561 delete ptr; // avoid this by using unique ptr
562 }
563 }
564
565 template <typename T, typename M>
566 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
567 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
568 {
569 //
570 // Remap the mother track IDs by adding an offset.
571 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
572 // This method is called by O2HitMerger::mergeAndFlushData(int)
573 //
574 T* incomingdata = nullptr;
575 std::unique_ptr<T> targetdata(nullptr);
576 auto& vectorOfT = mapOfVectorOfTs[eventID];
577 const auto entries = vectorOfT.size();
578
579 if (entries == 1) {
580 // nothing to do in case there is only one entry
581 incomingdata = vectorOfT[0];
582 } else {
583 targetdata = std::make_unique<T>();
584 // loop over subevents
585 Int_t nprimTot = 0;
586 for (int entry = 0; entry < entries; entry++) {
587 nprimTot += nprimaries[entry];
588 }
589 Int_t idelta0 = 0;
590 Int_t idelta1 = nprimTot;
591 for (int entry = entries - 1; entry >= 0; --entry) {
592 Int_t index = subevOrdered[entry];
593 Int_t nprim = nprimaries[index];
594 incomingdata = vectorOfT[index];
595 idelta1 -= nprim;
596 for (auto& data : *incomingdata) {
597 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
598 targetdata->push_back(data);
599 }
600 idelta0 += nprim;
601 idelta1 += trackoffsets[index];
602 }
603 }
604 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
605 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
606 targetbr->SetAddress(&dataaddr);
607 targetbr->Fill();
608 targetbr->ResetAddress();
609
610 // cleanup mem
611 for (auto ptr : vectorOfT) {
612 delete ptr; // avoid this by using unique ptr
613 }
614 }
615
616 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
617 {
618 Int_t cId = track.getMotherTrackId();
619 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
620 if (cId != -1) {
621 track.SetMotherTrackId(cId + ioffset);
622 }
623 }
624
625 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
626 {
627 Int_t cId = ref.getTrackID();
628 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
629 ref.setTrackID(cId + ioffset);
630 }
631
632 void initHitTreeAndOutFile(std::string prefix, int detID)
633 {
635 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
636 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
637 mDetectorOutFiles[detID]->Close();
638 delete mDetectorOutFiles[detID];
639 }
640 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
641 if (mWriteToDisc) {
642 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
643 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
644 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
645 } else {
646 mDetectorOutFiles[detID] = nullptr;
647 mDetectorToTTreeMap[detID] = nullptr;
648 }
649 }
650
651 // This method goes over the buffers containing data for a given event; potentially merges
652 // them and flushes into the actual output file.
653 // The method can be called asynchronously to data collection
654 bool mergeAndFlushData()
655 {
656 auto checkIfNextFlushable = [this]() -> bool {
657 mNextFlushID++;
658 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
659 };
660
661 LOG(info) << "Launching merge kernel ";
662 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
663 if (!canflush) {
664 return false;
665 }
666 while (canflush == true) {
667 auto flusheventID = mNextFlushID;
668 LOG(info) << "Merge and flush event " << flusheventID;
669 auto iter = mSubEventInfoBuffer.find(flusheventID);
670 if (iter == mSubEventInfoBuffer.end()) {
671 LOG(error) << "No info/data found for event " << flusheventID;
672 if (!checkIfNextFlushable()) {
673 return false;
674 }
675 }
676
677 auto& subEventInfoList = (*iter).second;
678 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
679 LOG(error) << "No data entries found for event " << flusheventID;
680 if (!checkIfNextFlushable()) {
681 return false;
682 }
683 }
684
685 TStopwatch timer;
686 timer.Start();
687
688 // calculate trackoffsets
689 auto& confref = o2::conf::SimConfig::Instance();
690
691 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
692 std::vector<int> trackoffsets;
693 // collecting primary particles in each subevent (data arrival id)
694 std::vector<int> nprimaries;
695 // mapping of id to actual sub-event id (or part)
696 std::vector<int> nsubevents;
697
698 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
699
700 // the MC labels (trackID) for hits
701 for (auto info : subEventInfoList) {
702 assert(info->npersistenttracks >= 0);
703 trackoffsets.emplace_back(info->npersistenttracks);
704 nprimaries.emplace_back(info->nprimarytracks);
705 nsubevents.emplace_back(info->part);
706 if (eventheader == nullptr) {
707 eventheader = &info->mMCEventHeader;
708 } else {
709 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
710 }
711 }
712
713 // now see which events can be discarded in any case due to no hits
714 if (confref.isFilterOutNoHitEvents()) {
715 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
716 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
717 cleanEvent(flusheventID);
718 if (!checkIfNextFlushable()) {
719 return true;
720 }
721 }
722 }
723
724 // attention: We need to make sure that we write everything in the same event order
725 // but iteration over keys of a standard map in C++ is ordered
726
727 // b) merge the general data
728 //
729 // for MCTrack remap the motherIds and merge at the same go
730 const auto entries = subEventInfoList.size();
731 std::vector<int> subevOrdered((int)(nsubevents.size()));
732 for (int entry = entries - 1; entry >= 0; --entry) {
733 subevOrdered[nsubevents[entry] - 1] = entry;
734 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
735 }
736
737 // This is a hook that collects some useful statistics/properties on the event
738 // for use by other components;
739 // Properties are attached making use of the extensible "Info" feature which is already
740 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
741 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
742 // NOTE: This function might also be called directly in the primary server!?
743 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
744 int eta1Point2Counter = 0;
745 int eta1Point0Counter = 0;
746 int eta0Point8Counter = 0;
747 int eta1Point2CounterPi = 0;
748 int eta1Point0CounterPi = 0;
749 int eta0Point8CounterPi = 0;
750 int prims = 0;
751 for (auto& tr : tracks) {
752 if (tr.isPrimary()) {
753 prims++;
754 const auto eta = tr.GetEta();
755 if (eta < 1.2) {
756 eta1Point2Counter++;
757 if (std::abs(tr.GetPdgCode()) == 211) {
758 eta1Point2CounterPi++;
759 }
760 }
761 if (eta < 1.0) {
762 eta1Point0Counter++;
763 if (std::abs(tr.GetPdgCode()) == 211) {
764 eta1Point0CounterPi++;
765 }
766 }
767 if (eta < 0.8) {
768 eta0Point8Counter++;
769 if (std::abs(tr.GetPdgCode()) == 211) {
770 eta0Point8CounterPi++;
771 }
772 }
773 } else {
774 break; // track layout is such that all prims are first anyway
775 }
776 }
777 // attach these properties to eventheader
778 // we only need to make the names standard
779 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
780 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
781 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
782 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
783 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
784 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
785 eventheader->putInfo("prims_total", prims);
786 };
787 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
788
789 if (mOutTree) {
790 // adjusting and merging track references
791 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
792
793 // write MC event headers
794 {
795 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
796 headerbr->SetAddress(&eventheader);
797 headerbr->Fill();
798 headerbr->ResetAddress();
799 }
800
801 {
802 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
803 headerbr->SetAddress(&eventheader);
804 headerbr->Fill();
805 headerbr->ResetAddress();
806 }
807 }
808
809 // c) do the merge procedure for all hits ... delegate this to detector specific functions
810 // since they know about types; number of branches; etc.
811 // this will also fix the trackIDs inside the hits
812 for (int id = 0; id < mDetectorInstances.size(); ++id) {
813 auto& det = mDetectorInstances[id];
814 if (det) {
815 auto hittree = mDetectorToTTreeMap[id];
816 if (hittree) {
817 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
818 hittree->SetEntries(hittree->GetEntries() + 1);
819 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
820 }
821 }
822 }
823
824 // increase the entry count in the tree
825 if (mOutTree) {
826 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
827 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
828 }
829 if (mMCHeaderTree) {
830 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
831 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
832 }
833
834 cleanEvent(flusheventID);
835 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
836 if (!checkIfNextFlushable()) {
837 break;
838 }
839 } // end while
840 if (mWriteToDisc && mOutFile) {
841 LOG(info) << "Writing TTrees";
842 mOutFile->Write("", TObject::kOverwrite);
843 for (int id = 0; id < mDetectorInstances.size(); ++id) {
844 auto& det = mDetectorInstances[id];
845 if (det && mDetectorOutFiles[id]) {
846 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
847 }
848 }
849 if (mMCHeaderOnlyOutFile) {
850 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
851 }
852 }
853 return true;
854 }
855
856 std::map<uint32_t, uint32_t> mPartsCheckSum;
857 std::string mOutFileName;
858
859 // structures for the final flush
860 TFile* mOutFile;
861 TTree* mOutTree;
862 TFile* mMCHeaderOnlyOutFile;
863 TTree* mMCHeaderTree;
864
865 template <class K, class V>
866 using Hashtable = tbb::concurrent_unordered_map<K, V>;
867 Hashtable<int, TFile*> mDetectorOutFiles;
868 Hashtable<int, TTree*> mDetectorToTTreeMap;
869
870 // intermediate structures to collect data per event
871 std::thread mMergerIOThread;
872 bool mergingInProgress = false;
873
874 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
875 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
876 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
877 Hashtable<int, bool> mFlushableEvents;
878
879 int mEventChecksum = 0;
880 int mNExpectedEvents = 0;
881 int mNextFlushID = 1;
882 TStopwatch mTimer;
883
884 bool mAsService = false;
885 bool mForwardKine = true;
886 bool mWriteToDisc = true;
887
888 int mPipeToDriver = -1;
889
890 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
891
892 // output folder configuration
893 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
894 std::string mCurrentOutputDir; // current output folder asked
895
896 // channel to PUB status messages to outside subscribers
897 fair::mq::Channel mPubChannel;
898
899 // init detector instances
900 void initDetInstances();
901 void initHitFiles(std::string prefix);
902};
903
904void O2HitMerger::initHitFiles(std::string prefix)
905{
907
908 // a little helper lambda
909 auto isActivated = [](std::string s) -> bool {
910 // access user configuration for list of wanted modules
912 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
913 return active; };
914
915 for (int i = DetID::First; i <= DetID::Last; ++i) {
916 if (!isActivated(DetID::getName(i))) {
917 continue;
918 }
919 // init the detector specific output files
920 initHitTreeAndOutFile(prefix, i);
921 }
922}
923
924// init detector instances used to write hit data to a TTree
925void O2HitMerger::initDetInstances()
926{
928
929 // a little helper lambda
930 auto isActivated = [](std::string s) -> bool {
931 // access user configuration for list of wanted modules
933 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
934 return active; };
935
936 mDetectorInstances.resize(DetID::nDetectors);
937 // like a factory of detector objects
938
939 int counter = 0;
940 for (int i = DetID::First; i <= DetID::Last; ++i) {
941 if (!isActivated(DetID::getName(i))) {
942 continue;
943 }
944
945 if (i == DetID::TPC) {
946 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
947 counter++;
948 }
949 if (i == DetID::ITS) {
950 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
951 counter++;
952 }
953 if (i == DetID::MFT) {
954 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
955 counter++;
956 }
957 if (i == DetID::TRD) {
958 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
959 counter++;
960 }
961 if (i == DetID::PHS) {
962 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
963 counter++;
964 }
965 if (i == DetID::CPV) {
966 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
967 counter++;
968 }
969 if (i == DetID::EMC) {
970 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
971 counter++;
972 }
973 if (i == DetID::HMP) {
974 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
975 counter++;
976 }
977 if (i == DetID::TOF) {
978 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
979 counter++;
980 }
981 if (i == DetID::FT0) {
982 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
983 counter++;
984 }
985 if (i == DetID::FV0) {
986 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
987 counter++;
988 }
989 if (i == DetID::FDD) {
990 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
991 counter++;
992 }
993 if (i == DetID::MCH) {
994 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
995 counter++;
996 }
997 if (i == DetID::MID) {
998 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
999 counter++;
1000 }
1001 if (i == DetID::ZDC) {
1002 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
1003 counter++;
1004 }
1005 if (i == DetID::FOC) {
1006 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, gSystem->ExpandPathName("$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Sheets.txt")));
1007 counter++;
1008 }
1009#ifdef ENABLE_UPGRADES
1010 if (i == DetID::IT3) {
1011 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
1012 counter++;
1013 }
1014 if (i == DetID::TRK) {
1015 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
1016 counter++;
1017 }
1018 if (i == DetID::FT3) {
1019 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
1020 counter++;
1021 }
1022 if (i == DetID::FCT) {
1023 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
1024 counter++;
1025 }
1026 if (i == DetID::TF3) {
1027 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
1028 counter++;
1029 }
1030 if (i == DetID::RCH) {
1031 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1032 counter++;
1033 }
1034 if (i == DetID::MI3) {
1035 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1036 counter++;
1037 }
1038 if (i == DetID::ECL) {
1039 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1040 counter++;
1041 }
1042 if (i == DetID::FD3) {
1043 mDetectorInstances[i] = std::move(std::make_unique<o2::fd3::Detector>(true));
1044 counter++;
1045 }
1046#endif
1047 }
1048 if (counter != DetID::nDetectors) {
1049 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1050 }
1051}
1052
1053} // namespace devices
1054} // namespace o2
1055
1056#endif
Definition of the DescriptorInnerBarrelITS3 class.
Definition of the Names Generator class.
Definition of the Stack class.
Definition of the Detector class.
Definition of the Detector class.
Definition of the FV0 detector class.
int32_t i
Definition of the Detector class.
Definition of the Detector class.
bool waitForControlInput(int workerID)
TBranch * ptr
ECal geometry creation and hit processing.
Definition of the Detector class.
Definition of the Detector class.
Definition of the Detector class.
StringRef key
void SetMotherTrackId(Int_t id)
Modifiers.
Definition MCTrack.h:166
Int_t getMotherTrackId() const
Definition MCTrack.h:73
static std::string getHitsFileName(DId d, const std::string_view prefix=STANDARDSIMPREFIX)
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static std::string getMCHeadersFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:52
bool writeToDisc() const
Definition SimConfig.h:179
bool asService() const
Definition SimConfig.h:173
unsigned int getNEvents() const
Definition SimConfig.h:156
bool forwardKine() const
Definition SimConfig.h:178
static SimConfig & Instance()
Definition SimConfig.h:111
std::vector< std::string > const & getReadoutDetectors() const
Definition SimConfig.h:140
void putInfo(std::string const &key, T const &value)
void add(MCEventStats const &other)
merge from another object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:146
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:95
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:97
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID FOC
Definition DetID.h:80
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
O2HitMerger()
Default constructor.
~O2HitMerger() override
Default destructor.
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
static ShmManager & Instance()
Definition ShmManager.h:61
const GLfloat * m
Definition glcorearb.h:4066
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
TBranch * getOrMakeBranch(TTree &tree, const char *brname, T *ptr)
Definition Detector.h:281
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
bool primaryServer_sendShutdownPermission(fair::mq::Channel &channel)
Definition O2HitMerger.h:94
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
struct o2::upgrades_utils::@454 tracks
structure to keep trigger-related info
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
o2::dataformats::MCEventHeader mMCEventHeader
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"