Project
Loading...
Searching...
No Matches
DataSender.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
26
27using namespace o2::monitoring;
28
29namespace o2::framework
30{
31
32namespace
33{
34std::vector<size_t>
35 createDistinctOutputRouteIndex(std::vector<OutputRoute> const& routes)
36{
37 std::vector<size_t> result;
38 for (size_t ri = 0; ri < routes.size(); ++ri) {
39 auto& route = routes[ri];
40 if (route.timeslice == 0) {
41 result.push_back(ri);
42 }
43 }
44 return result;
45}
46} // namespace
47
49 : mProxy{registry.get<FairMQDeviceProxy>()},
50 mRegistry{registry},
51 mSpec{registry.get<DeviceSpec const>()},
52 mDistinctRoutesIndex{createDistinctOutputRouteIndex(mSpec.outputs)}
53{
54 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
55
56 auto numInputTypes = mDistinctRoutesIndex.size();
57 auto& routes = mSpec.outputs;
58 auto& states = registry.get<DataProcessingStates>();
59 std::string queries = "";
60 for (size_t i = 0; i < numInputTypes; ++i) {
61 char buffer[128];
62 assert(mDistinctRoutesIndex[i] < routes.size());
63 mOutputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
64 DataSpecUtils::describe(buffer, 127, mOutputs.back());
65 queries += std::string_view(buffer, strlen(buffer));
66 queries += ";";
67 }
68
70 states.registerState({.name = "output_matchers", .stateId = stateId, .sendInitialValue = true});
71 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
72 states.processCommandQueue();
74 for (size_t i = 0; i < mOutputs.size(); ++i) {
75 mPresentDefaults.push_back(mOutputs[i].lifetime != Lifetime::Timeframe);
76 }
77
82 for (auto& input : mSpec.inputs) {
83 if (input.matcher.lifetime != Lifetime::Timeframe && input.matcher.lifetime != Lifetime::Optional) {
84 LOGP(detail, "Disabling the Lifetime::timeframe check because not all the inputs are of kind Lifetime::Timeframe");
85 mPresentDefaults.resize(0);
86 break;
87 }
88 }
89 if (mSpec.completionPolicy.name != "consume-all" && mSpec.completionPolicy.name != "consume-all-ordered") {
90 LOGP(detail, "Disabling the Lifetime::timeframe check because the completion policy is not the default one");
91 mPresentDefaults.resize(0);
92 }
93}
94
95std::unique_ptr<fair::mq::Message> DataSender::create(RouteIndex routeIndex)
96{
97 auto& proxy = mRegistry.get<FairMQDeviceProxy>();
98 return proxy.getOutputTransport(routeIndex)->CreateMessage();
99}
100
101void DataSender::send(fair::mq::Parts& parts, ChannelIndex channelIndex)
102{
103 // In case the vector is empty, it means the check is disabled
104 if (mPresentDefaults.empty() == false) {
105 O2DataModelHelpers::updateMissingSporadic(parts, mOutputs, mPresent);
106 }
107 auto& dataProcessorContext = mRegistry.get<DataProcessorContext>();
108 dataProcessorContext.preSendingMessagesCallbacks(mRegistry, parts, channelIndex);
109 auto& info = mProxy.getOutputChannelInfo(channelIndex);
110 info.policy->send(parts, channelIndex, mRegistry);
111}
112
114{
115 mPresent = mPresentDefaults;
116}
117
119{
120 for (auto present : mPresent) {
121 if (!present) {
122 LOGP(debug, "{}", O2DataModelHelpers::describeMissingOutputs(mOutputs, mPresent).c_str());
123 return;
124 }
125 }
126}
127
128} // namespace o2::framework
int32_t i
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::ostringstream debug
std::unique_ptr< fair::mq::Message > create(RouteIndex index)
void verifyMissingSporadic() const
DataSender(ServiceRegistryRef registry)
void send(fair::mq::Parts &, ChannelIndex index)
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::string name
Name of the policy itself.
void preSendingMessagesCallbacks(ServiceRegistryRef, fair::mq::Parts &parts, ChannelIndex channelindex)
Invoke before sending messages parts on a channel channelindex.
static std::string describe(InputSpec const &spec)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
CompletionPolicy completionPolicy
The completion policy to use for this device.
Definition DeviceSpec.h:72
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static void updateMissingSporadic(fair::mq::Parts &parts, std::vector< OutputSpec > const &specs, std::vector< bool > &present)
static std::string describeMissingOutputs(std::vector< OutputSpec > const &specs, std::vector< bool > const &present)
SendingPolicy const * policy
Definition ChannelInfo.h:77