25#include <fmt/format.h>
44 std::unique_ptr<io::DigitSampler> mDigitSampler;
46 bool mReadIsOk =
true;
48 size_t mNofProcessedROFs{0};
49 size_t mMinNumberOfROFsPerTF{1};
50 std::vector<ROFRecord> mROFs;
51 std::vector<Digit> mDigits;
58 mInput.open(inputFileName);
59 mDigitSampler = std::make_unique<io::DigitSampler>(mInput);
67 LOGP(info,
"Sending {} rofs with {} digits", mROFs.size(), mDigits.size());
77 bool maxROFreached = mNofProcessedROFs >= mMaxNofROFs;
78 bool lastTF = mInput.peek() == EOF;
79 return !mReadIsOk || lastTF || maxTFreached || maxROFreached;
85 throw std::invalid_argument(
"process should have ended already");
88 std::vector<ROFRecord> rofs;
90 while ((mReadIsOk = mDigitSampler->read(
digits, rofs))) {
95 mNofProcessedROFs += rofs.size();
98 auto offset = mDigits.size();
99 std::transform(rofs.begin(), rofs.end(), std::back_inserter(mROFs),
101 r.setDataRef(r.getFirstIdx() + offset, r.getNEntries());
104 mDigits.insert(mDigits.end(),
digits.begin(),
digits.end());
113 if (mROFs.size() >= mMinNumberOfROFsPerTF ||
shouldEnd()) {
130 std::string spec = fmt::format(
"digits:MCH/DIGITS{}/0",
run2 ?
"R2" :
"");
136 {
OPTNAME_MAX_NOF_ROFS, VariantType::Int, std::numeric_limits<int>::max(), {
"max number of ROFs to process"}},
137 {
OPTNAME_REPACK_ROFS, VariantType::Int, 1, {
"number of rofs to repack into a timeframe (aka min number of rofs per timeframe"}}};
138 options.insert(options.end(), commonOptions.begin(), commonOptions.end());
144 OutputSpec{{
"rofs"},
"MCH",
"DIGITROFS", 0, Lifetime::Timeframe}},
152void customize(std::vector<ConfigParamSpec>& workflowOptions)
154 workflowOptions.emplace_back(
OPTNAME_RUN2, VariantType::Bool,
false,
152void customize(std::vector<ConfigParamSpec>& workflowOptions) {
…}
void init(InitContext &ic)
void outputAndClear(DataAllocator &out)
void run(ProcessingContext &pc)
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
void printFull(gsl::span< const Digit > digits, gsl::span< const ROFRecord > rofs) const
void printSummary(gsl::span< const Digit > digits, gsl::span< const ROFRecord > rofs, const char *suffix="") const
void init(o2::framework::InitContext &ic)
void incNofProcessedTFs()
bool shouldProcess() const
WorkflowSpec defineDataProcessing(const ConfigContext &cc)
constexpr const char * OPTNAME_RUN2
constexpr const char * OPTNAME_MAX_NOF_ROFS
o2::framework::DataProcessorSpec getDigitSamplerSpec(const char *specName, bool run2)
constexpr const char * OPTNAME_REPACK_ROFS
void customize(std::vector< ConfigParamSpec > &workflowOptions)
constexpr const char * OPTNAME_INFILE
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::vector< ConfigParamSpec > getCommonOptions()
static OutputSpec asOutputSpec(InputSpec const &spec)
std::vector< o2::mch::ChannelCode > cc
std::vector< Digit > digits