119 class TMessageWrapper :
public TMessage
123 ~TMessageWrapper()
override =
default;
131 mInitialOutputDir = std::filesystem::current_path().string();
132 mCurrentOutputDir = mInitialOutputDir;
138 FairSystemInfo sysinfo;
139 LOG(info) <<
"TIME-STAMP " << mTimer.RealTime() <<
"\t";
141 LOG(info) <<
"MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
142 << sysinfo.GetMaxMemory() <<
" MB\n";
147 void InitTask() final
149 LOG(info) <<
"INIT HIT MERGER";
150 ROOT::EnableThreadSafety();
152 std::string outfilename(
"o2sim_merged_hits.root");
159 LOG(fatal) <<
"No configuration received. Aborting";
165 mOutFileName = outfilename.c_str();
167 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
168 mOutTree =
new TTree(
"o2sim",
"o2sim");
169 mOutTree->SetDirectory(mOutFile);
172 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
173 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
176 if (mDetectorInstances.size() == 0) {
184 auto pipeenv = getenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE");
186 mPipeToDriver = atoi(pipeenv);
187 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
189 LOG(warning) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
193 if (mNExpectedEvents == 0) {
195 waitForControlInput();
197 LOG(info) <<
"NOT EXPECTING ANY DATA; SHUTTING DOWN";
203 bool setWorkingDirectory(std::string
const& dir)
205 namespace fs = std::filesystem;
215 fs::current_path(fs::path(mInitialOutputDir));
217 auto absolutePath = fs::absolute(fs::path(dir));
218 if (!fs::exists(absolutePath)) {
219 if (!fs::create_directory(absolutePath)) {
220 LOG(error) <<
"Could not create directory " << absolutePath.string();
225 fs::current_path(absolutePath.string().c_str());
226 mCurrentOutputDir = fs::current_path().string();
228 LOG(info) <<
"FINAL PATH " << mCurrentOutputDir;
229 }
catch (std::exception e) {
230 LOG(error) <<
" could not change path to " << dir;
241 if (!setWorkingDirectory(reconfig.
outputDir)) {
245 std::string outfilename(
"o2sim_merged_hits.root");
247 mNExpectedEvents = reconfig.
nEvents;
248 mOutFileName = outfilename.c_str();
250 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
251 mOutTree =
new TTree(
"o2sim",
"o2sim");
252 mOutTree->SetDirectory(mOutFile);
255 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
256 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
262 mPartsCheckSum.clear();
266 mMCTrackBuffer.clear();
267 mTrackRefBuffer.clear();
268 mSubEventInfoBuffer.clear();
269 mFlushableEvents.clear();
275 template <
typename T,
typename V>
276 V insertAdd(std::map<T, V>&
m, T
const&
key, V
value)
278 const auto iter =
m.find(
key);
280 if (iter !=
m.end()) {
281 iter->second +=
value;
282 accum = iter->second;
290 template <
typename T>
291 bool isDataComplete(T checksum, T nparts)
293 return checksum == nparts * (nparts + 1) / 2;
296 void consumeHits(
int eventID, fair::mq::Parts&
data,
int&
index)
298 auto detIDmessage = std::move(
data.At(
index++));
300 if (detIDmessage->GetSize() == 4) {
301 auto ptr = (
int*)detIDmessage->GetData();
303 LOG(debug2) <<
"I1 " <<
ptr[0] <<
" NAME " <<
id.getName() <<
" MB "
304 <<
data.At(
index)->GetSize() / 1024. / 1024.;
307 auto detector = mDetectorInstances[
id].get();
309 detector->collectHits(eventID,
data,
index);
314 template <
typename T,
typename BT>
315 void consumeData(
int eventID, fair::mq::Parts&
data,
int&
index, BT&
buffer)
317 auto decodeddata = o2::base::decodeTMessage<T*>(
data,
index);
319 buffer[eventID] =
typename BT::mapped_type();
321 buffer[eventID].push_back(decodeddata);
331 if (mSubEventInfoBuffer.find(
info.eventID) == mSubEventInfoBuffer.end()) {
332 mSubEventInfoBuffer[
info.eventID] = std::list<o2::data::SubEventInfo*>();
334 mSubEventInfoBuffer[
info.eventID].push_back(&info);
341 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
342 auto channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
343 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
344 LOG(info) <<
"SOCKETNAME " << controlsocketname;
345 channel.Connect(std::string(controlsocketname));
347 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
349 LOG(info) <<
"WAITING FOR INPUT";
350 if (channel.Receive(reply) > 0) {
351 auto data = reply->GetData();
352 auto size = reply->GetSize();
354 std::string command(
reinterpret_cast<char const*
>(
data),
size);
355 LOG(info) <<
"message: " << command;
359 return ReInit(reconfig);
361 LOG(info) <<
"NOTHING RECEIVED";
366 bool ConditionalRun()
override
368 auto& channel = GetChannels().at(
"simdata").at(0);
369 fair::mq::Parts request;
370 auto bytes = channel.Receive(request);
372 LOG(error) <<
"Some error occurred on socket during receive on sim data";
377 auto more = handleSimData(request, 0);
378 LOG(info) <<
"HitMerger processing took " << timer.RealTime();
379 if (!more && mAsService) {
380 LOG(info) <<
" CONTROL ";
386 static bool initAcknowledged =
false;
387 if (!initAcknowledged) {
389 initAcknowledged =
true;
395 bool handleSimData(fair::mq::Parts&
data,
int )
397 bool expectmore =
true;
399 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(
data,
index++);
401 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum,
info.eventID, (uint32_t)
info.part);
403 LOG(info) <<
"SIMDATA channel got " <<
data.Size() <<
" parts for event " <<
info.eventID <<
" part " <<
info.part <<
" out of " <<
info.nparts;
405 fillSubEventInfoEntry(info);
406 consumeData<std::vector<o2::MCTrack>>(
info.eventID,
data,
index, mMCTrackBuffer);
407 consumeData<std::vector<o2::TrackReference>>(
info.eventID,
data,
index, mTrackRefBuffer);
412 if (isDataComplete<uint32_t>(accum,
info.nparts)) {
413 LOG(info) <<
"Event " <<
info.eventID <<
" complete. Marking as flushable";
414 mFlushableEvents[
info.eventID] =
true;
420 if (!mergingInProgress) {
421 if (mMergerIOThread.joinable()) {
422 mMergerIOThread.join();
425 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
428 mEventChecksum +=
info.eventID;
430 if (isDataComplete<uint32_t>(mEventChecksum,
info.maxEvents)) {
431 LOG(info) <<
"ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
434 if (mMergerIOThread.joinable()) {
435 mMergerIOThread.join();
437 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
438 if (mMergerIOThread.joinable()) {
439 mMergerIOThread.join();
445 if (mPipeToDriver != -1) {
446 if (write(mPipeToDriver, &
info.eventID,
sizeof(
info.eventID)) == -1) {
447 LOG(error) <<
"FAILED WRITING TO PIPE";
454 void cleanEvent(
int eventID)
459 template <
typename T>
460 void backInsert(T
const& from, T& to)
462 std::copy(from.begin(), from.end(), std::back_inserter(to));
465 void reorderAndMergeMCTracks(
int eventID, TTree*
target,
const std::vector<int>& nprimaries,
const std::vector<int>& nsubevents, std::function<
void(std::vector<MCTrack>
const&)> tracks_analysis_hook,
o2::dataformats::MCEventHeader const* mceventheader)
468 std::vector<MCTrack>* mcTracksPerSubEvent =
nullptr;
469 auto targetdata = std::make_unique<std::vector<MCTrack>>();
471 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
472 const auto entries = vectorOfSubEventMCTracks.size();
481 nprimTot += nprimaries[
index];
483 for (
int i = 0;
i < nprimaries[
index];
i++) {
484 auto&
track = (*vectorOfSubEventMCTracks[
index])[
i];
485 if (
track.isTransported()) {
486 track.SetFirstDaughterTrackId(-1);
487 track.SetLastDaughterTrackId(-1);
489 targetdata->push_back(track);
495 Int_t idelta1 = nprimTot;
500 auto& subEventTracks = *(vectorOfSubEventMCTracks[
index]);
502 Int_t npart = (
int)(subEventTracks.size());
503 Int_t nprim = nprimaries[
index];
506 for (Int_t
i = nprim;
i < npart;
i++) {
507 auto&
track = subEventTracks[
i];
508 Int_t cId =
track.getMotherTrackId();
514 track.SetMotherTrackId(cId);
515 track.SetFirstDaughterTrackId(-1);
517 Int_t hwm = (
int)(targetdata->size());
518 auto& mother = (*targetdata)[cId];
519 if (mother.getFirstDaughterTrackId() == -1) {
520 mother.SetFirstDaughterTrackId(hwm);
522 mother.SetLastDaughterTrackId(hwm);
524 targetdata->push_back(track);
532 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
536 tracks_analysis_hook(*filladdr);
538 if (mWriteToDisc &&
target) {
540 targetbr->SetAddress(&filladdr);
542 targetbr->ResetAddress();
546 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
547 auto& channel = GetChannels().at(
"kineforward").at(0);
549 tmsg->WriteObjectAny((
void*)filladdr, TClass::GetClass(
"std::vector<o2::MCTrack>"));
550 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
552 tmsg->WriteObjectAny((
void*)mceventheader, TClass::GetClass(
"o2::dataformats::MCEventHeader"));
553 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
554 fair::mq::Parts reply;
555 reply.AddPart(std::move(headermessage));
556 reply.AddPart(std::move(trackmessage));
558 LOG(info) <<
"Forward publish MC tracks on channel";
562 for (
auto ptr : vectorOfSubEventMCTracks) {
567 template <
typename T,
typename M>
568 void remapTrackIdsAndMerge(std::string brname,
int eventID, TTree&
target,
569 const std::vector<int>& trackoffsets,
const std::vector<int>& nprimaries,
const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
576 T* incomingdata =
nullptr;
577 std::unique_ptr<T> targetdata(
nullptr);
578 auto& vectorOfT = mapOfVectorOfTs[eventID];
579 const auto entries = vectorOfT.size();
583 incomingdata = vectorOfT[0];
585 targetdata = std::make_unique<T>();
589 nprimTot += nprimaries[
entry];
592 Int_t idelta1 = nprimTot;
595 Int_t nprim = nprimaries[
index];
596 incomingdata = vectorOfT[
index];
598 for (
auto&
data : *incomingdata) {
599 updateTrackIdWithOffset(
data, nprim, idelta0, idelta1);
600 targetdata->push_back(
data);
603 idelta1 += trackoffsets[
index];
606 auto dataaddr = (entries == 1) ? incomingdata : targetdata.
get();
608 targetbr->SetAddress(&dataaddr);
610 targetbr->ResetAddress();
613 for (
auto ptr : vectorOfT) {
618 void updateTrackIdWithOffset(
MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
620 Int_t cId =
track.getMotherTrackId();
621 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
623 track.SetMotherTrackId(cId + ioffset);
627 void updateTrackIdWithOffset(TrackReference&
ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
629 Int_t cId =
ref.getTrackID();
630 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
631 ref.setTrackID(cId + ioffset);
634 void initHitTreeAndOutFile(std::string prefix,
int detID)
637 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
638 LOG(warn) <<
"Hit outfile for detID " <<
DetID::getName(detID) <<
" already initialized --> Reopening";
639 mDetectorOutFiles[detID]->Close();
640 delete mDetectorOutFiles[detID];
644 mDetectorOutFiles[detID] =
new TFile(
name.c_str(),
"RECREATE");
645 mDetectorToTTreeMap[detID] =
new TTree(
"o2sim",
"o2sim");
646 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
648 mDetectorOutFiles[detID] =
nullptr;
649 mDetectorToTTreeMap[detID] =
nullptr;
656 bool mergeAndFlushData()
658 auto checkIfNextFlushable = [
this]() ->
bool {
660 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
663 LOG(info) <<
"Launching merge kernel ";
664 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
668 while (canflush ==
true) {
669 auto flusheventID = mNextFlushID;
670 LOG(info) <<
"Merge and flush event " << flusheventID;
671 auto iter = mSubEventInfoBuffer.find(flusheventID);
672 if (iter == mSubEventInfoBuffer.end()) {
673 LOG(error) <<
"No info/data found for event " << flusheventID;
674 if (!checkIfNextFlushable()) {
679 auto& subEventInfoList = (*iter).second;
680 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
681 LOG(error) <<
"No data entries found for event " << flusheventID;
682 if (!checkIfNextFlushable()) {
694 std::vector<int> trackoffsets;
696 std::vector<int> nprimaries;
698 std::vector<int> nsubevents;
703 for (
auto info : subEventInfoList) {
704 assert(
info->npersistenttracks >= 0);
705 trackoffsets.emplace_back(
info->npersistenttracks);
706 nprimaries.emplace_back(
info->nprimarytracks);
707 nsubevents.emplace_back(
info->part);
708 if (eventheader ==
nullptr) {
709 eventheader = &
info->mMCEventHeader;
716 if (confref.isFilterOutNoHitEvents()) {
718 LOG(info) <<
" Taking out event " << flusheventID <<
" due to no hits ";
719 cleanEvent(flusheventID);
720 if (!checkIfNextFlushable()) {
732 const auto entries = subEventInfoList.size();
733 std::vector<int> subevOrdered((
int)(nsubevents.size()));
736 printf(
"HitMerger entry: %d nprimry: %5d trackoffset: %5d \n",
entry, nprimaries[
entry], trackoffsets[
entry]);
745 auto mcheaderhook = [eventheader](std::vector<MCTrack>
const&
tracks) {
746 int eta1Point2Counter = 0;
747 int eta1Point0Counter = 0;
748 int eta0Point8Counter = 0;
749 int eta1Point2CounterPi = 0;
750 int eta1Point0CounterPi = 0;
751 int eta0Point8CounterPi = 0;
754 if (tr.isPrimary()) {
756 const auto eta = tr.GetEta();
759 if (std::abs(tr.GetPdgCode()) == 211) {
760 eta1Point2CounterPi++;
765 if (std::abs(tr.GetPdgCode()) == 211) {
766 eta1Point0CounterPi++;
771 if (std::abs(tr.GetPdgCode()) == 211) {
772 eta0Point8CounterPi++;
781 eventheader->
putInfo(
"prims_eta_1.2", eta1Point2Counter);
782 eventheader->
putInfo(
"prims_eta_1.0", eta1Point0Counter);
783 eventheader->
putInfo(
"prims_eta_0.8", eta0Point8Counter);
784 eventheader->
putInfo(
"prims_eta_1.2_pi", eta1Point2CounterPi);
785 eventheader->
putInfo(
"prims_eta_1.0_pi", eta1Point0CounterPi);
786 eventheader->
putInfo(
"prims_eta_0.8_pi", eta0Point8CounterPi);
787 eventheader->
putInfo(
"prims_total", prims);
789 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
793 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>(
"TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
798 headerbr->SetAddress(&eventheader);
800 headerbr->ResetAddress();
805 headerbr->SetAddress(&eventheader);
807 headerbr->ResetAddress();
814 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
815 auto& det = mDetectorInstances[
id];
817 auto hittree = mDetectorToTTreeMap[
id];
819 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
820 hittree->SetEntries(hittree->GetEntries() + 1);
821 LOG(info) <<
"flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
828 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
829 LOG(info) <<
"outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
832 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
833 LOG(info) <<
"mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
836 cleanEvent(flusheventID);
837 LOG(info) <<
"Merge/flush for event " << flusheventID <<
" took " << timer.RealTime();
838 if (!checkIfNextFlushable()) {
842 if (mWriteToDisc && mOutFile) {
843 LOG(info) <<
"Writing TTrees";
844 mOutFile->Write(
"", TObject::kOverwrite);
845 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
846 auto& det = mDetectorInstances[
id];
847 if (det && mDetectorOutFiles[
id]) {
848 mDetectorOutFiles[
id]->Write(
"", TObject::kOverwrite);
851 if (mMCHeaderOnlyOutFile) {
852 mMCHeaderOnlyOutFile->Write(
"", TObject::kOverwrite);
858 std::map<uint32_t, uint32_t> mPartsCheckSum;
859 std::string mOutFileName;
864 TFile* mMCHeaderOnlyOutFile;
865 TTree* mMCHeaderTree;
867 template <
class K,
class V>
868 using Hashtable = tbb::concurrent_unordered_map<K, V>;
869 Hashtable<int, TFile*> mDetectorOutFiles;
870 Hashtable<int, TTree*> mDetectorToTTreeMap;
873 std::thread mMergerIOThread;
874 bool mergingInProgress =
false;
876 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
877 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
878 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
879 Hashtable<int, bool> mFlushableEvents;
881 int mEventChecksum = 0;
882 int mNExpectedEvents = 0;
883 int mNextFlushID = 1;
886 bool mAsService =
false;
887 bool mForwardKine =
true;
888 bool mWriteToDisc =
true;
890 int mPipeToDriver = -1;
892 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
895 std::string mInitialOutputDir;
896 std::string mCurrentOutputDir;
899 fair::mq::Channel mPubChannel;
902 void initDetInstances();
903 void initHitFiles(std::string prefix);