33#include <Monitoring/Monitoring.h>
35#include <fairmq/Device.h>
48 std::vector<InputSpec>& unmatched)
52 auto keepString = ic.options().get<std::string>(
"keep");
58 bool hasOutputsToWrite =
false;
61 for (
auto& spec : danglingOutputInputs) {
63 if (outputMatcher->match(concrete, context)) {
64 hasOutputsToWrite =
true;
67 if (hasOutputsToWrite ==
false) {
69 static bool once =
false;
71 LOG(
debug) <<
"No dangling output to be dumped.";
76 auto output = std::make_shared<std::ofstream>(
filename.c_str(), std::ios_base::binary);
79 LOG(
debug) <<
"processing data set with " << pc.inputs().size() <<
" entries";
80 for (
const auto&
entry : pc.inputs()) {
82 auto header = DataRefUtils::getHeader<header::DataHeader*>(
entry);
83 auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(
entry);
84 if (matcher->match(*header, matchingContext) ==
false) {
95 std::vector<InputSpec> validBinaryInputs;
96 auto onlyTimeframe = [](
InputSpec const& input) {
98 input.lifetime == Lifetime::Timeframe;
101 auto noTimeframe = [](
InputSpec const& input) {
103 input.lifetime != Lifetime::Timeframe;
106 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
107 std::back_inserter(validBinaryInputs), onlyTimeframe);
108 std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
109 std::back_inserter(unmatched), noTimeframe);
112 "internal-dpl-injected-global-binary-file-sink",
117 {
"keep",
VariantType::String,
"", {
"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile"}}}};
130 externalChannelSpec.
name =
"downstream";
133 externalChannelSpec.
hostname =
"localhost";
134 externalChannelSpec.
port = 0;
148 static size_t lastTimeslice = -1;
152 auto channel = device->GetChannels().find(
"metric-feedback");
154 if (channel == device->GetChannels().end()) {
157 fair::mq::MessagePtr payload(device->NewMessage());
158 payload->Rebuild(&oldestPossingTimeslice,
sizeof(int64_t),
nullptr,
nullptr);
159 auto consumed = oldestPossingTimeslice;
161 int64_t
result = channel->second[0].Send(payload, 100);
167 uv_async_send(async);
169 lastTimeslice = consumed;
176 .
name =
"internal-dpl-injected-dummy-sink",
177 .inputs = danglingOutputInputs,
184 LOGP(
debug,
"Domain info updated with timeslice {}", timeslice);
196 .options = !rateLimitingChannelConfig.empty() ? std::vector<ConfigParamSpec>{{
"channel-config",
VariantType::String,
197 rateLimitingChannelConfig,
198 {
"Out-of-band channel config"}}}
199 : std::vector<ConfigParamSpec>(),
200 .labels = {{
"resilient"}}};
206 auto& raw = pcx.
services().get<RawDeviceService>();
208 auto limit = std::stoi(raw.device()->fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
209 LOG(detail) <<
"Rate limiting to " << limit <<
" timeframes in flight";
210 limiter.
check(pcx, limit, 2000);
211 LOG(detail) <<
"Rate limiting passed. Invoking old callback";
213 LOG(detail) <<
"Rate limited callback done";
struct uv_async_s uv_async_t
@ DomainInfoUpdated
Invoked when new domain info is available.
ServiceRegistryRef services()
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
OldestOutputInfo getOldestPossibleOutput() const
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void retryMetricCallback(uv_async_t *async)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::function< void(ProcessingContext &)> ProcessCallback
static DataProcessorSpec getGlobalFileSink(std::vector< InputSpec > const &danglingInputs, std::vector< InputSpec > &unmatched)
static AlgorithmSpec wrapWithRateLimiting(AlgorithmSpec spec)
static DataProcessorSpec getGlobalFairMQSink(std::vector< InputSpec > const &danglingInputs)
static DataProcessorSpec getDummySink(std::vector< InputSpec > const &danglingInputs, std::string rateLimitingChannelConfig)
static DataDescriptorQuery buildFromKeepConfig(std::string const &config)
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
Running state information of a given device.
enum ChannelMethod method
static auto wrapAlgorithm(AlgorithmSpec const &spec, WrapperProcessCallback &&wrapper) -> AlgorithmSpec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"