94 class TMessageWrapper :
public TMessage
98 ~TMessageWrapper()
override =
default;
106 mInitialOutputDir = std::filesystem::current_path().string();
107 mCurrentOutputDir = mInitialOutputDir;
113 FairSystemInfo sysinfo;
114 LOG(info) <<
"TIME-STAMP " << mTimer.RealTime() <<
"\t";
116 LOG(info) <<
"MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
117 << sysinfo.GetMaxMemory() <<
" MB\n";
122 void InitTask() final
124 LOG(info) <<
"INIT HIT MERGER";
125 ROOT::EnableThreadSafety();
127 std::string outfilename(
"o2sim_merged_hits.root");
137 mOutFileName = outfilename.c_str();
139 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
140 mOutTree =
new TTree(
"o2sim",
"o2sim");
141 mOutTree->SetDirectory(mOutFile);
144 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
145 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
148 if (mDetectorInstances.size() == 0) {
156 auto pipeenv = getenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE");
158 mPipeToDriver = atoi(pipeenv);
159 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
161 LOG(warning) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
165 if (mNExpectedEvents == 0) {
167 waitForControlInput();
169 LOG(info) <<
"NOT EXPECTING ANY DATA; SHUTTING DOWN";
175 bool setWorkingDirectory(std::string
const& dir)
177 namespace fs = std::filesystem;
187 fs::current_path(fs::path(mInitialOutputDir));
189 auto absolutePath = fs::absolute(fs::path(dir));
190 if (!fs::exists(absolutePath)) {
191 if (!fs::create_directory(absolutePath)) {
192 LOG(error) <<
"Could not create directory " << absolutePath.string();
197 fs::current_path(absolutePath.string().c_str());
198 mCurrentOutputDir = fs::current_path().string();
200 LOG(info) <<
"FINAL PATH " << mCurrentOutputDir;
201 }
catch (std::exception e) {
202 LOG(error) <<
" could not change path to " << dir;
213 if (!setWorkingDirectory(reconfig.
outputDir)) {
217 std::string outfilename(
"o2sim_merged_hits.root");
219 mNExpectedEvents = reconfig.
nEvents;
220 mOutFileName = outfilename.c_str();
222 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
223 mOutTree =
new TTree(
"o2sim",
"o2sim");
224 mOutTree->SetDirectory(mOutFile);
227 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
228 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
234 mPartsCheckSum.clear();
238 mMCTrackBuffer.clear();
239 mTrackRefBuffer.clear();
240 mSubEventInfoBuffer.clear();
241 mFlushableEvents.clear();
247 template <
typename T,
typename V>
248 V insertAdd(std::map<T, V>&
m, T
const&
key, V
value)
250 const auto iter =
m.find(
key);
252 if (iter !=
m.end()) {
253 iter->second +=
value;
254 accum = iter->second;
262 template <
typename T>
263 bool isDataComplete(T checksum, T nparts)
265 return checksum == nparts * (nparts + 1) / 2;
268 void consumeHits(
int eventID, fair::mq::Parts&
data,
int&
index)
270 auto detIDmessage = std::move(
data.At(
index++));
272 if (detIDmessage->GetSize() == 4) {
273 auto ptr = (
int*)detIDmessage->GetData();
275 LOG(debug2) <<
"I1 " <<
ptr[0] <<
" NAME " <<
id.getName() <<
" MB "
276 <<
data.At(
index)->GetSize() / 1024. / 1024.;
279 auto detector = mDetectorInstances[
id].get();
281 detector->collectHits(eventID,
data,
index);
286 template <
typename T,
typename BT>
287 void consumeData(
int eventID, fair::mq::Parts&
data,
int&
index, BT&
buffer)
289 auto decodeddata = o2::base::decodeTMessage<T*>(
data,
index);
291 buffer[eventID] =
typename BT::mapped_type();
293 buffer[eventID].push_back(decodeddata);
303 if (mSubEventInfoBuffer.find(info.
eventID) == mSubEventInfoBuffer.end()) {
304 mSubEventInfoBuffer[info.
eventID] = std::list<o2::data::SubEventInfo*>();
306 mSubEventInfoBuffer[info.
eventID].push_back(&info);
313 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
314 auto channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
315 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
316 LOG(info) <<
"SOCKETNAME " << controlsocketname;
317 channel.Connect(std::string(controlsocketname));
319 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
321 LOG(info) <<
"WAITING FOR INPUT";
322 if (channel.Receive(reply) > 0) {
323 auto data = reply->GetData();
324 auto size = reply->GetSize();
326 std::string command(
reinterpret_cast<char const*
>(
data),
size);
327 LOG(info) <<
"message: " << command;
331 return ReInit(reconfig);
333 LOG(info) <<
"NOTHING RECEIVED";
338 bool ConditionalRun()
override
340 auto& channel = GetChannels().at(
"simdata").at(0);
341 fair::mq::Parts request;
342 auto bytes = channel.Receive(request);
344 LOG(error) <<
"Some error occurred on socket during receive on sim data";
349 auto more = handleSimData(request, 0);
350 LOG(info) <<
"HitMerger processing took " << timer.RealTime();
351 if (!more && mAsService) {
352 LOG(info) <<
" CONTROL ";
360 bool handleSimData(fair::mq::Parts&
data,
int )
362 bool expectmore =
true;
364 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(
data,
index++);
366 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.
eventID, (uint32_t)info.
part);
368 LOG(info) <<
"SIMDATA channel got " <<
data.Size() <<
" parts for event " << info.
eventID <<
" part " << info.
part <<
" out of " << info.
nparts;
370 fillSubEventInfoEntry(info);
371 consumeData<std::vector<o2::MCTrack>>(info.
eventID,
data,
index, mMCTrackBuffer);
372 consumeData<std::vector<o2::TrackReference>>(info.
eventID,
data,
index, mTrackRefBuffer);
377 if (isDataComplete<uint32_t>(accum, info.
nparts)) {
378 LOG(info) <<
"Event " << info.
eventID <<
" complete. Marking as flushable";
379 mFlushableEvents[info.
eventID] =
true;
385 if (!mergingInProgress) {
386 if (mMergerIOThread.joinable()) {
387 mMergerIOThread.join();
390 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
393 mEventChecksum += info.
eventID;
395 if (isDataComplete<uint32_t>(mEventChecksum, info.
maxEvents)) {
396 LOG(info) <<
"ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
399 if (mMergerIOThread.joinable()) {
400 mMergerIOThread.join();
402 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
403 if (mMergerIOThread.joinable()) {
404 mMergerIOThread.join();
410 if (mPipeToDriver != -1) {
411 if (write(mPipeToDriver, &info.
eventID,
sizeof(info.
eventID)) == -1) {
412 LOG(error) <<
"FAILED WRITING TO PIPE";
423 void cleanEvent(
int eventID)
428 template <
typename T>
429 void backInsert(T
const& from, T& to)
431 std::copy(from.begin(), from.end(), std::back_inserter(to));
434 void reorderAndMergeMCTracks(
int eventID, TTree*
target,
const std::vector<int>& nprimaries,
const std::vector<int>& nsubevents, std::function<
void(std::vector<MCTrack>
const&)> tracks_analysis_hook,
o2::dataformats::MCEventHeader const* mceventheader)
437 std::vector<MCTrack>* mcTracksPerSubEvent =
nullptr;
438 auto targetdata = std::make_unique<std::vector<MCTrack>>();
440 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
441 const auto entries = vectorOfSubEventMCTracks.size();
450 nprimTot += nprimaries[
index];
452 for (
int i = 0;
i < nprimaries[
index];
i++) {
453 auto& track = (*vectorOfSubEventMCTracks[
index])[
i];
454 if (track.isTransported()) {
455 track.SetFirstDaughterTrackId(-1);
456 track.SetLastDaughterTrackId(-1);
458 targetdata->push_back(track);
464 Int_t idelta1 = nprimTot;
469 auto& subEventTracks = *(vectorOfSubEventMCTracks[
index]);
471 Int_t npart = (
int)(subEventTracks.size());
472 Int_t nprim = nprimaries[
index];
475 for (Int_t
i = nprim;
i < npart;
i++) {
476 auto& track = subEventTracks[
i];
477 Int_t cId = track.getMotherTrackId();
483 track.SetMotherTrackId(cId);
484 track.SetFirstDaughterTrackId(-1);
486 Int_t hwm = (
int)(targetdata->size());
487 auto& mother = (*targetdata)[cId];
488 if (mother.getFirstDaughterTrackId() == -1) {
489 mother.SetFirstDaughterTrackId(hwm);
491 mother.SetLastDaughterTrackId(hwm);
493 targetdata->push_back(track);
501 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
505 tracks_analysis_hook(*filladdr);
507 if (mWriteToDisc &&
target) {
509 targetbr->SetAddress(&filladdr);
511 targetbr->ResetAddress();
515 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
516 auto& channel = GetChannels().at(
"kineforward").at(0);
518 tmsg->WriteObjectAny((
void*)filladdr, TClass::GetClass(
"std::vector<o2::MCTrack>"));
519 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
521 tmsg->WriteObjectAny((
void*)mceventheader, TClass::GetClass(
"o2::dataformats::MCEventHeader"));
522 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
523 fair::mq::Parts reply;
524 reply.AddPart(std::move(headermessage));
525 reply.AddPart(std::move(trackmessage));
527 LOG(info) <<
"Forward publish MC tracks on channel";
531 for (
auto ptr : vectorOfSubEventMCTracks) {
536 template <
typename T,
typename M>
537 void remapTrackIdsAndMerge(std::string brname,
int eventID, TTree&
target,
538 const std::vector<int>& trackoffsets,
const std::vector<int>& nprimaries,
const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
545 T* incomingdata =
nullptr;
546 std::unique_ptr<T> targetdata(
nullptr);
547 auto& vectorOfT = mapOfVectorOfTs[eventID];
548 const auto entries = vectorOfT.size();
552 incomingdata = vectorOfT[0];
554 targetdata = std::make_unique<T>();
558 nprimTot += nprimaries[
entry];
561 Int_t idelta1 = nprimTot;
564 Int_t nprim = nprimaries[
index];
565 incomingdata = vectorOfT[
index];
567 for (
auto&
data : *incomingdata) {
568 updateTrackIdWithOffset(
data, nprim, idelta0, idelta1);
569 targetdata->push_back(
data);
572 idelta1 += trackoffsets[
index];
575 auto dataaddr = (entries == 1) ? incomingdata : targetdata.
get();
577 targetbr->SetAddress(&dataaddr);
579 targetbr->ResetAddress();
582 for (
auto ptr : vectorOfT) {
587 void updateTrackIdWithOffset(
MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
590 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
596 void updateTrackIdWithOffset(TrackReference&
ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
598 Int_t cId =
ref.getTrackID();
599 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
600 ref.setTrackID(cId + ioffset);
603 void initHitTreeAndOutFile(std::string prefix,
int detID)
606 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
607 LOG(warn) <<
"Hit outfile for detID " <<
DetID::getName(detID) <<
" already initialized --> Reopening";
608 mDetectorOutFiles[detID]->Close();
609 delete mDetectorOutFiles[detID];
613 mDetectorOutFiles[detID] =
new TFile(
name.c_str(),
"RECREATE");
614 mDetectorToTTreeMap[detID] =
new TTree(
"o2sim",
"o2sim");
615 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
617 mDetectorOutFiles[detID] =
nullptr;
618 mDetectorToTTreeMap[detID] =
nullptr;
625 bool mergeAndFlushData()
627 auto checkIfNextFlushable = [
this]() ->
bool {
629 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
632 LOG(info) <<
"Launching merge kernel ";
633 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
637 while (canflush ==
true) {
638 auto flusheventID = mNextFlushID;
639 LOG(info) <<
"Merge and flush event " << flusheventID;
640 auto iter = mSubEventInfoBuffer.find(flusheventID);
641 if (iter == mSubEventInfoBuffer.end()) {
642 LOG(error) <<
"No info/data found for event " << flusheventID;
643 if (!checkIfNextFlushable()) {
648 auto& subEventInfoList = (*iter).second;
649 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
650 LOG(error) <<
"No data entries found for event " << flusheventID;
651 if (!checkIfNextFlushable()) {
663 std::vector<int> trackoffsets;
665 std::vector<int> nprimaries;
667 std::vector<int> nsubevents;
672 for (
auto info : subEventInfoList) {
676 nsubevents.emplace_back(info->
part);
677 if (eventheader ==
nullptr) {
685 if (confref.isFilterOutNoHitEvents()) {
687 LOG(info) <<
" Taking out event " << flusheventID <<
" due to no hits ";
688 cleanEvent(flusheventID);
689 if (!checkIfNextFlushable()) {
701 const auto entries = subEventInfoList.size();
702 std::vector<int> subevOrdered((
int)(nsubevents.size()));
705 printf(
"HitMerger entry: %d nprimry: %5d trackoffset: %5d \n",
entry, nprimaries[
entry], trackoffsets[
entry]);
714 auto mcheaderhook = [eventheader](std::vector<MCTrack>
const&
tracks) {
715 int eta1Point2Counter = 0;
716 int eta1Point0Counter = 0;
717 int eta0Point8Counter = 0;
718 int eta1Point2CounterPi = 0;
719 int eta1Point0CounterPi = 0;
720 int eta0Point8CounterPi = 0;
723 if (tr.isPrimary()) {
725 const auto eta = tr.GetEta();
728 if (std::abs(tr.GetPdgCode()) == 211) {
729 eta1Point2CounterPi++;
734 if (std::abs(tr.GetPdgCode()) == 211) {
735 eta1Point0CounterPi++;
740 if (std::abs(tr.GetPdgCode()) == 211) {
741 eta0Point8CounterPi++;
750 eventheader->
putInfo(
"prims_eta_1.2", eta1Point2Counter);
751 eventheader->
putInfo(
"prims_eta_1.0", eta1Point0Counter);
752 eventheader->
putInfo(
"prims_eta_0.8", eta0Point8Counter);
753 eventheader->
putInfo(
"prims_eta_1.2_pi", eta1Point2CounterPi);
754 eventheader->
putInfo(
"prims_eta_1.0_pi", eta1Point0CounterPi);
755 eventheader->
putInfo(
"prims_eta_0.8_pi", eta0Point8CounterPi);
756 eventheader->
putInfo(
"prims_total", prims);
758 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
762 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>(
"TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
767 headerbr->SetAddress(&eventheader);
769 headerbr->ResetAddress();
774 headerbr->SetAddress(&eventheader);
776 headerbr->ResetAddress();
783 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
784 auto& det = mDetectorInstances[
id];
786 auto hittree = mDetectorToTTreeMap[
id];
788 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
789 hittree->SetEntries(hittree->GetEntries() + 1);
790 LOG(info) <<
"flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
797 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
798 LOG(info) <<
"outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
801 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
802 LOG(info) <<
"mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
805 cleanEvent(flusheventID);
806 LOG(info) <<
"Merge/flush for event " << flusheventID <<
" took " << timer.RealTime();
807 if (!checkIfNextFlushable()) {
811 if (mWriteToDisc && mOutFile) {
812 LOG(info) <<
"Writing TTrees";
813 mOutFile->Write(
"", TObject::kOverwrite);
814 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
815 auto& det = mDetectorInstances[
id];
816 if (det && mDetectorOutFiles[
id]) {
817 mDetectorOutFiles[
id]->Write(
"", TObject::kOverwrite);
820 if (mMCHeaderOnlyOutFile) {
821 mMCHeaderOnlyOutFile->Write(
"", TObject::kOverwrite);
827 std::map<uint32_t, uint32_t> mPartsCheckSum;
828 std::string mOutFileName;
833 TFile* mMCHeaderOnlyOutFile;
834 TTree* mMCHeaderTree;
836 template <
class K,
class V>
837 using Hashtable = tbb::concurrent_unordered_map<K, V>;
838 Hashtable<int, TFile*> mDetectorOutFiles;
839 Hashtable<int, TTree*> mDetectorToTTreeMap;
842 std::thread mMergerIOThread;
843 bool mergingInProgress =
false;
845 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
846 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
847 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
848 Hashtable<int, bool> mFlushableEvents;
850 int mEventChecksum = 0;
851 int mNExpectedEvents = 0;
852 int mNextFlushID = 1;
855 bool mAsService =
false;
856 bool mForwardKine =
true;
857 bool mWriteToDisc =
true;
859 int mPipeToDriver = -1;
861 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
864 std::string mInitialOutputDir;
865 std::string mCurrentOutputDir;
868 fair::mq::Channel mPubChannel;
871 void initDetInstances();
872 void initHitFiles(std::string prefix);