Project
Loading...
Searching...
No Matches
CommonDataProcessors.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12
24#include "Framework/InputSpec.h"
27#include "Framework/Variant.h"
33#include <Monitoring/Monitoring.h>
34
35#include <fairmq/Device.h>
36#include <fstream>
37#include <functional>
38#include <memory>
39#include <string>
40
41using namespace o2::framework::data_matcher;
42
43namespace o2::framework
44{
45
47 CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec> const& danglingOutputInputs,
48 std::vector<InputSpec>& unmatched)
49{
50 auto writerFunction = [danglingOutputInputs](InitContext& ic) -> std::function<void(ProcessingContext&)> {
51 auto filename = ic.options().get<std::string>("outfile");
52 auto keepString = ic.options().get<std::string>("keep");
53
54 if (filename.empty()) {
55 throw runtime_error("output file missing");
56 }
57
58 bool hasOutputsToWrite = false;
59 auto [variables, outputMatcher] = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString);
60 VariableContext context;
61 for (auto& spec : danglingOutputInputs) {
62 auto concrete = DataSpecUtils::asConcreteDataTypeMatcher(spec);
63 if (outputMatcher->match(concrete, context)) {
64 hasOutputsToWrite = true;
65 }
66 }
67 if (hasOutputsToWrite == false) {
68 return [](ProcessingContext&) mutable -> void {
69 static bool once = false;
70 if (!once) {
71 LOG(debug) << "No dangling output to be dumped.";
72 once = true;
73 }
74 };
75 }
76 auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);
77 return [output, matcher = outputMatcher](ProcessingContext& pc) mutable -> void {
78 VariableContext matchingContext;
79 LOG(debug) << "processing data set with " << pc.inputs().size() << " entries";
80 for (const auto& entry : pc.inputs()) {
81 LOG(debug) << " " << *(entry.spec);
82 auto header = DataRefUtils::getHeader<header::DataHeader*>(entry);
83 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(entry);
84 if (matcher->match(*header, matchingContext) == false) {
85 continue;
86 }
87 output->write(reinterpret_cast<char const*>(header), sizeof(header::DataHeader));
88 output->write(reinterpret_cast<char const*>(dataProcessingHeader), sizeof(DataProcessingHeader));
91 }
92 };
93 };
94
95 std::vector<InputSpec> validBinaryInputs;
96 auto onlyTimeframe = [](InputSpec const& input) {
97 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == false) &&
98 input.lifetime == Lifetime::Timeframe;
99 };
100
101 auto noTimeframe = [](InputSpec const& input) {
102 return (DataSpecUtils::partialMatch(input, o2::header::DataOrigin("TFN")) == true) ||
103 input.lifetime != Lifetime::Timeframe;
104 };
105
106 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
107 std::back_inserter(validBinaryInputs), onlyTimeframe);
108 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
109 std::back_inserter(unmatched), noTimeframe);
110
112 "internal-dpl-injected-global-binary-file-sink",
113 validBinaryInputs,
114 Outputs{},
115 AlgorithmSpec(writerFunction),
116 {{"outfile", VariantType::String, "dpl-out.bin", {"Name of the output file"}},
117 {"keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
118
119 return spec;
120}
121
122DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpec> const& danglingOutputInputs)
123{
124
125 // we build the default channel configuration from the binding of the first input
126 // in order to have more than one we would need to possibility to have support for
127 // vectored options
128 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
129 OutputChannelSpec externalChannelSpec;
130 externalChannelSpec.name = "downstream";
131 externalChannelSpec.type = ChannelType::Push;
132 externalChannelSpec.method = ChannelMethod::Bind;
133 externalChannelSpec.hostname = "localhost";
134 externalChannelSpec.port = 0;
135 externalChannelSpec.listeners = 0;
136 // in principle, protocol and transport are two different things but fur simplicity
137 // we use ipc when shared memory is selected and the normal tcp url whith zeromq,
138 // this is for building the default configuration which can be simply changed from the
139 // command line
140 externalChannelSpec.protocol = ChannelProtocol::IPC;
141 std::string defaultChannelConfig = formatExternalChannelConfiguration(externalChannelSpec);
142 // at some point the formatting tool might add the transport as well so we have to check
143 return specifyFairMQDeviceOutputProxy("internal-dpl-injected-output-proxy", danglingOutputInputs, defaultChannelConfig.c_str());
144}
145
147{
148 static size_t lastTimeslice = -1;
149 auto* services = (ServiceRegistryRef*)async->data;
150 auto& timesliceIndex = services->get<TimesliceIndex>();
151 auto* device = services->get<RawDeviceService>().device();
152 auto channel = device->GetChannels().find("metric-feedback");
153 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
154 if (channel == device->GetChannels().end()) {
155 return;
156 }
157 fair::mq::MessagePtr payload(device->NewMessage());
158 payload->Rebuild(&oldestPossingTimeslice, sizeof(int64_t), nullptr, nullptr);
159 auto consumed = oldestPossingTimeslice;
160
161 int64_t result = channel->second[0].Send(payload, 100);
162 // If the sending worked, we do not retry.
163 if (result != 0) {
164 // If the sending did not work, we keep trying until it actually works.
165 // This will schedule other tasks in the queue, so the processing of the
166 // data will still happen.
167 uv_async_send(async);
168 } else {
169 lastTimeslice = consumed;
170 }
171}
172
173DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs, std::string rateLimitingChannelConfig)
174{
175 return DataProcessorSpec{
176 .name = "internal-dpl-injected-dummy-sink",
177 .inputs = danglingOutputInputs,
178 .algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks, DeviceState& deviceState, InitContext& ic) {
179 static uv_async_t async;
180 // The callback will only have access to the
181 async.data = new ServiceRegistryRef{ic.services()};
182 uv_async_init(deviceState.loop, &async, retryMetricCallback);
183 auto domainInfoUpdated = [](ServiceRegistryRef services, size_t timeslice, ChannelIndex channelIndex) {
184 LOGP(debug, "Domain info updated with timeslice {}", timeslice);
185 retryMetricCallback(&async);
186 auto& timesliceIndex = services.get<TimesliceIndex>();
187 auto oldestPossingTimeslice = timesliceIndex.getOldestPossibleOutput().timeslice.value;
188 auto& stats = services.get<DataProcessingStats>();
189 stats.updateStats({(int)ProcessingStatsId::CONSUMED_TIMEFRAMES, DataProcessingStats::Op::Set, (int64_t)oldestPossingTimeslice});
190 };
191 callbacks.set<CallbackService::Id::DomainInfoUpdated>(domainInfoUpdated);
192
193 return adaptStateless([]() {
194 });
195 })},
196 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{"channel-config", VariantType::String, // raw input channel
197 rateLimitingChannelConfig,
198 {"Out-of-band channel config"}}}
199 : std::vector<ConfigParamSpec>(),
200 .labels = {{"resilient"}}};
201}
202
204{
205 return PluginManager::wrapAlgorithm(spec, [](AlgorithmSpec::ProcessCallback& original, ProcessingContext& pcx) -> void {
206 auto& raw = pcx.services().get<RawDeviceService>();
207 static RateLimiter limiter;
208 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
209 LOG(detail) << "Rate limiting to " << limit << " timeframes in flight";
210 limiter.check(pcx, limit, 2000);
211 LOG(detail) << "Rate limiting passed. Invoking old callback";
212 original(pcx);
213 LOG(detail) << "Rate limited callback done";
214 });
215}
216
217} // namespace o2::framework
struct uv_async_s uv_async_t
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
Definition InitContext.h:34
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::string filename()
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
Definition DeviceState.h:34
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"