18void customize(std::vector<CompletionPolicy>& policies)
22void customize(std::vector<ChannelConfigurationPolicy>& policies)
38#include <boost/algorithm/string.hpp>
74 {
"dataTPC",
"TPC",
"CLUSTERS"}},
76 {
"TPC",
"CLUSTERS_P"}},
86 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P"},
103 {
"TPC_CLUSTERS_S", {
"DS",
"simpleQcTask0" } },
104 {
"TPC_CLUSTERS_P_S", {
"DS",
"simpleQcTask1" } }
109 auto inputDataTpc =
reinterpret_cast<const FakeCluster*
>(ctx.
inputs().
get(
"TPC_CLUSTERS_S").payload);
111 "TPC_CLUSTERS_P_S").payload);
114 const auto* header = DataRefUtils::getHeader<DataHeader*>(
ref);
116 bool dataGood =
true;
118 float diff = std::abs(-inputDataTpc[
j].
x - inputDataTpcProcessed[
j].
x) +
119 std::abs(2 * inputDataTpc[
j].
y - inputDataTpcProcessed[
j].
y) +
120 std::abs(inputDataTpc[
j].
z * inputDataTpc[
j].q - inputDataTpcProcessed[
j].
z) +
121 std::abs(inputDataTpc[
j].q - inputDataTpcProcessed[
j].q);
128 LOG(info) <<
"simpleQcTask - received data is " << (dataGood ?
"correct" :
"wrong");
137 { {
"tsthistos"},
"TST",
"HISTOS", 0 },
138 { {
"tststring"},
"TST",
"STRING", 0 }
144 specs.swap(dataProducers);
145 specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
146 specs.push_back(sink);
147 specs.push_back(simpleQcTask);
148 specs.push_back(dummyProducer);
150 const char* o2Root = getenv(
"O2_ROOT");
151 if (o2Root ==
nullptr) {
152 throw std::runtime_error(
"The O2_ROOT environment variable is not set, probably the O2 environment has not been loaded.");
154 std::string configurationSource = std::string(
"json:/") + o2Root +
"/share/etc/exampleDataSamplingConfig.json";
163 std::this_thread::sleep_for(std::chrono::seconds(1));
170 for (
auto& cluster : tpcClusters) {
175 cluster.q = rand() % 1000;
190 for (
auto& cluster : processedTpcClusters) {
192 cluster.x = -inputDataTpc[
i].
x;
193 cluster.y = 2 * inputDataTpc[
i].
y;
194 cluster.z = inputDataTpc[
i].
z * inputDataTpc[
i].
q;
195 cluster.q = inputDataTpc[
i].
q;
Definition of O2 Data Sampling, v1.0.
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
void customize(std::vector< CompletionPolicy > &policies)
void someDataProducerAlgorithm(ProcessingContext &ctx)
void someSinkAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
GLdouble GLdouble GLdouble z
Defining PrimaryVertex explicitly as messageable.
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::function< void(ProcessingContext &)> ProcessCallback
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"