Project
Loading...
Searching...
No Matches
dcs-config-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// example to run:
13// o2-dcs-config-proxy --dcs-config-proxy '--channel-config "name=dcs-config-proxy,type=sub,method=connect,address=tcp://127.0.0.1:5556,rateLogging=1,transport=zeromq"' \
14// --acknowlege-to "type=push,method=connect,address=tcp://127.0.0.1:5557,rateLogging=1,transport=zeromq"
15
21#include "Framework/Logger.h"
22#include "Framework/Lifetime.h"
27#include <fairmq/Device.h>
28#include <fairmq/Parts.h>
30#include <vector>
31#include <string>
32#include <chrono>
33
34using namespace o2::framework;
36
37std::array<o2::header::DataOrigin, 2> exceptionsDetID{"GRP", "AGD"};
38
39void sendAnswer(const std::string& what, const std::string& ack_chan, fair::mq::Device& device)
40{
41 if (!ack_chan.empty()) {
42 auto fmqFactory = device.GetChannel(ack_chan).Transport();
43 auto msg = fmqFactory->CreateMessage(what.size(), fair::mq::Alignment{64});
44 memcpy(msg->GetData(), what.c_str(), what.size());
45 fair::mq::Parts outParts;
46 outParts.AddPart(std::move(msg));
47 sendOnChannel(device, outParts, ack_chan, (size_t)-1);
48 }
49}
50
51auto getDataOriginFromFilename(const std::string& filename)
52{
53 // assume the filename start with detector name
54 auto dIDStr = filename.substr(0, 3);
55 auto dID = DetID::nameToID(dIDStr.c_str(), DetID::First);
56 o2::header::DataOrigin dataOrigin;
57 if (dID < 0) {
58 for (auto& el : exceptionsDetID) {
59 if (el.as<std::string>() == dIDStr) {
60 return el;
61 }
62 }
64 }
65 return DetID(dID).getDataOrigin();
66}
67
68InjectorFunction dcs2dpl(const std::string& acknowledge)
69{
70 return [acknowledge](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) -> bool {
71 auto *device = services.get<RawDeviceService>().device();
72 if (parts.Size() == 0) { // received at ^c, ignore
73 LOG(info) << "ignoring empty message";
74 return false;
75 }
76 // make sure just 2 messages received
77 if (parts.Size() != 2) {
78 LOG(error) << "received " << parts.Size() << " instead of 2 expected";
79 sendAnswer("error0: wrong number of messages", acknowledge, *device);
80 return false;
81 }
82 std::string filename{static_cast<const char*>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
83 size_t filesize = parts.At(1)->GetSize();
84 LOG(info) << "received file " << filename << " of size " << filesize;
86 if (dataOrigin == o2::header::gDataOriginInvalid) {
87 LOG(error) << "unknown detector for " << filename;
88 sendAnswer(fmt::format("{}:error1: unrecognized filename", filename), acknowledge, *device);
89 return false;
90 }
91
92 o2::header::DataHeader hdrF("DCS_CONFIG_FILE", dataOrigin, 0);
93 o2::header::DataHeader hdrN("DCS_CONFIG_NAME", dataOrigin, 0);
95 auto channel = channelRetriever(outsp, newTimesliceId);
96 if (channel.empty()) {
97 LOG(error) << "No output channel found for OutputSpec " << outsp;
98 sendAnswer(fmt::format("{}:error2: no channel to send", filename), acknowledge, *device);
99 return false;
100 }
101
102 hdrF.tfCounter = newTimesliceId;
104 hdrF.splitPayloadParts = 1;
105 hdrF.splitPayloadIndex = 0;
106 hdrF.payloadSize = filesize;
107 hdrF.firstTForbit = 0; // this should be irrelevant for DCS
108
109 hdrN.tfCounter = newTimesliceId;
111 hdrN.splitPayloadParts = 1;
112 hdrN.splitPayloadIndex = 0;
113 hdrN.payloadSize = parts.At(0)->GetSize();
114 hdrN.firstTForbit = 0; // this should be irrelevant for DCS
115
116 auto fmqFactory = device->GetChannel(channel).Transport();
117 std::uint64_t creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
118
119 o2::header::Stack headerStackF{hdrF, DataProcessingHeader{newTimesliceId, 1, creation}};
120 auto hdMessageF = fmqFactory->CreateMessage(headerStackF.size(), fair::mq::Alignment{64});
121 auto plMessageF = fmqFactory->CreateMessage(hdrF.payloadSize, fair::mq::Alignment{64});
122 memcpy(hdMessageF->GetData(), headerStackF.data(), headerStackF.size());
123 memcpy(plMessageF->GetData(), parts.At(1)->GetData(), hdrF.payloadSize);
124
125 o2::header::Stack headerStackN{hdrN, DataProcessingHeader{newTimesliceId, 1, creation}};
126 auto hdMessageN = fmqFactory->CreateMessage(headerStackN.size(), fair::mq::Alignment{64});
127 auto plMessageN = fmqFactory->CreateMessage(hdrN.payloadSize, fair::mq::Alignment{64});
128 memcpy(hdMessageN->GetData(), headerStackN.data(), headerStackN.size());
129 memcpy(plMessageN->GetData(), parts.At(0)->GetData(), hdrN.payloadSize);
130
131 fair::mq::Parts outPartsF;
132 outPartsF.AddPart(std::move(hdMessageF));
133 outPartsF.AddPart(std::move(plMessageF));
134 sendOnChannel(*device, outPartsF, channel, (size_t)-1);
135
136 fair::mq::Parts outPartsN;
137 outPartsN.AddPart(std::move(hdMessageN));
138 outPartsN.AddPart(std::move(plMessageN));
139 sendOnChannel(*device, outPartsN, channel, newTimesliceId);
140
141 sendAnswer(fmt::format("{}:ok", filename), acknowledge, *device);
142 LOG(info) << "Sent DPL message and acknowledgment for file " << filename;
143 return true;
144 };
145}
146
147// we need to add workflow options before including Framework/runDataProcessing
148void customize(std::vector<ConfigParamSpec>& workflowOptions)
149{
150 workflowOptions.push_back(ConfigParamSpec{"acknowlege-to", VariantType::String, "type=push,method=connect,address=tcp://127.0.0.1:5557,rateLogging=1,transport=zeromq", {"channel to acknowledge, no acknowledgement if empty"}});
151 workflowOptions.push_back(ConfigParamSpec{"subscribe-to", VariantType::String, "type=sub,method=connect,address=tcp://127.0.0.1:5556,rateLogging=1,transport=zeromq", {"channel subscribe to"}});
152}
153
155
157{
158 auto setChanName = [](const std::string& chan, const std::string& name) {
159 size_t n = 0;
160 if (std::string(chan).find("name=") != std::string::npos) {
161 n = std::string(chan).find(",");
162 if (n == std::string::npos) {
163 throw std::runtime_error(fmt::format("wrongly formatted channel: {}", chan));
164 }
165 n++;
166 }
167 return o2::utils::Str::concat_string("name=", name, ",", chan.substr(n, chan.size()));
168 };
169
170 const std::string devName = "dcs-config-proxy";
171 auto chan = config.options().get<std::string>("subscribe-to");
172 if (chan.empty()) {
173 throw std::runtime_error("input channel is not provided");
174 }
175 chan = setChanName(chan, devName);
176
177 auto chanTo = config.options().get<std::string>("acknowlege-to");
178 std::string ackChan{};
179 if (!chanTo.empty()) {
180 ackChan = "ackChan";
181 chan = o2::utils::Str::concat_string(chan, ";", setChanName(chanTo, ackChan));
182 }
183 LOG(info) << "Channels setup: " << chan;
184 Outputs dcsOutputs;
185
186 for (int id = DetID::First; id <= DetID::Last; id++) {
187 dcsOutputs.emplace_back(DetID(id).getDataOrigin(), "DCS_CONFIG_FILE", 0, Lifetime::Sporadic);
188 dcsOutputs.emplace_back(DetID(id).getDataOrigin(), "DCS_CONFIG_NAME", 0, Lifetime::Sporadic);
189 }
190 for (auto& el : exceptionsDetID) {
191 dcsOutputs.emplace_back(el, "DCS_CONFIG_FILE", 0, Lifetime::Sporadic);
192 dcsOutputs.emplace_back(el, "DCS_CONFIG_NAME", 0, Lifetime::Sporadic);
193 }
194
196 devName.c_str(),
197 std::move(dcsOutputs),
198 // this is just default, can be overriden by --dcs-config-proxy '--channel-config..'
199 chan.c_str(),
200 dcs2dpl(ackChan));
201
202 WorkflowSpec workflow;
203 workflow.emplace_back(dcsConfigProxy);
204 return workflow;
205}
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID First
Definition DetID.h:94
static constexpr int nameToID(char const *name, int id=First)
Definition DetID.h:154
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
ConfigParamRegistry & options() const
InjectorFunction dcs2dpl()
void sendAnswer(const std::string &what, const std::string &ack_chan, fair::mq::Device &device)
auto getDataOriginFromFilename(const std::string &filename)
o2::detectors::DetID DetID
WorkflowSpec defineDataProcessing(ConfigContext const &config)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
std::array< o2::header::DataOrigin, 2 > exceptionsDetID
GLdouble n
Definition glcorearb.h:1982
GLuint const GLchar * name
Definition glcorearb.h:781
constexpr o2::header::DataOrigin gDataOriginInvalid
Definition DataHeader.h:561
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::vector< DataProcessorSpec > WorkflowSpec
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
std::vector< OutputSpec > Outputs
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
std::string filename()
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153