37 std::vector<DataProcessorSpec> specs;
42 {
OutputSpec{
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe}},
54 {
"dataTPC",
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe}},
56 {
"TPC",
"CLUSTERS_P", 0, Lifetime::Timeframe}},
67 {
"dataTPC",
"TPC",
"CLUSTERS", 0, Lifetime::Timeframe},
73 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P", 0, Lifetime::Timeframe},
78 inputsDataSampler.insert(std::end(inputsDataSampler), std::begin(inputsTpcProc), std::end(inputsTpcProc));
84 {
"TPC",
"CLUSTERS_S"},
85 {
"TPC",
"CLUSTERS_P_S"}},
90 for (
auto& input : inputs) {
97 size_t len = strlen(outputDescription.
str);
98 if (
len < outputDescription.
size - 2) {
99 outputDescription.
str[
len] =
'_';
100 outputDescription.
str[
len + 1] =
'S';
109 LOG(
debug) <<
"DataSampler sends data from subSpec: " << matcher.
subSpec;
111 const auto* inputHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
115 const char* input_ptr = input.payload;
128 {
"dataTPC-sampled",
"TPC",
"CLUSTERS_S"},
129 {
"dataTPC-proc-sampled",
"TPC",
"CLUSTERS_P_S"}},
145 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P"},
153 const FakeCluster* inputDataTpc =
reinterpret_cast<const FakeCluster*
>(ctx.inputs().get(
"dataTPC-proc").payload);
157specs.swap(dataProducers);
158specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
159specs.push_back(sink);
160specs.push_back(dataSampler);
161specs.push_back(qcTask);
182 for (
auto& cluster : tpcClusters) {
187 cluster.q = rand() % 1000;
204 for (
auto& cluster : processedTpcClusters) {
206 cluster.x = -inputDataTpc[
i].
x;
207 cluster.y = 2 * inputDataTpc[
i].
y;
208 cluster.z = inputDataTpc[
i].
z * inputDataTpc[
i].
q;
209 cluster.q = inputDataTpc[
i].
q;
header::DataDescription description
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLenum GLenum GLsizei len
Defining ITS Vertex explicitly as messageable.
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
header::DataDescription description
header::DataHeader::SubSpecificationType subSpec
header::DataOrigin origin
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
void someDataProducerAlgorithm(ProcessingContext &ctx)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"