Project
Loading...
Searching...
No Matches
tpc-aggregate-cmv.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <vector>
13#include <string>
20
21using namespace o2::framework;
22
23// customize the completion policy
24void customize(std::vector<o2::framework::CompletionPolicy>& policies)
25{
27 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-aggregate-*.*", CompletionPolicy::CompletionOp::Consume));
28}
29
30// we need to add workflow options before including Framework/runDataProcessing
31void customize(std::vector<ConfigParamSpec>& workflowOptions)
32{
33 const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
34
35 std::vector<ConfigParamSpec> options{
36 {"configFile", VariantType::String, "", {"Configuration file for configurable parameters"}},
37 {"timeframes", VariantType::Int, 2000, {"Number of TFs aggregated per calibration interval"}},
38 {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}},
39 {"input-lanes", VariantType::Int, 1, {"Number of aggregate pipelines set by --output-lanes in TPCDistributeCMVSpec"}},
40 {"use-precise-timestamp", VariantType::Bool, false, {"Use precise timestamp metadata from distribute when writing to CCDB"}},
41 {"enable-CCDB-output", VariantType::Bool, false, {"Send output to the CCDB populator"}},
42 {"n-TFs-buffer", VariantType::Int, 1, {"Buffer size that was set in TPCFLPCMVSpec"}},
43 {"configKeyValues", VariantType::String, "", {"Semicolon-separated key=value strings"}}};
44
45 std::swap(workflowOptions, options);
46}
47
49
51{
52 using namespace o2::tpc;
53
54 // set up configuration
55 o2::conf::ConfigurableParam::updateFromFile(config.options().get<std::string>("configFile"));
56 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
57 o2::conf::ConfigurableParam::writeINI("o2tpcaggregatecmv_configuration.ini");
58
59 const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
60 auto timeframes = static_cast<unsigned int>(config.options().get<int>("timeframes"));
61 int aggregateLanes = config.options().get<int>("input-lanes");
62 if (aggregateLanes <= 0) {
63 aggregateLanes = 1;
64 }
65 const bool usePreciseTimestamp = config.options().get<bool>("use-precise-timestamp");
66 const bool sendCCDB = config.options().get<bool>("enable-CCDB-output");
67
68 int nTFsBuffer = config.options().get<int>("n-TFs-buffer");
69 if (nTFsBuffer <= 0) {
70 nTFsBuffer = 1;
71 }
72
73 // convert total TFs per interval to number of buffered TFs
74 assert(timeframes >= static_cast<unsigned int>(nTFsBuffer));
75 timeframes /= static_cast<unsigned int>(nTFsBuffer);
76
77 const std::vector<uint32_t> rangeCRUs(tpcCRUs.begin(), tpcCRUs.end());
78
79 WorkflowSpec workflow;
80 workflow.reserve(static_cast<size_t>(aggregateLanes));
81 LOGP(info, "Starting CMV aggregate with {} lanes, {} timeframes, {} n-TFs-buffer", aggregateLanes, timeframes, nTFsBuffer);
82 for (int ilane = 0; ilane < aggregateLanes; ++ilane) {
83 workflow.emplace_back(getTPCAggregateCMVSpec(ilane, rangeCRUs, timeframes, sendCCDB, usePreciseTimestamp, nTFsBuffer));
84 }
85 return workflow;
86}
Helper function to tokenize sequences and ranges of integral numbers.
TPC aggregation of distributed CMVs, including preprocessing, compression and CCDB output.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
@ MaxCRU
Definition CRU.h:31
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:168
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)