34#include <Monitoring/Monitoring.h>
36#include <fairmq/Device.h>
55 std::vector<InputSpec>& unmatched)
59 auto keepString = ic.options().get<std::string>(
"keep");
65 bool hasOutputsToWrite =
false;
68 for (
auto& spec : danglingOutputInputs) {
70 if (outputMatcher->match(concrete, context)) {
71 hasOutputsToWrite =
true;
74 if (hasOutputsToWrite ==
false) {
76 static bool once =
false;
78 LOG(
debug) <<
"No dangling output to be dumped.";
83 auto output = std::make_shared<std::ofstream>(
filename.c_str(), std::ios_base::binary);
86 LOG(
debug) <<
"processing data set with " << pc.inputs().size() <<
" entries";
87 for (
const auto&
entry : pc.inputs()) {
89 auto header = DataRefUtils::getHeader<header::DataHeader*>(
entry);
90 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(
entry);
91 if (matcher->match(*header, matchingContext) ==
false) {
102 std::vector<InputSpec> validBinaryInputs;
103 auto onlyTimeframe = [](
InputSpec const& input) {
105 input.lifetime == Lifetime::Timeframe;
108 auto noTimeframe = [](
InputSpec const& input) {
110 input.lifetime != Lifetime::Timeframe;
113 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
114 std::back_inserter(validBinaryInputs), onlyTimeframe);
115 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
116 std::back_inserter(unmatched), noTimeframe);
119 "internal-dpl-injected-global-binary-file-sink",
124 {
"keep",
VariantType::String,
"", {
"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
137 externalChannelSpec.
name =
"downstream";
140 externalChannelSpec.
hostname =
"localhost";
141 externalChannelSpec.
port = 0;
156 O2_SIGNPOST_EVENT_EMIT(callbacks, cid,
"rate-limiting",
"Attempting again propagating rate-limiting information.");
159 static size_t lastTimeslice = -1;
163 auto channel = device->GetChannels().find(
"metric-feedback");
165 if (channel == device->GetChannels().end()) {
169 fair::mq::MessagePtr payload(device->NewMessage());
170 payload->Rebuild(&oldestPossingTimeslice,
sizeof(int64_t),
nullptr,
nullptr);
171 auto consumed = oldestPossingTimeslice;
173 size_t start = uv_hrtime();
174 int64_t
result = channel->second[0].Send(payload, 100);
175 size_t stop = uv_hrtime();
179 int64_t ellapsed = (stop -
start) / 1000000;
180 if (ellapsed < 100) {
182 "FairMQ returned %llu earlier than expected. Sleeping %llu ms more before, retrying.",
184 uv_sleep(100 - ellapsed);
187 "FairMQ returned %llu, unable to send last consumed timeslice to source for %llu ms, retrying.",
result, ellapsed);
192 uv_async_send(async);
195 lastTimeslice = consumed;
202 .
name =
"internal-dpl-injected-dummy-sink",
203 .inputs = danglingOutputInputs,
210 LOGP(
debug,
"Domain info updated with timeslice {}", timeslice);
217 stats.processCommandQueue();
224 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{
"channel-config",
VariantType::String,
225 rateLimitingChannelConfig,
226 {
"Out-of-band channel config"}}}
227 : std::vector<ConfigParamSpec>(),
228 .labels = {{
"resilient"}}};
237 .
name =
"internal-dpl-injected-dummy-sink",
238 .inputs = danglingOutputInputs,
244 LOGP(
debug,
"Domain info updated with timeslice {}", timeslice);
249 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid,
"run",
"Consumed timeframes (domain info updated) to be set to %zu.", oldestPossingTimeslice);
252 stats.processCommandQueue();
259 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid,
"run",
"Consumed timeframes (processing) to be set to %zu.", oldestPossingTimeslice);
262 stats.processCommandQueue();
265 .labels = {{
"resilient"}}};
271 auto& raw = pcx.
services().get<RawDeviceService>();
274 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
276 "Rate limiting to %d timeframes in flight", limit);
277 limiter.
check(pcx, limit, 2000);
279 "Rate limiting passed. Invoking old callback.");
282 "Rate limited callback done.");
292 auto disposeResources = [](
int taskId,
293 std::array<ComputingQuotaOffer, 32>& offers,
299 int64_t timeslicesProcessed = 1;
300 for (
auto& offer : offers) {
301 if (offer.user != taskId) {
304 int64_t toRemove = std::min((int64_t)timeslicesProcessed, offer.timeslices);
305 offer.timeslices -= toRemove;
306 timeslicesProcessed -= toRemove;
308 if (timeslicesProcessed <= 0) {
312 return accountDisposed(disposed, stats);
struct uv_async_s uv_async_t
#define O2_SIGNPOST_EVENT_EMIT_DETAIL(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getScheduledDummySink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
enum ChannelMethod method
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"