117 class TMessageWrapper :
public TMessage
121 ~TMessageWrapper()
override =
default;
129 mInitialOutputDir = std::filesystem::current_path().string();
130 mCurrentOutputDir = mInitialOutputDir;
136 FairSystemInfo sysinfo;
137 LOG(info) <<
"TIME-STAMP " << mTimer.RealTime() <<
"\t";
139 LOG(info) <<
"MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
140 << sysinfo.GetMaxMemory() <<
" MB\n";
145 void InitTask() final
147 LOG(info) <<
"INIT HIT MERGER";
148 ROOT::EnableThreadSafety();
150 std::string outfilename(
"o2sim_merged_hits.root");
157 LOG(fatal) <<
"No configuration received. Aborting";
163 mOutFileName = outfilename.c_str();
165 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
166 mOutTree =
new TTree(
"o2sim",
"o2sim");
167 mOutTree->SetDirectory(mOutFile);
170 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
171 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
174 if (mDetectorInstances.size() == 0) {
182 auto pipeenv = getenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE");
184 mPipeToDriver = atoi(pipeenv);
185 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
187 LOG(warning) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
191 if (mNExpectedEvents == 0) {
193 waitForControlInput();
195 LOG(info) <<
"NOT EXPECTING ANY DATA; SHUTTING DOWN";
201 bool setWorkingDirectory(std::string
const& dir)
203 namespace fs = std::filesystem;
213 fs::current_path(fs::path(mInitialOutputDir));
215 auto absolutePath = fs::absolute(fs::path(dir));
216 if (!fs::exists(absolutePath)) {
217 if (!fs::create_directory(absolutePath)) {
218 LOG(error) <<
"Could not create directory " << absolutePath.string();
223 fs::current_path(absolutePath.string().c_str());
224 mCurrentOutputDir = fs::current_path().string();
226 LOG(info) <<
"FINAL PATH " << mCurrentOutputDir;
227 }
catch (std::exception e) {
228 LOG(error) <<
" could not change path to " << dir;
239 if (!setWorkingDirectory(reconfig.
outputDir)) {
243 std::string outfilename(
"o2sim_merged_hits.root");
245 mNExpectedEvents = reconfig.
nEvents;
246 mOutFileName = outfilename.c_str();
248 mOutFile =
new TFile(outfilename.c_str(),
"RECREATE");
249 mOutTree =
new TTree(
"o2sim",
"o2sim");
250 mOutTree->SetDirectory(mOutFile);
253 mMCHeaderTree =
new TTree(
"o2sim",
"o2sim");
254 mMCHeaderTree->SetDirectory(mMCHeaderOnlyOutFile);
260 mPartsCheckSum.clear();
264 mMCTrackBuffer.clear();
265 mTrackRefBuffer.clear();
266 mSubEventInfoBuffer.clear();
267 mFlushableEvents.clear();
273 template <
typename T,
typename V>
274 V insertAdd(std::map<T, V>&
m, T
const&
key, V
value)
276 const auto iter =
m.find(
key);
278 if (iter !=
m.end()) {
279 iter->second +=
value;
280 accum = iter->second;
288 template <
typename T>
289 bool isDataComplete(T checksum, T nparts)
291 return checksum == nparts * (nparts + 1) / 2;
294 void consumeHits(
int eventID, fair::mq::Parts&
data,
int&
index)
296 auto detIDmessage = std::move(
data.At(
index++));
298 if (detIDmessage->GetSize() == 4) {
299 auto ptr = (
int*)detIDmessage->GetData();
301 LOG(debug2) <<
"I1 " <<
ptr[0] <<
" NAME " <<
id.getName() <<
" MB "
302 <<
data.At(
index)->GetSize() / 1024. / 1024.;
305 auto detector = mDetectorInstances[
id].get();
307 detector->collectHits(eventID,
data,
index);
312 template <
typename T,
typename BT>
313 void consumeData(
int eventID, fair::mq::Parts&
data,
int&
index, BT&
buffer)
315 auto decodeddata = o2::base::decodeTMessage<T*>(
data,
index);
317 buffer[eventID] =
typename BT::mapped_type();
319 buffer[eventID].push_back(decodeddata);
329 if (mSubEventInfoBuffer.find(info.
eventID) == mSubEventInfoBuffer.end()) {
330 mSubEventInfoBuffer[info.
eventID] = std::list<o2::data::SubEventInfo*>();
332 mSubEventInfoBuffer[info.
eventID].push_back(&info);
339 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
340 auto channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
341 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
342 LOG(info) <<
"SOCKETNAME " << controlsocketname;
343 channel.Connect(std::string(controlsocketname));
345 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
347 LOG(info) <<
"WAITING FOR INPUT";
348 if (channel.Receive(reply) > 0) {
349 auto data = reply->GetData();
350 auto size = reply->GetSize();
352 std::string command(
reinterpret_cast<char const*
>(
data),
size);
353 LOG(info) <<
"message: " << command;
357 return ReInit(reconfig);
359 LOG(info) <<
"NOTHING RECEIVED";
364 bool ConditionalRun()
override
366 auto& channel = GetChannels().at(
"simdata").at(0);
367 fair::mq::Parts request;
368 auto bytes = channel.Receive(request);
370 LOG(error) <<
"Some error occurred on socket during receive on sim data";
375 auto more = handleSimData(request, 0);
376 LOG(info) <<
"HitMerger processing took " << timer.RealTime();
377 if (!more && mAsService) {
378 LOG(info) <<
" CONTROL ";
384 static bool initAcknowledged =
false;
385 if (!initAcknowledged) {
387 initAcknowledged =
true;
393 bool handleSimData(fair::mq::Parts&
data,
int )
395 bool expectmore =
true;
397 auto infoptr = o2::base::decodeTMessage<o2::data::SubEventInfo*>(
data,
index++);
399 auto accum = insertAdd<uint32_t, uint32_t>(mPartsCheckSum, info.
eventID, (uint32_t)info.
part);
401 LOG(info) <<
"SIMDATA channel got " <<
data.Size() <<
" parts for event " << info.
eventID <<
" part " << info.
part <<
" out of " << info.
nparts;
403 fillSubEventInfoEntry(info);
404 consumeData<std::vector<o2::MCTrack>>(info.
eventID,
data,
index, mMCTrackBuffer);
405 consumeData<std::vector<o2::TrackReference>>(info.
eventID,
data,
index, mTrackRefBuffer);
410 if (isDataComplete<uint32_t>(accum, info.
nparts)) {
411 LOG(info) <<
"Event " << info.
eventID <<
" complete. Marking as flushable";
412 mFlushableEvents[info.
eventID] =
true;
418 if (!mergingInProgress) {
419 if (mMergerIOThread.joinable()) {
420 mMergerIOThread.join();
423 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
426 mEventChecksum += info.
eventID;
428 if (isDataComplete<uint32_t>(mEventChecksum, info.
maxEvents)) {
429 LOG(info) <<
"ALL EVENTS HERE; CHECKSUM " << mEventChecksum;
432 if (mMergerIOThread.joinable()) {
433 mMergerIOThread.join();
435 mMergerIOThread = std::thread([info,
this]() { mergingInProgress =
true; mergeAndFlushData(); mergingInProgress =
false; });
436 if (mMergerIOThread.joinable()) {
437 mMergerIOThread.join();
443 if (mPipeToDriver != -1) {
444 if (write(mPipeToDriver, &info.
eventID,
sizeof(info.
eventID)) == -1) {
445 LOG(error) <<
"FAILED WRITING TO PIPE";
452 void cleanEvent(
int eventID)
457 template <
typename T>
458 void backInsert(T
const& from, T& to)
460 std::copy(from.begin(), from.end(), std::back_inserter(to));
463 void reorderAndMergeMCTracks(
int eventID, TTree*
target,
const std::vector<int>& nprimaries,
const std::vector<int>& nsubevents, std::function<
void(std::vector<MCTrack>
const&)> tracks_analysis_hook,
o2::dataformats::MCEventHeader const* mceventheader)
466 std::vector<MCTrack>* mcTracksPerSubEvent =
nullptr;
467 auto targetdata = std::make_unique<std::vector<MCTrack>>();
469 auto& vectorOfSubEventMCTracks = mMCTrackBuffer[eventID];
470 const auto entries = vectorOfSubEventMCTracks.size();
479 nprimTot += nprimaries[
index];
481 for (
int i = 0;
i < nprimaries[
index];
i++) {
482 auto& track = (*vectorOfSubEventMCTracks[
index])[
i];
483 if (track.isTransported()) {
484 track.SetFirstDaughterTrackId(-1);
485 track.SetLastDaughterTrackId(-1);
487 targetdata->push_back(track);
493 Int_t idelta1 = nprimTot;
498 auto& subEventTracks = *(vectorOfSubEventMCTracks[
index]);
500 Int_t npart = (
int)(subEventTracks.size());
501 Int_t nprim = nprimaries[
index];
504 for (Int_t
i = nprim;
i < npart;
i++) {
505 auto& track = subEventTracks[
i];
506 Int_t cId = track.getMotherTrackId();
512 track.SetMotherTrackId(cId);
513 track.SetFirstDaughterTrackId(-1);
515 Int_t hwm = (
int)(targetdata->size());
516 auto& mother = (*targetdata)[cId];
517 if (mother.getFirstDaughterTrackId() == -1) {
518 mother.SetFirstDaughterTrackId(hwm);
520 mother.SetLastDaughterTrackId(hwm);
522 targetdata->push_back(track);
530 auto filladdr = (entries > 1) ? targetdata.get() : vectorOfSubEventMCTracks[0];
534 tracks_analysis_hook(*filladdr);
536 if (mWriteToDisc &&
target) {
538 targetbr->SetAddress(&filladdr);
540 targetbr->ResetAddress();
544 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
545 auto& channel = GetChannels().at(
"kineforward").at(0);
547 tmsg->WriteObjectAny((
void*)filladdr, TClass::GetClass(
"std::vector<o2::MCTrack>"));
548 std::unique_ptr<fair::mq::Message> trackmessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
550 tmsg->WriteObjectAny((
void*)mceventheader, TClass::GetClass(
"o2::dataformats::MCEventHeader"));
551 std::unique_ptr<fair::mq::Message> headermessage(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
552 fair::mq::Parts reply;
553 reply.AddPart(std::move(headermessage));
554 reply.AddPart(std::move(trackmessage));
556 LOG(info) <<
"Forward publish MC tracks on channel";
560 for (
auto ptr : vectorOfSubEventMCTracks) {
565 template <
typename T,
typename M>
566 void remapTrackIdsAndMerge(std::string brname,
int eventID, TTree&
target,
567 const std::vector<int>& trackoffsets,
const std::vector<int>& nprimaries,
const std::vector<int>& subevOrdered, M& mapOfVectorOfTs)
574 T* incomingdata =
nullptr;
575 std::unique_ptr<T> targetdata(
nullptr);
576 auto& vectorOfT = mapOfVectorOfTs[eventID];
577 const auto entries = vectorOfT.size();
581 incomingdata = vectorOfT[0];
583 targetdata = std::make_unique<T>();
587 nprimTot += nprimaries[
entry];
590 Int_t idelta1 = nprimTot;
593 Int_t nprim = nprimaries[
index];
594 incomingdata = vectorOfT[
index];
596 for (
auto&
data : *incomingdata) {
597 updateTrackIdWithOffset(
data, nprim, idelta0, idelta1);
598 targetdata->push_back(
data);
601 idelta1 += trackoffsets[
index];
604 auto dataaddr = (entries == 1) ? incomingdata : targetdata.
get();
606 targetbr->SetAddress(&dataaddr);
608 targetbr->ResetAddress();
611 for (
auto ptr : vectorOfT) {
616 void updateTrackIdWithOffset(
MCTrack& track, Int_t nprim, Int_t idelta0, Int_t idelta1)
619 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
625 void updateTrackIdWithOffset(TrackReference&
ref, Int_t nprim, Int_t idelta0, Int_t idelta1)
627 Int_t cId =
ref.getTrackID();
628 Int_t ioffset = (cId < nprim) ? idelta0 : idelta1;
629 ref.setTrackID(cId + ioffset);
632 void initHitTreeAndOutFile(std::string prefix,
int detID)
635 if (mDetectorOutFiles.find(detID) != mDetectorOutFiles.end() && mDetectorOutFiles[detID]) {
636 LOG(warn) <<
"Hit outfile for detID " <<
DetID::getName(detID) <<
" already initialized --> Reopening";
637 mDetectorOutFiles[detID]->Close();
638 delete mDetectorOutFiles[detID];
642 mDetectorOutFiles[detID] =
new TFile(
name.c_str(),
"RECREATE");
643 mDetectorToTTreeMap[detID] =
new TTree(
"o2sim",
"o2sim");
644 mDetectorToTTreeMap[detID]->SetDirectory(mDetectorOutFiles[detID]);
646 mDetectorOutFiles[detID] =
nullptr;
647 mDetectorToTTreeMap[detID] =
nullptr;
654 bool mergeAndFlushData()
656 auto checkIfNextFlushable = [
this]() ->
bool {
658 return mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
661 LOG(info) <<
"Launching merge kernel ";
662 bool canflush = mFlushableEvents.find(mNextFlushID) != mFlushableEvents.end() && mFlushableEvents[mNextFlushID] ==
true;
666 while (canflush ==
true) {
667 auto flusheventID = mNextFlushID;
668 LOG(info) <<
"Merge and flush event " << flusheventID;
669 auto iter = mSubEventInfoBuffer.find(flusheventID);
670 if (iter == mSubEventInfoBuffer.end()) {
671 LOG(error) <<
"No info/data found for event " << flusheventID;
672 if (!checkIfNextFlushable()) {
677 auto& subEventInfoList = (*iter).second;
678 if (subEventInfoList.size() == 0 || mNExpectedEvents == 0) {
679 LOG(error) <<
"No data entries found for event " << flusheventID;
680 if (!checkIfNextFlushable()) {
692 std::vector<int> trackoffsets;
694 std::vector<int> nprimaries;
696 std::vector<int> nsubevents;
701 for (
auto info : subEventInfoList) {
705 nsubevents.emplace_back(info->
part);
706 if (eventheader ==
nullptr) {
714 if (confref.isFilterOutNoHitEvents()) {
716 LOG(info) <<
" Taking out event " << flusheventID <<
" due to no hits ";
717 cleanEvent(flusheventID);
718 if (!checkIfNextFlushable()) {
730 const auto entries = subEventInfoList.size();
731 std::vector<int> subevOrdered((
int)(nsubevents.size()));
734 printf(
"HitMerger entry: %d nprimry: %5d trackoffset: %5d \n",
entry, nprimaries[
entry], trackoffsets[
entry]);
743 auto mcheaderhook = [eventheader](std::vector<MCTrack>
const&
tracks) {
744 int eta1Point2Counter = 0;
745 int eta1Point0Counter = 0;
746 int eta0Point8Counter = 0;
747 int eta1Point2CounterPi = 0;
748 int eta1Point0CounterPi = 0;
749 int eta0Point8CounterPi = 0;
752 if (tr.isPrimary()) {
754 const auto eta = tr.GetEta();
757 if (std::abs(tr.GetPdgCode()) == 211) {
758 eta1Point2CounterPi++;
763 if (std::abs(tr.GetPdgCode()) == 211) {
764 eta1Point0CounterPi++;
769 if (std::abs(tr.GetPdgCode()) == 211) {
770 eta0Point8CounterPi++;
779 eventheader->
putInfo(
"prims_eta_1.2", eta1Point2Counter);
780 eventheader->
putInfo(
"prims_eta_1.0", eta1Point0Counter);
781 eventheader->
putInfo(
"prims_eta_0.8", eta0Point8Counter);
782 eventheader->
putInfo(
"prims_eta_1.2_pi", eta1Point2CounterPi);
783 eventheader->
putInfo(
"prims_eta_1.0_pi", eta1Point0CounterPi);
784 eventheader->
putInfo(
"prims_eta_0.8_pi", eta0Point8CounterPi);
785 eventheader->
putInfo(
"prims_total", prims);
787 reorderAndMergeMCTracks(flusheventID, mOutTree, nprimaries, subevOrdered, mcheaderhook, eventheader);
791 remapTrackIdsAndMerge<std::vector<o2::TrackReference>>(
"TrackRefs", flusheventID, *mOutTree, trackoffsets, nprimaries, subevOrdered, mTrackRefBuffer);
796 headerbr->SetAddress(&eventheader);
798 headerbr->ResetAddress();
803 headerbr->SetAddress(&eventheader);
805 headerbr->ResetAddress();
812 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
813 auto& det = mDetectorInstances[
id];
815 auto hittree = mDetectorToTTreeMap[
id];
817 det->mergeHitEntriesAndFlush(flusheventID, *hittree, trackoffsets, nprimaries, subevOrdered);
818 hittree->SetEntries(hittree->GetEntries() + 1);
819 LOG(info) <<
"flushing tree to file " << hittree->GetDirectory()->GetFile()->GetName();
826 mOutTree->SetEntries(mOutTree->GetEntries() + 1);
827 LOG(info) <<
"outtree has file " << mOutTree->GetDirectory()->GetFile()->GetName();
830 mMCHeaderTree->SetEntries(mMCHeaderTree->GetEntries() + 1);
831 LOG(info) <<
"mc header outtree has file " << mMCHeaderTree->GetDirectory()->GetFile()->GetName();
834 cleanEvent(flusheventID);
835 LOG(info) <<
"Merge/flush for event " << flusheventID <<
" took " << timer.RealTime();
836 if (!checkIfNextFlushable()) {
840 if (mWriteToDisc && mOutFile) {
841 LOG(info) <<
"Writing TTrees";
842 mOutFile->Write(
"", TObject::kOverwrite);
843 for (
int id = 0;
id < mDetectorInstances.size(); ++
id) {
844 auto& det = mDetectorInstances[
id];
845 if (det && mDetectorOutFiles[
id]) {
846 mDetectorOutFiles[
id]->Write(
"", TObject::kOverwrite);
849 if (mMCHeaderOnlyOutFile) {
850 mMCHeaderOnlyOutFile->Write(
"", TObject::kOverwrite);
856 std::map<uint32_t, uint32_t> mPartsCheckSum;
857 std::string mOutFileName;
862 TFile* mMCHeaderOnlyOutFile;
863 TTree* mMCHeaderTree;
865 template <
class K,
class V>
866 using Hashtable = tbb::concurrent_unordered_map<K, V>;
867 Hashtable<int, TFile*> mDetectorOutFiles;
868 Hashtable<int, TTree*> mDetectorToTTreeMap;
871 std::thread mMergerIOThread;
872 bool mergingInProgress =
false;
874 Hashtable<int, std::vector<std::vector<o2::MCTrack>*>> mMCTrackBuffer;
875 Hashtable<int, std::vector<std::vector<o2::TrackReference>*>> mTrackRefBuffer;
876 Hashtable<int, std::list<o2::data::SubEventInfo*>> mSubEventInfoBuffer;
877 Hashtable<int, bool> mFlushableEvents;
879 int mEventChecksum = 0;
880 int mNExpectedEvents = 0;
881 int mNextFlushID = 1;
884 bool mAsService =
false;
885 bool mForwardKine =
true;
886 bool mWriteToDisc =
true;
888 int mPipeToDriver = -1;
890 std::vector<std::unique_ptr<o2::base::Detector>> mDetectorInstances;
893 std::string mInitialOutputDir;
894 std::string mCurrentOutputDir;
897 fair::mq::Channel mPubChannel;
900 void initDetInstances();
901 void initHitFiles(std::string prefix);