Project
Loading...
Searching...
No Matches
SendingPolicy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
18#include "Framework/Logger.h"
19#include "Headers/STFHeader.h"
20#include "DeviceSpecHelpers.h"
21#include <fairmq/Device.h>
22
23#pragma GCC diagnostic push
24#pragma GCC diagnostic ignored "-Wpedantic"
25namespace o2::framework
26{
27
28std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
29{
30 return {SendingPolicy{
31 .name = "profiling",
32 .matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return getenv("DPL_DEBUG_MESSAGE_SIZE"); },
33 .send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
34 auto &proxy = registry.get<FairMQDeviceProxy>();
35 auto *channel = proxy.getOutputChannel(channelIndex);
36 auto timeout = 1000;
37 int count = 0;
38 auto& relayer = registry.get<DataRelayer>();
39 for (auto& part : parts) {
40 auto* dh = o2::header::get<o2::header::DataHeader*>(part->GetData());
41 if (dh == nullptr) {
42 // This is a payload.
43 continue;
44 }
45 LOGP(info, "Sent {}/{}/{} for a total of {} bytes", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize);
46 count+= dh->payloadSize;
47 auto* dph = o2::header::get<o2::framework::DataProcessingHeader*>(part->GetData());
48 if (dph == nullptr) {
49 // This is a payload.
50 continue;
51 }
52 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
53 if ((size_t)dph->startTime < oldestPossibleOutput.timeslice.value) {
54 LOGP(error, "Sent startTime {} while oldestPossibleOutput is {}. This should not be possible.", dph->startTime, oldestPossibleOutput.timeslice.value);
55 }
56 }
57 LOGP(info, "Sent {} parts for a total of {} bytes", parts.Size(), count);
58 auto res = channel->Send(parts, timeout);
59 if (res == (size_t)fair::mq::TransferCode::timeout) {
60 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout / 1000, channel->GetName());
61 channel->Send(parts);
62 LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
63 } else if (res == (size_t)fair::mq::TransferCode::error) {
64 LOGP(fatal, "Error while sending on channel {}", channel->GetName());
65 } }},
67 .name = "expendable",
68 .matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
69 auto has_label = [](DataProcessorLabel const& label) {
70 return label.value == "expendable";
71 };
72 return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); },
73 .send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
74 auto &proxy = registry.get<FairMQDeviceProxy>();
75 auto *channel = proxy.getOutputChannel(channelIndex);
76 OutputChannelState& state = proxy.getOutputChannelState(channelIndex);
77 auto timeout = 1000;
78 if (state.droppedMessages > 0) {
79 timeout = 0;
80 }
81 if (state.droppedMessages == 1) {
82 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.", timeout / 1000, channel->GetName());
83 }
84 if (state.droppedMessages == 0) {
85 timeout = 1000;
86 }
87 int64_t res = channel->Send(parts, timeout);
88 if (res >= 0) {
89 state.droppedMessages = 0;
90 } else {
91 state.droppedMessages++;
92 } }},
94 .name = "default",
95 .matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return true; },
96 .send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
97 auto &proxy = registry.get<FairMQDeviceProxy>();
98 auto *channel = proxy.getOutputChannel(channelIndex);
99 auto timeout = 1000;
100 auto res = channel->Send(parts, timeout);
101 if (res == (size_t)fair::mq::TransferCode::timeout) {
102 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName());
103 channel->Send(parts);
104 LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
105 } else if (res == (size_t) fair::mq::TransferCode::error) {
106 LOGP(fatal, "Error while sending on channel {}", channel->GetName());
107 } }}};
108}
109
111{
112 return ForwardingPolicy{
113 .name = "default",
114 .matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return true; },
115 .forward = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
116 auto &proxy = registry.get<FairMQDeviceProxy>();
117 auto *channel = proxy.getForwardChannel(channelIndex);
118 auto timeout = 1000;
119 auto res = channel->Send(parts, timeout);
120 if (res == (size_t)fair::mq::TransferCode::timeout) {
121 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName());
122 channel->Send(parts);
123 LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
124 } else if (res == (size_t) fair::mq::TransferCode::error) {
125 LOGP(fatal, "Error while sending on channel {}", channel->GetName());
126 } }};
127}
128
129std::vector<ForwardingPolicy> ForwardingPolicy::createDefaultPolicies()
130{
131 return {ForwardingPolicy{
132 .name = "profiling",
133 .matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return getenv("DPL_DEBUG_MESSAGE_SIZE"); },
134 .forward = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
135 auto &proxy = registry.get<FairMQDeviceProxy>();
136 auto *channel = proxy.getForwardChannel(channelIndex);
137 auto timeout = 1000;
138 int count = 0;
139 auto& relayer = registry.get<DataRelayer>();
140 for (auto& part : parts) {
141 auto* dh = o2::header::get<o2::header::DataHeader*>(part->GetData());
142 if (dh == nullptr) {
143 // This is a payload.
144 continue;
145 }
146 LOGP(info, "Sent {}/{}/{} for a total of {} bytes", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize);
147 count+= dh->payloadSize;
148 auto* dph = o2::header::get<o2::framework::DataProcessingHeader*>(part->GetData());
149 if (dph == nullptr) {
150 // This is a payload.
151 continue;
152 }
153 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
154 if ((size_t)dph->startTime < oldestPossibleOutput.timeslice.value) {
155 LOGP(error, "Sent startTime {} while oldestPossibleOutput is {}. This should not be possible.", dph->startTime, oldestPossibleOutput.timeslice.value);
156 }
157 }
158 LOGP(info, "Sent {} parts for a total of {} bytes", parts.Size(), count);
159 auto res = channel->Send(parts, timeout);
160 if (res == (size_t)fair::mq::TransferCode::timeout) {
161 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName());
162 channel->Send(parts);
163 LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
164 } else if (res == (size_t) fair::mq::TransferCode::error) {
165 LOGP(fatal, "Error while sending on channel {}", channel->GetName());
166 } }},
168 .name = "expendable",
169 .matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
170 auto has_label = [](DataProcessorLabel const& label) {
171 return label.value == "expendable";
172 };
173 return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); },
174 .forward = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
175 auto &proxy = registry.get<FairMQDeviceProxy>();
176 auto *channel = proxy.getForwardChannel(channelIndex);
177 ForwardChannelState& state = proxy.getForwardChannelState(channelIndex);
178 auto timeout = 1000;
179 if (state.droppedMessages > 0) {
180 timeout = 0;
181 }
182 if (state.droppedMessages == 1) {
183 LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.", timeout / 1000, channel->GetName());
184 }
185 if (state.droppedMessages == 0) {
186 timeout = 1000;
187 }
188 int64_t res = channel->Send(parts, timeout);
189 if (res >= 0) {
190 state.droppedMessages = 0;
191 } else {
192 state.droppedMessages++;
193 } }},
195}
196} // namespace o2::framework
benchmark::State & state
uint32_t res
Definition RawData.h:0
fair::mq::Channel * getForwardChannel(ChannelIndex channelIndex) const
fair::mq::Channel * getOutputChannel(ChannelIndex channelIndex) const
GLint GLsizei count
Definition glcorearb.h:399
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
A label that can be associated to a DataProcessorSpec.
static ForwardingPolicy createDefaultForwardingPolicy()
static std::vector< ForwardingPolicy > createDefaultPolicies()
static std::vector< SendingPolicy > createDefaultPolicies()