93 class TMessageWrapper :
public TMessage
97 ~TMessageWrapper()
override =
default;
105 mInitialOutputDir = std::filesystem::current_path().string();
106 mCurrentOutputDir = mInitialOutputDir;
112 FairSystemInfo sysinfo;
113 LOG(info) <<
"TIME-STAMP " << mTimer.RealTime() <<
"\t";
115 LOG(info) <<
"MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
116 << sysinfo.GetMaxMemory() <<
" MB\n";
121 void InitTask() final
123 LOG(info) <<
"INIT HIT MERGER";
124 ROOT::EnableThreadSafety();
126 std::string outfilename(
"o2sim_merged_hits.root");
136 mOutFileName = outfilename.c_str();
138 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
139 mOutTree =
new TTree(
"o2sim",
"o2sim");
140 mOutTree->SetDirectory(mOutFile);
143 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
144 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
147 if (mDetectorInstances.size() == 0) {
155 auto pipeenv = getenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE");
157 mPipeToDriver = atoi(pipeenv);
158 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
160 LOG(warning) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
164 if (mNExpectedEvents == 0) {
166 waitForControlInput();
168 LOG(info) <<
"NOT EXPECTING ANY DATA; SHUTTING DOWN";
174 bool setWorkingDirectory(std::string
const& dir)
176 namespace fs = std::filesystem;
186 fs::current_path(fs::path(mInitialOutputDir));
188 auto absolutePath = fs::absolute(fs::path(dir));
189 if (!fs::exists(absolutePath)) {
190 if (!fs::create_directory(absolutePath)) {
191 LOG(error) <<
"Could not create directory " << absolutePath.string();
196 fs::current_path(absolutePath.string().c_str());
197 mCurrentOutputDir = fs::current_path().string();
199 LOG(info) <<
"FINAL PATH " << mCurrentOutputDir;
200 }
catch (std::exception e) {
201 LOG(error) <<
" could not change path to " << dir;
212 if (!setWorkingDirectory(reconfig.
outputDir)) {
216 std::string outfilename(
"o2sim_merged_hits.root");
218 mNExpectedEvents = reconfig.
nEvents;
219 mOutFileName = outfilename.c_str();
221 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
222 mOutTree =
new TTree(
"o2sim",
"o2sim");
223 mOutTree->SetDirectory(mOutFile);
226 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
227 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
233 mPartsCheckSum.clear();
237 mMCTrackBuffer.clear();
238 mTrackRefBuffer.clear();
239 mSubEventInfoBuffer.clear();
240 mFlushableEvents.clear();
246 template <
typename T,
typename V>
247 V insertAdd(std::map<T, V>&
m, T
const&
key, V
value)
249 const auto iter =
m.find(
key);
251 if (iter !=
m.end()) {
252 iter->second +=
value;
253 accum = iter->second;
261 template <
typename T>
262 bool isDataComplete(T checksum, T nparts)
264 return checksum == nparts * (nparts + 1) / 2;
267 void consumeHits(
int eventID, fair::mq::Parts&
data,
int&
index)
269 auto detIDmessage = std::move(
data.At(
index++));
271 if (detIDmessage->GetSize() == 4) {
272 auto ptr = (
int*)detIDmessage->GetData();
274 LOG(debug2) <<
"I1 " <<
ptr[0] <<
" NAME " <<
id.getName() <<
" MB "
275 <<
data.At(
index)->GetSize() / 1024. / 1024.;
278 auto detector = mDetectorInstances[
id].get();
280 detector->collectHits(eventID,
data,
index);
285 template <
typename T,
typename BT>
286 void consumeData(
int eventID, fair::mq::Parts&
data,
int&
index, BT&
buffer)
288 auto decodeddata = o2::base::decodeTMessage<T*>(
data,
index);
290 buffer[eventID] =
typename BT::mapped_type();
292 buffer[eventID].push_back(decodeddata);
302 if (mSubEventInfoBuffer.find(info.
eventID) == mSubEventInfoBuffer.end()) {
303 mSubEventInfoBuffer[info.
eventID] = std::list<o2::data::SubEventInfo*>();
305 mSubEventInfoBuffer[info.
eventID].push_back(&info);
312 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
313 auto channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
314 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
315 LOG(info) <<
"SOCKETNAME " << controlsocketname;
316 channel.Connect(std::string(controlsocketname));
318 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
320 LOG(info) <<
"WAITING FOR INPUT";
321 if (channel.Receive(reply) > 0) {
322 auto data = reply->GetData();
323 auto size = reply->GetSize();
325 std::string command(
reinterpret_cast<char const*
>(
data),
size);
326 LOG(info) <<
"message: " << command;
330 return ReInit(reconfig);
332 LOG(info) <<
"NOTHING RECEIVED";
337 bool ConditionalRun()
override
339 auto& channel = GetChannels().at(
"simdata").at(0);
340 fair::mq::Parts request;
341 auto bytes = channel.Receive(request);
343 LOG(error) <<
"Some error occurred on socket during receive on sim data";
348 auto more = handleSimData(request, 0);
349 LOG(info) <<
"HitMerger processing took " << timer.RealTime();
350 if (!more && mAsService) {
351 LOG(info) <<
" CONTROL ";
359 bool handleSimData(fair::mq::Parts&
data,
int )
361 bool expectmore =
true;
363 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(
data,
index++);
365 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.
eventID, (uint32_t)info.
part);
367 LOG(info) <<
"SIMDATA channel got " <<
data.Size() <<
" parts for event " << info.
eventID <<
" part " << info.
part <<
" out of " << info.
nparts;
369 fillSubEventInfoEntry(info);
370 consumeData<std::vector<o2::MCTrack>>(info.
eventID,
data,
index, mMCTrackBuffer);
371 consumeData<std::vector<o2::TrackReference>>(info.
eventID,
data,
index, mTrackRefBuffer);
376 if (isDataComplete<uint32_t>(accum, info.
nparts)) {
377 LOG(info) <<
"Event " << info.
eventID <<
" complete. Marking as flushable";
378 mFlushableEvents[info.
eventID] =
true;
384 if (!mergingInProgress) {
385 if (mMergerIOThread.joinable()) {
386 mMergerIOThread.join();
389 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
392 mEventChecksum += info.
eventID;
394 if (isDataComplete<uint32_t>(mEventChecksum, info.
maxEvents)) {
395 LOG(info) <<
"ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
398 if (mMergerIOThread.joinable()) {
399 mMergerIOThread.join();
401 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
402 if (mMergerIOThread.joinable()) {
403 mMergerIOThread.join();
409 if (mPipeToDriver != -1) {
410 if (write(mPipeToDriver, &info.
eventID,
sizeof(info.
eventID)) == -1) {
411 LOG(error) <<
"FAILED WRITING TO PIPE";
422 void cleanEvent(
int eventID)
427 template <
typename T>
428 void backInsert(T
const& from, T& to)
430 std::copy(from.begin(), from.end(), std::back_inserter(to));
433 void reorderAndMergeMCTracks(
int eventID, TTree*
target,
const std::vector<int>& nprimaries,
const std::vector<int>& nsubevents, std::function<
void(std::vector<MCTrack>
const&)> tracks_analysis_hook,
o2::dataformats::MCEventHeader const* mceventheader)
436 std::vector<MCTrack>* mcTracksPerSubEvent =
nullptr;
437 auto targetdata = std::make_unique<std::vector<MCTrack>>();
439 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
440 const auto entries = vectorOfSubEventMCTracks.size();
449 nprimTot += nprimaries[
index];
451 for (
int i = 0;
i < nprimaries[
index];
i++) {
452 auto& track = (*vectorOfSubEventMCTracks[
index])[
i];
453 if (track.isTransported()) {
454 track.SetFirstDaughterTrackId(-1);
455 track.SetLastDaughterTrackId(-1);
457 targetdata->push_back(track);
463 Int_t idelta1 = nprimTot;
468 auto& subEventTracks = *(vectorOfSubEventMCTracks[
index]);
470 Int_t npart = (
int)(subEventTracks.size());
471 Int_t nprim = nprimaries[
index];
474 for (Int_t
i = nprim;
i < npart;
i++) {
475 auto& track = subEventTracks[
i];
476 Int_t cId = track.getMotherTrackId();
482 track.SetMotherTrackId(cId);
483 track.SetFirstDaughterTrackId(-1);
485 Int_t hwm = (
int)(targetdata->size());
486 auto& mother = (*targetdata)[cId];
487 if (mother.getFirstDaughterTrackId() == -1) {
488 mother.SetFirstDaughterTrackId(hwm);
490 mother.SetLastDaughterTrackId(hwm);
492 targetdata->push_back(track);
500 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
504 tracks_analysis_hook(*filladdr);
506 if (mWriteToDisc &&
target) {
508 targetbr->SetAddress(&filladdr);
510 targetbr->ResetAddress();
514 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
515 auto& channel = GetChannels().at(
"kineforward").at(0);
517 tmsg->WriteObjectAny((
void*)filladdr, TClass::GetClass(
"std::vector<o2::MCTrack>"));
518 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
520 tmsg->WriteObjectAny((
void*)mceventheader, TClass::GetClass(
"o2::dataformats::MCEventHeader"));
521 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
522 fair::mq::Parts reply;
523 reply.AddPart(std::move(headermessage));
524 reply.AddPart(std::move(trackmessage));
526 LOG(info) <<
"Forward publish MC tracks on channel";
530 for (
auto ptr : vectorOfSubEventMCTracks) {
535 template <
typename T,
typename M>
536 void remapTrackIdsAndMerge(std::string brname,
int eventID, TTree&
target,
537 const std::vector<int>& trackoffsets,
const std::vector<int>& nprimaries,
const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
544 T* incomingdata =
nullptr;
545 std::unique_ptr<T> targetdata(
nullptr);
546 auto& vectorOfT = mapOfVectorOfTs[eventID];
547 const auto entries = vectorOfT.size();
551 incomingdata = vectorOfT[0];
553 targetdata = std::make_unique<T>();
557 nprimTot += nprimaries[
entry];
560 Int_t idelta1 = nprimTot;
563 Int_t nprim = nprimaries[
index];
564 incomingdata = vectorOfT[
index];
566 for (
auto&
data : *incomingdata) {
567 updateTrackIdWithOffset(
data, nprim, idelta0, idelta1);
568 targetdata->push_back(
data);
571 idelta1 += trackoffsets[
index];
574 auto dataaddr = (entries == 1) ? incomingdata : targetdata.
get();
576 targetbr->SetAddress(&dataaddr);
578 targetbr->ResetAddress();
581 for (
auto ptr : vectorOfT) {
586 void updateTrackIdWithOffset(
MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
589 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
595 void updateTrackIdWithOffset(TrackReference&
ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
597 Int_t cId =
ref.getTrackID();
598 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
599 ref.setTrackID(cId + ioffset);
602 void initHitTreeAndOutFile(std::string prefix,
int detID)
605 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
606 LOG(warn) <<
"Hit outfile for detID " <<
DetID::getName(detID) <<
" already initialized --> Reopening";
607 mDetectorOutFiles[detID]->Close();
608 delete mDetectorOutFiles[detID];
612 mDetectorOutFiles[detID] =
new TFile(
name.c_str(),
"RECREATE");
613 mDetectorToTTreeMap[detID] =
new TTree(
"o2sim",
"o2sim");
614 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
616 mDetectorOutFiles[detID] =
nullptr;
617 mDetectorToTTreeMap[detID] =
nullptr;
624 bool mergeAndFlushData()
626 auto checkIfNextFlushable = [
this]() ->
bool {
628 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
631 LOG(info) <<
"Launching merge kernel ";
632 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
636 while (canflush ==
true) {
637 auto flusheventID = mNextFlushID;
638 LOG(info) <<
"Merge and flush event " << flusheventID;
639 auto iter = mSubEventInfoBuffer.find(flusheventID);
640 if (iter == mSubEventInfoBuffer.end()) {
641 LOG(error) <<
"No info/data found for event " << flusheventID;
642 if (!checkIfNextFlushable()) {
647 auto& subEventInfoList = (*iter).second;
648 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
649 LOG(error) <<
"No data entries found for event " << flusheventID;
650 if (!checkIfNextFlushable()) {
662 std::vector<int> trackoffsets;
664 std::vector<int> nprimaries;
666 std::vector<int> nsubevents;
671 for (
auto info : subEventInfoList) {
675 nsubevents.emplace_back(info->
part);
676 if (eventheader ==
nullptr) {
684 if (confref.isFilterOutNoHitEvents()) {
686 LOG(info) <<
" Taking out event " << flusheventID <<
" due to no hits ";
687 cleanEvent(flusheventID);
688 if (!checkIfNextFlushable()) {
700 const auto entries = subEventInfoList.size();
701 std::vector<int> subevOrdered((
int)(nsubevents.size()));
704 printf(
"HitMerger entry: %d nprimry: %5d trackoffset: %5d \n",
entry, nprimaries[
entry], trackoffsets[
entry]);
713 auto mcheaderhook = [eventheader](std::vector<MCTrack>
const&
tracks) {
714 int eta1Point2Counter = 0;
715 int eta1Point0Counter = 0;
716 int eta0Point8Counter = 0;
717 int eta1Point2CounterPi = 0;
718 int eta1Point0CounterPi = 0;
719 int eta0Point8CounterPi = 0;
722 if (tr.isPrimary()) {
724 const auto eta = tr.GetEta();
727 if (std::abs(tr.GetPdgCode()) == 211) {
728 eta1Point2CounterPi++;
733 if (std::abs(tr.GetPdgCode()) == 211) {
734 eta1Point0CounterPi++;
739 if (std::abs(tr.GetPdgCode()) == 211) {
740 eta0Point8CounterPi++;
749 eventheader->
putInfo(
"prims_eta_1.2", eta1Point2Counter);
750 eventheader->
putInfo(
"prims_eta_1.0", eta1Point0Counter);
751 eventheader->
putInfo(
"prims_eta_0.8", eta0Point8Counter);
752 eventheader->
putInfo(
"prims_eta_1.2_pi", eta1Point2CounterPi);
753 eventheader->
putInfo(
"prims_eta_1.0_pi", eta1Point0CounterPi);
754 eventheader->
putInfo(
"prims_eta_0.8_pi", eta0Point8CounterPi);
755 eventheader->
putInfo(
"prims_total", prims);
757 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
761 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>(
"TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
766 headerbr->SetAddress(&eventheader);
768 headerbr->ResetAddress();
773 headerbr->SetAddress(&eventheader);
775 headerbr->ResetAddress();
782 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
783 auto& det = mDetectorInstances[
id];
785 auto hittree = mDetectorToTTreeMap[
id];
787 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
788 hittree->SetEntries(hittree->GetEntries() + 1);
789 LOG(info) <<
"flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
796 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
797 LOG(info) <<
"outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
800 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
801 LOG(info) <<
"mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
804 cleanEvent(flusheventID);
805 LOG(info) <<
"Merge/flush for event " << flusheventID <<
" took " << timer.RealTime();
806 if (!checkIfNextFlushable()) {
810 if (mWriteToDisc && mOutFile) {
811 LOG(info) <<
"Writing TTrees";
812 mOutFile->Write(
"", TObject::kOverwrite);
813 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
814 auto& det = mDetectorInstances[
id];
815 if (det && mDetectorOutFiles[
id]) {
816 mDetectorOutFiles[
id]->Write(
"", TObject::kOverwrite);
819 if (mMCHeaderOnlyOutFile) {
820 mMCHeaderOnlyOutFile->Write(
"", TObject::kOverwrite);
826 std::map<uint32_t, uint32_t> mPartsCheckSum;
827 std::string mOutFileName;
832 TFile* mMCHeaderOnlyOutFile;
833 TTree* mMCHeaderTree;
835 template <
class K,
class V>
836 using Hashtable = tbb::concurrent_unordered_map<K, V>;
837 Hashtable<int, TFile*> mDetectorOutFiles;
838 Hashtable<int, TTree*> mDetectorToTTreeMap;
841 std::thread mMergerIOThread;
842 bool mergingInProgress =
false;
844 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
845 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
846 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
847 Hashtable<int, bool> mFlushableEvents;
849 int mEventChecksum = 0;
850 int mNExpectedEvents = 0;
851 int mNextFlushID = 1;
854 bool mAsService =
false;
855 bool mForwardKine =
true;
856 bool mWriteToDisc =
true;
858 int mPipeToDriver = -1;
860 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
863 std::string mInitialOutputDir;
864 std::string mCurrentOutputDir;
867 fair::mq::Channel mPubChannel;
870 void initDetInstances();
871 void initHitFiles(std::string prefix);