54 std::vector<DataProcessorSpec> workflowSpecs{
57 {
"input",
"TST",
"TRIGGER", 0, Lifetime::Timeframe}},
59 {{
"output"},
"TST",
"PREPROC", 0, Lifetime::Timeframe}},
61 for (
auto const& input : ctx.inputs()) {
63 LOG(
debug) <<
"instance " << parallelContext.
index1D() <<
" of " << parallelContext.index1DSize() <<
": "
64 << *input.spec <<
": " << *((
int*)input.payload);
65 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
66 auto&
data = ctx.outputs().make<
int>(
Output{
"TST",
"PREPROC", dataheader->subSpecification});
67 ASSERT_ERROR(ctx.inputs().get<
int>(input.spec->binding.c_str()) == parallelContext.index1D());
68 data = parallelContext.index1D();
73 {
"input",
"TST",
"PREPROC", 0, Lifetime::Timeframe}},
75 {{
"output"},
"TST",
"DATA", 0, Lifetime::Timeframe},
76 {{
"metadt"},
"TST",
"META", 0, Lifetime::Timeframe}},
78 for (
auto const& input : ctx.inputs()) {
80 LOG(
debug) <<
"instance " << parallelContext.
index1D() <<
" of " << parallelContext.index1DSize() <<
": "
81 << *input.spec <<
": " << *((
int*)input.payload);
82 ASSERT_ERROR(ctx.inputs().get<
int>(input.spec->binding.c_str()) == parallelContext.index1D());
83 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
86 auto&
data = ctx.outputs().make<
int>(
Output{
"TST",
"DATA", dataheader->subSpecification});
87 data = ctx.inputs().get<
int>(input.spec->binding.c_str());
88 auto& meta = ctx.outputs().make<
int>(
Output{
"TST",
"META", dataheader->subSpecification});
89 meta = dataheader->subSpecification;
96 std::vector<o2::header::DataHeader::SubSpecificationType> subspecs(
nParallelChannels);
97 std::generate(subspecs.begin(), subspecs.end(), [
counter = std::make_shared<int>(0)]() { return 0x1 << (*counter)++; });
100 auto checkMap = std::make_shared<std::unordered_map<o2::header::DataHeader::SubSpecificationType, int>>();
103 for (
auto const& subspec : subspecs) {
104 (*checkMap)[subspec] = pipeline;
113 [&subspecs]() {
return subspecs.size(); },
114 [&subspecs](
size_t index) {
return subspecs[
index]; });
117 auto producerOutputs = [&subspecs]() {
119 for (
auto const& subspec : subspecs) {
120 outputs.emplace_back(
"TST",
"TRIGGER", subspec, Lifetime::Timeframe);
133 std::vector<size_t> multiplicities(
nPipelines);
134 for (pipeline = 0; pipeline <
nPipelines; pipeline++) {
136 channels -= multiplicities[pipeline];
140 auto end = subspecs.size();
142 if (multiplicities[pipeline] == 0) {
145 ctx.outputs().make<
int>(
Output{
"TST",
"TRIGGER", subspecs[
index]}) = pipeline;
146 multiplicities[pipeline++]--;
156 ctx.services().get<
ControlService>().readyToQuit(QuitRequest::Me);
165 std::unordered_map<o2::header::DataHeader::SubSpecificationType, std::string> bindings;
168 mergeInputs({{
"datain",
"TST",
"DATA", 0, Lifetime::Timeframe},
169 {
"metain",
"TST",
"META", 0, Lifetime::Timeframe}},
174 if (input.
binding.compare(0, 6,
"datain") == 0) {
181 for (
auto const& [subspec, pipeline] : *checkMap) {
187 callbacks.
set<CallbackService::Id::Stop>([checkMap]() {
191 bool haveDataIn =
false;
193 for (
auto const& input : inputs) {
197 LOG(info) <<
"consuming : " << *input.spec <<
": " << *((
int*)input.payload);
198 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
199 if (input.spec->
binding.compare(0, 6,
"datain") == 0) {
200 if (input.spec->
binding != bindings.at(dataheader->subSpecification)) {
201 LOG(error) <<
"data with subspec " << dataheader->subSpecification <<
" at unexpected binding " << input.spec->
binding <<
", expected " << bindings.at(dataheader->subSpecification);
207 auto pipeline = checkMap->at(dataheader->subSpecification);
209 (*checkMap)[dataheader->subSpecification] = -1;
211 if (inputs.
isValid(bindings.at(dataheader->subSpecification))) {
212 ASSERT_ERROR(inputs.
get<
int>(bindings.at(dataheader->subSpecification)) == pipeline);
221 return workflowSpecs;