20#include <boost/algorithm/string.hpp>
39 std::vector<DataProcessorSpec> specs;
44 {
OutputSpec{
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe}},
56 {
"dataTPC",
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe}},
58 {
"TPC",
"CLUSTERS_P", 0, Lifetime::Timeframe}},
69 {
"dataTPC",
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe},
75 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P", 0, Lifetime::Timeframe},
80 inputsDataSampler.insert(std::end(inputsDataSampler), std::begin(inputsTpcProc), std::end(inputsTpcProc));
86 {
"TPC",
"CLUSTERS_S"},
87 {
"TPC",
"CLUSTERS_P_S"}},
92 for (
auto& input : inputs) {
99 size_t len = strlen(outputDescription.
str);
100 if (
len < outputDescription.
size - 2) {
101 outputDescription.
str[
len] =
'_';
102 outputDescription.
str[
len + 1] =
'S';
111 LOG(
debug) <<
"DataSampler sends data from subSpec: " << matcher.subSpec;
113 const auto* inputHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
117 const char* input_ptr = input.payload;
130 {
"dataTPC-sampled",
"TPC",
"CLUSTERS_S"},
131 {
"dataTPC-proc-sampled",
"TPC",
"CLUSTERS_P_S"}},
138LOG(
debug) <<
"qcTask received data with subSpec: " << matcher.subSpec;
147 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P"},
155 const FakeCluster* inputDataTpc =
reinterpret_cast<const FakeCluster*
>(ctx.inputs().get(
"dataTPC-proc").payload);
159specs.swap(dataProducers);
160specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
161specs.push_back(sink);
162specs.push_back(dataSampler);
163specs.push_back(qcTask);
184 for (
auto& cluster : tpcClusters) {
189 cluster.q = rand() % 1000;
206 for (
auto& cluster : processedTpcClusters) {
208 cluster.x = -inputDataTpc[
i].
x;
209 cluster.y = 2 * inputDataTpc[
i].
y;
210 cluster.z = inputDataTpc[
i].
z * inputDataTpc[
i].
q;
211 cluster.q = inputDataTpc[
i].
q;
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLenum GLenum GLsizei len
Defining PrimaryVertex explicitly as messageable.
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
header::DataOrigin origin
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
void someDataProducerAlgorithm(ProcessingContext &ctx)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"