31void customize(std::vector<o2::framework::DispatchPolicy>& policies)
34 auto producerMatcher = [](
auto const& spec) {
35 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex(
"producer.*"));
37 auto processorMatcher = [](
auto const& spec) {
38 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex(
"processor.*"));
40 auto triggerMatcher = [](
auto const& query) {
75 std::vector<o2::header::DataHeader::SubSpecificationType> subspecs(
nChannels);
76 std::generate(subspecs.begin(), subspecs.end(), [
counter = std::make_shared<int>(0)]() { return (*counter)++; });
77 std::vector<OutputSpec> producerOutputs;
78 for (
auto const& subspec : subspecs) {
79 producerOutputs.emplace_back(
OutputSpec{
"PROD",
"CHANNEL", subspec, Lifetime::Timeframe});
80 producerOutputs.emplace_back(
OutputSpec{
"PROD",
"TRIGGER", subspec, Lifetime::Timeframe});
84 for (
auto const& subspec : subspecs) {
98 int nActiveInputs = 0;
99 LOG(info) <<
"processing ...";
100 for (
auto const& input : pc.inputs()) {
101 if (pc.inputs().isValid(input.spec->binding) ==
false) {
105 auto&
data = pc.inputs().get<MyDataType>(input.spec->binding.c_str());
106 LOG(info) <<
"processing " << input.spec->binding <<
" " <<
data;
108 if (input.spec->binding.find(
"trigger") == 0) {
109 pc.outputs().make<MyDataType>(
Output{
"PROC",
"CHANNEL",
data}) =
data;
113 LOG(info) <<
"processed " << nActiveInputs <<
" inputs";
117 auto amendSinkInput = [subspecs](
InputSpec& input,
size_t index) {
124 context.services().get<
ControlService>().readyToQuit(QuitRequest::All);
127 for (
auto const& input : inputs) {
129 LOG(info) <<
"received channel " <<
data;
136 {
InputSpec{
"input",
"PROD",
"CHANNEL", 0, Lifetime::Timeframe},
137 InputSpec{
"trigger",
"PROD",
"TRIGGER", 0, Lifetime::Timeframe}},
138 {
OutputSpec{
"PROC",
"CHANNEL", 0, Lifetime::Timeframe}},
141 [&subspecs]() {
return subspecs.size(); },
142 [&subspecs](
size_t index) {
return subspecs[
index]; });