Project
Loading...
Searching...
No Matches
ExternalFairMQDeviceProxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
36#include "Headers/DataHeader.h"
37#include "Headers/Stack.h"
38#include "DecongestionService.h"
40
41#include "./DeviceSpecHelpers.h"
42#include "Monitoring/Monitoring.h"
43
44#include <fairmq/Parts.h>
45#include <fairmq/Device.h>
46#include <uv.h>
47#include <cstring>
48#include <cassert>
49#include <memory>
50#include <optional>
51#include <unordered_map>
52#include <numeric> // std::accumulate
53#include <sstream>
54#include <stdexcept>
55#include <regex>
56
57namespace o2::framework
58{
60
65
70
72
73void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, std::string const& channel, size_t timeSlice)
74{
75 // Note: DPL is only setting up one instance of a channel while FairMQ allows to have an
76 // array of channels, the index is 0 in the call
77 constexpr auto index = 0;
78 LOG(debug) << "sending " << messages.Size() << " messages on " << channel;
79 // TODO: we can make this configurable
80 const int maxTimeout = 10000;
81 int timeout = 0;
82 // try dispatch with increasing timeout in order to also drop a warning if the dispatching
83 // has been tried multiple times within max timeout
84 // since we do not want any messages to be dropped at this stage, we stay in the loop until
85 // the downstream congestion is resolved
86 // TODO: we might want to treat this error condition some levels higher up, but for
87 // the moment its an appropriate solution. The important thing is not to drop
88 // messages and to be informed about the congestion.
89 while (device.Send(messages, channel, index, timeout) < 0) {
90 if (timeout == 0) {
91 timeout = 1;
92 } else if (timeout < maxTimeout) {
93 timeout *= 10;
94 } else {
95 LOG(alarm) << "Cannot dispatch to channel " << channel << " due to DOWNSTREAM BACKPRESSURE. NO DATA IS DROPPED,"
96 << " will keep retrying. This is only a problem if downstream congestion does not resolve by itself.";
97 if (timeout == maxTimeout) {
98 // we add 1ms to disable the warning below
99 timeout += 1;
100 }
101 }
102 if (device.NewStatePending()) {
103 LOG(alarm) << "Device state change is requested, dropping " << messages.Size() << " pending message(s) "
104 << "on channel " << channel << ". "
105 << "ATTENTION: DATA IS LOST! Could not dispatch data to downstream consumer(s), check if "
106 << "consumers have been terminated too early";
107 // make sure we disable the warning below
108 timeout = maxTimeout + 1;
109 break;
110 }
111 }
112
113 // FIXME: we need a better logic for avoiding message spam
114 if (timeout > 100 && timeout <= maxTimeout) {
115 LOG(warning) << "dispatching on channel " << channel << " was delayed by " << timeout / 1000.f << " s";
116 }
117 // TODO: feeling this is a bit awkward, but the interface of fair::mq::Parts does not provide a
118 // method to clear the content.
119 // Maybe the FairMQ API can be improved at some point. Actually the ownership of all messages should be passed
120 // on to the transport and the messages should be empty after sending and the parts content can be cleared.
121 // assert(std::accumulate(messages.begin(), messages.end(), true, [](bool a, auto const& msg) {return a && (msg.get() == nullptr);}));
122 messages.fParts.clear();
123}
124
125void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, OutputSpec const& spec, DataProcessingHeader::StartTime tslice, ChannelRetriever& channelRetriever)
126{
127 // Note: DPL is only setting up one instance of a channel while FairMQ allows to have an
128 // array of channels, the index is 0 in the call
129 auto channel = channelRetriever(spec, tslice);
130 if (channel.empty()) {
131 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec) << " timeslice " << tslice;
132 return;
133 }
134 sendOnChannel(device, messages, channel, tslice);
135}
136
137void sendOnChannel(fair::mq::Device& device, o2::header::Stack&& headerStack, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, ChannelRetriever& channelRetriever)
138{
139 const auto* dph = o2::header::get<DataProcessingHeader*>(headerStack.data());
140 if (!dph) {
141 LOG(error) << "Header Stack does not follow the O2 data model, DataProcessingHeader missing";
142 return;
143 }
144 auto channelName = channelRetriever(spec, dph->startTime);
145 constexpr auto index = 0;
146 if (channelName.empty()) {
147 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec);
148 return;
149 }
150 for (auto& channelInfo : device.GetChannels()) {
151 if (channelInfo.first != channelName) {
152 continue;
153 }
154 assert(channelInfo.second.size() == 1);
155 // allocate the header message using the underlying transport of the channel
156 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[index].Transport());
157 fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
158
159 fair::mq::Parts out;
160 out.AddPart(std::move(headerMessage));
161 out.AddPart(std::move(payloadMessage));
162 sendOnChannel(device, out, channelName, dph->startTime);
163 return;
164 }
165 LOG(error) << "internal mismatch, can not find channel " << channelName << " in the list of channel infos of the device";
166}
167
168void sendOnChannel(fair::mq::Device& device, fair::mq::MessagePtr&& headerMessage, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, ChannelRetriever& channelRetriever)
169{
170 // const auto* dph = o2::header::get<DataProcessingHeader*>( *reinterpret_cast<o2::header::Stack*>(headerMessage->GetData()) );
171 const auto* dph = o2::header::get<DataProcessingHeader*>(headerMessage->GetData());
172 if (!dph) {
173 LOG(error) << "Header does not follow the O2 data model, DataProcessingHeader missing";
174 return;
175 }
176 auto tslice = dph->startTime;
177 fair::mq::Parts out;
178 out.AddPart(std::move(headerMessage));
179 out.AddPart(std::move(payloadMessage));
180 sendOnChannel(device, out, spec, tslice, channelRetriever);
181}
182
183void appendForSending(fair::mq::Device& device, o2::header::Stack&& headerStack, size_t timeSliceID, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, fair::mq::Parts& messageCache, ChannelRetriever& channelRetriever)
184{
185 auto channelName = channelRetriever(spec, timeSliceID);
186 constexpr auto index = 0;
187 if (channelName.empty()) {
188 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec);
189 return;
190 }
191 for (auto& channelInfo : device.GetChannels()) {
192 if (channelInfo.first != channelName) {
193 continue;
194 }
195 assert(channelInfo.second.size() == 1);
196 // allocate the header message using the underlying transport of the channel
197 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[index].Transport());
198 fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
199
200 fair::mq::Parts out;
201 messageCache.AddPart(std::move(headerMessage));
202 messageCache.AddPart(std::move(payloadMessage));
203 return;
204 }
205 LOG(error) << "internal mismatch, can not find channel " << channelName << " in the list of channel infos of the device";
206}
207
208InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t /*step*/)
209{
210 return [spec](TimingInfo&, ServiceRegistryRef const& ref, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) -> bool {
211 auto* device = ref.get<RawDeviceService>().device();
212 for (int i = 0; i < parts.Size() / 2; ++i) {
213 auto dh = o2::header::get<DataHeader*>(parts.At(i * 2)->GetData());
214
215 DataProcessingHeader dph{newTimesliceId, 0};
216 o2::header::Stack headerStack{*dh, dph};
217 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(i * 2 + 1)), spec, channelRetriever);
218 }
219 return parts.Size() > 0;
220 };
221}
222
223auto getFinalIndex(DataHeader const& dh, size_t msgidx) -> size_t
224{
225 size_t finalBlockIndex = 0;
226 if (dh.splitPayloadParts > 0 && dh.splitPayloadParts == dh.splitPayloadIndex) {
227 // this is indicating a sequence of payloads following the header
228 // FIXME: we will probably also set the DataHeader version
229 // Current position + number of parts + 1 (for the header)
230 finalBlockIndex = msgidx + dh.splitPayloadParts + 1;
231 } else {
232 // We can consider the next splitPayloadParts as one block of messages pairs
233 // because we are guaranteed they are all the same.
234 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
235 // pair.
236 finalBlockIndex = msgidx + (dh.splitPayloadParts > 0 ? dh.splitPayloadParts : 1) * 2;
237 }
238 assert(finalBlockIndex >= msgidx + 2);
239 return finalBlockIndex;
240};
241
242void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::vector<OutputRoute> const& routes, bool doInjectMissingData, unsigned int doPrintSizes)
243{
244 // Check for missing data.
245 static std::vector<bool> present;
246 static std::vector<bool> ignored;
247 static std::vector<size_t> dataSizes;
248 static std::vector<bool> showSize;
249 present.clear();
250 present.resize(routes.size(), false);
251 ignored.clear();
252 ignored.resize(routes.size(), false);
253 dataSizes.clear();
254 dataSizes.resize(routes.size(), 0);
255 showSize.clear();
256 showSize.resize(routes.size(), false);
257
258 static std::vector<size_t> unmatchedDescriptions;
259 unmatchedDescriptions.clear();
260 DataProcessingHeader const* dph = nullptr;
261 DataHeader const* firstDH = nullptr;
262 bool hassih = false;
263
264 // Do not check anything which has DISTSUBTIMEFRAME in it.
265 size_t expectedDataSpecs = 0;
266 for (size_t pi = 0; pi < present.size(); ++pi) {
267 auto& spec = routes[pi].matcher;
269 ignored[pi] = true;
270 continue;
271 }
272 if (routes[pi].timeslice == 0) {
273 ++expectedDataSpecs;
274 }
275 }
276
277 size_t foundDataSpecs = 0;
278 bool skipAsAllFound = false;
279 for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
280 bool allFound = true;
281 int addToSize = -1;
282 const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
283 auto const sih = o2::header::get<SourceInfoHeader*>(parts.At(msgidx)->GetData());
284 if (sih != nullptr) {
285 hassih = true;
286 continue;
287 }
288 if (parts.At(msgidx).get() == nullptr) {
289 LOG(error) << "unexpected nullptr found. Skipping message pair.";
290 continue;
291 }
292 if (!dh) {
293 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
294 if (msgidx > 0) {
295 --msgidx;
296 }
297 continue;
298 }
299 if (firstDH == nullptr) {
300 firstDH = dh;
301 if (doPrintSizes && firstDH->tfCounter % doPrintSizes != 0) {
302 doPrintSizes = 0;
303 }
304 }
305 // Copy the DataProcessingHeader from the first message.
306 if (dph == nullptr) {
307 dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
308 for (size_t pi = 0; pi < present.size(); ++pi) {
309 if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
310 ignored[pi] = true;
311 }
312 }
313 }
314 for (size_t pi = 0; pi < present.size(); ++pi) {
315 if ((present[pi] || ignored[pi]) && !doPrintSizes) {
316 continue;
317 }
318 // Consider uninvolved pipelines as present.
319 if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
320 ignored[pi] = true;
321 continue;
322 }
323 allFound = false;
324 auto& spec = routes[pi].matcher;
325 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
326 if (DataSpecUtils::match(spec, query)) {
327 if (!present[pi] && !ignored[pi]) {
328 ++foundDataSpecs;
329 present[pi] = true;
330 showSize[pi] = true;
331 }
332 addToSize = pi;
333 break;
334 }
335 }
336 int msgidxLast = getFinalIndex(*dh, msgidx);
337 if (addToSize >= 0) {
338 int increment = (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) ? 1 : 2;
339 for (int msgidx2 = msgidx + 1; msgidx2 < msgidxLast; msgidx2 += increment) {
340 dataSizes[addToSize] += parts.At(msgidx2)->GetSize();
341 }
342 }
343 // Skip the rest of the block of messages. We subtract 2 because above we increment by 2.
344 msgidx = msgidxLast - 2;
345 if (allFound && !doPrintSizes) {
346 skipAsAllFound = true;
347 break;
348 }
349 }
350
351 bool emptyTf = true;
352 for (size_t pi = 0; pi < present.size(); ++pi) {
353 if (present[pi] && !ignored[pi]) {
354 emptyTf = false;
355 }
356 if (!present[pi] && !ignored[pi]) {
357 showSize[pi] = true;
358 unmatchedDescriptions.push_back(pi);
359 }
360 }
361 int timeframeCompleteness = emptyTf ? 0 : (unmatchedDescriptions.size() ? -1 : 1);
362 (void)timeframeCompleteness; // To be sent as message
363
364 if (skipAsAllFound && !doPrintSizes) {
365 return;
366 }
367
368 if (firstDH && doPrintSizes) {
369 std::string sizes = "";
370 size_t totalSize = 0;
371 for (size_t pi = 0; pi < present.size(); ++pi) {
372 if (showSize[pi]) {
373 totalSize += dataSizes[pi];
374 auto& spec = routes[pi].matcher;
375 sizes += DataSpecUtils::describe(spec) + fmt::format(":{} ", fmt::group_digits(dataSizes[pi]));
376 }
377 }
378 LOGP(important, "RAW {} size report:{}- Total:{}", firstDH->tfCounter, sizes, fmt::group_digits(totalSize));
379 }
380
381 if (!doInjectMissingData) {
382 return;
383 }
384
385 if (unmatchedDescriptions.size() > 0) {
386 if (hassih) {
387 if (firstDH) {
388 LOG(error) << "Received an EndOfStream message together with data. This should not happen.";
389 }
390 LOG(detail) << "This is an End Of Stream message. Not injecting anything.";
391 return;
392 }
393 if (firstDH == nullptr) {
394 LOG(error) << "Input proxy received incomplete data without any data header. This should not happen! Cannot inject missing data as requsted.";
395 return;
396 }
397 if (dph == nullptr) {
398 LOG(error) << "Input proxy received incomplete data without any data processing header. This should happen! Cannot inject missing data as requsted.";
399 return;
400 }
401 std::string missing = "";
402 bool showAlarm = false;
403 uint32_t runNumber = 0;
404 try {
405 runNumber = strtoul(device.fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
406 } catch (...) {
407 }
408 for (auto mi : unmatchedDescriptions) {
409 auto& spec = routes[mi].matcher;
410 missing += " " + DataSpecUtils::describe(spec);
411 // If we have a ConcreteDataMatcher, we can create a message with the correct header.
412 // If we have a ConcreteDataTypeMatcher, we use 0xdeadbeef as subSpecification.
414 auto subSpec = DataSpecUtils::getOptionalSubSpec(spec);
415 if (subSpec == std::nullopt) {
416 *subSpec = 0xDEADBEEF;
417 }
418 o2::header::DataHeader dh{*firstDH};
419 dh.dataOrigin = concrete.origin;
420 dh.dataDescription = concrete.description;
421 dh.subSpecification = *subSpec;
422 dh.payloadSize = 0;
423 dh.runNumber = runNumber;
424 dh.splitPayloadParts = 0;
425 dh.splitPayloadIndex = 0;
426 dh.payloadSerializationMethod = header::gSerializationMethodNone;
427
428 auto& channelName = routes[mi].channel;
429 auto& channelInfo = device.GetChannel(channelName);
430 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.Transport());
431 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *dph});
432 parts.AddPart(std::move(headerMessage));
433 // add empty payload message
434 parts.AddPart(device.NewMessageFor(channelName, 0, 0));
436 showAlarm = true;
437 }
438 }
439 static int maxWarn = 10; // Correct would be o2::conf::VerbosityConfig::Instance().maxWarnDeadBeef, but Framework does not depend on CommonUtils..., but not so critical since receives will send correct number of DEADBEEF messages
440 static int contDeadBeef = 0;
441 if (showAlarm && ++contDeadBeef <= maxWarn) {
442 LOGP(alarm, "Found {}/{} data specs, missing data specs: {}, injecting 0xDEADBEEF{}", foundDataSpecs, expectedDataSpecs, missing, contDeadBeef == maxWarn ? " - disabling alarm now to stop flooding the log" : "");
443 }
444 }
445}
446
447InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config)
448{
449 bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs;
450 // structure to hold information on the unmatched data and print a warning at cleanup
451 class DroppedDataSpecs
452 {
453 public:
454 DroppedDataSpecs() = default;
455 ~DroppedDataSpecs()
456 {
457 warning();
458 }
459
460 [[nodiscard]] bool find(std::string const& desc) const
461 {
462 return descriptions.find(desc) != std::string::npos;
463 }
464
465 void add(std::string const& desc)
466 {
467 descriptions += "\n " + desc;
468 }
469
470 void warning() const
471 {
472 if (not descriptions.empty()) {
473 LOG(warning) << "Some input data could not be matched by filter rules to output specs\n"
474 << "Active rules: " << descriptions << "\n"
475 << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
476 }
477 }
478
479 private:
480 std::string descriptions;
481 };
482
483 return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](TimingInfo& timingInfo, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) {
484 // FIXME: this in not thread safe, but better than an alloc of a map per message...
485 std::unordered_map<std::string, fair::mq::Parts> outputs;
486 std::vector<std::string> unmatchedDescriptions;
487 auto* device = services.get<RawDeviceService>().device();
488
489 static bool override_creation_env = getenv("DPL_RAWPROXY_OVERRIDE_ORBITRESET");
490 bool override_creation = false;
491 uint64_t creationVal = 0;
492 if (override_creation_env) {
493 static uint64_t creationValBase = std::stoul(getenv("DPL_RAWPROXY_OVERRIDE_ORBITRESET"));
494 creationVal = creationValBase;
495 override_creation = true;
496 } else {
497 auto orbitResetTimeUrl = device->fConfig->GetProperty<std::string>("orbit-reset-time", "ccdb://CTP/Calib/OrbitResetTime");
498 char* err = nullptr;
499 creationVal = std::strtoll(orbitResetTimeUrl.c_str(), &err, 10);
500 if (err && *err == 0 && creationVal) {
501 override_creation = true;
502 }
503 }
504
505 int fmqRunNumber = -1;
506 try {
507 fmqRunNumber = atoi(device->fConfig->GetProperty<std::string>("runNumber", "").c_str());
508 } catch (...) {
509 }
510
511 for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
512 if (parts.At(msgidx).get() == nullptr) {
513 LOG(error) << "unexpected nullptr found. Skipping message pair.";
514 continue;
515 }
516 auto* header = parts.At(msgidx)->GetData();
517 const auto dh = o2::header::get<DataHeader*>(header);
518 if (!dh) {
519 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
520 if (msgidx > 0) {
521 --msgidx;
522 }
523 continue;
524 }
525 auto dph = o2::header::get<DataProcessingHeader*>(header);
526 if (!dph) {
527 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
528 continue;
529 }
530 const_cast<DataProcessingHeader*>(dph)->startTime = newTimesliceId;
531 if (override_creation) {
532 const_cast<DataProcessingHeader*>(dph)->creation = creationVal + (dh->firstTForbit * o2::constants::lhc::LHCOrbitNS * 0.000001f);
533 }
534 timingInfo.timeslice = dph->startTime;
535 timingInfo.creation = dph->creation;
536 timingInfo.firstTForbit = dh->firstTForbit;
537 timingInfo.runNumber = dh->runNumber;
538 timingInfo.tfCounter = dh->tfCounter;
539 LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
540 if (DefaultsHelpers::deploymentMode() != DeploymentMode::FST && (dh->runNumber == 0 || (dh->tfCounter == 0 && dh->dataDescription.as<std::string>() != "EOS") || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber))) {
541 LOG(error) << "INVALID runNumber / tfCounter: runNumber " << dh->runNumber
542 << ", tfCounter " << dh->tfCounter << ", FMQ runNumber " << fmqRunNumber
543 << " for msgidx " << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
544 }
545
546 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
547 LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts;
548 int finalBlockIndex = 0;
549 std::string channelName = "";
550
551 for (auto const& spec : filterSpecs) {
552 // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
554 DataSpecUtils::match(spec, query)) {
555 channelName = channelRetriever(query, dph->startTime);
556 // We do not complain about DPL/EOS/0, since it's normal not to forward it.
557 if (channelName.empty() && DataSpecUtils::describe(query) != "DPL/EOS/0") {
558 LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query);
559 }
560 break;
561 }
562 }
563 finalBlockIndex = getFinalIndex(*dh, msgidx);
564 if (finalBlockIndex > parts.Size()) {
565 // TODO error handling
566 // LOGP(error, "DataHeader::splitPayloadParts invalid");
567 continue;
568 }
569
570 if (!channelName.empty()) {
571 // the checks for consistency of split payload parts are of informative nature
572 // forwarding happens independently
573 // if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
574 // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
575 // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
576 // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
577 // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
578 // << " of " << dh->splitPayloadParts;
579 // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
580 // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
581 // << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
582 // }
583 //}
584 LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size());
585 for (; msgidx < finalBlockIndex; ++msgidx) {
586 outputs[channelName].AddPart(std::move(parts.At(msgidx)));
587 }
588 msgidx -= 2;
589 } else {
590 msgidx = finalBlockIndex - 2;
591 }
592 if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
593 unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query));
594 }
595 } // end of loop over parts
596
597 bool didSendParts = false;
598 for (auto& [channelName, channelParts] : outputs) {
599 if (channelParts.Size() == 0) {
600 continue;
601 }
602 didSendParts = true;
603 sendOnChannel(*device, channelParts, channelName, newTimesliceId);
604 }
605 if (not unmatchedDescriptions.empty()) {
606 if (throwOnUnmatchedInputs) {
607 std::string descriptions;
608 for (auto const& desc : unmatchedDescriptions) {
609 descriptions += "\n " + desc;
610 }
611 throw std::runtime_error("No matching filter rule for input data " + descriptions +
612 "\n Add appropriate matcher(s) to dataspec definition or allow to drop unmatched data");
613 } else {
614 bool changed = false;
615 for (auto const& desc : unmatchedDescriptions) {
616 if (not droppedDataSpecs->find(desc)) {
617 // a new description
618 droppedDataSpecs->add(desc);
619 changed = true;
620 }
621 }
622 if (changed) {
623 droppedDataSpecs->warning();
624 }
625 }
626 }
627 return didSendParts;
628 };
629}
630
631InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step)
632{
633 auto timesliceId = std::make_shared<size_t>(startTime);
634 return [timesliceId, spec, step, method](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) {
635 auto* device = services.get<RawDeviceService>().device();
636 uint32_t runNumber = 0;
637 try {
638 runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
639 } catch (...) {
640 }
641 // We iterate on all the parts and we send them two by two,
642 // adding the appropriate O2 header.
643 for (int i = 0; i < parts.Size(); ++i) {
644 DataHeader dh;
645 dh.payloadSerializationMethod = method;
646
647 // FIXME: this only supports fully specified output specs...
649 dh.dataOrigin = matcher.origin;
650 dh.dataDescription = matcher.description;
651 dh.subSpecification = matcher.subSpec;
652 dh.payloadSize = parts.At(i)->GetSize();
653 dh.runNumber = runNumber;
654
655 DataProcessingHeader dph{newTimesliceId, 0};
656 if (*timesliceId != newTimesliceId) {
657 LOG(fatal) << "Time slice ID provided from oldestPossible mechanism " << newTimesliceId << " is out of sync with expected value " << *timesliceId;
658 }
659 *timesliceId += step;
660 // we have to move the incoming data
661 o2::header::Stack headerStack{dh, dph};
662
663 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(i)), spec, channelRetriever);
664 }
665 return parts.Size();
666 };
667}
668
670 std::vector<OutputSpec> const& outputs,
671 char const* defaultChannelConfig,
672 InjectorFunction converter,
673 uint64_t minSHM,
674 bool sendTFcounter,
675 bool doInjectMissingData,
676 unsigned int doPrintSizes)
677{
679 spec.name = strdup(name);
680 spec.inputs = {};
681 spec.outputs = outputs;
682 static std::vector<std::string> channels;
683 static std::vector<int> numberOfEoS(channels.size(), 0);
684 static std::vector<int> eosPeersCount(channels.size(), 0);
685 // The Init method will register a new "Out of band" channel and
686 // attach an OnData to it which is responsible for converting incoming
687 // messages into DPL messages.
688 spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name, sendTFcounter, doInjectMissingData, doPrintSizes](InitContext& ctx) {
689 auto* device = ctx.services().get<RawDeviceService>().device();
690 // make a copy of the output routes and pass to the lambda by move
691 auto outputRoutes = ctx.services().get<RawDeviceService>().spec().outputs;
692 auto outputChannels = ctx.services().get<RawDeviceService>().spec().outputChannels;
693 assert(device);
694
695 // check that the name used for registering the OnData callback corresponds
696 // to the configured output channel, unfortunately we can not automatically
697 // deduce this from list of channels without knowing the name, because there
698 // will be multiple channels. At least we throw a more informative exception.
699 // fair::mq::Device calls the custom init before the channels have been configured
700 // so we do the check before starting in a dedicated callback
701 auto channelConfigurationChecker = [device, deviceName, services = ctx.services()]() {
702 auto& deviceState = services.get<DeviceState>();
703 channels.clear();
704 numberOfEoS.clear();
705 eosPeersCount.clear();
706 for (auto& [channelName, _] : services.get<RawDeviceService>().device()->GetChannels()) {
707 // Out of band channels must start with the proxy name, at least for now
708 if (strncmp(channelName.c_str(), deviceName.c_str(), deviceName.size()) == 0) {
709 channels.push_back(channelName);
710 }
711 }
712 for (auto& channel : channels) {
713 LOGP(detail, "Injecting channel '{}' into DPL configuration", channel);
714 // Converter should pump messages
715 auto& channelPtr = services.get<RawDeviceService>().device()->GetChannel(channel, 0);
716 deviceState.inputChannelInfos.push_back(InputChannelInfo{
718 .hasPendingEvents = false,
719 .readPolled = false,
720 .channel = &channelPtr,
721 .id = {ChannelIndex::INVALID},
722 .channelType = ChannelAccountingType::RAWFMQ,
723 });
724 }
725 numberOfEoS.resize(channels.size(), 0);
726 eosPeersCount.resize(channels.size(), 0);
727 };
728
729 auto drainMessages = [](ServiceRegistryRef registry, int state) {
730 auto* device = registry.get<RawDeviceService>().device();
731 auto& deviceState = registry.get<DeviceState>();
732 // We drop messages in input only when in ready.
733 // FIXME: should we drop messages in input the first time we are in ready?
734 static bool wasRunning = false;
735 if (fair::mq::State{state} == fair::mq::State::Running) {
736 wasRunning = true;
737 }
738 if (fair::mq::State{state} != fair::mq::State::Ready || !wasRunning) {
739 return;
740 }
741 uv_update_time(deviceState.loop);
742 bool doDrain = true;
743 // Cleanup count is set by the cleanup property of the device.
744 // It is incremented every time the device is cleaned up.
745 // We use it to detect when the device is cleaned up.
746 int64_t cleanupCount = deviceState.cleanupCount.load();
747
748 // Continue iterating we saw the cleanup property being reset or
749 // the device state changing.
750 while (doDrain) {
751 doDrain = device->NewStatePending() == false && deviceState.cleanupCount == cleanupCount;
752 fair::mq::Parts parts;
753 for (size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
754 auto& info = deviceState.inputChannelInfos[ci];
755 // We only care about rawfmq channels.
756 if (info.channelType != ChannelAccountingType::RAWFMQ) {
757 continue;
758 }
759 info.channel->Receive(parts, 10);
760 }
761 // Keep state transitions going also when running with the standalone GUI.
762 uv_run(deviceState.loop, UV_RUN_NOWAIT);
763 }
764 };
765
766 ctx.services().get<CallbackService>().set<CallbackService::Id::Start>(channelConfigurationChecker);
767 if (ctx.options().get<std::string>("ready-state-policy") == "drain") {
768 LOG(info) << "Drain mode requested while in Ready state";
769 ctx.services().get<CallbackService>().set<CallbackService::Id::DeviceStateChanged>(drainMessages);
770 }
771
772 static auto countEoS = [](fair::mq::Parts& inputs) -> int {
773 int count = 0;
774 for (int msgidx = 0; msgidx < inputs.Size() / 2; ++msgidx) {
775 // Skip when we have nullptr for the header.
776 // Not sure it can actually happen, but does not hurt.
777 if (inputs.At(msgidx * 2).get() == nullptr) {
778 continue;
779 }
780 auto const sih = o2::header::get<SourceInfoHeader*>(inputs.At(msgidx * 2)->GetData());
781 if (sih != nullptr && sih->state == InputChannelState::Completed) {
782 count++;
783 }
784 }
785 return count;
786 };
787
788 // Data handler for incoming data. Must return true if it sent any data.
789 auto dataHandler = [converter, doInjectMissingData, doPrintSizes,
790 outputRoutes = std::move(outputRoutes),
791 control = &ctx.services().get<ControlService>(),
792 deviceState = &ctx.services().get<DeviceState>(),
793 timesliceIndex = &ctx.services().get<TimesliceIndex>(),
794 outputChannels = std::move(outputChannels)](ServiceRegistryRef ref, TimingInfo& timingInfo, fair::mq::Parts& inputs, int, size_t ci, bool newRun) -> bool {
795 auto* device = ref.get<RawDeviceService>().device();
796 // pass a copy of the outputRoutes
797 auto channelRetriever = [&outputRoutes](OutputSpec const& query, DataProcessingHeader::StartTime timeslice) -> std::string const& {
798 static std::string emptyChannel = "";
799 for (auto& route : outputRoutes) {
800 LOG(debug) << "matching: " << DataSpecUtils::describe(query) << " to route " << DataSpecUtils::describe(route.matcher);
801 if (DataSpecUtils::match(route.matcher, query) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
802 return route.channel;
803 }
804 }
805 return emptyChannel;
806 };
807
808 std::string const& channel = channels[ci];
809 // we buffer the condition since the converter will forward messages by move
810 int nEos = countEoS(inputs);
811 if (newRun) {
812 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
813 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
814 }
815 numberOfEoS[ci] += nEos;
816 if (numberOfEoS[ci]) {
817 eosPeersCount[ci] = std::max<int>(eosPeersCount[ci], device->GetNumberOfConnectedPeers(channel));
818 }
819 // For reference, the oldest possible timeframe passed as newTimesliceId here comes from LifetimeHelpers::enumDrivenCreation()
820 bool shouldstop = false;
821 if (doInjectMissingData) {
822 injectMissingData(*device, inputs, outputRoutes, doInjectMissingData, doPrintSizes);
823 }
824 bool didSendParts = converter(timingInfo, ref, inputs, channelRetriever, timesliceIndex->getOldestPossibleOutput().timeslice.value, shouldstop);
825
826 // If we have enough EoS messages, we can stop the device
827 // Notice that this has a number of failure modes:
828 // * If a connection sends the EoS and then closes before the GetNumberOfConnectedPeers command above.
829 // * If a connection sends two EoS.
830 // * If a connection sends an end of stream closes and another one opens.
831 // Finally, if we didn't receive an EoS this time, out counting of the connected peers is off, so the best thing we can do is delay the EoS reporting
832 bool everyEoS = shouldstop;
833 if (!shouldstop && nEos) {
834 everyEoS = true;
835 for (unsigned int i = 0; i < numberOfEoS.size(); i++) {
836 if (numberOfEoS[i] < eosPeersCount[i]) {
837 everyEoS = false;
838 break;
839 }
840 }
841 }
842
843 if (everyEoS) {
844 LOG(info) << "Received (on channel " << ci << ") " << numberOfEoS[ci] << " end-of-stream from " << eosPeersCount[ci] << " peers, forwarding end-of-stream (shouldstop " << (int)shouldstop << ", nEos " << nEos << ", newRun " << (int)newRun << ")";
845 // Mark all input channels as closed
846 for (auto& info : deviceState->inputChannelInfos) {
847 info.state = InputChannelState::Completed;
848 }
849 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
850 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
851 control->endOfStream();
852 }
853 return didSendParts;
854 };
855
856 auto runHandler = [dataHandler, minSHM, sendTFcounter](ProcessingContext& ctx) {
857 static RateLimiter limiter;
858 static size_t currentRunNumber = -1;
859 static bool inStopTransition = false;
860 bool newRun = false;
861 auto device = ctx.services().get<RawDeviceService>().device();
862 if (limiter.check(ctx, std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit")), minSHM)) {
863 inStopTransition = true;
864 }
865
866 bool didSendParts = false;
867 for (size_t ci = 0; ci < channels.size(); ++ci) {
868 // check for state transition request every 10th input channel to avoid large delays of EoS timers
869 if (ci > 0 && ci % 10 == 0) {
870 ctx.services().get<DeviceState>().transitionHandling = DataProcessingHelpers::updateStateTransition(ctx.services(), ctx.services().get<DeviceContext>().processingPolicies);
871 }
872 std::string const& channel = channels[ci];
873 int waitTime = channels.size() == 1 ? -1 : 1;
874 int maxRead = 1000;
875 while (maxRead-- > 0) {
876 fair::mq::Parts parts;
877 auto res = device->Receive(parts, channel, 0, waitTime);
878 if (res == (size_t)fair::mq::TransferCode::error) {
879 LOGP(error, "Error while receiving on channel {}", channel);
880 }
881 // Populate TimingInfo from the first message
882 unsigned int nReceived = parts.Size();
883 if (nReceived != 0) {
884 auto const dh = o2::header::get<DataHeader*>(parts.At(0)->GetData());
885 auto& timingInfo = ctx.services().get<TimingInfo>();
886 if (dh != nullptr) {
887 if (currentRunNumber != -1 && dh->runNumber != 0 && dh->runNumber != currentRunNumber) {
888 newRun = true;
889 inStopTransition = false;
890 }
891 if (currentRunNumber == -1 || dh->runNumber != 0) {
892 currentRunNumber = dh->runNumber;
893 }
894 timingInfo.runNumber = dh->runNumber;
895 timingInfo.firstTForbit = dh->firstTForbit;
896 timingInfo.tfCounter = dh->tfCounter;
897 }
898 auto const dph = o2::header::get<DataProcessingHeader*>(parts.At(0)->GetData());
899 if (dph != nullptr) {
900 timingInfo.timeslice = dph->startTime;
901 timingInfo.creation = dph->creation;
902 }
903 if (!inStopTransition) {
904 didSendParts |= dataHandler(ctx.services(), timingInfo, parts, 0, ci, newRun);
905 }
906 if (sendTFcounter) {
907 ctx.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)timingInfo.tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
908 }
909 }
910 if (nReceived == 0 || channels.size() == 1) {
911 break;
912 }
913 waitTime = 0;
914 }
915 }
916 // In case we did not send any part at all, we need to rewind by one
917 // to avoid creating extra timeslices.
918 auto& decongestion = ctx.services().get<DecongestionService>();
919 decongestion.nextEnumerationTimesliceRewinded = !didSendParts;
920 if (didSendParts) {
921 ctx.services().get<MessageContext>().fakeDispatch();
922 } else {
923 decongestion.nextEnumerationTimeslice -= 1;
924 }
925 };
926
927 return runHandler;
928 }};
929 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
930 spec.options = {
931 ConfigParamSpec{"ready-state-policy", VariantType::String, "keep", {"What to do when the device is in ready state: *keep*, drain"}},
932 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}}};
933 return spec;
934}
935
936// Decide where to sent the output. Everything to "downstream" if there is such a channel.
937std::string defaultOutputProxyChannelSelector(InputSpec const& input, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>& channels)
938{
939 return channels.count("downstream") ? "downstream" : input.binding;
940}
941
943 Inputs const& inputSpecs,
944 const char* defaultChannelConfig)
945{
947 spec.name = name;
948 spec.inputs = inputSpecs;
949 spec.outputs = {};
950 spec.algorithm = adaptStateful([inputSpecs](FairMQDeviceProxy& proxy, CallbackService& callbacks, RawDeviceService& rds, DeviceSpec const& deviceSpec, ConfigParamRegistry const& options) {
951 // we can retrieve the channel name from the channel configuration string
952 // FIXME: even if a --channel-config option is specified on the command line, always the default string
953 // is retrieved from the config registry. The channel name thus needs to be configured in the default
954 // string AND must match the name in an optional channel config.
955 auto channelConfig = options.get<std::string>("channel-config");
956 std::regex r{R"(name=([^,]*))"};
957 std::vector<std::string> values{std::sregex_token_iterator{std::begin(channelConfig), std::end(channelConfig), r, 1},
958 std::sregex_token_iterator{}};
959 if (values.size() != 1 || values[0].empty()) {
960 throw std::runtime_error("failed to extract channel name from channel configuration parameter '" + channelConfig + "'");
961 }
962 std::string outputChannelName = values[0];
963
964 auto* device = rds.device();
965 // check that the input spec bindings have corresponding output channels
966 // fair::mq::Device calls the custom init before the channels have been configured
967 // so we do the check before starting in a dedicated callback
968 auto channelConfigurationChecker = [inputSpecs = std::move(inputSpecs), device, outputChannelName]() {
969 LOG(info) << "checking channel configuration";
970 if (device->GetChannels().count(outputChannelName) == 0) {
971 throw std::runtime_error("no corresponding output channel found for input '" + outputChannelName + "'");
972 }
973 };
974 callbacks.set<CallbackService::Id::Start>(channelConfigurationChecker);
975 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
976
977 auto& spec = const_cast<DeviceSpec&>(deviceSpec);
979 for (auto const& inputSpec : inputSpecs) {
980 // this is a prototype, in principle we want to have all spec objects const
981 // and so only the const object can be retrieved from service registry
982 ForwardRoute route{
983 .timeslice = 0,
984 .maxTimeslices = 1,
985 .matcher = inputSpec,
986 .channel = outputChannelName,
987 .policy = &policy};
988 spec.forwards.emplace_back(route);
989 }
990
991 auto forwardEos = [device, lastDataProcessingHeader, outputChannelName](EndOfStreamContext&) {
992 // DPL implements an internal end of stream signal, which is propagated through
993 // all downstream channels if a source is dry, make it available to other external
994 // devices via a message of type {DPL/EOS/0}
995 for (auto& channelInfo : device->GetChannels()) {
996 auto& channelName = channelInfo.first;
997 if (channelName != outputChannelName) {
998 continue;
999 }
1000
1001 uint32_t runNumber = 0;
1002 try {
1003 runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
1004 } catch (...) {
1005 }
1006 DataHeader dh;
1007 dh.dataOrigin = "DPL";
1008 dh.dataDescription = "EOS";
1009 dh.subSpecification = 0;
1010 dh.payloadSize = 0;
1011 dh.runNumber = runNumber;
1013 dh.tfCounter = 0;
1014 dh.firstTForbit = 0;
1015 SourceInfoHeader sih;
1017 // allocate the header message using the underlying transport of the channel
1018 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
1019 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *lastDataProcessingHeader, sih});
1020 fair::mq::Parts out;
1021 out.AddPart(std::move(headerMessage));
1022 // add empty payload message
1023 out.AddPart(device->NewMessageFor(channelName, 0, 0));
1024 sendOnChannel(*device, out, channelName, (size_t)-1);
1025 }
1026 };
1027 callbacks.set<CallbackService::Id::EndOfStream>(forwardEos);
1028
1029 return adaptStateless([lastDataProcessingHeader](InputRecord& inputs) {
1030 for (size_t ii = 0; ii != inputs.size(); ++ii) {
1031 for (size_t pi = 0; pi < inputs.getNofParts(ii); ++pi) {
1032 auto part = inputs.getByPos(ii, pi);
1033 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
1034 if (dph) {
1035 // FIXME: should we implement an assignment operator for DataProcessingHeader?
1036 lastDataProcessingHeader->startTime = dph->startTime;
1037 lastDataProcessingHeader->duration = dph->duration;
1038 lastDataProcessingHeader->creation = dph->creation;
1039 }
1040 }
1041 }
1042 });
1043 });
1044 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
1045 spec.options = {
1046 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}},
1047 };
1048
1049 return spec;
1050}
1051
1053 Inputs const& inputSpecs,
1054 const char* defaultChannelConfig,
1055 ChannelSelector channelSelector)
1056{
1057 // FIXME: this looks like a code duplication with the function above, check if the
1058 // two can be combined
1059 DataProcessorSpec spec;
1060 spec.name = name;
1061 spec.inputs = inputSpecs;
1062 spec.outputs = {};
1063 spec.algorithm = adaptStateful([inputSpecs, channelSelector](FairMQDeviceProxy& proxy, CallbackService& callbacks, RawDeviceService& rds, const DeviceSpec& deviceSpec) {
1064 auto device = rds.device();
1065 // check that the input spec bindings have corresponding output channels
1066 // fair::mq::Device calls the custom init before the channels have been configured
1067 // so we do the check before starting in a dedicated callback
1068 // also we set forwards for all input specs and keep a list of all channels so we can send EOS on them
1069 auto channelNames = std::make_shared<std::vector<std::string>>();
1070 auto channelConfigurationInitializer = [&proxy, inputSpecs = std::move(inputSpecs), device, channelSelector, &deviceSpec, channelNames]() {
1071 channelNames->clear();
1072 auto& mutableDeviceSpec = const_cast<DeviceSpec&>(deviceSpec);
1073 for (auto const& spec : inputSpecs) {
1074 auto channel = channelSelector(spec, device->GetChannels());
1075 if (device->GetChannels().count(channel) == 0) {
1076 throw std::runtime_error("no corresponding output channel found for input '" + channel + "'");
1077 }
1079 ForwardRoute route{
1080 .timeslice = 0,
1081 .maxTimeslices = 1,
1082 .matcher = spec,
1083 .channel = channel,
1084 .policy = &policy};
1085 // this we will try to fix on the framework level, there will be an API to
1086 // set external routes. Basically, this has to be added while setting up the
1087 // workflow. After that, the actual spec provided by the service is supposed
1088 // to be const by design
1089 mutableDeviceSpec.forwards.emplace_back(route);
1090
1091 channelNames->emplace_back(std::move(channel));
1092 }
1093 proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device);
1094 };
1095 // We need to clear the channels on stop, because we will check and add them
1096 auto channelConfigurationDisposer = [&deviceSpec]() {
1097 auto& mutableDeviceSpec = const_cast<DeviceSpec&>(deviceSpec);
1098 mutableDeviceSpec.forwards.clear();
1099 };
1100 callbacks.set<CallbackService::Id::Start>(channelConfigurationInitializer);
1101 callbacks.set<CallbackService::Id::Stop>(channelConfigurationDisposer);
1102
1103 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
1104 auto forwardEos = [device, lastDataProcessingHeader, channelNames](EndOfStreamContext&) {
1105 // DPL implements an internal end of stream signal, which is propagated through
1106 // all downstream channels if a source is dry, make it available to other external
1107 // devices via a message of type {DPL/EOS/0}
1108 for (auto& channelInfo : device->GetChannels()) {
1109 auto& channelName = channelInfo.first;
1110 auto checkChannel = [channelNames = std::move(*channelNames)](std::string const& name) -> bool {
1111 for (auto const& n : channelNames) {
1112 if (n == name) {
1113 return true;
1114 }
1115 }
1116 return false;
1117 };
1118 if (!checkChannel(channelName)) {
1119 continue;
1120 }
1121 uint32_t runNumber = 0;
1122 try {
1123 runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
1124 } catch (...) {
1125 }
1126 DataHeader dh;
1127 dh.dataOrigin = "DPL";
1128 dh.dataDescription = "EOS";
1129 dh.subSpecification = 0;
1130 dh.payloadSize = 0;
1132 dh.runNumber = runNumber;
1133 dh.tfCounter = 0;
1134 dh.firstTForbit = 0;
1135 SourceInfoHeader sih;
1137 // allocate the header message using the underlying transport of the channel
1138 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
1139 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *lastDataProcessingHeader, sih});
1140 fair::mq::Parts out;
1141 out.AddPart(std::move(headerMessage));
1142 // add empty payload message
1143 out.AddPart(device->NewMessageFor(channelName, 0, 0));
1144 LOGP(detail, "Forwarding EoS to {}", channelName);
1145 sendOnChannel(*device, out, channelName, (size_t)-1);
1146 }
1147 };
1148 callbacks.set<CallbackService::Id::EndOfStream>(forwardEos);
1149
1150 return adaptStateless([channelSelector, lastDataProcessingHeader](InputRecord& inputs) {
1151 // there is nothing to do if the forwarding is handled on the framework level
1152 // as forward routes but we need to keep a copy of the last DataProcessingHeader
1153 // for sending the EOS
1154 for (size_t ii = 0; ii != inputs.size(); ++ii) {
1155 for (size_t pi = 0; pi < inputs.getNofParts(ii); ++pi) {
1156 auto part = inputs.getByPos(ii, pi);
1157 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
1158 if (dph) {
1159 // FIXME: should we implement an assignment operator for DataProcessingHeader?
1160 lastDataProcessingHeader->startTime = dph->startTime;
1161 lastDataProcessingHeader->duration = dph->duration;
1162 lastDataProcessingHeader->creation = dph->creation;
1163 }
1164 }
1165 }
1166 });
1167 });
1168 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
1169 spec.options = {
1170 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}},
1171 };
1172
1173 return spec;
1174}
1175
1176} // namespace o2::framework
header::DataDescription description
benchmark::State & state
std::vector< OutputRoute > routes
std::ostringstream debug
int32_t i
Header to collect LHC related constants.
uint32_t res
Definition RawData.h:0
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, fair::mq::Device &device)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
size_t getNofParts(int pos) const
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean r
Definition glcorearb.h:1233
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginPHS
Definition DataHeader.h:574
constexpr o2::header::DataOrigin gDataOriginHMP
Definition DataHeader.h:569
constexpr o2::header::DataOrigin gDataOriginEMC
Definition DataHeader.h:565
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionAny
Definition DataHeader.h:596
constexpr double LHCOrbitNS
Defining PrimaryVertex explicitly as messageable.
void injectMissingData(fair::mq::Device &device, fair::mq::Parts &parts, std::vector< OutputRoute > const &routes, bool doInjectMissingData, unsigned int doPrintSizes)
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
@ RAWFMQ
A raw FairMQ channel which is not accounted by the framework.
std::function< std::string(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)> ChannelSelector
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig, ChannelSelector channelSelector=defaultOutputProxyChannelSelector)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
InjectorFunction o2DataModelAdaptor(OutputSpec const &spec, uint64_t startTime, uint64_t step)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::string defaultOutputProxyChannelSelector(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)
Default way to select an output channel for multi-output proxy.
auto getFinalIndex(DataHeader const &dh, size_t msgidx) -> size_t
std::vector< InputSpec > Inputs
void appendForSending(fair::mq::Device &device, o2::header::Stack &&headerStack, size_t timeSliceID, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, fair::mq::Parts &messageCache, ChannelRetriever &channelRetriever)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
InjectorFunction incrementalConverter(OutputSpec const &spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int INVALID
header::DataHeader::SubSpecificationType subSpec
bool throwOnUnmatchedInputs
throw runtime error if an input message is not matched by filter rules
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static DeploymentMode deploymentMode()
static std::string inputChannel2String(const InputChannelSpec &channel)
Helper to provide the channel configuration string for an input channel.
static std::string outputChannel2String(const OutputChannelSpec &channel)
Helper to provide the channel configuration string for an output channel.
std::vector< ForwardRoute > forwards
Definition DeviceSpec.h:64
Running state information of a given device.
Definition DeviceState.h:34
static ForwardingPolicy createDefaultForwardingPolicy()
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:619
TFCounterType tfCounter
Definition DataHeader.h:680
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:652
TForbitType firstTForbit
Definition DataHeader.h:675
DataDescription dataDescription
Definition DataHeader.h:637
SubSpecificationType subSpecification
Definition DataHeader.h:657
PayloadSizeType payloadSize
Definition DataHeader.h:667
RunNumberType runNumber
Definition DataHeader.h:685
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels