Project
Loading...
Searching...
No Matches
O2HitMerger.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_HITMERGER_H_
15#define ALICEO2_DEVICES_HITMERGER_H_
16
17#include <memory>
18#include <string>
19#include <type_traits>
20#include <fairmq/Message.h>
21#include <fairmq/Device.h>
22#include <fairlogger/Logger.h>
24#include <DetectorsBase/Stack.h>
28#include <gsl/gsl>
29#include <TFile.h>
30#include <TMemFile.h>
31#include <TTree.h>
32#include <TString.h>
33#include <TSystem.h>
34#include <TROOT.h>
35#include <memory>
36#include <TMessage.h>
37#include <fairmq/Parts.h>
38#include <ctime>
39#include <TStopwatch.h>
40#include <sstream>
41#include <cassert>
42#include "FairSystemInfo.h"
43
44#include "O2HitMerger.h"
45#include "O2SimDevice.h"
50#include <TOFSimulation/Detector.h>
62
64#include <map>
65#include <vector>
66#include <list>
67#include <csignal>
68#include <mutex>
69#include <filesystem>
70#include <functional>
71
73
74#ifdef ENABLE_UPGRADES
84#endif
85
86#include <tbb/concurrent_unordered_map.h>
87
88namespace o2
89{
90namespace devices
91{
92
93// Function communicating to primary particle server that it is now safe to shutdown.
94// From the perspective of o2-sim, this is the case when all configs have been propagated and the system
95// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers.
96bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel)
97{
98 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown));
99 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
100
101 int timeoutinMS = 100;
102 if (channel.Send(request, timeoutinMS) > 0) {
103 LOG(info) << "Sending Shutdown permission to particle server";
104 if (channel.Receive(reply, timeoutinMS) > 0) {
105 // the answer is a simple ack with a status code
106 LOG(info) << "Shutdown permission was acknowledged";
107 } else {
108 LOG(error) << "No answer received within " << timeoutinMS << "ms\n";
109 return false;
110 }
111 return true;
112 }
113 return false;
114}
115
117{
118
119 class TMessageWrapper : public TMessage
120 {
121 public:
122 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
123 ~TMessageWrapper() override = default;
124 };
125
126 public:
129 {
130 mTimer.Start();
131 mInitialOutputDir = std::filesystem::current_path().string();
132 mCurrentOutputDir = mInitialOutputDir;
133 }
134
136 ~O2HitMerger() override
137 {
138 FairSystemInfo sysinfo;
139 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
140 mTimer.Continue();
141 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
142 << sysinfo.GetMaxMemory() << " MB\n";
143 }
144
145 private:
147 void InitTask() final
148 {
149 LOG(info) << "INIT HIT MERGER";
150 ROOT::EnableThreadSafety();
151
152 std::string outfilename("o2sim_merged_hits.root"); // default name
153 // query the sim config ... which is used to extract the filenames
154 if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
156 mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
157 } else {
158 // we didn't manage to get a configuration --> better to fail
159 LOG(fatal) << "No configuration received. Aborting";
160 }
164
165 mOutFileName = outfilename.c_str();
166 if (mWriteToDisc) {
167 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
168 mOutTree = new TTree("o2sim", "o2sim");
169 mOutTree->SetDirectory(mOutFile);
170
171 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()).c_str(), "RECREATE");
172 mMCHeaderTree = new TTree("o2sim", "o2sim");
173 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
174 }
175 // detectors init only once
176 if (mDetectorInstances.size() == 0) {
177 initDetInstances();
178 // has to be after init of Detectors
180 initHitFiles(o2::conf::SimConfig::Instance().getOutPrefix());
181 }
182
183 // init pipe
184 auto pipeenv = getenv("ALICE_O2SIMMERGERTODRIVER_PIPE");
185 if (pipeenv) {
186 mPipeToDriver = atoi(pipeenv);
187 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
188 } else {
189 LOG(warning) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
190 }
191
192 // if no data to expect we shut down the device NOW since it would otherwise hang
193 if (mNExpectedEvents == 0) {
194 if (mAsService) {
195 waitForControlInput();
196 } else {
197 LOG(info) << "NOT EXPECTING ANY DATA; SHUTTING DOWN";
198 raise(SIGINT);
199 }
200 }
201 }
202
203 bool setWorkingDirectory(std::string const& dir)
204 {
205 namespace fs = std::filesystem;
206
207 // sets the output directory where simulation files are produced
208 // and creates it when it doesn't exist already
209
210 // 2 possibilities:
211 // a) dir is relative dir. Then we interpret it as relative to the initial
212 // base directory
213 // b) or dir is itself absolut.
214 try {
215 fs::current_path(fs::path(mInitialOutputDir)); // <--- to make sure relative start is always the same
216 if (!dir.empty()) {
217 auto absolutePath = fs::absolute(fs::path(dir));
218 if (!fs::exists(absolutePath)) {
219 if (!fs::create_directory(absolutePath)) {
220 LOG(error) << "Could not create directory " << absolutePath.string();
221 return false;
222 }
223 }
224 // set the current path
225 fs::current_path(absolutePath.string().c_str());
226 mCurrentOutputDir = fs::current_path().string();
227 }
228 LOG(info) << "FINAL PATH " << mCurrentOutputDir;
229 } catch (std::exception e) {
230 LOG(error) << " could not change path to " << dir;
231 }
232 return true;
233 }
234
235 // function for intermediate/on-the-fly reinitializations
236 bool ReInit(o2::conf::SimReconfigData const& reconfig)
237 {
238 if (reconfig.stop) {
239 return false;
240 }
241 if (!setWorkingDirectory(reconfig.outputDir)) {
242 return false;
243 }
244
245 std::string outfilename("o2sim_merged_hits.root"); // default name
247 mNExpectedEvents = reconfig.nEvents;
248 mOutFileName = outfilename.c_str();
249 if (mWriteToDisc) {
250 mOutFile = new TFile(outfilename.c_str(), "RECREATE");
251 mOutTree = new TTree("o2sim", "o2sim");
252 mOutTree->SetDirectory(mOutFile);
253
254 mMCHeaderOnlyOutFile = new TFile(o2::base::NameConf::getMCHeadersFileName(reconfig.outputPrefix).c_str(), "RECREATE");
255 mMCHeaderTree = new TTree("o2sim", "o2sim");
256 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
257 }
258 // reinit detectorInstance files (also make sure they are closed before continuing)
259 initHitFiles(reconfig.outputPrefix);
260
261 // clear "counter" datastructures
262 mPartsCheckSum.clear();
263 mEventChecksum = 0;
264
265 // clear collector datastructures
266 mMCTrackBuffer.clear();
267 mTrackRefBuffer.clear();
268 mSubEventInfoBuffer.clear();
269 mFlushableEvents.clear();
270 mNextFlushID = 1;
271
272 return true;
273 }
274
275 template <typename T, typename V>
276 V insertAdd(std::map<T, V>& m, T const& key, V value)
277 {
278 const auto iter = m.find(key);
279 V accum{0};
280 if (iter != m.end()) {
281 iter->second += value;
282 accum = iter->second;
283 } else {
284 m.insert(std::make_pair(key, value));
285 accum = value;
286 }
287 return accum;
288 }
289
290 template <typename T>
291 bool isDataComplete(T checksum, T nparts)
292 {
293 return checksum == nparts * (nparts + 1) / 2;
294 }
295
296 void consumeHits(int eventID, fair::mq::Parts& data, int& index)
297 {
298 auto detIDmessage = std::move(data.At(index++));
299 // this should be a detector ID
300 if (detIDmessage->GetSize() == 4) {
301 auto ptr = (int*)detIDmessage->GetData();
303 LOG(debug2) << "I1 " << ptr[0] << " NAME " << id.getName() << " MB "
304 << data.At(index)->GetSize() / 1024. / 1024.;
305
306 // get the detector that can interpret it
307 auto detector = mDetectorInstances[id].get();
308 if (detector) {
309 detector->collectHits(eventID, data, index);
310 }
311 }
312 }
313
314 template <typename T, typename BT>
315 void consumeData(int eventID, fair::mq::Parts& data, int& index, BT& buffer)
316 {
317 auto decodeddata = o2::base::decodeTMessage<T*>(data, index);
318 if (buffer.find(eventID) == buffer.end()) {
319 buffer[eventID] = typename BT::mapped_type();
320 }
321 buffer[eventID].push_back(decodeddata);
322 // delete decodeddata; --> we store the pointers
323 index++;
324 }
325
326 // fills a special branch of SubEventInfos in order to keep
327 // track of which entry corresponds to which event etc.
328 // also creates the MCEventHeader branch expected for physics analysis
329 void fillSubEventInfoEntry(o2::data::SubEventInfo& info)
330 {
331 if (mSubEventInfoBuffer.find(info.eventID) == mSubEventInfoBuffer.end()) {
332 mSubEventInfoBuffer[info.eventID] = std::list<o2::data::SubEventInfo*>();
333 }
334 mSubEventInfoBuffer[info.eventID].push_back(&info);
335 }
336
338 {
339 o2::simpubsub::publishMessage(GetChannels()["merger-notifications"].at(0), o2::simpubsub::simStatusString("MERGER", "STATUS", "AWAITING INPUT"));
340
341 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
342 auto channel = fair::mq::Channel{"o2sim-control", "sub", factory};
343 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
344 LOG(info) << "SOCKETNAME " << controlsocketname;
345 channel.Connect(std::string(controlsocketname));
346 channel.Validate();
347 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
348
349 LOG(info) << "WAITING FOR INPUT";
350 if (channel.Receive(reply) > 0) {
351 auto data = reply->GetData();
352 auto size = reply->GetSize();
353
354 std::string command(reinterpret_cast<char const*>(data), size);
355 LOG(info) << "message: " << command;
356
358 o2::conf::parseSimReconfigFromString(command, reconfig);
359 return ReInit(reconfig);
360 } else {
361 LOG(info) << "NOTHING RECEIVED";
362 }
363 return true;
364 }
365
366 bool ConditionalRun() override
367 {
368 auto& channel = GetChannels().at("simdata").at(0);
369 fair::mq::Parts request;
370 auto bytes = channel.Receive(request);
371 if (bytes < 0) {
372 LOG(error) << "Some error occurred on socket during receive on sim data";
373 return true; // keep going
374 }
375 TStopwatch timer;
376 timer.Start();
377 auto more = handleSimData(request, 0);
378 LOG(info) << "HitMerger processing took " << timer.RealTime();
379 if (!more && mAsService) {
380 LOG(info) << " CONTROL ";
381 // if we are done treating data we may go back to init phase
382 // for the next batch
383 return waitForControlInput();
384 }
385
386 static bool initAcknowledged = false;
387 if (!initAcknowledged) {
388 primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0));
389 initAcknowledged = true;
390 }
391
392 return more;
393 }
394
395 bool handleSimData(fair::mq::Parts& data, int /*index*/)
396 {
397 bool expectmore = true;
398 int index = 0;
399 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(data, index++);
400 o2::data::SubEventInfo& info = *infoptr;
401 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.eventID, (uint32_t)info.part);
402
403 LOG(info) << "SIMDATA channel got " << data.Size() << " parts for event " << info.eventID << " part " << info.part << " out of " << info.nparts;
404
405 fillSubEventInfoEntry(info);
406 consumeData<std::vector<o2::MCTrack>>(info.eventID, data, index, mMCTrackBuffer);
407 consumeData<std::vector<o2::TrackReference>>(info.eventID, data, index, mTrackRefBuffer);
408 while (index < data.Size()) {
409 consumeHits(info.eventID, data, index);
410 }
411
412 if (isDataComplete<uint32_t>(accum, info.nparts)) {
413 LOG(info) << "Event " << info.eventID << " complete. Marking as flushable";
414 mFlushableEvents[info.eventID] = true;
415
416 // check if previous flush finished
417 // start merging only when no merging currently happening
418 // Like this we don't have to join/wait on the thread here and do not block the outer ConditionalRun handling
419 // TODO: Let this run fully asynchronously (not even triggered by ConditionalRun)
420 if (!mergingInProgress) {
421 if (mMergerIOThread.joinable()) {
422 mMergerIOThread.join();
423 }
424 // start hit merging and flushing in a separate thread in order not to block
425 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
426 }
427
428 mEventChecksum += info.eventID;
429 // we also need to check if we have all events
430 if (isDataComplete<uint32_t>(mEventChecksum, info.maxEvents)) {
431 LOG(info) << "ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
432
433 // flush remaining data and close file
434 if (mMergerIOThread.joinable()) {
435 mMergerIOThread.join();
436 }
437 mMergerIOThread = std::thread([info, this]() { mergingInProgress = true; mergeAndFlushData(); mergingInProgress = false; });
438 if (mMergerIOThread.joinable()) {
439 mMergerIOThread.join();
440 }
441
442 expectmore = false;
443 }
444
445 if (mPipeToDriver != -1) {
446 if (write(mPipeToDriver, &info.eventID, sizeof(info.eventID)) == -1) {
447 LOG(error) << "FAILED WRITING TO PIPE";
448 };
449 }
450 }
451 return expectmore;
452 }
453
454 void cleanEvent(int eventID)
455 {
456 // cleanup intermediate per-Event buffers
457 }
458
459 template <typename T>
460 void backInsert(T const& from, T& to)
461 {
462 std::copy(from.begin(), from.end(), std::back_inserter(to));
463 }
464
465 void reorderAndMergeMCTracks(int eventID, TTree* target, const std::vector<int>& nprimaries, const std::vector<int>& nsubevents, std::function<void(std::vector<MCTrack> const&)> tracks_analysis_hook, o2::dataformats::MCEventHeader const* mceventheader)
466 {
467 // avoid doing this for trivial cases
468 std::vector<MCTrack>* mcTracksPerSubEvent = nullptr;
469 auto targetdata = std::make_unique<std::vector<MCTrack>>();
470
471 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
472 const auto entries = vectorOfSubEventMCTracks.size();
473
474 if (entries > 1) {
475 //
476 // loop over subevents to store the primary events
477 //
478 int nprimTot = 0;
479 for (int entry = entries - 1; entry >= 0; --entry) {
480 int index = nsubevents[entry];
481 nprimTot += nprimaries[index];
482 printf("merge %d %5d %5d %5d \n", entry, index, nsubevents[entry], nsubevents[index]);
483 for (int i = 0; i < nprimaries[index]; i++) {
484 auto& track = (*vectorOfSubEventMCTracks[index])[i];
485 if (track.isTransported()) { // reset daughters only if track was transported, it will be fixed below
486 track.SetFirstDaughterTrackId(-1);
487 track.SetLastDaughterTrackId(-1);
488 }
489 targetdata->push_back(track);
490 }
491 }
492 //
493 // loop a second time to store the secondaries and fix the mother track IDs
494 //
495 Int_t idelta1 = nprimTot;
496 Int_t idelta0 = 0;
497 for (int entry = entries - 1; entry >= 0; --entry) {
498 int index = nsubevents[entry];
499
500 auto& subEventTracks = *(vectorOfSubEventMCTracks[index]);
501 // we need to fetch the right mctracks here!!
502 Int_t npart = (int)(subEventTracks.size());
503 Int_t nprim = nprimaries[index];
504 idelta1 -= nprim;
505
506 for (Int_t i = nprim; i < npart; i++) {
507 auto& track = subEventTracks[i];
508 Int_t cId = track.getMotherTrackId();
509 if (cId >= nprim) {
510 cId += idelta1;
511 } else {
512 cId += idelta0;
513 }
514 track.SetMotherTrackId(cId);
515 track.SetFirstDaughterTrackId(-1);
516
517 Int_t hwm = (int)(targetdata->size());
518 auto& mother = (*targetdata)[cId];
519 if (mother.getFirstDaughterTrackId() == -1) {
520 mother.SetFirstDaughterTrackId(hwm);
521 }
522 mother.SetLastDaughterTrackId(hwm);
523
524 targetdata->push_back(track);
525 }
526 idelta0 += nprim;
527 idelta1 += npart;
528 }
529 }
530 //
531 // write to output
532 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
533
534 // we give the possibility to produce some MC track statistics
535 // to be saved as part of the MCHeader structure
536 tracks_analysis_hook(*filladdr);
537
538 if (mWriteToDisc && target) {
539 auto targetbr = o2::base::getOrMakeBranch(*target, "MCTrack", &filladdr);
540 targetbr->SetAddress(&filladdr);
541 targetbr->Fill();
542 targetbr->ResetAddress();
543 }
544 // forwarding the track data to other consumers (pub/sub)
545 if (mForwardKine) {
546 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
547 auto& channel = GetChannels().at("kineforward").at(0);
548 TMessage* tmsg = new TMessage(kMESS_OBJECT);
549 tmsg->WriteObjectAny((void*)filladdr, TClass::GetClass("std::vector<o2::MCTrack>"));
550 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
551 tmsg = new TMessage(kMESS_OBJECT);
552 tmsg->WriteObjectAny((void*)mceventheader, TClass::GetClass("o2::dataformats::MCEventHeader"));
553 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
554 fair::mq::Parts reply;
555 reply.AddPart(std::move(headermessage));
556 reply.AddPart(std::move(trackmessage));
557 channel.Send(reply);
558 LOG(info) << "Forward publish MC tracks on channel";
559 }
560
561 // cleanup buffered data
562 for (auto ptr : vectorOfSubEventMCTracks) {
563 delete ptr; // avoid this by using unique ptr
564 }
565 }
566
567 template <typename T, typename M>
568 void remapTrackIdsAndMerge(std::string brname, int eventID, TTree& target,
569 const std::vector<int>& trackoffsets, const std::vector<int>& nprimaries, const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
570 {
571 //
572 // Remap the mother track IDs by adding an offset.
573 // The offset calculated as the sum of the number of entries in the particle list of the previous subevents.
574 // This method is called by O2HitMerger::mergeAndFlushData(int)
575 //
576 T* incomingdata = nullptr;
577 std::unique_ptr<T> targetdata(nullptr);
578 auto& vectorOfT = mapOfVectorOfTs[eventID];
579 const auto entries = vectorOfT.size();
580
581 if (entries == 1) {
582 // nothing to do in case there is only one entry
583 incomingdata = vectorOfT[0];
584 } else {
585 targetdata = std::make_unique<T>();
586 // loop over subevents
587 Int_t nprimTot = 0;
588 for (int entry = 0; entry < entries; entry++) {
589 nprimTot += nprimaries[entry];
590 }
591 Int_t idelta0 = 0;
592 Int_t idelta1 = nprimTot;
593 for (int entry = entries - 1; entry >= 0; --entry) {
594 Int_t index = subevOrdered[entry];
595 Int_t nprim = nprimaries[index];
596 incomingdata = vectorOfT[index];
597 idelta1 -= nprim;
598 for (auto& data : *incomingdata) {
599 updateTrackIdWithOffset(data, nprim, idelta0, idelta1);
600 targetdata->push_back(data);
601 }
602 idelta0 += nprim;
603 idelta1 += trackoffsets[index];
604 }
605 }
606 auto dataaddr = (entries == 1) ? incomingdata : targetdata.get();
607 auto targetbr = o2::base::getOrMakeBranch(target, brname.c_str(), &dataaddr);
608 targetbr->SetAddress(&dataaddr);
609 targetbr->Fill();
610 targetbr->ResetAddress();
611
612 // cleanup mem
613 for (auto ptr : vectorOfT) {
614 delete ptr; // avoid this by using unique ptr
615 }
616 }
617
618 void updateTrackIdWithOffset(MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
619 {
620 Int_t cId = track.getMotherTrackId();
621 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
622 if (cId != -1) {
623 track.SetMotherTrackId(cId + ioffset);
624 }
625 }
626
627 void updateTrackIdWithOffset(TrackReference& ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
628 {
629 Int_t cId = ref.getTrackID();
630 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
631 ref.setTrackID(cId + ioffset);
632 }
633
634 void initHitTreeAndOutFile(std::string prefix, int detID)
635 {
637 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
638 LOG(warn) << "Hit outfile for detID " << DetID::getName(detID) << " already initialized --> Reopening";
639 mDetectorOutFiles[detID]->Close();
640 delete mDetectorOutFiles[detID];
641 }
642 std::string name(o2::base::DetectorNameConf::getHitsFileName(detID, prefix));
643 if (mWriteToDisc) {
644 mDetectorOutFiles[detID] = new TFile(name.c_str(), "RECREATE");
645 mDetectorToTTreeMap[detID] = new TTree("o2sim", "o2sim");
646 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
647 } else {
648 mDetectorOutFiles[detID] = nullptr;
649 mDetectorToTTreeMap[detID] = nullptr;
650 }
651 }
652
653 // This method goes over the buffers containing data for a given event; potentially merges
654 // them and flushes into the actual output file.
655 // The method can be called asynchronously to data collection
656 bool mergeAndFlushData()
657 {
658 auto checkIfNextFlushable = [this]() -> bool {
659 mNextFlushID++;
660 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
661 };
662
663 LOG(info) << "Launching merge kernel ";
664 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] == true;
665 if (!canflush) {
666 return false;
667 }
668 while (canflush == true) {
669 auto flusheventID = mNextFlushID;
670 LOG(info) << "Merge and flush event " << flusheventID;
671 auto iter = mSubEventInfoBuffer.find(flusheventID);
672 if (iter == mSubEventInfoBuffer.end()) {
673 LOG(error) << "No info/data found for event " << flusheventID;
674 if (!checkIfNextFlushable()) {
675 return false;
676 }
677 }
678
679 auto& subEventInfoList = (*iter).second;
680 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
681 LOG(error) << "No data entries found for event " << flusheventID;
682 if (!checkIfNextFlushable()) {
683 return false;
684 }
685 }
686
687 TStopwatch timer;
688 timer.Start();
689
690 // calculate trackoffsets
691 auto& confref = o2::conf::SimConfig::Instance();
692
693 // collecting trackoffsets (per data arrival id) to be used for global track-ID correction pass
694 std::vector<int> trackoffsets;
695 // collecting primary particles in each subevent (data arrival id)
696 std::vector<int> nprimaries;
697 // mapping of id to actual sub-event id (or part)
698 std::vector<int> nsubevents;
699
700 o2::dataformats::MCEventHeader* eventheader = nullptr; // The event header
701
702 // the MC labels (trackID) for hits
703 for (auto info : subEventInfoList) {
704 assert(info->npersistenttracks >= 0);
705 trackoffsets.emplace_back(info->npersistenttracks);
706 nprimaries.emplace_back(info->nprimarytracks);
707 nsubevents.emplace_back(info->part);
708 if (eventheader == nullptr) {
709 eventheader = &info->mMCEventHeader;
710 } else {
711 eventheader->getMCEventStats().add(info->mMCEventHeader.getMCEventStats());
712 }
713 }
714
715 // now see which events can be discarded in any case due to no hits
716 if (confref.isFilterOutNoHitEvents()) {
717 if (eventheader && eventheader->getMCEventStats().getNHits() == 0) {
718 LOG(info) << " Taking out event " << flusheventID << " due to no hits ";
719 cleanEvent(flusheventID);
720 if (!checkIfNextFlushable()) {
721 return true;
722 }
723 }
724 }
725
726 // attention: We need to make sure that we write everything in the same event order
727 // but iteration over keys of a standard map in C++ is ordered
728
729 // b) merge the general data
730 //
731 // for MCTrack remap the motherIds and merge at the same go
732 const auto entries = subEventInfoList.size();
733 std::vector<int> subevOrdered((int)(nsubevents.size()));
734 for (int entry = entries - 1; entry >= 0; --entry) {
735 subevOrdered[nsubevents[entry] - 1] = entry;
736 printf("HitMerger entry: %d nprimry: %5d trackoffset: %5d \n", entry, nprimaries[entry], trackoffsets[entry]);
737 }
738
739 // This is a hook that collects some useful statistics/properties on the event
740 // for use by other components;
741 // Properties are attached making use of the extensible "Info" feature which is already
742 // part of MCEventHeader. In such a way, one can also do this pass outside and attach arbitrary
743 // metadata to MCEventHeader without needing to change the data layout or API of the class itself.
744 // NOTE: This function might also be called directly in the primary server!?
745 auto mcheaderhook = [eventheader](std::vector<MCTrack> const& tracks) {
746 int eta1Point2Counter = 0;
747 int eta1Point0Counter = 0;
748 int eta0Point8Counter = 0;
749 int eta1Point2CounterPi = 0;
750 int eta1Point0CounterPi = 0;
751 int eta0Point8CounterPi = 0;
752 int prims = 0;
753 for (auto& tr : tracks) {
754 if (tr.isPrimary()) {
755 prims++;
756 const auto eta = tr.GetEta();
757 if (eta < 1.2) {
758 eta1Point2Counter++;
759 if (std::abs(tr.GetPdgCode()) == 211) {
760 eta1Point2CounterPi++;
761 }
762 }
763 if (eta < 1.0) {
764 eta1Point0Counter++;
765 if (std::abs(tr.GetPdgCode()) == 211) {
766 eta1Point0CounterPi++;
767 }
768 }
769 if (eta < 0.8) {
770 eta0Point8Counter++;
771 if (std::abs(tr.GetPdgCode()) == 211) {
772 eta0Point8CounterPi++;
773 }
774 }
775 } else {
776 break; // track layout is such that all prims are first anyway
777 }
778 }
779 // attach these properties to eventheader
780 // we only need to make the names standard
781 eventheader->putInfo("prims_eta_1.2", eta1Point2Counter);
782 eventheader->putInfo("prims_eta_1.0", eta1Point0Counter);
783 eventheader->putInfo("prims_eta_0.8", eta0Point8Counter);
784 eventheader->putInfo("prims_eta_1.2_pi", eta1Point2CounterPi);
785 eventheader->putInfo("prims_eta_1.0_pi", eta1Point0CounterPi);
786 eventheader->putInfo("prims_eta_0.8_pi", eta0Point8CounterPi);
787 eventheader->putInfo("prims_total", prims);
788 };
789 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
790
791 if (mOutTree) {
792 // adjusting and merging track references
793 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>("TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
794
795 // write MC event headers
796 {
797 auto headerbr = o2::base::getOrMakeBranch(*mOutTree, "MCEventHeader.", &eventheader);
798 headerbr->SetAddress(&eventheader);
799 headerbr->Fill();
800 headerbr->ResetAddress();
801 }
802
803 {
804 auto headerbr = o2::base::getOrMakeBranch(*mMCHeaderTree, "MCEventHeader.", &eventheader);
805 headerbr->SetAddress(&eventheader);
806 headerbr->Fill();
807 headerbr->ResetAddress();
808 }
809 }
810
811 // c) do the merge procedure for all hits ... delegate this to detector specific functions
812 // since they know about types; number of branches; etc.
813 // this will also fix the trackIDs inside the hits
814 for (int id = 0; id < mDetectorInstances.size(); ++id) {
815 auto& det = mDetectorInstances[id];
816 if (det) {
817 auto hittree = mDetectorToTTreeMap[id];
818 if (hittree) {
819 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
820 hittree->SetEntries(hittree->GetEntries() + 1);
821 LOG(info) << "flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
822 }
823 }
824 }
825
826 // increase the entry count in the tree
827 if (mOutTree) {
828 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
829 LOG(info) << "outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
830 }
831 if (mMCHeaderTree) {
832 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
833 LOG(info) << "mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
834 }
835
836 cleanEvent(flusheventID);
837 LOG(info) << "Merge/flush for event " << flusheventID << " took " << timer.RealTime();
838 if (!checkIfNextFlushable()) {
839 break;
840 }
841 } // end while
842 if (mWriteToDisc && mOutFile) {
843 LOG(info) << "Writing TTrees";
844 mOutFile->Write("", TObject::kOverwrite);
845 for (int id = 0; id < mDetectorInstances.size(); ++id) {
846 auto& det = mDetectorInstances[id];
847 if (det && mDetectorOutFiles[id]) {
848 mDetectorOutFiles[id]->Write("", TObject::kOverwrite);
849 }
850 }
851 if (mMCHeaderOnlyOutFile) {
852 mMCHeaderOnlyOutFile->Write("", TObject::kOverwrite);
853 }
854 }
855 return true;
856 }
857
858 std::map<uint32_t, uint32_t> mPartsCheckSum;
859 std::string mOutFileName;
860
861 // structures for the final flush
862 TFile* mOutFile;
863 TTree* mOutTree;
864 TFile* mMCHeaderOnlyOutFile;
865 TTree* mMCHeaderTree;
866
867 template <class K, class V>
868 using Hashtable = tbb::concurrent_unordered_map<K, V>;
869 Hashtable<int, TFile*> mDetectorOutFiles;
870 Hashtable<int, TTree*> mDetectorToTTreeMap;
871
872 // intermediate structures to collect data per event
873 std::thread mMergerIOThread;
874 bool mergingInProgress = false;
875
876 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
877 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
878 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
879 Hashtable<int, bool> mFlushableEvents;
880
881 int mEventChecksum = 0;
882 int mNExpectedEvents = 0;
883 int mNextFlushID = 1;
884 TStopwatch mTimer;
885
886 bool mAsService = false;
887 bool mForwardKine = true;
888 bool mWriteToDisc = true;
889
890 int mPipeToDriver = -1;
891
892 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
893
894 // output folder configuration
895 std::string mInitialOutputDir; // initial output folder of the process (initialized during construction)
896 std::string mCurrentOutputDir; // current output folder asked
897
898 // channel to PUB status messages to outside subscribers
899 fair::mq::Channel mPubChannel;
900
901 // init detector instances
902 void initDetInstances();
903 void initHitFiles(std::string prefix);
904};
905
906void O2HitMerger::initHitFiles(std::string prefix)
907{
909
910 // a little helper lambda
911 auto isActivated = [](std::string s) -> bool {
912 // access user configuration for list of wanted modules
914 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
915 return active; };
916
917 for (int i = DetID::First; i <= DetID::Last; ++i) {
918 if (!isActivated(DetID::getName(i))) {
919 continue;
920 }
921 // init the detector specific output files
922 initHitTreeAndOutFile(prefix, i);
923 }
924}
925
926// init detector instances used to write hit data to a TTree
927void O2HitMerger::initDetInstances()
928{
930
931 // a little helper lambda
932 auto isActivated = [](std::string s) -> bool {
933 // access user configuration for list of wanted modules
935 auto active = std::find(modulelist.begin(), modulelist.end(), s) != modulelist.end();
936 return active; };
937
938 mDetectorInstances.resize(DetID::nDetectors);
939 // like a factory of detector objects
940
941 int counter = 0;
942 for (int i = DetID::First; i <= DetID::Last; ++i) {
943 if (!isActivated(DetID::getName(i))) {
944 continue;
945 }
946
947 if (i == DetID::TPC) {
948 mDetectorInstances[i] = std::move(std::make_unique<o2::tpc::Detector>(true));
949 counter++;
950 }
951 if (i == DetID::ITS) {
952 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true));
953 counter++;
954 }
955 if (i == DetID::MFT) {
956 mDetectorInstances[i] = std::move(std::make_unique<o2::mft::Detector>(true));
957 counter++;
958 }
959 if (i == DetID::TRD) {
960 mDetectorInstances[i] = std::move(std::make_unique<o2::trd::Detector>(true));
961 counter++;
962 }
963 if (i == DetID::PHS) {
964 mDetectorInstances[i] = std::move(std::make_unique<o2::phos::Detector>(true));
965 counter++;
966 }
967 if (i == DetID::CPV) {
968 mDetectorInstances[i] = std::move(std::make_unique<o2::cpv::Detector>(true));
969 counter++;
970 }
971 if (i == DetID::EMC) {
972 mDetectorInstances[i] = std::move(std::make_unique<o2::emcal::Detector>(true));
973 counter++;
974 }
975 if (i == DetID::HMP) {
976 mDetectorInstances[i] = std::move(std::make_unique<o2::hmpid::Detector>(true));
977 counter++;
978 }
979 if (i == DetID::TOF) {
980 mDetectorInstances[i] = std::move(std::make_unique<o2::tof::Detector>(true));
981 counter++;
982 }
983 if (i == DetID::FT0) {
984 mDetectorInstances[i] = std::move(std::make_unique<o2::ft0::Detector>(true));
985 counter++;
986 }
987 if (i == DetID::FV0) {
988 mDetectorInstances[i] = std::move(std::make_unique<o2::fv0::Detector>(true));
989 counter++;
990 }
991 if (i == DetID::FDD) {
992 mDetectorInstances[i] = std::move(std::make_unique<o2::fdd::Detector>(true));
993 counter++;
994 }
995 if (i == DetID::MCH) {
996 mDetectorInstances[i] = std::move(std::make_unique<o2::mch::Detector>(true));
997 counter++;
998 }
999 if (i == DetID::MID) {
1000 mDetectorInstances[i] = std::move(std::make_unique<o2::mid::Detector>(true));
1001 counter++;
1002 }
1003 if (i == DetID::ZDC) {
1004 mDetectorInstances[i] = std::move(std::make_unique<o2::zdc::Detector>(true));
1005 counter++;
1006 }
1007 if (i == DetID::FOC) {
1008 TString sName = "$O2_ROOT/share/Detectors/Geometry/FOC/geometryFiles/geometry_Sheets.txt";
1009 gSystem->ExpandPathName(sName);
1010 mDetectorInstances[i] = std::move(std::make_unique<o2::focal::Detector>(true, sName.Data()));
1011 counter++;
1012 }
1013#ifdef ENABLE_UPGRADES
1014 if (i == DetID::IT3) {
1015 mDetectorInstances[i] = std::move(std::make_unique<o2::its::Detector>(true, "IT3"));
1016 counter++;
1017 }
1018 if (i == DetID::TRK) {
1019 mDetectorInstances[i] = std::move(std::make_unique<o2::trk::Detector>(true));
1020 counter++;
1021 }
1022 if (i == DetID::FT3) {
1023 mDetectorInstances[i] = std::move(std::make_unique<o2::ft3::Detector>(true));
1024 counter++;
1025 }
1026 if (i == DetID::FCT) {
1027 mDetectorInstances[i] = std::move(std::make_unique<o2::fct::Detector>(true));
1028 counter++;
1029 }
1030 if (i == DetID::TF3) {
1031 mDetectorInstances[i] = std::move(std::make_unique<o2::iotof::Detector>(true));
1032 counter++;
1033 }
1034 if (i == DetID::RCH) {
1035 mDetectorInstances[i] = std::move(std::make_unique<o2::rich::Detector>(true));
1036 counter++;
1037 }
1038 if (i == DetID::MI3) {
1039 mDetectorInstances[i] = std::move(std::make_unique<o2::mi3::Detector>(true));
1040 counter++;
1041 }
1042 if (i == DetID::ECL) {
1043 mDetectorInstances[i] = std::move(std::make_unique<o2::ecal::Detector>(true));
1044 counter++;
1045 }
1046 if (i == DetID::FD3) {
1047 mDetectorInstances[i] = std::move(std::make_unique<o2::fd3::Detector>(true));
1048 counter++;
1049 }
1050#endif
1051 }
1052 if (counter != DetID::nDetectors) {
1053 LOG(warning) << " O2HitMerger: Some Detectors are potentially missing in this initialization ";
1054 }
1055}
1056
1057} // namespace devices
1058} // namespace o2
1059
1060#endif
Definition of the DescriptorInnerBarrelITS3 class.
Definition of the Names Generator class.
Definition of the Stack class.
Definition of the Detector class.
Definition of the Detector class.
Definition of the FV0 detector class.
int32_t i
Definition of the Detector class.
Definition of the Detector class.
bool waitForControlInput(int workerID)
TBranch * ptr
ECal geometry creation and hit processing.
Definition of the Detector class.
Definition of the Detector class.
Definition of the Detector class.
StringRef key
static std::string getHitsFileName(DId d, const std::string_view prefix=STANDARDSIMPREFIX)
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static std::string getMCHeadersFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:52
bool writeToDisc() const
Definition SimConfig.h:179
bool asService() const
Definition SimConfig.h:173
unsigned int getNEvents() const
Definition SimConfig.h:156
bool forwardKine() const
Definition SimConfig.h:178
static SimConfig & Instance()
Definition SimConfig.h:111
std::vector< std::string > const & getReadoutDetectors() const
Definition SimConfig.h:140
void putInfo(std::string const &key, T const &value)
void add(MCEventStats const &other)
merge from another object
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:146
static constexpr ID FV0
Definition DetID.h:76
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID First
Definition DetID.h:95
static constexpr ID MFT
Definition DetID.h:71
static constexpr int nDetectors
number of defined detectors
Definition DetID.h:97
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr ID FOC
Definition DetID.h:80
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
O2HitMerger()
Default constructor.
~O2HitMerger() override
Default destructor.
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
static ShmManager & Instance()
Definition ShmManager.h:61
const GLfloat * m
Definition glcorearb.h:4066
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
TBranch * getOrMakeBranch(TTree &tree, const char *brname, T *ptr)
Definition Detector.h:281
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
bool primaryServer_sendShutdownPermission(fair::mq::Channel &channel)
Definition O2HitMerger.h:96
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
const bool const int TrackITSInternal< NLayers > & track
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
struct o2::upgrades_utils::@469 tracks
structure to keep trigger-related info
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"