Project
Loading...
Searching...
No Matches
CCDBFetcherTestWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include "Framework/Lifetime.h"
17#include "Framework/Task.h"
18#include "Framework/Logger.h"
22#include <iostream>
23
24using namespace o2::framework;
25
26std::vector<std::string> objects{"GLO/Calib/MeanVertex"};
27std::vector<unsigned long> times{1657152944347};
28int gRunNumber = 30000;
29int gOrbitsPerTF = 32;
30
31void ReadObjectList(std::string const& filename)
32{
33 std::ifstream file(filename.c_str());
34 if (file.is_open()) {
35 objects.clear();
36 std::string line;
37 while (std::getline(file, line)) {
38 objects.push_back(line);
39 }
40 file.close();
41 } else {
42 std::cerr << "Failed to open the file... using default times" << std::endl;
43 }
44}
45
46void ReadTimesList(std::string const& filename)
47{
48 // extract times
49 std::ifstream file(filename.c_str());
50 if (file.is_open()) {
51 std::string line;
52 times.clear();
53 while (std::getline(file, line)) {
54 times.push_back(std::atol(line.c_str()));
55 }
56 file.close();
57 } else {
58 std::cerr << "Failed to open the times file ... using default times" << std::endl;
59 }
60}
61
62// workflow options
63void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
64{
65 // put here options for the workflow
66 workflowOptions.push_back(ConfigParamSpec{"run-number", o2::framework::VariantType::Int, 30000, {"run number"}});
67 workflowOptions.push_back(ConfigParamSpec{"tforbits", o2::framework::VariantType::Int, 32, {"orbits per tf"}});
68 workflowOptions.push_back(ConfigParamSpec{"objects", o2::framework::VariantType::String, "ccdb-object.dat", {"file with rows of object path to fetch"}});
69 workflowOptions.push_back(ConfigParamSpec{"times", o2::framework::VariantType::String, "ccdb-times.dat", {"file with times to use"}});
70}
71
72// customization to inject time information in the data source device
73void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
74{
75 // we customize the time information sent in DPL headers
76 policies.push_back(o2::framework::CallbacksPolicy{
77 [](o2::framework::DeviceSpec const& spec, o2::framework::ConfigContext const& context) -> bool {
78 return true;
79 },
81 // simple linear enumeration from already updated HBFUtils (set via config key values)
84 static int counter = 0;
85 const auto offset = int64_t(0);
86 const auto increment = int64_t(gOrbitsPerTF);
87 dh.firstTForbit = offset + increment * dh.tfCounter;
88 LOG(info) << "Setting firstTForbit to " << dh.firstTForbit;
89 dh.runNumber = gRunNumber; // hbfu.runNumber;
90 LOG(info) << "Setting runNumber to " << dh.runNumber;
91 dph.creation = times[counter]; // ; we are taking the times from the timerecord
92 counter++;
93 });
94 }} // end of struct
95 );
96}
97
99
101 void run(ProcessingContext& ctx) final
102 {
103 static int counter = 1;
104 auto& inputs = ctx.inputs();
105 auto msg = inputs.get<unsigned long>("datainput");
106 LOG(info) << "Doing compute with conditions for time " << msg << " (" << counter++ << "/" << times.size() << ")";
107 }
109 {
110 LOG(error) << "Deserialization callback invoked";
111 }
112};
113
115 void run(ProcessingContext& ctx) final
116 {
117 static int counter = 0;
118 LOG(info) << "Run function of Producer";
119 ctx.outputs().snapshot(Output{"TST", "A1", 0}, times[counter]);
120 counter++;
121 if (counter == times.size()) {
122 ctx.services().get<ControlService>().endOfStream();
123 }
124 }
125};
126
128{
129 std::vector<o2::framework::DataProcessorSpec> workflow;
130
131 ReadObjectList(configcontext.options().get<std::string>("objects"));
132 ReadTimesList(configcontext.options().get<std::string>("times"));
133 gRunNumber = configcontext.options().get<int>("run-number");
134 gOrbitsPerTF = configcontext.options().get<int>("tforbits");
135
136 // putting Producer
137 workflow.emplace_back(
139 "Producer",
140 {},
141 {OutputSpec{"TST", "A1", 0, Lifetime::Timeframe}},
142 adaptFromTask<Producer>(),
143 Options{}});
144
146 inputs.emplace_back(InputSpec{"datainput", "TST", "A1", 0, Lifetime::Timeframe});
147 // now put all conditions
148 int condcounter = 0;
149 for (auto& obj : objects) {
150 std::string name("cond");
151 name += std::to_string(condcounter);
152 condcounter++;
153 char descr[9]; // Data description field
154 if (name.length() >= 9) {
155 std::string lastNineChars = name.substr(name.length() - 9);
156 std::strcpy(descr, lastNineChars.c_str());
157 } else {
158 std::strcpy(descr, name.c_str());
159 }
160 inputs.emplace_back(InputSpec{name, "TST", descr, 0, Lifetime::Condition, ccdbParamSpec(obj)});
161 }
162
163 // putting Consumer
164 workflow.emplace_back(
166 "Consumer",
167 inputs,
168 {},
169 adaptFromTask<Consumer>(),
170 Options{}});
171
172 return workflow;
173}
void ReadObjectList(std::string const &filename)
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
std::vector< unsigned long > times
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void ReadTimesList(std::string const &filename)
std::vector< std::string > objects
ConfigParamRegistry & options() const
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
GLuint const GLchar * name
Definition glcorearb.h:781
GLintptr offset
Definition glcorearb.h:660
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
void finaliseCCDB(ConcreteDataMatcher &, void *) final
void run(ProcessingContext &ctx) final
void run(ProcessingContext &ctx) final
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153