Project
Loading...
Searching...
No Matches
DebugStreamer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#if !defined(GPUCA_GPUCODE) && !defined(GPUCA_STANDALONE)
14#include <thread>
15#include <fmt/format.h>
16#include "TROOT.h"
17#include "TKey.h"
18#include <random>
19#include "Framework/Logger.h"
20#endif
21
23
24#if !defined(GPUCA_GPUCODE) && !defined(GPUCA_STANDALONE) && defined(DEBUG_STREAMER)
25
26o2::utils::DebugStreamer::DebugStreamer()
27{
28 ROOT::EnableThreadSafety();
29}
30
31void o2::utils::DebugStreamer::setStreamer(const char* outFile, const char* option, const size_t id)
32{
33 if (!isStreamerSet(id)) {
34 mTreeStreamer[id] = std::make_unique<o2::utils::TreeStreamRedirector>(fmt::format("{}_{}.root", outFile, id).data(), option);
35 }
36}
37
38o2::utils::TreeStreamRedirector& o2::utils::DebugStreamer::getStreamer(const char* outFile, const char* option, const size_t id)
39{
40 setStreamer(outFile, option, id);
41 return getStreamer(id);
42}
43
44void o2::utils::DebugStreamer::flush(const size_t id)
45{
46 if (isStreamerSet(id)) {
47 mTreeStreamer[id].reset();
48 }
49}
50
51void o2::utils::DebugStreamer::flush()
52{
53 for (const auto& pair : mTreeStreamer) {
54 flush(pair.first);
55 }
56}
57
58bool o2::utils::DebugStreamer::checkStream(const StreamFlags streamFlag, const size_t samplingID, const float weight)
59{
60 const bool isStreamerSet = ((getStreamFlags() & streamFlag) == streamFlag);
61 if (!isStreamerSet) {
62 return false;
63 }
64
65 // check sampling frequency
66 const auto sampling = getSamplingTypeFrequency(streamFlag);
67 if (sampling.first != SamplingTypes::sampleAll) {
68 auto sampleTrack = [&]() {
69 if (samplingID == -1) {
70 LOGP(fatal, "Sampling type sampleID not supported for stream flag {}", (int)streamFlag);
71 }
72 // sample on samplingID (e.g. track level)
73 static thread_local std::unordered_map<StreamFlags, std::pair<size_t, bool>> idMap;
74 // in case of first call samplingID in idMap is 0 and always false and first ID rejected
75 if (idMap[streamFlag].first != samplingID) {
76 idMap[streamFlag] = std::pair<size_t, bool>{samplingID, (getRandom() < sampling.second)};
77 }
78 return idMap[streamFlag].second;
79 };
80
81 if (sampling.first == SamplingTypes::sampleRandom) {
82 // just sample randomly
83 return (getRandom() < sampling.second);
84 } else if (sampling.first == SamplingTypes::sampleID) {
85 return sampleTrack();
86 } else if (sampling.first == SamplingTypes::sampleIDGlobal) {
87 // this contains for each flag the processed track IDs and stores if it was processed or not
88 static tbb::concurrent_unordered_map<int, tbb::concurrent_unordered_map<size_t, bool>> refIDs;
90
91 // check if refIDs contains track ID
92 auto it = refIDs[index].find(samplingID);
93 if (it != refIDs[index].end()) {
94 // in case it is present get stored decission
95 return it->second;
96 } else {
97 // in case it is not present sample random decission
98 const bool storeTrk = sampleTrack();
99 refIDs[index][samplingID] = storeTrk;
100 return storeTrk;
101 }
102 } else if (sampling.first == SamplingTypes::sampleWeights) {
103 // sample with weight
104 return (weight * getRandom() < sampling.second);
105 }
106 }
107 return true;
108}
109
110float o2::utils::DebugStreamer::getRandom(float min, float max)
111{
112 // init random number generator for each thread
113 static thread_local std::mt19937 generator(std::random_device{}());
114 std::uniform_real_distribution<> distr(min, max);
115 const float rnd = distr(generator);
116 return rnd;
117}
118
119int o2::utils::DebugStreamer::getIndex(const StreamFlags streamFlag)
120{
121 // see: https://stackoverflow.com/a/71539401
122 uint32_t v = streamFlag;
123 v -= 1;
124 v = v - ((v >> 1) & 0x55555555);
125 v = (v & 0x33333333) + ((v >> 2) & 0x33333333);
126 const uint32_t ind = (((v + (v >> 4) & 0xF0F0F0F) * 0x1010101) >> 24);
127 return ind;
128}
129
130std::pair<o2::utils::SamplingTypes, float> o2::utils::DebugStreamer::getSamplingTypeFrequency(const StreamFlags streamFlag)
131{
132 const int ind = getIndex(streamFlag);
133 return std::pair<o2::utils::SamplingTypes, float>{ParameterDebugStreamer::Instance().samplingType[ind], ParameterDebugStreamer::Instance().samplingFrequency[ind]};
134}
135
136std::string o2::utils::DebugStreamer::getUniqueTreeName(const char* tree, const size_t id) const { return fmt::format("{}_{}", tree, getNTrees(id)); }
137
138size_t o2::utils::DebugStreamer::getCPUID() { return std::hash<std::thread::id>{}(std::this_thread::get_id()); }
139
140o2::utils::TreeStreamRedirector* o2::utils::DebugStreamer::getStreamerPtr(const size_t id) const
141{
142 auto it = mTreeStreamer.find(id);
143 if (it != mTreeStreamer.end()) {
144 return (it->second).get();
145 } else {
146 return nullptr;
147 }
148}
149
150int o2::utils::DebugStreamer::getNTrees(const size_t id) const { return isStreamerSet(id) ? getStreamerPtr(id)->GetFile()->GetListOfKeys()->GetEntries() : -1; }
151
152void o2::utils::DebugStreamer::mergeTrees(const char* inpFile, const char* outFile, const char* option)
153{
154 TFile fInp(inpFile, "READ");
155 std::unordered_map<std::string, TList> lists;
156 for (TObject* keyAsObj : *fInp.GetListOfKeys()) {
157 const auto key = dynamic_cast<TKey*>(keyAsObj);
158 TTree* tree = (TTree*)fInp.Get(key->GetName());
159 // perform simple check on the number of entries to merge only TTree with same content (ToDo: Do check on name of branches)
160 const int entries = tree->GetListOfBranches()->GetEntries();
161 const std::string brName = key->GetName();
162 const std::string nameBr = brName.substr(0, brName.find_last_of("_"));
163 lists[nameBr].Add(tree);
164 }
165
166 TFile fOut(outFile, "RECREATE");
167 for (auto& list : lists) {
168 auto tree = TTree::MergeTrees(&list.second, option);
169 fOut.WriteObject(tree, list.first.data());
170 }
171}
172
173void o2::utils::DebugStreamer::enableStream(const StreamFlags streamFlag)
174{
175 StreamFlags streamlevel = getStreamFlags();
176 streamlevel = streamFlag | streamlevel;
177 setStreamFlags(streamlevel);
178}
179
180void o2::utils::DebugStreamer::disableStream(const StreamFlags streamFlag)
181{
182 StreamFlags streamlevel = getStreamFlags();
183 streamlevel = (~streamFlag) & streamlevel;
184 setStreamFlags(streamlevel);
185}
186
187#endif
#define O2ParamImpl(classname)
Definition of class for writing debug informations.
StringRef key
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint index
Definition glcorearb.h:781
GLuint GLuint GLfloat weight
Definition glcorearb.h:5477
GLboolean * data
Definition glcorearb.h:298
GLuint id
Definition glcorearb.h:650
GLuint * lists
Definition glcorearb.h:4942
constexpr auto getIndex(const container_T &container, typename container_T::const_iterator iter) -> typename container_T::source_type
Definition algorithm.h:65
@ sampleIDGlobal
in case different streamers have access to the same IDs use this gloabl ID
@ sampleAll
use all data (default)
@ sampleID
sample every n IDs (per example track)
@ sampleWeights
perform sampling on weights, defined where the streamer is called
@ sampleRandom
sample randomly every n points
StreamFlags
struct defining the flags which can be used to check if a certain debug streamer is used
void * data
Definition list.h:33
Definition list.h:40
struct list_node * first
Definition list.h:42
struct for setting and storing the streamer level
int sampleIDGlobal[StreamFlags::streamFlagsCount]
storage of reference streamer used for sampleIDFromOtherStreamer
float samplingFrequency[StreamFlags::streamFlagsCount]
frequency which is used for the sampling (0.1 -> 10% is written if sampling is used)
SamplingTypes samplingType[StreamFlags::streamFlagsCount]
flag to store what will be streamed
constexpr size_t min
constexpr size_t max
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::uniform_int_distribution< unsigned long long > distr