Project
Loading...
Searching...
No Matches
ctp-qc-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
13// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
14// All rights not expressly granted are reserved.
15//
16// This software is distributed under the terms of the GNU General Public
17// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
18//
19// In applying this license CERN does not waive the privileges and immunities
20// granted to it by virtue of its status as an Intergovernmental Organization
21// or submit itself to any jurisdiction.
22
23// example to run:
24// default: processing , intermal database
25// o2-ctp-qc-proxy --ctp-qc-proxy '--channel-config "name=ctp-qc-proxy,type=sub,method=connect,address=tcp://10.161.64.100:50090,rateLogging=5,transport=zeromq"' -b
26
32#include "Framework/Logger.h"
33#include "Framework/Lifetime.h"
38#include <fairmq/Device.h>
39#include <fairmq/Parts.h>
41#include <vector>
42#include <string>
43
44using namespace o2::framework;
47// InjectorFunction dcs2dpl()
48{
49 return [](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) -> bool {
50 auto* device = services.get<RawDeviceService>().device();
51 std::string messageHeader{static_cast<const char*>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
52 size_t dataSize = parts.At(1)->GetSize();
53 std::string messageData{static_cast<const char*>(parts.At(1)->GetData()), parts.At(1)->GetSize()};
54 LOG(info) << "received message " << messageHeader << " of size " << dataSize << "#parts:" << parts.Size(); // << " Payload:" << messageData;
57 auto channel = channelRetriever(outsp, newTimesliceId);
58 if (channel.empty()) {
59 LOG(error) << "No output channel found for OutputSpec " << outsp;
60 return false;
61 }
62 hdrF.tfCounter = newTimesliceId; // this also
64 hdrF.splitPayloadParts = 1;
65 hdrF.splitPayloadIndex = 0;
66 hdrF.payloadSize = parts.At(0)->GetSize() + parts.At(1)->GetSize() + 1;
67 hdrF.firstTForbit = 0; // this should be irrelevant for Counters ? Orbit is in payload
68
69 auto fmqFactory = device->GetChannel(channel).Transport();
70
71 o2::header::Stack headerStackF{hdrF, DataProcessingHeader{newTimesliceId, 1}};
72 auto hdMessageF = fmqFactory->CreateMessage(headerStackF.size(), fair::mq::Alignment{64});
73 auto plMessageF = fmqFactory->CreateMessage(hdrF.payloadSize, fair::mq::Alignment{64});
74 memcpy(hdMessageF->GetData(), headerStackF.data(), headerStackF.size());
75 std::string payload = (messageHeader + " " + messageData);
76 LOG(info) << messageHeader;
77 int Nchars = 1000;
78 if (messageData.size() > Nchars) {
79 LOG(info) << messageData.substr(0, Nchars);
80 } else {
81 LOG(info) << messageData;
82 }
83 const char* c = payload.c_str();
84 const void* pp = static_cast<const void*>(c);
85 memcpy(plMessageF->GetData(), pp, hdrF.payloadSize);
86
87 //
88
89 fair::mq::Parts outParts;
90 outParts.AddPart(std::move(hdMessageF));
91 outParts.AddPart(std::move(plMessageF));
92 sendOnChannel(*device, outParts, channel, newTimesliceId);
93 LOG(info) << "Sent CTP counters DPL message" << std::flush;
94 return true;
95 };
96}
97
98// we need to add workflow options before including Framework/runDataProcessing
99void customize(std::vector<ConfigParamSpec>& workflowOptions)
100{
101 workflowOptions.push_back(ConfigParamSpec{"subscribe-to", VariantType::String, "type=sub,method=connect,address=tcp://188.184.30.57:5556,rateLogging=10,transport=zeromq", {"channel subscribe to"}});
102}
103
105
107{
108 LOG(info) << "Defining data processing";
109 auto setChanName = [](const std::string& chan, const std::string& name) {
110 size_t n = 0;
111 if (std::string(chan).find("name=") != std::string::npos) {
112 n = std::string(chan).find(",");
113 if (n == std::string::npos) {
114 throw std::runtime_error(fmt::format("wrongly formatted channel: {}", chan));
115 }
116 n++;
117 }
118 LOG(info) << "===>inside:" << name << " " << chan;
119 return o2::utils::Str::concat_string("name=", name, ",", chan.substr(n, chan.size()));
120 };
121 const std::string devName = "ctp-qc-proxy";
122 auto chan = config.options().get<std::string>("subscribe-to");
123 if (chan.empty()) {
124 throw std::runtime_error("input channel is not provided");
125 }
126 chan = setChanName(chan, devName);
127 LOG(info) << "name:" << devName << " chan:" << chan;
128 LOG(info) << "Channels setup: " << chan;
129 Outputs ctpCountersOutputs;
130 ctpCountersOutputs.emplace_back("CTP", "CTP_COUNTERS", 0, Lifetime::Timeframe);
131 LOG(info) << "===> Proxy to be set";
133 devName.c_str(),
134 std::move(ctpCountersOutputs),
135 // this is just default, can be overriden by --ctp-config-proxy '--channel-config..'
136 chan.c_str(),
137 dcs2dpl());
138 ctpProxy.labels.emplace_back(DataProcessorLabel{"input-proxy"});
139 LOG(info) << "===> Proxy done";
140 WorkflowSpec workflow;
141 workflow.emplace_back(ctpProxy);
142 return workflow;
143}
uint32_t c
Definition RawData.h:2
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
ConfigParamRegistry & options() const
WorkflowSpec defineDataProcessing(ConfigContext const &config)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
InjectorFunction dcs2dpl()
GLdouble n
Definition glcorearb.h:1982
GLenum GLsizei dataSize
Definition glcorearb.h:3994
GLuint const GLchar * name
Definition glcorearb.h:781
constexpr o2::header::DataOrigin gDataOriginCTP
Definition DataHeader.h:564
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::vector< DataProcessorSpec > WorkflowSpec
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
std::vector< OutputSpec > Outputs
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"