20void customize(std::vector<CompletionPolicy>& policies)
24void customize(std::vector<ChannelConfigurationPolicy>& policies)
35#include <boost/algorithm/string.hpp>
71 {
"dataTPC",
"TPC",
"CLUSTERS"}},
73 {
"TPC",
"CLUSTERS_P"}},
81 {
"dataTPC-proc",
"TPC",
"CLUSTERS_P", 0}},
90 {
"TPC_CLUSTERS_S", {
"DS",
"simpleQcTask0" } },
91 {
"TPC_CLUSTERS_P_S", {
"DS",
"simpleQcTask1" } }
96 auto inputDataTpc =
reinterpret_cast<const FakeCluster*
>(ctx.
inputs().
get(
"TPC_CLUSTERS_S").payload);
98 "TPC_CLUSTERS_P_S").payload);
101 const auto* header = DataRefUtils::getHeader<DataHeader*>(
ref);
103 bool dataGood =
true;
106 float diff = std::abs(-inputDataTpc[
j].
x - inputDataTpcProcessed[
j].
x) +
107 std::abs(2 * inputDataTpc[
j].
y - inputDataTpcProcessed[
j].
y) +
108 std::abs(inputDataTpc[
j].
z * inputDataTpc[
j].q - inputDataTpcProcessed[
j].
z) +
109 std::abs(inputDataTpc[
j].q - inputDataTpcProcessed[
j].q);
115 LOG(info) <<
"qcTaskTPC - received data is " << (dataGood ?
"correct" :
"wrong");
124 { {
"tsthistos"},
"TST",
"HISTOS", 0 },
125 { {
"tststring"},
"TST",
"STRING", 0 }
137 const char* o2Root = getenv(
"O2_ROOT");
138 if (o2Root ==
nullptr) {
139 throw std::runtime_error(
"The O2_ROOT environment variable is not set, probably the O2 environment has not been loaded.");
141 std::string configurationSource = std::string(
"json:/") + o2Root +
"/share/etc/exampleDataSamplingConfig.json";
151 std::this_thread::sleep_for(std::chrono::seconds(1));
158 for (
auto& cluster : tpcClusters) {
163 cluster.q = rand() % 1000;
179 for (
auto& cluster : processedTpcClusters) {
181 cluster.x = -inputDataTpc[
i].
x;
182 cluster.y = 2 * inputDataTpc[
i].
y;
183 cluster.z = inputDataTpc[
i].
z * inputDataTpc[
i].
q;
184 cluster.q = inputDataTpc[
i].
q;
Definition of O2 Data Sampling, v1.0.
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
void customize(std::vector< CompletionPolicy > &policies)
void someDataProducerAlgorithm(ProcessingContext &ctx)
void someSinkAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
GLdouble GLdouble GLdouble z
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::function< void(ProcessingContext &)> ProcessCallback
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"