Project
Loading...
Searching...
No Matches
DebugStreamer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <thread>
14#include <fmt/format.h>
15#include "TROOT.h"
16#include "TKey.h"
17#include <random>
18#include "Framework/Logger.h"
19
21
22#if defined(DEBUG_STREAMER)
23
24o2::utils::DebugStreamer::DebugStreamer()
25{
26 ROOT::EnableThreadSafety();
27}
28
29void o2::utils::DebugStreamer::setStreamer(const char* outFile, const char* option, const size_t id)
30{
31 if (!isStreamerSet(id)) {
32 mTreeStreamer[id] = std::make_unique<o2::utils::TreeStreamRedirector>(fmt::format("{}_{}.root", outFile, id).data(), option);
33 }
34}
35
36o2::utils::TreeStreamRedirector& o2::utils::DebugStreamer::getStreamer(const char* outFile, const char* option, const size_t id)
37{
38 setStreamer(outFile, option, id);
39 return getStreamer(id);
40}
41
42void o2::utils::DebugStreamer::flush(const size_t id)
43{
44 if (isStreamerSet(id)) {
45 mTreeStreamer[id].reset();
46 }
47}
48
49void o2::utils::DebugStreamer::flush()
50{
51 for (const auto& pair : mTreeStreamer) {
52 flush(pair.first);
53 }
54}
55
56bool o2::utils::DebugStreamer::checkStream(const StreamFlags streamFlag, const size_t samplingID, const float weight)
57{
58 const bool isStreamerSet = ((getStreamFlags() & streamFlag) == streamFlag);
59 if (!isStreamerSet) {
60 return false;
61 }
62
63 // check sampling frequency
64 const auto sampling = getSamplingTypeFrequency(streamFlag);
65 if (sampling.first != SamplingTypes::sampleAll) {
66 auto sampleTrack = [&]() {
67 if (samplingID == -1) {
68 LOGP(fatal, "Sampling type sampleID not supported for stream flag {}", (int)streamFlag);
69 }
70 // sample on samplingID (e.g. track level)
71 static thread_local std::unordered_map<StreamFlags, std::pair<size_t, bool>> idMap;
72 // in case of first call samplingID in idMap is 0 and always false and first ID rejected
73 if (idMap[streamFlag].first != samplingID) {
74 idMap[streamFlag] = std::pair<size_t, bool>{samplingID, (getRandom() < sampling.second)};
75 }
76 return idMap[streamFlag].second;
77 };
78
79 if (sampling.first == SamplingTypes::sampleRandom) {
80 // just sample randomly
81 return (getRandom() < sampling.second);
82 } else if (sampling.first == SamplingTypes::sampleID) {
83 return sampleTrack();
84 } else if (sampling.first == SamplingTypes::sampleIDGlobal) {
85 // this contains for each flag the processed track IDs and stores if it was processed or not
86 static tbb::concurrent_unordered_map<int, tbb::concurrent_unordered_map<size_t, bool>> refIDs;
88
89 // check if refIDs contains track ID
90 auto it = refIDs[index].find(samplingID);
91 if (it != refIDs[index].end()) {
92 // in case it is present get stored decission
93 return it->second;
94 } else {
95 // in case it is not present sample random decission
96 const bool storeTrk = sampleTrack();
97 refIDs[index][samplingID] = storeTrk;
98 return storeTrk;
99 }
100 } else if (sampling.first == SamplingTypes::sampleWeights) {
101 // sample with weight
102 return (weight * getRandom() < sampling.second);
103 }
104 }
105 return true;
106}
107
108float o2::utils::DebugStreamer::getRandom(float min, float max)
109{
110 // init random number generator for each thread
111 static thread_local std::mt19937 generator(std::random_device{}());
112 std::uniform_real_distribution<> distr(min, max);
113 const float rnd = distr(generator);
114 return rnd;
115}
116
117int o2::utils::DebugStreamer::getIndex(const StreamFlags streamFlag)
118{
119 // see: https://stackoverflow.com/a/71539401
120 uint32_t v = streamFlag;
121 v -= 1;
122 v = v - ((v >> 1) & 0x55555555);
123 v = (v & 0x33333333) + ((v >> 2) & 0x33333333);
124 const uint32_t ind = (((v + (v >> 4) & 0xF0F0F0F) * 0x1010101) >> 24);
125 return ind;
126}
127
128std::pair<o2::utils::SamplingTypes, float> o2::utils::DebugStreamer::getSamplingTypeFrequency(const StreamFlags streamFlag)
129{
130 const int ind = getIndex(streamFlag);
131 return std::pair<o2::utils::SamplingTypes, float>{ParameterDebugStreamer::Instance().samplingType[ind], ParameterDebugStreamer::Instance().samplingFrequency[ind]};
132}
133
134std::string o2::utils::DebugStreamer::getUniqueTreeName(const char* tree, const size_t id) const { return fmt::format("{}_{}", tree, getNTrees(id)); }
135
136size_t o2::utils::DebugStreamer::getCPUID() { return std::hash<std::thread::id>{}(std::this_thread::get_id()); }
137
138o2::utils::TreeStreamRedirector* o2::utils::DebugStreamer::getStreamerPtr(const size_t id) const
139{
140 auto it = mTreeStreamer.find(id);
141 if (it != mTreeStreamer.end()) {
142 return (it->second).get();
143 } else {
144 return nullptr;
145 }
146}
147
148int o2::utils::DebugStreamer::getNTrees(const size_t id) const { return isStreamerSet(id) ? getStreamerPtr(id)->GetFile()->GetListOfKeys()->GetEntries() : -1; }
149
150void o2::utils::DebugStreamer::mergeTrees(const char* inpFile, const char* outFile, const char* option)
151{
152 TFile fInp(inpFile, "READ");
153 std::unordered_map<std::string, TList> lists;
154 for (TObject* keyAsObj : *fInp.GetListOfKeys()) {
155 const auto key = dynamic_cast<TKey*>(keyAsObj);
156 TTree* tree = (TTree*)fInp.Get(key->GetName());
157 // perform simple check on the number of entries to merge only TTree with same content (ToDo: Do check on name of branches)
158 const int entries = tree->GetListOfBranches()->GetEntries();
159 const std::string brName = key->GetName();
160 const std::string nameBr = brName.substr(0, brName.find_last_of("_"));
161 lists[nameBr].Add(tree);
162 }
163
164 TFile fOut(outFile, "RECREATE");
165 for (auto& list : lists) {
166 auto tree = TTree::MergeTrees(&list.second, option);
167 fOut.WriteObject(tree, list.first.data());
168 }
169}
170
171void o2::utils::DebugStreamer::enableStream(const StreamFlags streamFlag)
172{
173 StreamFlags streamlevel = getStreamFlags();
174 streamlevel = streamFlag | streamlevel;
175 setStreamFlags(streamlevel);
176}
177
178void o2::utils::DebugStreamer::disableStream(const StreamFlags streamFlag)
179{
180 StreamFlags streamlevel = getStreamFlags();
181 streamlevel = (~streamFlag) & streamlevel;
182 setStreamFlags(streamlevel);
183}
184
185#endif
#define O2ParamImpl(classname)
Definition of class for writing debug informations.
StringRef key
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint index
Definition glcorearb.h:781
GLuint GLuint GLfloat weight
Definition glcorearb.h:5477
GLboolean * data
Definition glcorearb.h:298
GLuint id
Definition glcorearb.h:650
GLuint * lists
Definition glcorearb.h:4942
bool list(IEventListener &reporter, Config const &config)
constexpr auto getIndex(const container_T &container, typename container_T::const_iterator iter) -> typename container_T::source_type
Definition algorithm.h:65
@ sampleIDGlobal
in case different streamers have access to the same IDs use this gloabl ID
@ sampleAll
use all data (default)
@ sampleID
sample every n IDs (per example track)
@ sampleWeights
perform sampling on weights, defined where the streamer is called
@ sampleRandom
sample randomly every n points
StreamFlags
struct defining the flags which can be used to check if a certain debug streamer is used
struct for setting and storing the streamer level
int sampleIDGlobal[StreamFlags::streamFlagsCount]
storage of reference streamer used for sampleIDFromOtherStreamer
float samplingFrequency[StreamFlags::streamFlagsCount]
frequency which is used for the sampling (0.1 -> 10% is written if sampling is used)
SamplingTypes samplingType[StreamFlags::streamFlagsCount]
flag to store what will be streamed
constexpr size_t min
constexpr size_t max
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::uniform_int_distribution< unsigned long long > distr