102 class TMessageWrapper :
public TMessage
106 ~TMessageWrapper()
override =
default;
114 mInitialOutputDir = std::filesystem::current_path().string();
115 mCurrentOutputDir = mInitialOutputDir;
121 FairSystemInfo sysinfo;
122 LOG(info) <<
"TIME-STAMP " << mTimer.RealTime() <<
"\t";
124 LOG(info) <<
"MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
125 << sysinfo.GetMaxMemory() <<
" MB\n";
130 void InitTask() final
132 LOG(info) <<
"INIT HIT MERGER";
134 ROOT::EnableThreadSafety();
136 std::string outfilename(
"o2sim_merged_hits.root");
146 mOutFileName = outfilename.c_str();
148 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
149 mOutTree =
new TTree(
"o2sim",
"o2sim");
150 mOutTree->SetDirectory(mOutFile);
153 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
154 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
157 if (mDetectorInstances.size() == 0) {
165 auto pipeenv = getenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE");
167 mPipeToDriver = atoi(pipeenv);
168 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
170 LOG(warning) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
174 if (mNExpectedEvents == 0) {
176 waitForControlInput();
178 LOG(info) <<
"NOT EXPECTING ANY DATA; SHUTTING DOWN";
184 bool setWorkingDirectory(std::string
const& dir)
186 namespace fs = std::filesystem;
196 fs::current_path(fs::path(mInitialOutputDir));
198 auto absolutePath = fs::absolute(fs::path(dir));
199 if (!fs::exists(absolutePath)) {
200 if (!fs::create_directory(absolutePath)) {
201 LOG(error) <<
"Could not create directory " << absolutePath.string();
206 fs::current_path(absolutePath.string().c_str());
207 mCurrentOutputDir = fs::current_path().string();
209 LOG(info) <<
"FINAL PATH " << mCurrentOutputDir;
210 }
catch (std::exception e) {
211 LOG(error) <<
" could not change path to " << dir;
222 if (!setWorkingDirectory(reconfig.
outputDir)) {
226 std::string outfilename(
"o2sim_merged_hits.root");
228 mNExpectedEvents = reconfig.
nEvents;
229 mOutFileName = outfilename.c_str();
231 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
232 mOutTree =
new TTree(
"o2sim",
"o2sim");
233 mOutTree->SetDirectory(mOutFile);
236 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
237 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
243 mPartsCheckSum.clear();
247 mMCTrackBuffer.clear();
248 mTrackRefBuffer.clear();
249 mSubEventInfoBuffer.clear();
250 mFlushableEvents.clear();
256 template <
typename T,
typename V>
257 V insertAdd(std::map<T, V>&
m, T
const&
key, V
value)
259 const auto iter =
m.find(
key);
261 if (iter !=
m.end()) {
262 iter->second +=
value;
263 accum = iter->second;
271 template <
typename T>
272 bool isDataComplete(T checksum, T nparts)
274 return checksum == nparts * (nparts + 1) / 2;
277 void consumeHits(
int eventID, fair::mq::Parts&
data,
int&
index)
279 auto detIDmessage = std::move(
data.At(
index++));
281 if (detIDmessage->GetSize() == 4) {
282 auto ptr = (
int*)detIDmessage->GetData();
284 LOG(debug2) <<
"I1 " <<
ptr[0] <<
" NAME " <<
id.getName() <<
" MB "
285 <<
data.At(
index)->GetSize() / 1024. / 1024.;
288 auto detector = mDetectorInstances[
id].get();
290 detector->collectHits(eventID,
data,
index);
295 template <
typename T,
typename BT>
296 void consumeData(
int eventID, fair::mq::Parts&
data,
int&
index, BT&
buffer)
298 auto decodeddata = o2::base::decodeTMessage<T*>(
data,
index);
300 buffer[eventID] =
typename BT::mapped_type();
302 buffer[eventID].push_back(decodeddata);
312 if (mSubEventInfoBuffer.find(info.
eventID) == mSubEventInfoBuffer.end()) {
313 mSubEventInfoBuffer[info.
eventID] = std::list<o2::data::SubEventInfo*>();
315 mSubEventInfoBuffer[info.
eventID].push_back(&info);
322 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
323 auto channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
324 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
325 LOG(info) <<
"SOCKETNAME " << controlsocketname;
326 channel.Connect(std::string(controlsocketname));
328 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
330 LOG(info) <<
"WAITING FOR INPUT";
331 if (channel.Receive(reply) > 0) {
332 auto data = reply->GetData();
333 auto size = reply->GetSize();
335 std::string command(
reinterpret_cast<char const*
>(
data),
size);
336 LOG(info) <<
"message: " << command;
340 return ReInit(reconfig);
342 LOG(info) <<
"NOTHING RECEIVED";
347 bool ConditionalRun()
override
349 auto& channel = GetChannels().at(
"simdata").at(0);
350 fair::mq::Parts request;
351 auto bytes = channel.Receive(request);
353 LOG(error) <<
"Some error occurred on socket during receive on sim data";
358 auto more = handleSimData(request, 0);
359 LOG(info) <<
"HitMerger processing took " << timer.RealTime();
360 if (!more && mAsService) {
361 LOG(info) <<
" CONTROL ";
369 bool handleSimData(fair::mq::Parts&
data,
int )
371 bool expectmore =
true;
373 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(
data,
index++);
375 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.
eventID, (uint32_t)info.
part);
377 LOG(info) <<
"SIMDATA channel got " <<
data.Size() <<
" parts for event " << info.
eventID <<
" part " << info.
part <<
" out of " << info.
nparts;
379 fillSubEventInfoEntry(info);
380 consumeData<std::vector<o2::MCTrack>>(info.
eventID,
data,
index, mMCTrackBuffer);
381 consumeData<std::vector<o2::TrackReference>>(info.
eventID,
data,
index, mTrackRefBuffer);
386 if (isDataComplete<uint32_t>(accum, info.
nparts)) {
387 LOG(info) <<
"Event " << info.
eventID <<
" complete. Marking as flushable";
388 mFlushableEvents[info.
eventID] =
true;
394 if (!mergingInProgress) {
395 if (mMergerIOThread.joinable()) {
396 mMergerIOThread.join();
399 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
402 mEventChecksum += info.
eventID;
404 if (isDataComplete<uint32_t>(mEventChecksum, info.
maxEvents)) {
405 LOG(info) <<
"ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
408 if (mMergerIOThread.joinable()) {
409 mMergerIOThread.join();
411 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
412 if (mMergerIOThread.joinable()) {
413 mMergerIOThread.join();
419 if (mPipeToDriver != -1) {
420 if (write(mPipeToDriver, &info.
eventID,
sizeof(info.
eventID)) == -1) {
421 LOG(error) <<
"FAILED WRITING TO PIPE";
432 void cleanEvent(
int eventID)
437 template <
typename T>
438 void backInsert(T
const& from, T& to)
440 std::copy(from.begin(), from.end(), std::back_inserter(to));
443 void reorderAndMergeMCTracks(
int eventID, TTree*
target,
const std::vector<int>& nprimaries,
const std::vector<int>& nsubevents, std::function<
void(std::vector<MCTrack>
const&)> tracks_analysis_hook,
o2::dataformats::MCEventHeader const* mceventheader)
446 std::vector<MCTrack>* mcTracksPerSubEvent =
nullptr;
447 auto targetdata = std::make_unique<std::vector<MCTrack>>();
449 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
450 const auto entries = vectorOfSubEventMCTracks.size();
459 nprimTot += nprimaries[
index];
461 for (
int i = 0;
i < nprimaries[
index];
i++) {
462 auto& track = (*vectorOfSubEventMCTracks[
index])[
i];
463 if (track.isTransported()) {
464 track.SetFirstDaughterTrackId(-1);
465 track.SetLastDaughterTrackId(-1);
467 targetdata->push_back(track);
473 Int_t idelta1 = nprimTot;
478 auto& subEventTracks = *(vectorOfSubEventMCTracks[
index]);
480 Int_t npart = (
int)(subEventTracks.size());
481 Int_t nprim = nprimaries[
index];
484 for (Int_t
i = nprim;
i < npart;
i++) {
485 auto& track = subEventTracks[
i];
486 Int_t cId = track.getMotherTrackId();
492 track.SetMotherTrackId(cId);
493 track.SetFirstDaughterTrackId(-1);
495 Int_t hwm = (
int)(targetdata->size());
496 auto& mother = (*targetdata)[cId];
497 if (mother.getFirstDaughterTrackId() == -1) {
498 mother.SetFirstDaughterTrackId(hwm);
500 mother.SetLastDaughterTrackId(hwm);
502 targetdata->push_back(track);
510 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
514 tracks_analysis_hook(*filladdr);
516 if (mWriteToDisc &&
target) {
518 targetbr->SetAddress(&filladdr);
520 targetbr->ResetAddress();
524 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
525 auto& channel = GetChannels().at(
"kineforward").at(0);
527 tmsg->WriteObjectAny((
void*)filladdr, TClass::GetClass(
"std::vector<o2::MCTrack>"));
528 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
530 tmsg->WriteObjectAny((
void*)mceventheader, TClass::GetClass(
"o2::dataformats::MCEventHeader"));
531 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
532 fair::mq::Parts reply;
533 reply.AddPart(std::move(headermessage));
534 reply.AddPart(std::move(trackmessage));
536 LOG(info) <<
"Forward publish MC tracks on channel";
540 for (
auto ptr : vectorOfSubEventMCTracks) {
545 template <
typename T,
typename M>
546 void remapTrackIdsAndMerge(std::string brname,
int eventID, TTree&
target,
547 const std::vector<int>& trackoffsets,
const std::vector<int>& nprimaries,
const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
554 T* incomingdata =
nullptr;
555 std::unique_ptr<T> targetdata(
nullptr);
556 auto& vectorOfT = mapOfVectorOfTs[eventID];
557 const auto entries = vectorOfT.size();
561 incomingdata = vectorOfT[0];
563 targetdata = std::make_unique<T>();
567 nprimTot += nprimaries[
entry];
570 Int_t idelta1 = nprimTot;
573 Int_t nprim = nprimaries[
index];
574 incomingdata = vectorOfT[
index];
576 for (
auto&
data : *incomingdata) {
577 updateTrackIdWithOffset(
data, nprim, idelta0, idelta1);
578 targetdata->push_back(
data);
581 idelta1 += trackoffsets[
index];
584 auto dataaddr = (entries == 1) ? incomingdata : targetdata.
get();
586 targetbr->SetAddress(&dataaddr);
588 targetbr->ResetAddress();
591 for (
auto ptr : vectorOfT) {
596 void updateTrackIdWithOffset(
MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
599 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
605 void updateTrackIdWithOffset(TrackReference&
ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
607 Int_t cId =
ref.getTrackID();
608 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
609 ref.setTrackID(cId + ioffset);
612 void initHitTreeAndOutFile(std::string prefix,
int detID)
615 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
616 LOG(warn) <<
"Hit outfile for detID " <<
DetID::getName(detID) <<
" already initialized --> Reopening";
617 mDetectorOutFiles[detID]->Close();
618 delete mDetectorOutFiles[detID];
622 mDetectorOutFiles[detID] =
new TFile(
name.c_str(),
"RECREATE");
623 mDetectorToTTreeMap[detID] =
new TTree(
"o2sim",
"o2sim");
624 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
626 mDetectorOutFiles[detID] =
nullptr;
627 mDetectorToTTreeMap[detID] =
nullptr;
634 bool mergeAndFlushData()
636 auto checkIfNextFlushable = [
this]() ->
bool {
638 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
641 LOG(info) <<
"Launching merge kernel ";
642 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
646 while (canflush ==
true) {
647 auto flusheventID = mNextFlushID;
648 LOG(info) <<
"Merge and flush event " << flusheventID;
649 auto iter = mSubEventInfoBuffer.find(flusheventID);
650 if (iter == mSubEventInfoBuffer.end()) {
651 LOG(error) <<
"No info/data found for event " << flusheventID;
652 if (!checkIfNextFlushable()) {
657 auto& subEventInfoList = (*iter).second;
658 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
659 LOG(error) <<
"No data entries found for event " << flusheventID;
660 if (!checkIfNextFlushable()) {
672 std::vector<int> trackoffsets;
674 std::vector<int> nprimaries;
676 std::vector<int> nsubevents;
681 for (
auto info : subEventInfoList) {
685 nsubevents.emplace_back(info->
part);
686 if (eventheader ==
nullptr) {
694 if (confref.isFilterOutNoHitEvents()) {
696 LOG(info) <<
" Taking out event " << flusheventID <<
" due to no hits ";
697 cleanEvent(flusheventID);
698 if (!checkIfNextFlushable()) {
710 const auto entries = subEventInfoList.size();
711 std::vector<int> subevOrdered((
int)(nsubevents.size()));
714 printf(
"HitMerger entry: %d nprimry: %5d trackoffset: %5d \n",
entry, nprimaries[
entry], trackoffsets[
entry]);
723 auto mcheaderhook = [eventheader](std::vector<MCTrack>
const&
tracks) {
724 int eta1Point2Counter = 0;
725 int eta1Point0Counter = 0;
726 int eta0Point8Counter = 0;
727 int eta1Point2CounterPi = 0;
728 int eta1Point0CounterPi = 0;
729 int eta0Point8CounterPi = 0;
732 if (tr.isPrimary()) {
734 const auto eta = tr.GetEta();
737 if (std::abs(tr.GetPdgCode()) == 211) {
738 eta1Point2CounterPi++;
743 if (std::abs(tr.GetPdgCode()) == 211) {
744 eta1Point0CounterPi++;
749 if (std::abs(tr.GetPdgCode()) == 211) {
750 eta0Point8CounterPi++;
759 eventheader->
putInfo(
"prims_eta_1.2", eta1Point2Counter);
760 eventheader->
putInfo(
"prims_eta_1.0", eta1Point0Counter);
761 eventheader->
putInfo(
"prims_eta_0.8", eta0Point8Counter);
762 eventheader->
putInfo(
"prims_eta_1.2_pi", eta1Point2CounterPi);
763 eventheader->
putInfo(
"prims_eta_1.0_pi", eta1Point0CounterPi);
764 eventheader->
putInfo(
"prims_eta_0.8_pi", eta0Point8CounterPi);
765 eventheader->
putInfo(
"prims_total", prims);
768 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
772 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>(
"TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
777 headerbr->SetAddress(&eventheader);
779 headerbr->ResetAddress();
784 headerbr->SetAddress(&eventheader);
786 headerbr->ResetAddress();
793 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
794 auto& det = mDetectorInstances[
id];
796 auto hittree = mDetectorToTTreeMap[
id];
798 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
799 hittree->SetEntries(hittree->GetEntries() + 1);
800 LOG(info) <<
"flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
807 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
808 LOG(info) <<
"outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
811 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
812 LOG(info) <<
"mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
815 cleanEvent(flusheventID);
816 LOG(info) <<
"Merge/flush for event " << flusheventID <<
" took " << timer.RealTime();
817 if (!checkIfNextFlushable()) {
821 if (mWriteToDisc && mOutFile) {
822 LOG(info) <<
"Writing TTrees";
823 mOutFile->Write(
"", TObject::kOverwrite);
824 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
825 auto& det = mDetectorInstances[
id];
826 if (det && mDetectorOutFiles[
id]) {
827 mDetectorOutFiles[
id]->Write(
"", TObject::kOverwrite);
830 if (mMCHeaderOnlyOutFile) {
831 mMCHeaderOnlyOutFile->Write(
"", TObject::kOverwrite);
837 std::map<uint32_t, uint32_t> mPartsCheckSum;
838 std::string mOutFileName;
843 TFile* mMCHeaderOnlyOutFile;
844 TTree* mMCHeaderTree;
846 template <
class K,
class V>
847 using Hashtable = tbb::concurrent_unordered_map<K, V>;
848 Hashtable<int, TFile*> mDetectorOutFiles;
849 Hashtable<int, TTree*> mDetectorToTTreeMap;
852 std::thread mMergerIOThread;
853 bool mergingInProgress =
false;
855 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
856 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
857 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
858 Hashtable<int, bool> mFlushableEvents;
860 int mEventChecksum = 0;
861 int mNExpectedEvents = 0;
862 int mNextFlushID = 1;
865 bool mAsService =
false;
866 bool mForwardKine =
true;
867 bool mWriteToDisc =
true;
869 int mPipeToDriver = -1;
871 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
874 std::string mInitialOutputDir;
875 std::string mCurrentOutputDir;
878 fair::mq::Channel mPubChannel;
881 void initDetInstances();
882 void initHitFiles(std::string prefix);