Project
Loading...
Searching...
No Matches
ExternalFairMQDeviceProxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
33#include "Headers/DataHeader.h"
34#include "Headers/Stack.h"
35#include "DecongestionService.h"
37
38#include "./DeviceSpecHelpers.h"
39#include "Monitoring/Monitoring.h"
40
41#include <fairmq/Parts.h>
42#include <fairmq/Device.h>
43#include <uv.h>
44#include <cstring>
45#include <cassert>
46#include <memory>
47#include <optional>
48#include <unordered_map>
49#include <numeric> // std::accumulate
50#include <sstream>
51#include <stdexcept>
52#include <regex>
53
54namespace o2::framework
55{
57
62
67
69
70void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, std::string const& channel, size_t timeSlice)
71{
72 // Note: DPL is only setting up one instance of a channel while FairMQ allows to have an
73 // array of channels, the index is 0 in the call
74 constexpr auto index = 0;
75 LOG(debug) << "sending " << messages.Size() << " messages on " << channel;
76 // TODO: we can make this configurable
77 const int maxTimeout = 10000;
78 int timeout = 0;
79 // try dispatch with increasing timeout in order to also drop a warning if the dispatching
80 // has been tried multiple times within max timeout
81 // since we do not want any messages to be dropped at this stage, we stay in the loop until
82 // the downstream congestion is resolved
83 // TODO: we might want to treat this error condition some levels higher up, but for
84 // the moment its an appropriate solution. The important thing is not to drop
85 // messages and to be informed about the congestion.
86 while (device.Send(messages, channel, index, timeout) < 0) {
87 if (timeout == 0) {
88 timeout = 1;
89 } else if (timeout < maxTimeout) {
90 timeout *= 10;
91 } else {
92 LOG(alarm) << "Cannot dispatch to channel " << channel << " due to DOWNSTREAM BACKPRESSURE. NO DATA IS DROPPED,"
93 << " will keep retrying. This is only a problem if downstream congestion does not resolve by itself.";
94 if (timeout == maxTimeout) {
95 // we add 1ms to disable the warning below
96 timeout += 1;
97 }
98 }
99 if (device.NewStatePending()) {
100 LOG(alarm) << "Device state change is requested, dropping " << messages.Size() << " pending message(s) "
101 << "on channel " << channel << ". "
102 << "ATTENTION: DATA IS LOST! Could not dispatch data to downstream consumer(s), check if "
103 << "consumers have been terminated too early";
104 // make sure we disable the warning below
105 timeout = maxTimeout + 1;
106 break;
107 }
108 }
109
110 // FIXME: we need a better logic for avoiding message spam
111 if (timeout > 100 && timeout <= maxTimeout) {
112 LOG(warning) << "dispatching on channel " << channel << " was delayed by " << timeout / 1000.f << " s";
113 }
114 // TODO: feeling this is a bit awkward, but the interface of fair::mq::Parts does not provide a
115 // method to clear the content.
116 // Maybe the FairMQ API can be improved at some point. Actually the ownership of all messages should be passed
117 // on to the transport and the messages should be empty after sending and the parts content can be cleared.
118 // assert(std::accumulate(messages.begin(), messages.end(), true, [](bool a, auto const& msg) {return a && (msg.get() == nullptr);}));
119 messages.fParts.clear();
120}
121
122void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, OutputSpec const& spec, DataProcessingHeader::StartTime tslice, ChannelRetriever& channelRetriever)
123{
124 // Note: DPL is only setting up one instance of a channel while FairMQ allows to have an
125 // array of channels, the index is 0 in the call
126 auto channel = channelRetriever(spec, tslice);
127 if (channel.empty()) {
128 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec) << " timeslice " << tslice;
129 return;
130 }
131 sendOnChannel(device, messages, channel, tslice);
132}
133
134void sendOnChannel(fair::mq::Device& device, o2::header::Stack&& headerStack, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, ChannelRetriever& channelRetriever)
135{
136 const auto* dph = o2::header::get<DataProcessingHeader*>(headerStack.data());
137 if (!dph) {
138 LOG(error) << "Header Stack does not follow the O2 data model, DataProcessingHeader missing";
139 return;
140 }
141 auto channelName = channelRetriever(spec, dph->startTime);
142 constexpr auto index = 0;
143 if (channelName.empty()) {
144 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec);
145 return;
146 }
147 for (auto& channelInfo : device.GetChannels()) {
148 if (channelInfo.first != channelName) {
149 continue;
150 }
151 assert(channelInfo.second.size() == 1);
152 // allocate the header message using the underlying transport of the channel
153 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[index].Transport());
154 fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
155
156 fair::mq::Parts out;
157 out.AddPart(std::move(headerMessage));
158 out.AddPart(std::move(payloadMessage));
159 sendOnChannel(device, out, channelName, dph->startTime);
160 return;
161 }
162 LOG(error) << "internal mismatch, can not find channel " << channelName << " in the list of channel infos of the device";
163}
164
165void sendOnChannel(fair::mq::Device& device, fair::mq::MessagePtr&& headerMessage, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, ChannelRetriever& channelRetriever)
166{
167 // const auto* dph = o2::header::get<DataProcessingHeader*>( *reinterpret_cast<o2::header::Stack*>(headerMessage->GetData()) );
168 const auto* dph = o2::header::get<DataProcessingHeader*>(headerMessage->GetData());
169 if (!dph) {
170 LOG(error) << "Header does not follow the O2 data model, DataProcessingHeader missing";
171 return;
172 }
173 auto tslice = dph->startTime;
174 fair::mq::Parts out;
175 out.AddPart(std::move(headerMessage));
176 out.AddPart(std::move(payloadMessage));
177 sendOnChannel(device, out, spec, tslice, channelRetriever);
178}
179
180void appendForSending(fair::mq::Device& device, o2::header::Stack&& headerStack, size_t timeSliceID, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, fair::mq::Parts& messageCache, ChannelRetriever& channelRetriever)
181{
182 auto channelName = channelRetriever(spec, timeSliceID);
183 constexpr auto index = 0;
184 if (channelName.empty()) {
185 LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec);
186 return;
187 }
188 for (auto& channelInfo : device.GetChannels()) {
189 if (channelInfo.first != channelName) {
190 continue;
191 }
192 assert(channelInfo.second.size() == 1);
193 // allocate the header message using the underlying transport of the channel
194 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[index].Transport());
195 fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
196
197 fair::mq::Parts out;
198 messageCache.AddPart(std::move(headerMessage));
199 messageCache.AddPart(std::move(payloadMessage));
200 return;
201 }
202 LOG(error) << "internal mismatch, can not find channel " << channelName << " in the list of channel infos of the device";
203}
204
205InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t /*step*/)
206{
207 return [spec](TimingInfo&, ServiceRegistryRef const& ref, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) -> bool {
208 auto* device = ref.get<RawDeviceService>().device();
209 for (int i = 0; i < parts.Size() / 2; ++i) {
210 auto dh = o2::header::get<DataHeader*>(parts.At(i * 2)->GetData());
211
212 DataProcessingHeader dph{newTimesliceId, 0};
213 o2::header::Stack headerStack{*dh, dph};
214 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(i * 2 + 1)), spec, channelRetriever);
215 }
216 return parts.Size() > 0;
217 };
218}
219
220auto getFinalIndex(DataHeader const& dh, size_t msgidx) -> size_t
221{
222 size_t finalBlockIndex = 0;
223 if (dh.splitPayloadParts > 0 && dh.splitPayloadParts == dh.splitPayloadIndex) {
224 // this is indicating a sequence of payloads following the header
225 // FIXME: we will probably also set the DataHeader version
226 // Current position + number of parts + 1 (for the header)
227 finalBlockIndex = msgidx + dh.splitPayloadParts + 1;
228 } else {
229 // We can consider the next splitPayloadParts as one block of messages pairs
230 // because we are guaranteed they are all the same.
231 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
232 // pair.
233 finalBlockIndex = msgidx + (dh.splitPayloadParts > 0 ? dh.splitPayloadParts : 1) * 2;
234 }
235 assert(finalBlockIndex >= msgidx + 2);
236 return finalBlockIndex;
237};
238
239void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::vector<OutputRoute> const& routes, bool doInjectMissingData, unsigned int doPrintSizes)
240{
241 // Check for missing data.
242 static std::vector<bool> present;
243 static std::vector<bool> ignored;
244 static std::vector<size_t> dataSizes;
245 static std::vector<bool> showSize;
246 present.clear();
247 present.resize(routes.size(), false);
248 ignored.clear();
249 ignored.resize(routes.size(), false);
250 dataSizes.clear();
251 dataSizes.resize(routes.size(), 0);
252 showSize.clear();
253 showSize.resize(routes.size(), false);
254
255 static std::vector<size_t> unmatchedDescriptions;
256 unmatchedDescriptions.clear();
257 DataProcessingHeader const* dph = nullptr;
258 DataHeader const* firstDH = nullptr;
259 bool hassih = false;
260
261 // Do not check anything which has DISTSUBTIMEFRAME in it.
262 size_t expectedDataSpecs = 0;
263 for (size_t pi = 0; pi < present.size(); ++pi) {
264 auto& spec = routes[pi].matcher;
265 if (DataSpecUtils::asConcreteDataTypeMatcher(spec).description == header::DataDescription("DISTSUBTIMEFRAME")) {
266 ignored[pi] = true;
267 continue;
268 }
269 if (routes[pi].timeslice == 0) {
270 ++expectedDataSpecs;
271 }
272 }
273
274 size_t foundDataSpecs = 0;
275 bool skipAsAllFound = false;
276 for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
277 bool allFound = true;
278 int addToSize = -1;
279 const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
280 auto const sih = o2::header::get<SourceInfoHeader*>(parts.At(msgidx)->GetData());
281 if (sih != nullptr) {
282 hassih = true;
283 continue;
284 }
285 if (parts.At(msgidx).get() == nullptr) {
286 LOG(error) << "unexpected nullptr found. Skipping message pair.";
287 continue;
288 }
289 if (!dh) {
290 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
291 if (msgidx > 0) {
292 --msgidx;
293 }
294 continue;
295 }
296 if (firstDH == nullptr) {
297 firstDH = dh;
298 if (doPrintSizes && firstDH->tfCounter % doPrintSizes != 0) {
299 doPrintSizes = 0;
300 }
301 }
302 // Copy the DataProcessingHeader from the first message.
303 if (dph == nullptr) {
304 dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
305 for (size_t pi = 0; pi < present.size(); ++pi) {
306 if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
307 ignored[pi] = true;
308 }
309 }
310 }
311 for (size_t pi = 0; pi < present.size(); ++pi) {
312 if ((present[pi] || ignored[pi]) && !doPrintSizes) {
313 continue;
314 }
315 // Consider uninvolved pipelines as present.
316 if (routes[pi].timeslice != (dph->startTime % routes[pi].maxTimeslices)) {
317 ignored[pi] = true;
318 continue;
319 }
320 allFound = false;
321 auto& spec = routes[pi].matcher;
322 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
323 if (DataSpecUtils::match(spec, query)) {
324 if (!present[pi] && !ignored[pi]) {
325 ++foundDataSpecs;
326 present[pi] = true;
327 showSize[pi] = true;
328 }
329 addToSize = pi;
330 break;
331 }
332 }
333 int msgidxLast = getFinalIndex(*dh, msgidx);
334 if (addToSize >= 0) {
335 int increment = (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) ? 1 : 2;
336 for (int msgidx2 = msgidx + 1; msgidx2 < msgidxLast; msgidx2 += increment) {
337 dataSizes[addToSize] += parts.At(msgidx2)->GetSize();
338 }
339 }
340 // Skip the rest of the block of messages. We subtract 2 because above we increment by 2.
341 msgidx = msgidxLast - 2;
342 if (allFound && !doPrintSizes) {
343 skipAsAllFound = true;
344 break;
345 }
346 }
347
348 bool emptyTf = true;
349 for (size_t pi = 0; pi < present.size(); ++pi) {
350 if (present[pi] && !ignored[pi]) {
351 emptyTf = false;
352 }
353 if (!present[pi] && !ignored[pi]) {
354 showSize[pi] = true;
355 unmatchedDescriptions.push_back(pi);
356 }
357 }
358 int timeframeCompleteness = emptyTf ? 0 : (unmatchedDescriptions.size() ? -1 : 1);
359 (void)timeframeCompleteness; // To be sent as message
360
361 if (skipAsAllFound && !doPrintSizes) {
362 return;
363 }
364
365 if (firstDH && doPrintSizes) {
366 std::string sizes = "";
367 size_t totalSize = 0;
368 for (size_t pi = 0; pi < present.size(); ++pi) {
369 if (showSize[pi]) {
370 totalSize += dataSizes[pi];
371 auto& spec = routes[pi].matcher;
372 sizes += DataSpecUtils::describe(spec) + fmt::format(":{} ", fmt::group_digits(dataSizes[pi]));
373 }
374 }
375 LOGP(important, "RAW {} size report:{}- Total:{}", firstDH->tfCounter, sizes, fmt::group_digits(totalSize));
376 }
377
378 if (!doInjectMissingData) {
379 return;
380 }
381
382 if (unmatchedDescriptions.size() > 0) {
383 if (hassih) {
384 if (firstDH) {
385 LOG(error) << "Received an EndOfStream message together with data. This should not happen.";
386 }
387 LOG(detail) << "This is an End Of Stream message. Not injecting anything.";
388 return;
389 }
390 if (firstDH == nullptr) {
391 LOG(error) << "Input proxy received incomplete data without any data header. This should not happen! Cannot inject missing data as requsted.";
392 return;
393 }
394 if (dph == nullptr) {
395 LOG(error) << "Input proxy received incomplete data without any data processing header. This should happen! Cannot inject missing data as requsted.";
396 return;
397 }
398 std::string missing = "";
399 bool showAlarm = false;
400 for (auto mi : unmatchedDescriptions) {
401 auto& spec = routes[mi].matcher;
402 missing += " " + DataSpecUtils::describe(spec);
403 // If we have a ConcreteDataMatcher, we can create a message with the correct header.
404 // If we have a ConcreteDataTypeMatcher, we use 0xdeadbeef as subSpecification.
406 auto subSpec = DataSpecUtils::getOptionalSubSpec(spec);
407 if (subSpec == std::nullopt) {
408 *subSpec = 0xDEADBEEF;
409 }
410 o2::header::DataHeader dh{*firstDH};
411 dh.dataOrigin = concrete.origin;
412 dh.dataDescription = concrete.description;
413 dh.subSpecification = *subSpec;
414 dh.payloadSize = 0;
415 dh.splitPayloadParts = 0;
416 dh.splitPayloadIndex = 0;
417 dh.payloadSerializationMethod = header::gSerializationMethodNone;
418
419 auto& channelName = routes[mi].channel;
420 auto& channelInfo = device.GetChannel(channelName);
421 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.Transport());
422 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *dph});
423 parts.AddPart(std::move(headerMessage));
424 // add empty payload message
425 parts.AddPart(device.NewMessageFor(channelName, 0, 0));
427 showAlarm = true;
428 }
429 }
430 static int maxWarn = 10; // Correct would be o2::conf::VerbosityConfig::Instance().maxWarnDeadBeef, but Framework does not depend on CommonUtils..., but not so critical since receives will send correct number of DEADBEEF messages
431 static int contDeadBeef = 0;
432 if (showAlarm && ++contDeadBeef <= maxWarn) {
433 LOGP(alarm, "Found {}/{} data specs, missing data specs: {}, injecting 0xDEADBEEF{}", foundDataSpecs, expectedDataSpecs, missing, contDeadBeef == maxWarn ? " - disabling alarm now to stop flooding the log" : "");
434 }
435 }
436}
437
438InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config)
439{
440 bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs;
441 // structure to hold information on the unmatched data and print a warning at cleanup
442 class DroppedDataSpecs
443 {
444 public:
445 DroppedDataSpecs() = default;
446 ~DroppedDataSpecs()
447 {
448 warning();
449 }
450
451 [[nodiscard]] bool find(std::string const& desc) const
452 {
453 return descriptions.find(desc) != std::string::npos;
454 }
455
456 void add(std::string const& desc)
457 {
458 descriptions += "\n " + desc;
459 }
460
461 void warning() const
462 {
463 if (not descriptions.empty()) {
464 LOG(warning) << "Some input data could not be matched by filter rules to output specs\n"
465 << "Active rules: " << descriptions << "\n"
466 << "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
467 }
468 }
469
470 private:
471 std::string descriptions;
472 };
473
474 return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](TimingInfo& timingInfo, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) {
475 // FIXME: this in not thread safe, but better than an alloc of a map per message...
476 std::unordered_map<std::string, fair::mq::Parts> outputs;
477 std::vector<std::string> unmatchedDescriptions;
478 auto* device = services.get<RawDeviceService>().device();
479
480 static bool override_creation_env = getenv("DPL_RAWPROXY_OVERRIDE_ORBITRESET");
481 bool override_creation = false;
482 uint64_t creationVal = 0;
483 if (override_creation_env) {
484 static uint64_t creationValBase = std::stoul(getenv("DPL_RAWPROXY_OVERRIDE_ORBITRESET"));
485 creationVal = creationValBase;
486 override_creation = true;
487 } else {
488 auto orbitResetTimeUrl = device->fConfig->GetProperty<std::string>("orbit-reset-time", "ccdb://CTP/Calib/OrbitResetTime");
489 char* err = nullptr;
490 creationVal = std::strtoll(orbitResetTimeUrl.c_str(), &err, 10);
491 if (err && *err == 0 && creationVal) {
492 override_creation = true;
493 }
494 }
495
496 for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
497 if (parts.At(msgidx).get() == nullptr) {
498 LOG(error) << "unexpected nullptr found. Skipping message pair.";
499 continue;
500 }
501 const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
502 if (!dh) {
503 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
504 if (msgidx > 0) {
505 --msgidx;
506 }
507 continue;
508 }
509 auto dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
510 if (!dph) {
511 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
512 continue;
513 }
514 const_cast<DataProcessingHeader*>(dph)->startTime = newTimesliceId;
515 if (override_creation) {
516 const_cast<DataProcessingHeader*>(dph)->creation = creationVal + (dh->firstTForbit * o2::constants::lhc::LHCOrbitNS * 0.000001f);
517 }
518 timingInfo.timeslice = dph->startTime;
519 timingInfo.creation = dph->creation;
520 timingInfo.firstTForbit = dh->firstTForbit;
521 timingInfo.runNumber = dh->runNumber;
522 timingInfo.tfCounter = dh->tfCounter;
523 LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
524
525 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
526 LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts;
527 int finalBlockIndex = 0;
528 std::string channelName = "";
529
530 for (auto const& spec : filterSpecs) {
531 // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
533 DataSpecUtils::match(spec, query)) {
534 channelName = channelRetriever(query, dph->startTime);
535 // We do not complain about DPL/EOS/0, since it's normal not to forward it.
536 if (channelName.empty() && DataSpecUtils::describe(query) != "DPL/EOS/0") {
537 LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query);
538 }
539 break;
540 }
541 }
542 finalBlockIndex = getFinalIndex(*dh, msgidx);
543 if (finalBlockIndex > parts.Size()) {
544 // TODO error handling
545 // LOGP(error, "DataHeader::splitPayloadParts invalid");
546 continue;
547 }
548
549 if (!channelName.empty()) {
550 // the checks for consistency of split payload parts are of informative nature
551 // forwarding happens independently
552 // if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
553 // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
554 // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
555 // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
556 // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
557 // << " of " << dh->splitPayloadParts;
558 // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
559 // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
560 // << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
561 // }
562 //}
563 LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size());
564 for (; msgidx < finalBlockIndex; ++msgidx) {
565 outputs[channelName].AddPart(std::move(parts.At(msgidx)));
566 }
567 msgidx -= 2;
568 } else {
569 msgidx = finalBlockIndex - 2;
570 }
571 if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
572 unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query));
573 }
574 } // end of loop over parts
575
576 bool didSendParts = false;
577 for (auto& [channelName, channelParts] : outputs) {
578 if (channelParts.Size() == 0) {
579 continue;
580 }
581 didSendParts = true;
582 sendOnChannel(*device, channelParts, channelName, newTimesliceId);
583 }
584 if (not unmatchedDescriptions.empty()) {
585 if (throwOnUnmatchedInputs) {
586 std::string descriptions;
587 for (auto const& desc : unmatchedDescriptions) {
588 descriptions += "\n " + desc;
589 }
590 throw std::runtime_error("No matching filter rule for input data " + descriptions +
591 "\n Add appropriate matcher(s) to dataspec definition or allow to drop unmatched data");
592 } else {
593 bool changed = false;
594 for (auto const& desc : unmatchedDescriptions) {
595 if (not droppedDataSpecs->find(desc)) {
596 // a new description
597 droppedDataSpecs->add(desc);
598 changed = true;
599 }
600 }
601 if (changed) {
602 droppedDataSpecs->warning();
603 }
604 }
605 }
606 return didSendParts;
607 };
608}
609
610InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step)
611{
612 auto timesliceId = std::make_shared<size_t>(startTime);
613 return [timesliceId, spec, step, method](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) {
614 auto* device = services.get<RawDeviceService>().device();
615 // We iterate on all the parts and we send them two by two,
616 // adding the appropriate O2 header.
617 for (int i = 0; i < parts.Size(); ++i) {
618 DataHeader dh;
619 dh.payloadSerializationMethod = method;
620
621 // FIXME: this only supports fully specified output specs...
623 dh.dataOrigin = matcher.origin;
624 dh.dataDescription = matcher.description;
625 dh.subSpecification = matcher.subSpec;
626 dh.payloadSize = parts.At(i)->GetSize();
627
628 DataProcessingHeader dph{newTimesliceId, 0};
629 if (*timesliceId != newTimesliceId) {
630 LOG(fatal) << "Time slice ID provided from oldestPossible mechanism " << newTimesliceId << " is out of sync with expected value " << *timesliceId;
631 }
632 *timesliceId += step;
633 // we have to move the incoming data
634 o2::header::Stack headerStack{dh, dph};
635
636 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(i)), spec, channelRetriever);
637 }
638 return parts.Size();
639 };
640}
641
643 std::vector<OutputSpec> const& outputs,
644 char const* defaultChannelConfig,
645 InjectorFunction converter,
646 uint64_t minSHM,
647 bool sendTFcounter,
648 bool doInjectMissingData,
649 unsigned int doPrintSizes)
650{
652 spec.name = strdup(name);
653 spec.inputs = {};
654 spec.outputs = outputs;
655 static std::vector<std::string> channels;
656 static std::vector<int> numberOfEoS(channels.size(), 0);
657 static std::vector<int> eosPeersCount(channels.size(), 0);
658 // The Init method will register a new "Out of band" channel and
659 // attach an OnData to it which is responsible for converting incoming
660 // messages into DPL messages.
661 spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name, sendTFcounter, doInjectMissingData, doPrintSizes](InitContext& ctx) {
662 auto* device = ctx.services().get<RawDeviceService>().device();
663 // make a copy of the output routes and pass to the lambda by move
664 auto outputRoutes = ctx.services().get<RawDeviceService>().spec().outputs;
665 auto outputChannels = ctx.services().get<RawDeviceService>().spec().outputChannels;
666 assert(device);
667
668 // check that the name used for registering the OnData callback corresponds
669 // to the configured output channel, unfortunately we can not automatically
670 // deduce this from list of channels without knowing the name, because there
671 // will be multiple channels. At least we throw a more informative exception.
672 // fair::mq::Device calls the custom init before the channels have been configured
673 // so we do the check before starting in a dedicated callback
674 auto channelConfigurationChecker = [device, deviceName, services = ctx.services()]() {
675 auto& deviceState = services.get<DeviceState>();
676 channels.clear();
677 numberOfEoS.clear();
678 eosPeersCount.clear();
679 for (auto& [channelName, _] : services.get<RawDeviceService>().device()->GetChannels()) {
680 // Out of band channels must start with the proxy name, at least for now
681 if (strncmp(channelName.c_str(), deviceName.c_str(), deviceName.size()) == 0) {
682 channels.push_back(channelName);
683 }
684 }
685 for (auto& channel : channels) {
686 LOGP(detail, "Injecting channel '{}' into DPL configuration", channel);
687 // Converter should pump messages
688 auto& channelPtr = services.get<RawDeviceService>().device()->GetChannel(channel, 0);
689 deviceState.inputChannelInfos.push_back(InputChannelInfo{
691 .hasPendingEvents = false,
692 .readPolled = false,
693 .channel = &channelPtr,
694 .id = {ChannelIndex::INVALID},
695 .channelType = ChannelAccountingType::RAWFMQ,
696 });
697 }
698 numberOfEoS.resize(channels.size(), 0);
699 eosPeersCount.resize(channels.size(), 0);
700 };
701
702 auto drainMessages = [](ServiceRegistryRef registry, int state) {
703 auto* device = registry.get<RawDeviceService>().device();
704 auto& deviceState = registry.get<DeviceState>();
705 // We drop messages in input only when in ready.
706 // FIXME: should we drop messages in input the first time we are in ready?
707 static bool wasRunning = false;
708 if (fair::mq::State{state} == fair::mq::State::Running) {
709 wasRunning = true;
710 }
711 if (fair::mq::State{state} != fair::mq::State::Ready || !wasRunning) {
712 return;
713 }
714 uv_update_time(deviceState.loop);
715 bool doDrain = true;
716 // Cleanup count is set by the cleanup property of the device.
717 // It is incremented every time the device is cleaned up.
718 // We use it to detect when the device is cleaned up.
719 int64_t cleanupCount = deviceState.cleanupCount.load();
720
721 // Continue iterating we saw the cleanup property being reset or
722 // the device state changing.
723 while (doDrain) {
724 doDrain = device->NewStatePending() == false && deviceState.cleanupCount == cleanupCount;
725 fair::mq::Parts parts;
726 for (size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
727 auto& info = deviceState.inputChannelInfos[ci];
728 // We only care about rawfmq channels.
729 if (info.channelType != ChannelAccountingType::RAWFMQ) {
730 continue;
731 }
732 info.channel->Receive(parts, 10);
733 }
734 // Keep state transitions going also when running with the standalone GUI.
735 uv_run(deviceState.loop, UV_RUN_NOWAIT);
736 }
737 };
738
739 ctx.services().get<CallbackService>().set<CallbackService::Id::Start>(channelConfigurationChecker);
740 if (ctx.options().get<std::string>("ready-state-policy") == "drain") {
741 LOG(info) << "Drain mode requested while in Ready state";
742 ctx.services().get<CallbackService>().set<CallbackService::Id::DeviceStateChanged>(drainMessages);
743 }
744
745 static auto countEoS = [](fair::mq::Parts& inputs) -> int {
746 int count = 0;
747 for (int msgidx = 0; msgidx < inputs.Size() / 2; ++msgidx) {
748 // Skip when we have nullptr for the header.
749 // Not sure it can actually happen, but does not hurt.
750 if (inputs.At(msgidx * 2).get() == nullptr) {
751 continue;
752 }
753 auto const sih = o2::header::get<SourceInfoHeader*>(inputs.At(msgidx * 2)->GetData());
754 if (sih != nullptr && sih->state == InputChannelState::Completed) {
755 count++;
756 }
757 }
758 return count;
759 };
760
761 // Data handler for incoming data. Must return true if it sent any data.
762 auto dataHandler = [converter, doInjectMissingData, doPrintSizes,
763 outputRoutes = std::move(outputRoutes),
764 control = &ctx.services().get<ControlService>(),
765 deviceState = &ctx.services().get<DeviceState>(),
766 timesliceIndex = &ctx.services().get<TimesliceIndex>(),
767 outputChannels = std::move(outputChannels)](ServiceRegistryRef ref, TimingInfo& timingInfo, fair::mq::Parts& inputs, int, size_t ci, bool newRun) -> bool {
768 auto* device = ref.get<RawDeviceService>().device();
769 // pass a copy of the outputRoutes
770 auto channelRetriever = [&outputRoutes](OutputSpec const& query, DataProcessingHeader::StartTime timeslice) -> std::string const& {
771 static std::string emptyChannel = "";
772 for (auto& route : outputRoutes) {
773 LOG(debug) << "matching: " << DataSpecUtils::describe(query) << " to route " << DataSpecUtils::describe(route.matcher);
774 if (DataSpecUtils::match(route.matcher, query) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
775 return route.channel;
776 }
777 }
778 return emptyChannel;
779 };
780
781 std::string const& channel = channels[ci];
782 // we buffer the condition since the converter will forward messages by move
783 int nEos = countEoS(inputs);
784 if (newRun) {
785 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
786 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
787 }
788 numberOfEoS[ci] += nEos;
789 if (numberOfEoS[ci]) {
790 eosPeersCount[ci] = std::max<int>(eosPeersCount[ci], device->GetNumberOfConnectedPeers(channel));
791 }
792 // For reference, the oldest possible timeframe passed as newTimesliceId here comes from LifetimeHelpers::enumDrivenCreation()
793 bool shouldstop = false;
794 if (doInjectMissingData) {
795 injectMissingData(*device, inputs, outputRoutes, doInjectMissingData, doPrintSizes);
796 }
797 bool didSendParts = converter(timingInfo, ref, inputs, channelRetriever, timesliceIndex->getOldestPossibleOutput().timeslice.value, shouldstop);
798
799 // If we have enough EoS messages, we can stop the device
800 // Notice that this has a number of failure modes:
801 // * If a connection sends the EoS and then closes before the GetNumberOfConnectedPeers command above.
802 // * If a connection sends two EoS.
803 // * If a connection sends an end of stream closes and another one opens.
804 // Finally, if we didn't receive an EoS this time, out counting of the connected peers is off, so the best thing we can do is delay the EoS reporting
805 bool everyEoS = shouldstop;
806 if (!shouldstop && nEos) {
807 everyEoS = true;
808 for (unsigned int i = 0; i < numberOfEoS.size(); i++) {
809 if (numberOfEoS[i] < eosPeersCount[i]) {
810 everyEoS = false;
811 break;
812 }
813 }
814 }
815
816 if (everyEoS) {
817 LOG(info) << "Received (on channel " << ci << ") " << numberOfEoS[ci] << " end-of-stream from " << eosPeersCount[ci] << " peers, forwarding end-of-stream (shouldstop " << (int)shouldstop << ", nEos " << nEos << ", newRun " << (int)newRun << ")";
818 // Mark all input channels as closed
819 for (auto& info : deviceState->inputChannelInfos) {
820 info.state = InputChannelState::Completed;
821 }
822 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
823 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
824 control->endOfStream();
825 }
826 return didSendParts;
827 };
828
829 auto runHandler = [dataHandler, minSHM, sendTFcounter](ProcessingContext& ctx) {
830 static RateLimiter limiter;
831 static size_t currentRunNumber = -1;
832 static bool inStopTransition = false;
833 bool newRun = false;
834 auto device = ctx.services().get<RawDeviceService>().device();
835 if (limiter.check(ctx, std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit")), minSHM)) {
836 inStopTransition = true;
837 }
838
839 bool didSendParts = false;
840 for (size_t ci = 0; ci < channels.size(); ++ci) {
841 std::string const& channel = channels[ci];
842 int waitTime = channels.size() == 1 ? -1 : 1;
843 int maxRead = 1000;
844 while (maxRead-- > 0) {
845 fair::mq::Parts parts;
846 auto res = device->Receive(parts, channel, 0, waitTime);
847 if (res == (size_t)fair::mq::TransferCode::error) {
848 LOGP(error, "Error while receiving on channel {}", channel);
849 }
850 // Populate TimingInfo from the first message
851 unsigned int nReceived = parts.Size();
852 if (nReceived != 0) {
853 auto const dh = o2::header::get<DataHeader*>(parts.At(0)->GetData());
854 auto& timingInfo = ctx.services().get<TimingInfo>();
855 if (dh != nullptr) {
856 if (currentRunNumber != -1 && dh->runNumber != 0 && dh->runNumber != currentRunNumber) {
857 newRun = true;
858 inStopTransition = false;
859 }
860 if (currentRunNumber == -1 || dh->runNumber != 0) {
861 currentRunNumber = dh->runNumber;
862 }
863 timingInfo.runNumber = dh->runNumber;
864 timingInfo.firstTForbit = dh->firstTForbit;
865 timingInfo.tfCounter = dh->tfCounter;
866 }
867 auto const dph = o2::header::get<DataProcessingHeader*>(parts.At(0)->GetData());
868 if (dph != nullptr) {
869 timingInfo.timeslice = dph->startTime;
870 timingInfo.creation = dph->creation;
871 }
872 if (!inStopTransition) {
873 didSendParts |= dataHandler(ctx.services(), timingInfo, parts, 0, ci, newRun);
874 }
875 if (sendTFcounter) {
876 ctx.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)timingInfo.tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
877 }
878 }
879 if (nReceived == 0 || channels.size() == 1) {
880 break;
881 }
882 waitTime = 0;
883 }
884 }
885 // In case we did not send any part at all, we need to rewind by one
886 // to avoid creating extra timeslices.
887 auto& decongestion = ctx.services().get<DecongestionService>();
888 decongestion.nextEnumerationTimesliceRewinded = !didSendParts;
889 if (didSendParts) {
890 ctx.services().get<MessageContext>().fakeDispatch();
891 } else {
892 decongestion.nextEnumerationTimeslice -= 1;
893 }
894 };
895
896 return runHandler;
897 }};
898 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
899 spec.options = {
900 ConfigParamSpec{"ready-state-policy", VariantType::String, "keep", {"What to do when the device is in ready state: *keep*, drain"}},
901 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}}};
902 return spec;
903}
904
905// Decide where to sent the output. Everything to "downstream" if there is such a channel.
906std::string defaultOutputProxyChannelSelector(InputSpec const& input, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>& channels)
907{
908 return channels.count("downstream") ? "downstream" : input.binding;
909}
910
912 Inputs const& inputSpecs,
913 const char* defaultChannelConfig)
914{
916 spec.name = name;
917 spec.inputs = inputSpecs;
918 spec.outputs = {};
919 spec.algorithm = adaptStateful([inputSpecs](FairMQDeviceProxy& proxy, CallbackService& callbacks, RawDeviceService& rds, DeviceSpec const& deviceSpec, ConfigParamRegistry const& options) {
920 // we can retrieve the channel name from the channel configuration string
921 // FIXME: even if a --channel-config option is specified on the command line, always the default string
922 // is retrieved from the config registry. The channel name thus needs to be configured in the default
923 // string AND must match the name in an optional channel config.
924 auto channelConfig = options.get<std::string>("channel-config");
925 std::regex r{R"(name=([^,]*))"};
926 std::vector<std::string> values{std::sregex_token_iterator{std::begin(channelConfig), std::end(channelConfig), r, 1},
927 std::sregex_token_iterator{}};
928 if (values.size() != 1 || values[0].empty()) {
929 throw std::runtime_error("failed to extract channel name from channel configuration parameter '" + channelConfig + "'");
930 }
931 std::string outputChannelName = values[0];
932
933 auto* device = rds.device();
934 // check that the input spec bindings have corresponding output channels
935 // fair::mq::Device calls the custom init before the channels have been configured
936 // so we do the check before starting in a dedicated callback
937 auto channelConfigurationChecker = [inputSpecs = std::move(inputSpecs), device, outputChannelName]() {
938 LOG(info) << "checking channel configuration";
939 if (device->GetChannels().count(outputChannelName) == 0) {
940 throw std::runtime_error("no corresponding output channel found for input '" + outputChannelName + "'");
941 }
942 };
943 callbacks.set<CallbackService::Id::Start>(channelConfigurationChecker);
944 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
945
946 auto& spec = const_cast<DeviceSpec&>(deviceSpec);
948 for (auto const& inputSpec : inputSpecs) {
949 // this is a prototype, in principle we want to have all spec objects const
950 // and so only the const object can be retrieved from service registry
951 ForwardRoute route{
952 .timeslice = 0,
953 .maxTimeslices = 1,
954 .matcher = inputSpec,
955 .channel = outputChannelName,
956 .policy = &policy};
957 spec.forwards.emplace_back(route);
958 }
959
960 auto forwardEos = [device, lastDataProcessingHeader, outputChannelName](EndOfStreamContext&) {
961 // DPL implements an internal end of stream signal, which is propagated through
962 // all downstream channels if a source is dry, make it available to other external
963 // devices via a message of type {DPL/EOS/0}
964 for (auto& channelInfo : device->GetChannels()) {
965 auto& channelName = channelInfo.first;
966 if (channelName != outputChannelName) {
967 continue;
968 }
969 DataHeader dh;
970 dh.dataOrigin = "DPL";
971 dh.dataDescription = "EOS";
972 dh.subSpecification = 0;
973 dh.payloadSize = 0;
975 dh.tfCounter = 0;
976 dh.firstTForbit = 0;
979 // allocate the header message using the underlying transport of the channel
980 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
981 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *lastDataProcessingHeader, sih});
982 fair::mq::Parts out;
983 out.AddPart(std::move(headerMessage));
984 // add empty payload message
985 out.AddPart(device->NewMessageFor(channelName, 0, 0));
986 sendOnChannel(*device, out, channelName, (size_t)-1);
987 }
988 };
989 callbacks.set<CallbackService::Id::EndOfStream>(forwardEos);
990
991 return adaptStateless([lastDataProcessingHeader](InputRecord& inputs) {
992 for (size_t ii = 0; ii != inputs.size(); ++ii) {
993 for (size_t pi = 0; pi < inputs.getNofParts(ii); ++pi) {
994 auto part = inputs.getByPos(ii, pi);
995 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
996 if (dph) {
997 // FIXME: should we implement an assignment operator for DataProcessingHeader?
998 lastDataProcessingHeader->startTime = dph->startTime;
999 lastDataProcessingHeader->duration = dph->duration;
1000 lastDataProcessingHeader->creation = dph->creation;
1001 }
1002 }
1003 }
1004 });
1005 });
1006 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
1007 spec.options = {
1008 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}},
1009 };
1010
1011 return spec;
1012}
1013
1015 Inputs const& inputSpecs,
1016 const char* defaultChannelConfig,
1017 ChannelSelector channelSelector)
1018{
1019 // FIXME: this looks like a code duplication with the function above, check if the
1020 // two can be combined
1021 DataProcessorSpec spec;
1022 spec.name = name;
1023 spec.inputs = inputSpecs;
1024 spec.outputs = {};
1025 spec.algorithm = adaptStateful([inputSpecs, channelSelector](FairMQDeviceProxy& proxy, CallbackService& callbacks, RawDeviceService& rds, const DeviceSpec& deviceSpec) {
1026 auto device = rds.device();
1027 // check that the input spec bindings have corresponding output channels
1028 // fair::mq::Device calls the custom init before the channels have been configured
1029 // so we do the check before starting in a dedicated callback
1030 // also we set forwards for all input specs and keep a list of all channels so we can send EOS on them
1031 auto channelNames = std::make_shared<std::vector<std::string>>();
1032 auto channelConfigurationInitializer = [&proxy, inputSpecs = std::move(inputSpecs), device, channelSelector, &deviceSpec, channelNames]() {
1033 channelNames->clear();
1034 auto& mutableDeviceSpec = const_cast<DeviceSpec&>(deviceSpec);
1035 for (auto const& spec : inputSpecs) {
1036 auto channel = channelSelector(spec, device->GetChannels());
1037 if (device->GetChannels().count(channel) == 0) {
1038 throw std::runtime_error("no corresponding output channel found for input '" + channel + "'");
1039 }
1041 ForwardRoute route{
1042 .timeslice = 0,
1043 .maxTimeslices = 1,
1044 .matcher = spec,
1045 .channel = channel,
1046 .policy = &policy};
1047 // this we will try to fix on the framework level, there will be an API to
1048 // set external routes. Basically, this has to be added while setting up the
1049 // workflow. After that, the actual spec provided by the service is supposed
1050 // to be const by design
1051 mutableDeviceSpec.forwards.emplace_back(route);
1052
1053 channelNames->emplace_back(std::move(channel));
1054 }
1055 proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device);
1056 };
1057 // We need to clear the channels on stop, because we will check and add them
1058 auto channelConfigurationDisposer = [&deviceSpec]() {
1059 auto& mutableDeviceSpec = const_cast<DeviceSpec&>(deviceSpec);
1060 mutableDeviceSpec.forwards.clear();
1061 };
1062 callbacks.set<CallbackService::Id::Start>(channelConfigurationInitializer);
1063 callbacks.set<CallbackService::Id::Stop>(channelConfigurationDisposer);
1064
1065 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
1066 auto forwardEos = [device, lastDataProcessingHeader, channelNames](EndOfStreamContext&) {
1067 // DPL implements an internal end of stream signal, which is propagated through
1068 // all downstream channels if a source is dry, make it available to other external
1069 // devices via a message of type {DPL/EOS/0}
1070 for (auto& channelInfo : device->GetChannels()) {
1071 auto& channelName = channelInfo.first;
1072 auto checkChannel = [channelNames = std::move(*channelNames)](std::string const& name) -> bool {
1073 for (auto const& n : channelNames) {
1074 if (n == name) {
1075 return true;
1076 }
1077 }
1078 return false;
1079 };
1080 if (!checkChannel(channelName)) {
1081 continue;
1082 }
1083 DataHeader dh;
1084 dh.dataOrigin = "DPL";
1085 dh.dataDescription = "EOS";
1086 dh.subSpecification = 0;
1087 dh.payloadSize = 0;
1089 dh.tfCounter = 0;
1090 dh.firstTForbit = 0;
1091 SourceInfoHeader sih;
1093 // allocate the header message using the underlying transport of the channel
1094 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
1095 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, *lastDataProcessingHeader, sih});
1096 fair::mq::Parts out;
1097 out.AddPart(std::move(headerMessage));
1098 // add empty payload message
1099 out.AddPart(device->NewMessageFor(channelName, 0, 0));
1100 LOGP(detail, "Forwarding EoS to {}", channelName);
1101 sendOnChannel(*device, out, channelName, (size_t)-1);
1102 }
1103 };
1104 callbacks.set<CallbackService::Id::EndOfStream>(forwardEos);
1105
1106 return adaptStateless([channelSelector, lastDataProcessingHeader](InputRecord& inputs) {
1107 // there is nothing to do if the forwarding is handled on the framework level
1108 // as forward routes but we need to keep a copy of the last DataProcessingHeader
1109 // for sending the EOS
1110 for (size_t ii = 0; ii != inputs.size(); ++ii) {
1111 for (size_t pi = 0; pi < inputs.getNofParts(ii); ++pi) {
1112 auto part = inputs.getByPos(ii, pi);
1113 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
1114 if (dph) {
1115 // FIXME: should we implement an assignment operator for DataProcessingHeader?
1116 lastDataProcessingHeader->startTime = dph->startTime;
1117 lastDataProcessingHeader->duration = dph->duration;
1118 lastDataProcessingHeader->creation = dph->creation;
1119 }
1120 }
1121 }
1122 });
1123 });
1124 const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
1125 spec.options = {
1126 ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}},
1127 };
1128
1129 return spec;
1130}
1131
1132} // namespace o2::framework
benchmark::State & state
int32_t i
Header to collect LHC related constants.
uint32_t res
Definition RawData.h:0
std::ostringstream debug
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, fair::mq::Device &device)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
size_t getNofParts(int pos) const
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
virtual fair::mq::Device * device()=0
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean r
Definition glcorearb.h:1233
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginPHS
Definition DataHeader.h:574
constexpr o2::header::DataOrigin gDataOriginHMP
Definition DataHeader.h:569
constexpr o2::header::DataOrigin gDataOriginEMC
Definition DataHeader.h:565
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionAny
Definition DataHeader.h:595
constexpr double LHCOrbitNS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void injectMissingData(fair::mq::Device &device, fair::mq::Parts &parts, std::vector< OutputRoute > const &routes, bool doInjectMissingData, unsigned int doPrintSizes)
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
@ RAWFMQ
A raw FairMQ channel which is not accounted by the framework.
std::function< std::string(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)> ChannelSelector
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig, ChannelSelector channelSelector=defaultOutputProxyChannelSelector)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
InjectorFunction o2DataModelAdaptor(OutputSpec const &spec, uint64_t startTime, uint64_t step)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::string defaultOutputProxyChannelSelector(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)
Default way to select an output channel for multi-output proxy.
auto getFinalIndex(DataHeader const &dh, size_t msgidx) -> size_t
std::vector< InputSpec > Inputs
void appendForSending(fair::mq::Device &device, o2::header::Stack &&headerStack, size_t timeSliceID, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, fair::mq::Parts &messageCache, ChannelRetriever &channelRetriever)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
InjectorFunction incrementalConverter(OutputSpec const &spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int INVALID
header::DataHeader::SubSpecificationType subSpec
bool throwOnUnmatchedInputs
throw runtime error if an input message is not matched by filter rules
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static std::string inputChannel2String(const InputChannelSpec &channel)
Helper to provide the channel configuration string for an input channel.
static std::string outputChannel2String(const OutputChannelSpec &channel)
Helper to provide the channel configuration string for an output channel.
std::vector< ForwardRoute > forwards
Definition DeviceSpec.h:64
Running state information of a given device.
Definition DeviceState.h:34
static ForwardingPolicy createDefaultForwardingPolicy()
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels