41 if (!ack_chan.empty()) {
42 auto fmqFactory = device.GetChannel(ack_chan).Transport();
43 auto msg = fmqFactory->CreateMessage(what.size(), fair::mq::Alignment{64});
44 memcpy(
msg->GetData(), what.c_str(), what.size());
45 fair::mq::Parts outParts;
46 outParts.AddPart(std::move(
msg));
72 if (parts.Size() == 0) {
73 LOG(info) <<
"ignoring empty message";
77 if (parts.Size() != 2) {
78 LOG(error) <<
"received " << parts.Size() <<
" instead of 2 expected";
79 sendAnswer(
"error0: wrong number of messages", acknowledge, *device);
82 std::string
filename{
static_cast<const char*
>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
83 size_t filesize = parts.At(1)->GetSize();
84 LOG(info) <<
"received file " <<
filename <<
" of size " << filesize;
88 sendAnswer(fmt::format(
"{}:error1: unrecognized filename",
filename), acknowledge, *device);
95 auto channel = channelRetriever(outsp, newTimesliceId);
96 if (channel.empty()) {
97 LOG(error) <<
"No output channel found for OutputSpec " << outsp;
98 sendAnswer(fmt::format(
"{}:error2: no channel to send",
filename), acknowledge, *device);
116 auto fmqFactory = device->GetChannel(channel).Transport();
117 std::uint64_t creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
120 auto hdMessageF = fmqFactory->CreateMessage(headerStackF.size(), fair::mq::Alignment{64});
121 auto plMessageF = fmqFactory->CreateMessage(hdrF.
payloadSize, fair::mq::Alignment{64});
122 memcpy(hdMessageF->GetData(), headerStackF.data(), headerStackF.size());
123 memcpy(plMessageF->GetData(), parts.At(1)->GetData(), hdrF.
payloadSize);
126 auto hdMessageN = fmqFactory->CreateMessage(headerStackN.size(), fair::mq::Alignment{64});
127 auto plMessageN = fmqFactory->CreateMessage(hdrN.
payloadSize, fair::mq::Alignment{64});
128 memcpy(hdMessageN->GetData(), headerStackN.data(), headerStackN.size());
129 memcpy(plMessageN->GetData(), parts.At(0)->GetData(), hdrN.
payloadSize);
131 fair::mq::Parts outPartsF;
132 outPartsF.AddPart(std::move(hdMessageF));
133 outPartsF.AddPart(std::move(plMessageF));
136 fair::mq::Parts outPartsN;
137 outPartsN.AddPart(std::move(hdMessageN));
138 outPartsN.AddPart(std::move(plMessageN));
142 LOG(info) <<
"Sent DPL message and acknowledgment for file " <<
filename;
158 auto setChanName = [](
const std::string& chan,
const std::string&
name) {
160 if (std::string(chan).find(
"name=") != std::string::npos) {
161 n = std::string(chan).find(
",");
162 if (
n == std::string::npos) {
163 throw std::runtime_error(fmt::format(
"wrongly formatted channel: {}", chan));
170 const std::string devName =
"dcs-config-proxy";
171 auto chan = config.
options().
get<std::string>(
"subscribe-to");
173 throw std::runtime_error(
"input channel is not provided");
175 chan = setChanName(chan, devName);
177 auto chanTo = config.
options().
get<std::string>(
"acknowlege-to");
178 std::string ackChan{};
179 if (!chanTo.empty()) {
183 LOG(info) <<
"Channels setup: " << chan;
187 dcsOutputs.emplace_back(
DetID(
id).getDataOrigin(),
"DCS_CONFIG_FILE", 0, Lifetime::Sporadic);
188 dcsOutputs.emplace_back(
DetID(
id).getDataOrigin(),
"DCS_CONFIG_NAME", 0, Lifetime::Sporadic);
191 dcsOutputs.emplace_back(el,
"DCS_CONFIG_FILE", 0, Lifetime::Sporadic);
192 dcsOutputs.emplace_back(el,
"DCS_CONFIG_NAME", 0, Lifetime::Sporadic);
197 std::move(dcsOutputs),
203 workflow.emplace_back(dcsConfigProxy);
ConfigParamRegistry & options() const
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction