Project
Loading...
Searching...
No Matches
GRPUpdaterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include "GRPUpdaterSpec.h"
17#include "Framework/Task.h"
18#include "Framework/Lifetime.h"
23#include <TFile.h>
24#include <fairlogger/Logger.h>
25#include <memory> // for make_shared, make_unique, unique_ptr
26#include <string>
28
29// this is for some process synchronization, since
30// we need to prevent writing concurrently to the same global GRP file
31#include <boost/interprocess/sync/named_semaphore.hpp>
32#include <filesystem>
33#include <unordered_map> // for the hashing utility
34
35using namespace o2::framework;
36
37namespace o2
38{
39namespace parameters
40{
41
43
44static std::vector<o2::detectors::DetID> sDetList;
45
47{
48
49 public:
50 GRPDPLUpdatedTask(const std::string& prefix) : mPrefix{prefix} {}
51
53 {
54 // nothing special to be set up
55 }
56
58 {
59 const std::string grpName{o2::base::NameConf::CCDBOBJECT};
60 auto grpFileName = o2::base::NameConf::getGRPFileName(mPrefix);
61 auto grpECSFileName = o2::base::NameConf::getGRPECSFileName(mPrefix);
62 // a standardized semaphore convention --> taking the current execution path should be enough
63 // (the user enables this via O2_USEGRP_SEMA environment)
64 boost::interprocess::named_semaphore* sem = nullptr;
65 std::string semhashedstring;
66 try {
67 const auto semname = std::filesystem::current_path().string() + grpECSFileName;
68 std::hash<std::string> hasher;
69 semhashedstring = "alice_grp_" + std::to_string(hasher(semname)).substr(0, 16);
70 sem = new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
71 } catch (std::exception e) {
72 LOG(warn) << "Could not setup GRP semaphore; Continuing without";
73 sem = nullptr;
74 }
75 try {
76 if (sem) {
77 sem->wait(); // wait until we can enter (no one else there)
78 }
79
80 auto postSem = [sem, &semhashedstring] {
81 if (sem) {
82 sem->post();
83 if (sem->try_wait()) {
84 // if nobody else is waiting remove the semaphore resource
85 sem->post();
86 boost::interprocess::named_semaphore::remove(semhashedstring.c_str());
87 }
88 delete sem;
89 }
90 };
91 updateECSData<o2::parameters::GRPECSObject>(grpECSFileName, pc);
92 updateECSData<o2::parameters::GRPObject>(grpFileName, pc); // RS FIXME: suppress once we completely switch to GRPs triplet
93 updateLHCIFData(pc);
94 postSem();
95 } catch (boost::interprocess::interprocess_exception e) {
96 LOG(error) << "Caught semaphore exception " << e.what();
97 }
98 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
99 }
100
101 private:
102 template <typename GRP>
103 void updateECSData(const std::string& grpFileName, framework::ProcessingContext& pc)
104 {
105 const std::string grpName{o2::base::NameConf::CCDBOBJECT};
106 TFile flGRP(grpFileName.c_str(), "update");
107 if (flGRP.IsZombie()) {
108 LOG(error) << "Failed to open in update mode " << grpFileName;
109 return;
110 }
111 std::unique_ptr<GRP> grp(static_cast<GRP*>(flGRP.GetObjectChecked(grpName.c_str(), GRP::Class())));
112 for (auto det : sDetList) { // get readout mode data from different detectors
113 auto roMode = pc.inputs().get<typename GRP::ROMode>(det.getName());
114 if (!(roMode & GRP::PRESENT)) {
115 LOG(error) << "Detector " << det.getName() << " is read out while processor set ABSENT";
116 continue;
117 }
118 grp->setDetROMode(det, roMode);
119 }
120 grp->setIsMC(true);
121 grp->setNHBFPerTF(o2::raw::HBFUtils::Instance().nHBFPerTF);
122 LOG(info) << "Updated " << grpFileName << " with detectors RO modes";
123 grp->print();
124 flGRP.WriteObjectAny(grp.get(), grp->Class(), grpName.c_str());
125 flGRP.Close();
126 }
127
128 void updateLHCIFData(framework::ProcessingContext& pc)
129 {
130 using GRPLHCIF = o2::parameters::GRPLHCIFData;
131 auto grpFileName = o2::base::NameConf::getGRPLHCIFFileName(mPrefix);
132 const std::string grpName{o2::base::NameConf::CCDBOBJECT};
133 if (!std::filesystem::exists(grpFileName)) {
134 LOGP(info, "GRPLHCIF file {} is absent, abandon setting bunch-filling", grpFileName);
135 return;
136 }
137 TFile flGRP(grpFileName.c_str(), "update");
138 if (flGRP.IsZombie()) {
139 LOG(fatal) << "Failed to open in update mode " << grpFileName;
140 }
141 std::unique_ptr<GRPLHCIF> grp(static_cast<GRPLHCIF*>(flGRP.GetObjectChecked(grpName.c_str(), GRPLHCIF::Class())));
142 grp->setBunchFillingWithTime(grp->getBeamEnergyPerZTime(), pc.inputs().get<o2::BunchFilling>("bunchfilling")); // borrow the time from the existing entry
143 flGRP.WriteObjectAny(grp.get(), grp->Class(), grpName.c_str());
144 flGRP.Close();
145 LOG(info) << "Updated " << grpFileName << " with bunch filling";
146 }
147
148 std::string mPrefix = "o2sim";
149};
150
152o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string& prefix, const std::vector<o2::detectors::DetID>& detList)
153{
154 sDetList = detList;
155
156 // prepare specs
157 std::vector<InputSpec> inputs;
158 for (const auto det : detList) {
159 inputs.emplace_back(InputSpec{det.getName(), det.getDataOrigin(), "ROMode", 0, Lifetime::Timeframe});
160 }
161 inputs.emplace_back(InputSpec{"bunchfilling", "SIM", "BUNCHFILLING", 0, Lifetime::Timeframe});
162
163 return DataProcessorSpec{
164 "GRPUpdater",
165 inputs, // input status from each detector
166 {}, // no output
167 AlgorithmSpec{adaptFromTask<GRPDPLUpdatedTask>(prefix)},
168 Options{/* for the moment no options */}};
169}
170
171} // end namespace parameters
172} // end namespace o2
Header of the AggregatedRunInfo struct.
container for the LHC InterFace data
Header of the General Run Parameters object.
Definition of the Names Generator class.
static std::string getGRPECSFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.cxx:64
static std::string getGRPLHCIFFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.cxx:70
static constexpr std::string_view CCDBOBJECT
Definition NameConf.h:66
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.cxx:58
o2::header::DataHeader::SubSpecificationType SubSpecificationType
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void run(framework::ProcessingContext &pc)
GRPDPLUpdatedTask(const std::string &prefix)
void init(framework::InitContext &ic)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
framework::DataAllocator::SubSpecificationType SubSpecificationType
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"