Project
Loading...
Searching...
No Matches
TPCSectorCompletionPolicy.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_TPC_TPCSECTORCOMPLETIONPOLICY_H
13#define O2_TPC_TPCSECTORCOMPLETIONPOLICY_H
18
20#include "Framework/InputSpec.h"
21#include "Framework/InputSpan.h"
26#include "TPCBase/Sector.h"
27#include <fmt/ostream.h>
28#include <vector>
29#include <string>
30#include <stdexcept>
31#include <sstream>
32#include <regex>
33#include <functional>
34
35namespace o2
36{
37namespace tpc
38{
39
70{
71 public:
72 using CompletionPolicyData = std::vector<framework::InputSpec>;
73
74 enum struct Config {
75 // require data on all other inputs in addition to the ones checked for the sector completion
77 };
79 template <typename... Args>
80 TPCSectorCompletionPolicy(const char* processorName, Args&&... args)
81 : mProcessorName(processorName), mInputMatchers()
82 {
83 init(std::forward<Args>(args)...);
84 }
85
87 {
88 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
89
90 auto matcher = [expression = mProcessorName](framework::DeviceSpec const& device) -> bool {
91 return std::regex_match(device.name.begin(), device.name.end(), std::regex(expression.c_str()));
92 };
93
94 auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs, auto const&, auto&) -> framework::CompletionPolicy::CompletionOp {
95 unsigned long tpcSectorMask = pTpcSectorMask ? *pTpcSectorMask : 0xFFFFFFFFF;
96 std::bitset<NSectors> validSectors = 0;
97 bool haveMatchedInput = false;
98 uint64_t activeSectors = 0;
99 std::vector<uint64_t> validSectorsExternal(externalInputMatchers ? externalInputMatchers->size() : 0);
100 size_t nActiveInputRoutes = 0;
101 size_t nMaxPartsPerRoute = 0;
102 int inputType = -1;
103 for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) {
104 nMaxPartsPerRoute = it.size() > nMaxPartsPerRoute ? it.size() : nMaxPartsPerRoute;
105 bool haveActivePart = false;
106 for (auto const& ref : it) {
108 continue;
109 }
110 haveActivePart = true;
111 auto const* dh = framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
112 // check if the O2 message matches on of the input specs to be matched and
113 // if it matches, check for the sector header, retrieve the active sector information
114 // and mark the sector as valid, we require to match exactly one of the inputs in the list
115 for (size_t idx = 0, end = inputMatchers.size(); idx < end; idx++) {
116 auto const& spec = inputMatchers[idx];
118 haveMatchedInput = true;
119 if (inputType == -1) {
120 // we bind to the index of the first match and require all other inputs to match the same spec
121 inputType = idx;
122 } else if (inputType != (int)idx) {
123 std::stringstream error;
124 error << fmt::format("routing error, input messages must all be of the same type previously bound to {} {}/{}/{}",
125 o2::framework::DataSpecUtils::describe(inputMatchers[inputType]),
126 dh->dataOrigin,
127 dh->dataDescription, dh->subSpecification);
128 throw std::runtime_error(error.str());
129 }
130 auto const* sectorHeader = framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
131 if (sectorHeader == nullptr) {
132 throw std::runtime_error("TPC sector header missing on header stack");
133 }
134 activeSectors |= (sectorHeader->activeSectors & tpcSectorMask);
135 validSectors |= (sectorHeader->sectorBits & tpcSectorMask);
136 break;
137 }
138 }
139
140 // We require to match all inputs in the external list
141 if (externalInputMatchers) {
142 for (size_t idx = 0, end = externalInputMatchers->size(); idx < end; idx++) {
143 auto const& spec = (*externalInputMatchers)[idx];
145 auto const* sectorHeader = framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
146 if (sectorHeader == nullptr) {
147 throw std::runtime_error("TPC sector header missing on header stack");
148 }
149 activeSectors |= (sectorHeader->activeSectors & tpcSectorMask);
150 validSectorsExternal[idx] |= (sectorHeader->sectorBits & tpcSectorMask);
151 break;
152 }
153 }
154 }
155 }
156 if (haveActivePart) {
157 ++nActiveInputRoutes;
158 }
159 }
160
161 if (externalInputMatchers) {
162 // We require all external matchers to have all sectors present, if not we wait
163 for (size_t idx = 0, end = externalInputMatchers->size(); idx < end; idx++) {
164 if (validSectorsExternal[idx] == 0 || validSectorsExternal[idx] != activeSectors) {
166 }
167 }
168 }
169
171 // If the flag Config::RequireAll is set in the constructor arguments we require
172 // data from all inputs in addition to the sector matching condition
173 // To be fully correct we would need to require data from all inputs not going
174 // into the TPC policy, but that is not possible for the moment. That's why there is a possibly
175 // unhandled case if multiple TPC input routes are defined but a complete data set is coming over
176 // one of them. Not likely to be a use case, though.
177 if ((inputMatchers.size() == 0 || (haveMatchedInput && activeSectors == validSectors.to_ulong())) &&
178 (!bRequireAll || nActiveInputRoutes == inputs.size())) {
179 // we can process if there is input for all sectors, the required sectors are
180 // transported as part of the sector header
182 } else if (activeSectors == 0 && nActiveInputRoutes == inputs.size()) {
183 // no sector header is transmitted, this is the case for e.g. the ZS raw data
184 // we simply require input on all routes, this is also the default of DPL DataRelayer
185 // Because DPL can not do more without knowing how many parts are required for a complete
186 // data set, the workflow should be such that exactly one O2 message arrives per input route.
187 // Currently, the workflow has multiple O2 messages per input route, but they all come in
188 // a single multipart message. So it works fine, and we disable the warning below, but there
189 // is a potential problem. Need to fix this on the level of the workflow.
190 // if (nMaxPartsPerRoute ) {
191 // LOG(warning) << "No sector information is provided with the data, data set is complete with data on all input routes. But there are multiple parts on at least one route and this policy might not be complete, no check possible if other parts on some routes are still missing. It is adviced to add a custom policy.";
192 //}
194 }
195
196 if (retVal != framework::CompletionPolicy::CompletionOp::Wait && orderCheck && *orderCheck && **orderCheck) {
197 for (auto& input : inputs) {
198 auto* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
199 if (!dph) {
200 continue;
201 }
202 if (!(**orderCheck)(dph->startTime)) {
204 }
205 break;
206 }
207 }
208 return retVal;
209 };
210 return framework::CompletionPolicy{"TPCSectorCompletionPolicy", matcher, callback};
211 }
212
213 private:
215 template <typename Arg, typename... Args>
216 void init(Arg&& arg, Args&&... args)
217 {
218 using Type = std::decay_t<Arg>;
219 if constexpr (std::is_same_v<Type, framework::InputSpec>) {
220 mInputMatchers.emplace_back(std::move(arg));
221 } else if constexpr (std::is_same_v<Type, TPCSectorCompletionPolicy::Config>) {
222 switch (arg) {
224 mRequireAll = true;
225 break;
226 }
227 } else if constexpr (std::is_same_v<Type, std::vector<o2::framework::InputSpec>*>) {
228 mExternalInputMatchers = arg;
229 } else if constexpr (std::is_same_v<Type, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>**>) {
230 mOrderCheck = arg;
231 } else if constexpr (std::is_same_v<Type, uint64_t*> || std::is_same_v<Type, const uint64_t*>) {
232 mTpcSectorMask = arg;
233 } else {
234 static_assert(framework::always_static_assert_v<Type>);
235 }
236 if constexpr (sizeof...(args) > 0) {
237 init(std::forward<Args>(args)...);
238 }
239 }
240
241 std::string mProcessorName;
242 std::vector<framework::InputSpec> mInputMatchers;
243 std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** mOrderCheck = nullptr;
244 // The external input matchers behave as the internal ones with the following differences:
245 // - They are controlled externally and the external entity can modify them, e.g. after parsing command line arguments.
246 // - They are all matched independently, it is not sufficient that one of them is present for all sectors
247 const std::vector<framework::InputSpec>* mExternalInputMatchers = nullptr;
248 const uint64_t* mTpcSectorMask = nullptr;
249 bool mRequireAll = false;
250};
251} // namespace tpc
252} // namespace o2
253#endif // O2_TPC_TPCSECTORCOMPLETIONPOLICY_H
int32_t retVal
static constexpr int MAXSECTOR
Definition Sector.h:44
TPCSectorCompletionPolicy(const char *processorName, Args &&... args)
std::vector< framework::InputSpec > CompletionPolicyData
o2::framework::CompletionPolicy operator()()
GLuint GLuint end
Definition glcorearb.h:469
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
static bool match(DataRef const &ref, const char *binding)
static bool isValid(DataRef const &ref)
static std::string describe(InputSpec const &spec)