34#include <Monitoring/Monitoring.h>
36#include <fairmq/Device.h>
53 std::vector<InputSpec>& unmatched)
57 auto keepString = ic.options().get<std::string>(
"keep");
63 bool hasOutputsToWrite =
false;
66 for (
auto& spec : danglingOutputInputs) {
68 if (outputMatcher->match(concrete, context)) {
69 hasOutputsToWrite =
true;
72 if (hasOutputsToWrite ==
false) {
74 static bool once =
false;
76 LOG(
debug) <<
"No dangling output to be dumped.";
81 auto output = std::make_shared<std::ofstream>(
filename.c_str(), std::ios_base::binary);
84 LOG(
debug) <<
"processing data set with " << pc.inputs().size() <<
" entries";
85 for (
const auto&
entry : pc.inputs()) {
87 auto header = DataRefUtils::getHeader<header::DataHeader*>(
entry);
88 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(
entry);
89 if (matcher->match(*header, matchingContext) ==
false) {
100 std::vector<InputSpec> validBinaryInputs;
101 auto onlyTimeframe = [](
InputSpec const& input) {
103 input.lifetime == Lifetime::Timeframe;
106 auto noTimeframe = [](
InputSpec const& input) {
108 input.lifetime != Lifetime::Timeframe;
111 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
112 std::back_inserter(validBinaryInputs), onlyTimeframe);
113 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
114 std::back_inserter(unmatched), noTimeframe);
117 "internal-dpl-injected-global-binary-file-sink",
122 {
"keep",
VariantType::String,
"", {
"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
135 externalChannelSpec.
name =
"downstream";
138 externalChannelSpec.
hostname =
"localhost";
139 externalChannelSpec.
port = 0;
154 O2_SIGNPOST_EVENT_EMIT(callbacks, cid,
"rate-limiting",
"Attempting again propagating rate-limiting information.");
157 static size_t lastTimeslice = -1;
161 auto channel = device->GetChannels().find(
"metric-feedback");
163 if (channel == device->GetChannels().end()) {
167 fair::mq::MessagePtr payload(device->NewMessage());
168 payload->Rebuild(&oldestPossingTimeslice,
sizeof(int64_t),
nullptr,
nullptr);
169 auto consumed = oldestPossingTimeslice;
171 size_t start = uv_hrtime();
172 int64_t
result = channel->second[0].Send(payload, 100);
173 size_t stop = uv_hrtime();
177 int64_t ellapsed = (stop -
start) / 1000000;
178 if (ellapsed < 100) {
180 "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
182 uv_sleep(100 - ellapsed);
185 "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.",
result, ellapsed);
190 uv_async_send(async);
193 lastTimeslice = consumed;
200 .
name =
"internal-dpl-injected-dummy-sink",
201 .inputs = danglingOutputInputs,
208 LOGP(
debug,
"Domain info updated with timeslice {}", timeslice);
220 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{
"channel-config",
VariantType::String,
221 rateLimitingChannelConfig,
222 {
"Out-of-band channel config"}}}
223 : std::vector<ConfigParamSpec>(),
224 .labels = {{
"resilient"}}};
230 auto& raw = pcx.
services().get<RawDeviceService>();
232 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
233 LOG(detail) <<
"Rate limiting to " << limit <<
" timeframes in flight";
234 limiter.
check(pcx, limit, 2000);
235 LOG(detail) <<
"Rate limiting passed. Invoking old callback";
237 LOG(detail) <<
"Rate limited callback done";
struct uv_async_s uv_async_t
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
enum ChannelMethod method
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"