Project
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
digits-sampler-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "DigitIOBaseTask.h"
15#include "DigitSampler.h"
20#include "Framework/Task.h"
21#include "Framework/Variant.h"
23#include "ProgOptions.h"
24#include <algorithm>
25#include <fmt/format.h>
26#include <fstream>
27#include <iostream>
28#include <memory>
29#include <string>
30#include <stdexcept>
31
32using namespace o2::framework;
33
34constexpr const char* OPTNAME_INFILE = "infile";
35constexpr const char* OPTNAME_MAX_NOF_ROFS = "max-nof-rofs";
36constexpr const char* OPTNAME_REPACK_ROFS = "repack-rofs";
37constexpr const char* OPTNAME_RUN2 = "run2";
38
39using namespace o2::mch;
40
42{
43 private:
44 std::unique_ptr<io::DigitSampler> mDigitSampler;
45 std::ifstream mInput;
46 bool mReadIsOk = true;
47 size_t mMaxNofROFs;
48 size_t mNofProcessedROFs{0};
49 size_t mMinNumberOfROFsPerTF{1};
50 std::vector<ROFRecord> mROFs;
51 std::vector<Digit> mDigits;
52
53 public:
54 void init(InitContext& ic)
55 {
56 io::DigitIOBaseTask::init(ic); // init common options
57 auto inputFileName = ic.options().get<std::string>(OPTNAME_INFILE);
58 mInput.open(inputFileName);
59 mDigitSampler = std::make_unique<io::DigitSampler>(mInput);
61 mMaxNofROFs = ic.options().get<int>(OPTNAME_MAX_NOF_ROFS);
62 mMinNumberOfROFsPerTF = ic.options().get<int>(OPTNAME_REPACK_ROFS);
63 }
64
66 {
67 LOGP(info, "Sending {} rofs with {} digits", mROFs.size(), mDigits.size());
68 out.snapshot(OutputRef{"rofs"}, mROFs);
69 out.snapshot(OutputRef{"digits"}, mDigits);
70 mDigits.clear();
71 mROFs.clear();
72 }
73
74 bool shouldEnd()
75 {
76 bool maxTFreached = mNofProcessedTFs >= mMaxNofTimeFrames;
77 bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs;
78 bool lastTF = mInput.peek() == EOF;
79 return !mReadIsOk || lastTF || maxTFreached || maxROFreached;
80 }
81
83 {
84 if (shouldEnd()) {
85 throw std::invalid_argument("process should have ended already");
86 }
87
88 std::vector<ROFRecord> rofs;
89 std::vector<Digit> digits;
90 while ((mReadIsOk = mDigitSampler->read(digits, rofs))) {
91
92 // process the current input TF if requested
93 if (shouldProcess()) {
95 mNofProcessedROFs += rofs.size();
96 // append rofs to mROFs, but shift the indices by the amount of digits
97 // we have read so far.
98 auto offset = mDigits.size();
99 std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
100 [offset](ROFRecord r) {
101 r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
102 return r;
103 });
104 mDigits.insert(mDigits.end(), digits.begin(), digits.end());
105 printSummary(mDigits, mROFs);
106 printFull(mDigits, mROFs);
107 }
108
109 // increment the input TF id for the next one
110 incTFid();
111
112 // stop here if we've accumulated enough ROFs or TFs
113 if (mROFs.size() >= mMinNumberOfROFsPerTF || shouldEnd()) {
114 break;
115 }
116 }
117
118 // output whatever has been accumulated, even if empty
120
121 if (shouldEnd()) {
122 pc.services().get<ControlService>().endOfStream();
123 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
124 }
125 }
126};
127
129{
130 std::string spec = fmt::format("digits:MCH/DIGITS{}/0", run2 ? "R2" : "");
131 InputSpec itmp = o2::framework::select(spec.c_str())[0];
132
133 auto commonOptions = o2::mch::io::getCommonOptions();
134 auto options = Options{
135 {OPTNAME_INFILE, VariantType::String, "", {"input file name"}},
136 {OPTNAME_MAX_NOF_ROFS, VariantType::Int, std::numeric_limits<int>::max(), {"max number of ROFs to process"}},
137 {OPTNAME_REPACK_ROFS, VariantType::Int, 1, {"number of rofs to repack into a timeframe (aka min number of rofs per timeframe"}}};
138 options.insert(options.end(), commonOptions.begin(), commonOptions.end());
139
140 return DataProcessorSpec{
141 specName,
142 Inputs{},
144 OutputSpec{{"rofs"}, "MCH", "DIGITROFS", 0, Lifetime::Timeframe}},
145 AlgorithmSpec{adaptFromTask<DigitSamplerTask>()},
146 options};
147}
148
152void customize(std::vector<ConfigParamSpec>& workflowOptions)
153{
154 workflowOptions.emplace_back(OPTNAME_RUN2, VariantType::Bool, false,
155 ConfigParamSpec::HelpString{"input digits use Run2 padIds"});
156}
157
159
160//_________________________________________________________________________________________________
162{
163 return WorkflowSpec{getDigitSamplerSpec("mch-digits-sampler", cc.options().get<bool>(OPTNAME_RUN2))};
164}
Definition of the MCH ROFrame record.
const char * specName
void init(InitContext &ic)
void outputAndClear(DataAllocator &out)
void run(ProcessingContext &pc)
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
void printFull(gsl::span< const Digit > digits, gsl::span< const ROFRecord > rofs) const
void printSummary(gsl::span< const Digit > digits, gsl::span< const ROFRecord > rofs, const char *suffix="") const
void init(o2::framework::InitContext &ic)
WorkflowSpec defineDataProcessing(const ConfigContext &cc)
constexpr const char * OPTNAME_RUN2
constexpr const char * OPTNAME_MAX_NOF_ROFS
o2::framework::DataProcessorSpec getDigitSamplerSpec(const char *specName, bool run2)
constexpr const char * OPTNAME_REPACK_ROFS
void customize(std::vector< ConfigParamSpec > &workflowOptions)
constexpr const char * OPTNAME_INFILE
constexpr bool run2
GLintptr offset
Definition glcorearb.h:660
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::vector< ConfigParamSpec > getCommonOptions()
static OutputSpec asOutputSpec(InputSpec const &spec)
std::vector< o2::mch::ChannelCode > cc
std::vector< Digit > digits