Project
Loading...
Searching...
No Matches
CommonDataProcessors.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12
24#include "Framework/InputSpec.h"
27#include "Framework/Variant.h"
33#include "Framework/Signpost.h"
34#include <Monitoring/Monitoring.h>
35
36#include <fairmq/Device.h>
37#include <uv.h>
38#include <fstream>
39#include <functional>
40#include <memory>
41#include <string>
42
43using namespace o2::framework::data_matcher;
44
45// Special log to track callbacks we know about
47
48namespace o2::framework
49{
50
52 CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec> const& danglingOutputInputs,
53 std::vector<InputSpec>& unmatched)
54{
55 auto writerFunction = [danglingOutputInputs](InitContext& ic) -> std::function<void(ProcessingContext&)> {
56 auto filename = ic.options().get<std::string>("outfile");
57 auto keepString = ic.options().get<std::string>("keep");
58
59 if (filename.empty()) {
60 throw runtime_error("output file missing");
61 }
62
63 bool hasOutputsToWrite = false;
64 auto [variables, outputMatcher] = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString);
65 VariableContext context;
66 for (auto& spec : danglingOutputInputs) {
67 auto concrete = DataSpecUtils::asConcreteDataTypeMatcher(spec);
68 if (outputMatcher->match(concrete, context)) {
69 hasOutputsToWrite = true;
70 }
71 }
72 if (hasOutputsToWrite == false) {
73 return [](ProcessingContext&) mutable -> void {
74 static bool once = false;
75 if (!once) {
76 LOG(debug) << "No dangling output to be dumped.";
77 once = true;
78 }
79 };
80 }
81 auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);
82 return [output, matcher = outputMatcher](ProcessingContext& pc) mutable -> void {
83 VariableContext matchingContext;
84 LOG(debug) << "processing data set with " << pc.inputs().size() << " entries";
85 for (const auto& entry : pc.inputs()) {
86 LOG(debug) << " " << *(entry.spec);
87 auto header = DataRefUtils::getHeader<header::DataHeader*>(entry);
88 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(entry);
89 if (matcher->match(*header, matchingContext) == false) {
90 continue;
91 }
92 output->write(reinterpret_cast<char const*>(header), sizeof(header::DataHeader));
93 output->write(reinterpret_cast<char const*>(dataProcessingHeader), sizeof(DataProcessingHeader));
96 }
97 };
98 };
99
100 std::vector<InputSpec> validBinaryInputs;
101 auto onlyTimeframe = [](InputSpec const& input) {
102 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == false) &&
103 input.lifetime == Lifetime::Timeframe;
104 };
105
106 auto noTimeframe = [](InputSpec const& input) {
107 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == true) ||
108 input.lifetime != Lifetime::Timeframe;
109 };
110
111 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
112 std::back_inserter(validBinaryInputs), onlyTimeframe);
113 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
114 std::back_inserter(unmatched), noTimeframe);
115
117 "internal-dpl-injected-global-binary-file-sink",
118 validBinaryInputs,
119 Outputs{},
120 AlgorithmSpec(writerFunction),
121 {{"outfile", VariantType::String, "dpl-out.bin", {"Name of the output file"}},
122 {"keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
123
124 return spec;
125}
126
127DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpec> const& danglingOutputInputs)
128{
129
130 // we build the default channel configuration from the binding of the first input
131 // in order to have more than one we would need to possibility to have support for
132 // vectored options
133 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
134 OutputChannelSpec externalChannelSpec;
135 externalChannelSpec.name = "downstream";
136 externalChannelSpec.type = ChannelType::Push;
137 externalChannelSpec.method = ChannelMethod::Bind;
138 externalChannelSpec.hostname = "localhost";
139 externalChannelSpec.port = 0;
140 externalChannelSpec.listeners = 0;
141 // in principle, protocol and transport are two different things but fur simplicity
142 // we use ipc when shared memory is selected and the normal tcp url whith zeromq,
143 // this is for building the default configuration which can be simply changed from the
144 // command line
145 externalChannelSpec.protocol = ChannelProtocol::IPC;
146 std::string defaultChannelConfig = formatExternalChannelConfiguration(externalChannelSpec);
147 // at some point the formatting tool might add the transport as well so we have to check
148 return specifyFairMQDeviceOutputProxy("internal-dpl-injected-output-proxy", danglingOutputInputs, defaultChannelConfig.c_str());
149}
150
152{
153 O2_SIGNPOST_ID_FROM_POINTER(cid, callbacks, async);
154 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Attempting again propagating rate-limiting information.");
155
156 // Check if this is a source device
157 static size_t lastTimeslice = -1;
158 auto* services = (ServiceRegistryRef*)async->data;
159 auto& timesliceIndex = services->get<TimesliceIndex>();
160 auto* device = services->get<RawDeviceService>().device();
161 auto channel = device->GetChannels().find("metric-feedback");
162 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
163 if (channel == device->GetChannels().end()) {
164 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Could not find metric-feedback channel.");
165 return;
166 }
167 fair::mq::MessagePtr payload(device->NewMessage());
168 payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr);
169 auto consumed = oldestPossingTimeslice;
170
171 size_t start = uv_hrtime();
172 int64_t result = channel->second[0].Send(payload, 100);
173 size_t stop = uv_hrtime();
174 // If the sending worked, we do not retry.
175 if (result <= 0) {
176 // Forcefully slow down in case FairMQ returns earlier than expected...
177 int64_t ellapsed = (stop - start) / 1000000;
178 if (ellapsed < 100) {
179 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
180 "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
181 result, ellapsed);
182 uv_sleep(100 - ellapsed);
183 } else {
184 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting",
185 "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.", result, ellapsed);
186 }
187 // If the sending did not work, we keep trying until it actually works.
188 // This will schedule other tasks in the queue, so the processing of the
189 // data will still happen.
190 uv_async_send(async);
191 } else {
192 O2_SIGNPOST_EVENT_EMIT(callbacks, cid, "rate-limiting", "Send %llu bytes, Last timeslice now set to %zu.", result, consumed);
193 lastTimeslice = consumed;
194 }
195}
196
197DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs, std::string rateLimitingChannelConfig)
198{
199 return DataProcessorSpec{
200 .name = "internal-dpl-injected-dummy-sink",
201 .inputs = danglingOutputInputs,
202 .algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) {
203 static uv_async_t async;
204 // The callback will only have access to the
205 async.data = new ServiceRegistryRef{ic.services()};
206 uv_async_init(deviceState.loop, &async, retryMetricCallback);
207 auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) {
208 LOGP(debug, "Domain info updated with timeslice {}", timeslice);
209 retryMetricCallback(&async);
210 auto& timesliceIndex = services.get<TimesliceIndex>();
211 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
212 auto& stats = services.get<DataProcessingStats>();
213 stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
214 };
215 callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
216
217 return adaptStateless([]() {
218 });
219 })},
220 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{"channel-config", VariantType::String, // raw input channel
221 rateLimitingChannelConfig,
222 {"Out-of-band channel config"}}}
223 : std::vector<ConfigParamSpec>(),
224 .labels = {{"resilient"}}};
225}
226
228{
229 return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {
230 auto& raw = pcx.services().get<RawDeviceService>();
231 static RateLimiter limiter;
232 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
233 LOG(detail) << "Rate limiting to " << limit << " timeframes in flight";
234 limiter.check(pcx, limit, 2000);
235 LOG(detail) << "Rate limiting passed. Invoking old callback";
236 original(pcx);
237 LOG(detail) << "Rate limited callback done";
238 });
239}
240
241} // namespace o2::framework
struct uv_async_s uv_async_t
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
std::ostringstream debug
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
Definition InitContext.h:34
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
GLuint start
Definition glcorearb.h:469
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::string filename()
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
Definition DeviceState.h:34
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"