Project
Loading...
Searching...
No Matches
DCStoDPLconverter.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_DCS_TO_DPL_CONVERTER
13#define O2_DCS_TO_DPL_CONVERTER
14
18#include <fairmq/Parts.h>
19#include <fairmq/Device.h>
24#include <unordered_map>
25#include <functional>
26#include <string_view>
27#include <chrono>
28
29namespace o2h = o2::header;
30namespace o2f = o2::framework;
31
32// we need to provide hash function for the DataDescription
33namespace std
34{
35template <>
36struct hash<o2h::DataDescription> {
37 std::size_t operator()(const o2h::DataDescription& d) const noexcept
38 {
39 return std::hash<std::string_view>{}({d.str, size_t(d.size)});
40 }
41};
42} // namespace std
43
44namespace o2::dcs
45{
49
52
53o2f::InjectorFunction dcs2dpl(std::unordered_map<DPID, std::vector<o2h::DataDescription>>& dpid2group, bool fbiFirst, bool verbose = false, int FBIPerInterval = 1)
54{
55
56 return [dpid2group, fbiFirst, verbose, FBIPerInterval](o2::framework::TimingInfo& tinfo, framework::ServiceRegistryRef const& services, fair::mq::Parts& parts, o2f::ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) -> bool {
57 auto *device = services.get<framework::RawDeviceService>().device();
58 static std::unordered_map<DPID, DPCOM> cache; // will keep only the latest measurement in the 1-second wide window for each DPID
59 static std::unordered_map<std::string, int> sentToChannel;
60 static auto timer = std::chrono::high_resolution_clock::now();
61 static auto timer0 = std::chrono::high_resolution_clock::now();
62 static bool seenFBI = false;
63 static uint32_t localTFCounter = 0;
64 static size_t nInp = 0, nInpFBI = 0;
65 static size_t szInp = 0, szInpFBI = 0;
66 if (verbose) {
67 LOG(info) << "In lambda function: ********* Size of unordered_map (--> number of defined groups) = " << dpid2group.size();
68 }
69 // check if we got FBI (Master) or delta (MasterDelta)
70 if (!parts.Size()) {
71 LOGP(warn, "Empty input recieved at timeslice {}", tinfo.timeslice);
72 return false;
73 }
74 std::string firstName = std::string((char*)&(reinterpret_cast<const DPCOM*>(parts.At(0)->GetData()))->id);
75
76 bool isFBI = false;
77 nInp++;
78 if (o2::utils::Str::endsWith(firstName, "Master")) {
79 isFBI = true;
80 nInpFBI++;
81 seenFBI = true;
82 } else if (o2::utils::Str::endsWith(firstName, "MasterDelta")) {
83 isFBI = false;
84 } else {
85 LOGP(error, "Cannot determine if the map is FBI or Delta, 1st DP name is {}", firstName);
86 }
87 if (verbose) {
88 LOGP(info, "New input of {} parts received, map type: {}, timeslice {}", parts.Size(), isFBI ? "FBI" : "Delta", tinfo.timeslice);
89 }
90
91 // We first iterate over the parts of the received message
92 for (size_t i = 0; i < parts.Size(); ++i) { // DCS sends only 1 part, but we should be able to receive more
93 auto sz = parts.At(i)->GetSize();
94 szInp += sz;
95 if (isFBI) {
96 szInpFBI += sz;
97 }
98 auto nDPCOM = sz / sizeof(DPCOM); // number of DPCOM in current part
99 LOGP(debug, "sz={} szof={} -> /={} %={} | {} {}", sz, sizeof(DPCOM), nDPCOM, sz % sizeof(DPCOM), sizeof(o2::dcs::DataPointIdentifier), sizeof(o2::dcs::DataPointValue));
100 for (size_t j = 0; j < nDPCOM; j++) {
101 const auto* ptr = (reinterpret_cast<const DPCOM*>(parts.At(i)->GetData()) + j);
102 DPCOM src;
103 memcpy(&src, ptr, sizeof(DPCOM));
104 // do we want to check if this DP was requested ?
105 auto mapEl = dpid2group.find(src.id);
106 if (verbose) {
107 std::string dest;
108 if (mapEl == dpid2group.end()) {
109 dest = "none";
110 } else {
111 for (const auto& ds : mapEl->second) {
112 dest += fmt::format("{}, ", ds.as<std::string>());
113 }
114 }
115 LOG(info) << "Received DP " << src.id << " (data = " << src.data << "), matched to output-> " << dest;
116 }
117 if (mapEl != dpid2group.end()) {
118 cache[src.id] = src; // this is needed in case in the 1s window we get a new value for the same DP
119 }
120 }
121 }
122 auto timerNow = std::chrono::high_resolution_clock::now();
123 if (fbiFirst && nInpFBI < 2) { // 1st FBI might be obsolete
124 seenFBI = false;
125 static int prevDelay = 0;
126 std::chrono::duration<double, std::ratio<1>> duration = timerNow - timer0;
127 int delay = duration.count();
128 if (delay > prevDelay) {
129 LOGP(info, "Waiting for requested 1st FBI since {} s", delay);
130 prevDelay = delay;
131 }
132 }
133
134 std::chrono::duration<double, std::ratio<1>> duration = timerNow - timer;
135 bool didSendMessages = false;
136 if (duration.count() > 1 && (seenFBI || !fbiFirst)) { // did we accumulate for 1 sec and have we seen FBI if it was requested?
137 std::unordered_map<o2h::DataDescription, pmr::vector<DPCOM>, std::hash<o2h::DataDescription>> outputs;
138 // in the cache we have the final values of the DPs that we should put in the output
139 // distribute DPs over the vectors for each requested output
140 for (auto& it : cache) {
141 auto mapEl = dpid2group.find(it.first);
142 if (mapEl != dpid2group.end()) {
143 for (const auto& ds : mapEl->second) {
144 outputs[ds].push_back(it.second);
145 }
146 }
147 }
148 std::uint64_t creation = std::chrono::time_point_cast<std::chrono::milliseconds>(timerNow).time_since_epoch().count();
149 std::unordered_map<std::string, std::unique_ptr<fair::mq::Parts>> messagesPerRoute;
150 // create and send output messages
151 for (auto& it : outputs) { // distribute messages per routes
152 o2h::DataHeader hdr(it.first, "DCS", 0);
154 if (it.second.empty()) {
155 LOG(warning) << "No data for OutputSpec " << outsp;
156 continue;
157 }
158 auto channel = channelRetriever(outsp, tinfo.timeslice);
159 if (channel.empty()) {
160 LOG(warning) << "No output channel found for OutputSpec " << outsp << ", discarding its data";
161 it.second.clear();
162 continue;
163 }
164
165 hdr.tfCounter = localTFCounter; // this also
167 hdr.splitPayloadParts = 1;
168 hdr.splitPayloadIndex = 1;
169 hdr.payloadSize = it.second.size() * sizeof(DPCOM);
170 hdr.firstTForbit = 0; // this should be irrelevant for DCS
171 o2h::Stack headerStack{hdr, o2::framework::DataProcessingHeader{tinfo.timeslice, 1, creation}};
172 auto fmqFactory = device->GetChannel(channel).Transport();
173 auto hdMessage = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
174 auto plMessage = fmqFactory->CreateMessage(hdr.payloadSize, fair::mq::Alignment{64});
175 memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
176 memcpy(plMessage->GetData(), it.second.data(), hdr.payloadSize);
177
178 fair::mq::Parts* parts2send = messagesPerRoute[channel].get(); // fair::mq::Parts*
179 if (!parts2send) {
180 messagesPerRoute[channel] = std::make_unique<fair::mq::Parts>();
181 parts2send = messagesPerRoute[channel].get();
182 }
183 parts2send->AddPart(std::move(hdMessage));
184 parts2send->AddPart(std::move(plMessage));
185 if (verbose) {
186 LOGP(info, "Pushing {} DPs to {} for TimeSlice {} at {}", it.second.size(), o2f::DataSpecUtils::describe(outsp), tinfo.timeslice, creation);
187 }
188 it.second.clear();
189 }
190 // push output of every route
191 for (auto& msgIt : messagesPerRoute) {
192 if (verbose) {
193 LOG(info) << "Sending " << msgIt.second->Size() / 2 << " parts to channel " << msgIt.first;
194 }
195 o2f::sendOnChannel(*device, *msgIt.second.get(), msgIt.first, tinfo.timeslice);
196 sentToChannel[msgIt.first]++;
197 didSendMessages |= msgIt.second->Size() > 0;
198 }
199 timer = timerNow;
200 cache.clear();
201 if (!messagesPerRoute.empty()) {
202 localTFCounter++;
203 }
204 }
205 if (isFBI && ((nInpFBI % FBIPerInterval) == 0 || verbose)) {
206 float runtime = 1e-3 * std::chrono::duration_cast<std::chrono::milliseconds>(timerNow - timer0).count();
207 std::string sent = "Sent since last FBI report: ";
208 for (auto& m : sentToChannel) {
209 auto pos = m.first.find("_to_");
210 sent += fmt::format("{}:{} ", m.first.substr(pos != std::string::npos ? pos + 4 : 0), m.second);
211 m.second = 0;
212 }
213 LOGP(info, "{} inputs ({} bytes) of which {} FBI ({} bytes) seen in {:.3f} s | {}", nInp, fmt::group_digits(szInp), nInpFBI, fmt::group_digits(szInpFBI), runtime, sent);
214 }
215 return didSendMessages;
216 };
217}
218
219} // namespace o2
220
221#endif /* O2_DCS_TO_DPL_CONVERTER_H */
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
TBranch * ptr
std::ostringstream debug
InjectorFunction dcs2dpl()
const GLfloat * m
Definition glcorearb.h:4066
GLenum src
Definition glcorearb.h:1767
o2::dcs::DataPointCompositeObject DPCOM
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
Defining DataPointCompositeObject explicitly as copiable.
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
static bool endsWith(const std::string &s, const std::string &ending)
std::size_t operator()(const o2h::DataDescription &d) const noexcept
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"