51 std::string messageHeader{
static_cast<const char*
>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
52 size_t dataSize = parts.At(1)->GetSize();
53 std::string messageData{
static_cast<const char*
>(parts.At(1)->GetData()), parts.At(1)->GetSize()};
54 LOG(info) <<
"received message " << messageHeader <<
" of size " <<
dataSize <<
"#parts:" << parts.Size();
57 auto channel = channelRetriever(outsp, newTimesliceId);
58 if (channel.empty()) {
59 LOG(error) <<
"No output channel found for OutputSpec " << outsp;
66 hdrF.
payloadSize = parts.At(0)->GetSize() + parts.At(1)->GetSize() + 1;
69 auto fmqFactory = device->GetChannel(channel).Transport();
72 auto hdMessageF = fmqFactory->CreateMessage(headerStackF.size(), fair::mq::Alignment{64});
73 auto plMessageF = fmqFactory->CreateMessage(hdrF.
payloadSize, fair::mq::Alignment{64});
74 memcpy(hdMessageF->GetData(), headerStackF.data(), headerStackF.size());
75 std::string payload = (messageHeader +
" " + messageData);
76 LOG(info) << messageHeader;
78 if (messageData.size() > Nchars) {
79 LOG(info) << messageData.substr(0, Nchars);
81 LOG(info) << messageData;
83 const char*
c = payload.c_str();
84 const void* pp =
static_cast<const void*
>(
c);
85 memcpy(plMessageF->GetData(), pp, hdrF.
payloadSize);
89 fair::mq::Parts outParts;
90 outParts.AddPart(std::move(hdMessageF));
91 outParts.AddPart(std::move(plMessageF));
93 LOG(info) <<
"Sent CTP counters DPL message" << std::flush;
108 LOG(info) <<
"Defining data processing";
109 auto setChanName = [](
const std::string& chan,
const std::string&
name) {
111 if (std::string(chan).find(
"name=") != std::string::npos) {
112 n = std::string(chan).find(
",");
113 if (
n == std::string::npos) {
114 throw std::runtime_error(fmt::format(
"wrongly formatted channel: {}", chan));
118 LOG(info) <<
"===>inside:" <<
name <<
" " << chan;
121 const std::string devName =
"ctp-qc-proxy";
122 auto chan = config.
options().
get<std::string>(
"subscribe-to");
124 throw std::runtime_error(
"input channel is not provided");
126 chan = setChanName(chan, devName);
127 LOG(info) <<
"name:" << devName <<
" chan:" << chan;
128 LOG(info) <<
"Channels setup: " << chan;
130 ctpCountersOutputs.emplace_back(
"CTP",
"CTP_COUNTERS", 0, Lifetime::Timeframe);
131 LOG(info) <<
"===> Proxy to be set";
134 std::move(ctpCountersOutputs),
139 LOG(info) <<
"===> Proxy done";
141 workflow.emplace_back(ctpProxy);
ConfigParamRegistry & options() const
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction