242 static std::vector<bool> present;
243 static std::vector<bool> ignored;
244 static std::vector<size_t> dataSizes;
245 static std::vector<bool> showSize;
247 present.resize(routes.size(),
false);
249 ignored.resize(routes.size(),
false);
251 dataSizes.resize(routes.size(), 0);
253 showSize.resize(routes.size(),
false);
255 static std::vector<size_t> unmatchedDescriptions;
256 unmatchedDescriptions.clear();
262 size_t expectedDataSpecs = 0;
263 for (
size_t pi = 0; pi < present.size(); ++pi) {
264 auto& spec = routes[pi].matcher;
269 if (routes[pi].timeslice == 0) {
274 size_t foundDataSpecs = 0;
275 bool skipAsAllFound =
false;
276 for (
int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
277 bool allFound =
true;
279 const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
280 auto const sih = o2::header::get<SourceInfoHeader*>(parts.At(msgidx)->GetData());
281 if (sih !=
nullptr) {
285 if (parts.At(msgidx).get() ==
nullptr) {
286 LOG(error) <<
"unexpected nullptr found. Skipping message pair.";
290 LOG(error) <<
"data on input " << msgidx <<
" does not follow the O2 data model, DataHeader missing";
296 if (firstDH ==
nullptr) {
298 if (doPrintSizes && firstDH->
tfCounter % doPrintSizes != 0) {
303 if (dph ==
nullptr) {
304 dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
305 for (
size_t pi = 0; pi < present.size(); ++pi) {
306 if (routes[pi].timeslice != (dph->
startTime % routes[pi].maxTimeslices)) {
311 for (
size_t pi = 0; pi < present.size(); ++pi) {
312 if ((present[pi] || ignored[pi]) && !doPrintSizes) {
316 if (routes[pi].timeslice != (dph->
startTime % routes[pi].maxTimeslices)) {
321 auto& spec = routes[pi].matcher;
322 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
324 if (!present[pi] && !ignored[pi]) {
334 if (addToSize >= 0) {
335 int increment = (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) ? 1 : 2;
336 for (
int msgidx2 = msgidx + 1; msgidx2 < msgidxLast; msgidx2 += increment) {
337 dataSizes[addToSize] += parts.At(msgidx2)->GetSize();
341 msgidx = msgidxLast - 2;
342 if (allFound && !doPrintSizes) {
343 skipAsAllFound =
true;
349 for (
size_t pi = 0; pi < present.size(); ++pi) {
350 if (present[pi] && !ignored[pi]) {
353 if (!present[pi] && !ignored[pi]) {
355 unmatchedDescriptions.push_back(pi);
358 int timeframeCompleteness = emptyTf ? 0 : (unmatchedDescriptions.size() ? -1 : 1);
359 (
void)timeframeCompleteness;
361 if (skipAsAllFound && !doPrintSizes) {
365 if (firstDH && doPrintSizes) {
366 std::string
sizes =
"";
367 size_t totalSize = 0;
368 for (
size_t pi = 0; pi < present.size(); ++pi) {
370 totalSize += dataSizes[pi];
371 auto& spec = routes[pi].matcher;
375 LOGP(important,
"RAW {} size report:{}- Total:{}", firstDH->
tfCounter,
sizes, fmt::group_digits(totalSize));
378 if (!doInjectMissingData) {
382 if (unmatchedDescriptions.size() > 0) {
385 LOG(error) <<
"Received an EndOfStream message together with data. This should not happen.";
387 LOG(detail) <<
"This is an End Of Stream message. Not injecting anything.";
390 if (firstDH ==
nullptr) {
391 LOG(error) <<
"Input proxy received incomplete data without any data header. This should not happen! Cannot inject missing data as requsted.";
394 if (dph ==
nullptr) {
395 LOG(error) <<
"Input proxy received incomplete data without any data processing header. This should happen! Cannot inject missing data as requsted.";
398 std::string missing =
"";
399 bool showAlarm =
false;
400 uint32_t runNumber = 0;
402 runNumber = strtoul(device.fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
405 for (
auto mi : unmatchedDescriptions) {
406 auto& spec = routes[mi].matcher;
412 if (subSpec == std::nullopt) {
413 *subSpec = 0xDEADBEEF;
418 dh.subSpecification = *subSpec;
420 dh.runNumber = runNumber;
421 dh.splitPayloadParts = 0;
422 dh.splitPayloadIndex = 0;
425 auto& channelName = routes[mi].channel;
426 auto& channelInfo = device.GetChannel(channelName);
427 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.Transport());
429 parts.AddPart(std::move(headerMessage));
431 parts.AddPart(device.NewMessageFor(channelName, 0, 0));
436 static int maxWarn = 10;
437 static int contDeadBeef = 0;
438 if (showAlarm && ++contDeadBeef <= maxWarn) {
439 LOGP(alarm,
"Found {}/{} data specs, missing data specs: {}, injecting 0xDEADBEEF{}", foundDataSpecs, expectedDataSpecs, missing, contDeadBeef == maxWarn ?
" - disabling alarm now to stop flooding the log" :
"");
448 class DroppedDataSpecs
451 DroppedDataSpecs() =
default;
457 [[nodiscard]]
bool find(std::string
const& desc)
const
459 return descriptions.find(desc) != std::string::npos;
462 void add(std::string
const& desc)
464 descriptions +=
"\n " + desc;
469 if (not descriptions.empty()) {
470 LOG(warning) <<
"Some input data could not be matched by filter rules to output specs\n"
471 <<
"Active rules: " << descriptions <<
"\n"
472 <<
"DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
477 std::string descriptions;
480 return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](
TimingInfo& timingInfo,
ServiceRegistryRef const& services, fair::mq::Parts& parts,
ChannelRetriever channelRetriever,
size_t newTimesliceId,
bool& stop) {
482 std::unordered_map<std::string, fair::mq::Parts> outputs;
483 std::vector<std::string> unmatchedDescriptions;
486 static bool override_creation_env = getenv(
"DPL_RAWPROXY_OVERRIDE_ORBITRESET");
487 bool override_creation =
false;
488 uint64_t creationVal = 0;
489 if (override_creation_env) {
490 static uint64_t creationValBase = std::stoul(getenv(
"DPL_RAWPROXY_OVERRIDE_ORBITRESET"));
491 creationVal = creationValBase;
492 override_creation =
true;
494 auto orbitResetTimeUrl = device->fConfig->GetProperty<std::string>(
"orbit-reset-time",
"ccdb://CTP/Calib/OrbitResetTime");
496 creationVal = std::strtoll(orbitResetTimeUrl.c_str(), &err, 10);
497 if (err && *err == 0 && creationVal) {
498 override_creation =
true;
502 int fmqRunNumber = -1;
504 fmqRunNumber = atoi(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str());
508 for (
int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
509 if (parts.At(msgidx).get() ==
nullptr) {
510 LOG(error) <<
"unexpected nullptr found. Skipping message pair.";
513 auto* header = parts.At(msgidx)->GetData();
514 const auto dh = o2::header::get<DataHeader*>(header);
516 LOG(error) <<
"data on input " << msgidx <<
" does not follow the O2 data model, DataHeader missing";
522 auto dph = o2::header::get<DataProcessingHeader*>(header);
524 LOG(error) <<
"data on input " << msgidx <<
" does not follow the O2 data model, DataProcessingHeader missing";
528 if (override_creation) {
532 timingInfo.creation = dph->creation;
533 timingInfo.firstTForbit = dh->firstTForbit;
534 timingInfo.runNumber = dh->runNumber;
535 timingInfo.tfCounter = dh->tfCounter;
536 LOG(
debug) << msgidx <<
": " <<
DataSpecUtils::describe(
OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) <<
" part " << dh->splitPayloadIndex <<
" of " << dh->splitPayloadParts <<
" payload " << parts.At(msgidx + 1)->GetSize();
537 if (dh->runNumber == 0 || (dh->tfCounter == 0 && dh->dataDescription.as<std::string>() !=
"EOS") || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) {
538 LOG(error) <<
"INVALID runNumber / tfCounter: runNumber " << dh->runNumber
539 <<
", tfCounter " << dh->tfCounter <<
", FMQ runNumber " << fmqRunNumber
540 <<
" for msgidx " << msgidx <<
": " <<
DataSpecUtils::describe(
OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) <<
" part " << dh->splitPayloadIndex <<
" of " << dh->splitPayloadParts <<
" payload " << parts.At(msgidx + 1)->GetSize();
543 OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
544 LOG(
debug) <<
"processing " <<
DataSpecUtils::describe(
OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) <<
" time slice " << dph->startTime <<
" part " << dh->splitPayloadIndex <<
" of " << dh->splitPayloadParts;
545 int finalBlockIndex = 0;
546 std::string channelName =
"";
548 for (
auto const& spec : filterSpecs) {
552 channelName = channelRetriever(query, dph->startTime);
561 if (finalBlockIndex > parts.Size()) {
567 if (!channelName.empty()) {
581 LOGP(
debug,
"associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].
Size());
582 for (; msgidx < finalBlockIndex; ++msgidx) {
583 outputs[channelName].AddPart(std::move(parts.At(msgidx)));
587 msgidx = finalBlockIndex - 2;
594 bool didSendParts =
false;
595 for (
auto& [channelName, channelParts] : outputs) {
596 if (channelParts.Size() == 0) {
600 sendOnChannel(*device, channelParts, channelName, newTimesliceId);
602 if (not unmatchedDescriptions.empty()) {
603 if (throwOnUnmatchedInputs) {
604 std::string descriptions;
605 for (
auto const& desc : unmatchedDescriptions) {
606 descriptions +=
"\n " + desc;
608 throw std::runtime_error(
"No matching filter rule for input data " + descriptions +
609 "\n Add appropriate matcher(s) to dataspec definition or allow to drop unmatched data");
611 bool changed =
false;
612 for (
auto const& desc : unmatchedDescriptions) {
613 if (not droppedDataSpecs->find(desc)) {
615 droppedDataSpecs->add(desc);
620 droppedDataSpecs->warning();
667 std::vector<OutputSpec>
const& outputs,
668 char const* defaultChannelConfig,
672 bool doInjectMissingData,
673 unsigned int doPrintSizes)
679 static std::vector<std::string>
channels;
680 static std::vector<int> numberOfEoS(
channels.size(), 0);
681 static std::vector<int> eosPeersCount(
channels.size(), 0);
689 auto outputChannels = ctx.services().get<
RawDeviceService>().spec().outputChannels;
698 auto channelConfigurationChecker = [device, deviceName, services = ctx.services()]() {
702 eosPeersCount.clear();
703 for (
auto& [channelName, _] : services.get<
RawDeviceService>().device()->GetChannels()) {
705 if (strncmp(channelName.c_str(), deviceName.c_str(), deviceName.size()) == 0) {
710 LOGP(detail,
"Injecting channel '{}' into DPL configuration", channel);
712 auto& channelPtr = services.get<
RawDeviceService>().device()->GetChannel(channel, 0);
715 .hasPendingEvents =
false,
717 .channel = &channelPtr,
722 numberOfEoS.resize(
channels.size(), 0);
723 eosPeersCount.resize(
channels.size(), 0);
731 static bool wasRunning =
false;
732 if (fair::mq::State{
state} == fair::mq::State::Running) {
735 if (fair::mq::State{
state} != fair::mq::State::Ready || !wasRunning) {
738 uv_update_time(deviceState.loop);
743 int64_t cleanupCount = deviceState.cleanupCount.load();
748 doDrain = device->NewStatePending() ==
false && deviceState.cleanupCount == cleanupCount;
749 fair::mq::Parts parts;
750 for (
size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
751 auto& info = deviceState.inputChannelInfos[ci];
756 info.channel->Receive(parts, 10);
759 uv_run(deviceState.loop, UV_RUN_NOWAIT);
763 ctx.services().get<
CallbackService>().set<CallbackService::Id::Start>(channelConfigurationChecker);
764 if (ctx.options().get<std::string>(
"ready-state-policy") ==
"drain") {
765 LOG(info) <<
"Drain mode requested while in Ready state";
766 ctx.services().get<
CallbackService>().set<CallbackService::Id::DeviceStateChanged>(drainMessages);
769 static auto countEoS = [](fair::mq::Parts& inputs) ->
int {
771 for (
int msgidx = 0; msgidx < inputs.Size() / 2; ++msgidx) {
774 if (inputs.At(msgidx * 2).get() ==
nullptr) {
777 auto const sih = o2::header::get<SourceInfoHeader*>(inputs.At(msgidx * 2)->GetData());
786 auto dataHandler = [converter, doInjectMissingData, doPrintSizes,
787 outputRoutes = std::move(outputRoutes),
795 static std::string emptyChannel =
"";
796 for (
auto& route : outputRoutes) {
798 if (
DataSpecUtils::match(route.matcher, query) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
799 return route.channel;
805 std::string
const& channel =
channels[ci];
807 int nEos = countEoS(inputs);
809 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
810 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
812 numberOfEoS[ci] += nEos;
813 if (numberOfEoS[ci]) {
814 eosPeersCount[ci] = std::max<int>(eosPeersCount[ci], device->GetNumberOfConnectedPeers(channel));
817 bool shouldstop =
false;
818 if (doInjectMissingData) {
819 injectMissingData(*device, inputs, outputRoutes, doInjectMissingData, doPrintSizes);
821 bool didSendParts = converter(timingInfo,
ref, inputs, channelRetriever, timesliceIndex->getOldestPossibleOutput().timeslice.value, shouldstop);
829 bool everyEoS = shouldstop;
830 if (!shouldstop && nEos) {
832 for (
unsigned int i = 0;
i < numberOfEoS.size();
i++) {
833 if (numberOfEoS[
i] < eosPeersCount[
i]) {
841 LOG(info) <<
"Received (on channel " << ci <<
") " << numberOfEoS[ci] <<
" end-of-stream from " << eosPeersCount[ci] <<
" peers, forwarding end-of-stream (shouldstop " << (
int)shouldstop <<
", nEos " << nEos <<
", newRun " << (
int)newRun <<
")";
843 for (
auto& info : deviceState->inputChannelInfos) {
846 std::fill(numberOfEoS.begin(), numberOfEoS.end(), 0);
847 std::fill(eosPeersCount.begin(), eosPeersCount.end(), 0);
848 control->endOfStream();
855 static size_t currentRunNumber = -1;
856 static bool inStopTransition =
false;
859 if (limiter.
check(ctx, std::stoi(device->fConfig->GetValue<std::string>(
"timeframes-rate-limit")), minSHM)) {
860 inStopTransition =
true;
863 bool didSendParts =
false;
864 for (
size_t ci = 0; ci <
channels.size(); ++ci) {
865 std::string
const& channel =
channels[ci];
866 int waitTime =
channels.size() == 1 ? -1 : 1;
868 while (maxRead-- > 0) {
869 fair::mq::Parts parts;
870 auto res = device->Receive(parts, channel, 0, waitTime);
871 if (
res == (
size_t)fair::mq::TransferCode::error) {
872 LOGP(error,
"Error while receiving on channel {}", channel);
875 unsigned int nReceived = parts.Size();
876 if (nReceived != 0) {
877 auto const dh = o2::header::get<DataHeader*>(parts.At(0)->GetData());
878 auto& timingInfo = ctx.services().get<
TimingInfo>();
880 if (currentRunNumber != -1 && dh->runNumber != 0 && dh->runNumber != currentRunNumber) {
882 inStopTransition =
false;
884 if (currentRunNumber == -1 || dh->runNumber != 0) {
887 timingInfo.runNumber = dh->runNumber;
888 timingInfo.firstTForbit = dh->firstTForbit;
889 timingInfo.tfCounter = dh->tfCounter;
891 auto const dph = o2::header::get<DataProcessingHeader*>(parts.At(0)->GetData());
892 if (dph !=
nullptr) {
893 timingInfo.timeslice = dph->startTime;
894 timingInfo.creation = dph->creation;
896 if (!inStopTransition) {
897 didSendParts |= dataHandler(ctx.services(), timingInfo, parts, 0, ci, newRun);
900 ctx.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)timingInfo.tfCounter,
"df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
903 if (nReceived == 0 ||
channels.size() == 1) {
916 decongestion.nextEnumerationTimeslice -= 1;
922 const char* d = strdup(((std::string(defaultChannelConfig).find(
"name=") == std::string::npos ? (std::string(
"name=") +
name +
",") :
"") + std::string(defaultChannelConfig)).c_str());
937 const char* defaultChannelConfig)
948 auto channelConfig = options.
get<std::string>(
"channel-config");
949 std::regex
r{R
"(name=([^,]*))"};
950 std::vector<std::string> values{std::sregex_token_iterator{std::begin(channelConfig), std::end(channelConfig), r, 1},
951 std::sregex_token_iterator{}};
953 throw std::runtime_error(
"failed to extract channel name from channel configuration parameter '" + channelConfig +
"'");
955 std::string outputChannelName =
values[0];
957 auto* device = rds.
device();
961 auto channelConfigurationChecker = [inputSpecs = std::move(inputSpecs), device, outputChannelName]() {
962 LOG(info) <<
"checking channel configuration";
963 if (device->GetChannels().count(outputChannelName) == 0) {
964 throw std::runtime_error(
"no corresponding output channel found for input '" + outputChannelName +
"'");
968 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
970 auto& spec =
const_cast<DeviceSpec&
>(deviceSpec);
972 for (
auto const& inputSpec : inputSpecs) {
978 .matcher = inputSpec,
979 .channel = outputChannelName,
981 spec.forwards.emplace_back(route);
984 auto forwardEos = [device, lastDataProcessingHeader, outputChannelName](
EndOfStreamContext&) {
988 for (
auto& channelInfo : device->GetChannels()) {
989 auto& channelName = channelInfo.first;
990 if (channelName != outputChannelName) {
994 uint32_t runNumber = 0;
996 runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
1011 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
1013 fair::mq::Parts out;
1014 out.AddPart(std::move(headerMessage));
1016 out.AddPart(device->NewMessageFor(channelName, 0, 0));
1023 for (
size_t ii = 0; ii != inputs.
size(); ++ii) {
1024 for (
size_t pi = 0; pi < inputs.
getNofParts(ii); ++pi) {
1025 auto part = inputs.
getByPos(ii, pi);
1026 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
1029 lastDataProcessingHeader->startTime = dph->startTime;
1030 lastDataProcessingHeader->duration = dph->duration;
1031 lastDataProcessingHeader->creation = dph->creation;
1037 const char* d = strdup(((std::string(defaultChannelConfig).find(
"name=") == std::string::npos ? (std::string(
"name=") +
name +
",") :
"") + std::string(defaultChannelConfig)).c_str());
1046 Inputs const& inputSpecs,
1047 const char* defaultChannelConfig,
1054 spec.
inputs = inputSpecs;
1057 auto device = rds.
device();
1062 auto channelNames = std::make_shared<std::vector<std::string>>();
1063 auto channelConfigurationInitializer = [&proxy, inputSpecs = std::move(inputSpecs), device, channelSelector, &deviceSpec, channelNames]() {
1064 channelNames->clear();
1065 auto& mutableDeviceSpec =
const_cast<DeviceSpec&
>(deviceSpec);
1066 for (
auto const& spec : inputSpecs) {
1067 auto channel = channelSelector(spec, device->GetChannels());
1068 if (device->GetChannels().count(channel) == 0) {
1069 throw std::runtime_error(
"no corresponding output channel found for input '" + channel +
"'");
1082 mutableDeviceSpec.forwards.emplace_back(route);
1084 channelNames->emplace_back(std::move(channel));
1086 proxy.
bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device);
1089 auto channelConfigurationDisposer = [&deviceSpec]() {
1090 auto& mutableDeviceSpec =
const_cast<DeviceSpec&
>(deviceSpec);
1091 mutableDeviceSpec.
forwards.clear();
1096 auto lastDataProcessingHeader = std::make_shared<DataProcessingHeader>(0, 0);
1097 auto forwardEos = [device, lastDataProcessingHeader, channelNames](
EndOfStreamContext&) {
1101 for (
auto& channelInfo : device->GetChannels()) {
1102 auto& channelName = channelInfo.first;
1103 auto checkChannel = [channelNames = std::move(*channelNames)](std::string
const&
name) ->
bool {
1104 for (
auto const&
n : channelNames) {
1111 if (!checkChannel(channelName)) {
1114 uint32_t runNumber = 0;
1116 runNumber = strtoul(device->fConfig->GetProperty<std::string>(
"runNumber",
"").c_str(),
nullptr, 10);
1131 auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[0].Transport());
1133 fair::mq::Parts out;
1134 out.AddPart(std::move(headerMessage));
1136 out.AddPart(device->NewMessageFor(channelName, 0, 0));
1137 LOGP(detail,
"Forwarding EoS to {}", channelName);
1147 for (
size_t ii = 0; ii != inputs.
size(); ++ii) {
1148 for (
size_t pi = 0; pi < inputs.
getNofParts(ii); ++pi) {
1149 auto part = inputs.
getByPos(ii, pi);
1150 const auto* dph = o2::header::get<DataProcessingHeader*>(part.header);
1153 lastDataProcessingHeader->startTime = dph->startTime;
1154 lastDataProcessingHeader->duration = dph->duration;
1155 lastDataProcessingHeader->creation = dph->creation;
1161 const char* d = strdup(((std::string(defaultChannelConfig).find(
"name=") == std::string::npos ? (std::string(
"name=") +
name +
",") :
"") + std::string(defaultChannelConfig)).c_str());