39 for (
auto& part : parts) {
40 auto* dh = o2::header::get<o2::header::DataHeader*>(part->GetData());
45 LOGP(info,
"Sent {}/{}/{} for a total of {} bytes", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize);
46 count+= dh->payloadSize;
47 auto* dph = o2::header::get<o2::framework::DataProcessingHeader*>(part->GetData());
52 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
53 if ((
size_t)dph->startTime < oldestPossibleOutput.timeslice.value) {
54 LOGP(error,
"Sent startTime {} while oldestPossibleOutput is {}. This should not be possible.", dph->startTime, oldestPossibleOutput.timeslice.value);
57 LOGP(info,
"Sent {} parts for a total of {} bytes", parts.Size(),
count);
59 if (
res == (
size_t)fair::mq::TransferCode::timeout) {
60 LOGP(warning,
"Timed out sending after {}s. Downstream backpressure detected on {}.",
timeout / 1000, channel->GetName());
62 LOGP(info,
"Downstream backpressure on {} recovered.", channel->GetName());
63 }
else if (
res == (
size_t)fair::mq::TransferCode::error) {
64 LOGP(fatal,
"Error while sending on channel {}", channel->GetName());
70 return label.value ==
"expendable";
72 return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); },
78 if (
state.droppedMessages > 0) {
81 if (
state.droppedMessages == 1) {
82 LOGP(warning,
"Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.",
timeout / 1000, channel->GetName());
84 if (
state.droppedMessages == 0) {
89 state.droppedMessages = 0;
91 state.droppedMessages++;
101 if (
res == (
size_t)fair::mq::TransferCode::timeout) {
102 LOGP(warning,
"Timed out sending after {}s. Downstream backpressure detected on {}.",
timeout/1000, channel->GetName());
103 channel->Send(parts);
104 LOGP(info,
"Downstream backpressure on {} recovered.", channel->GetName());
105 }
else if (
res == (
size_t) fair::mq::TransferCode::error) {
106 LOGP(fatal,
"Error while sending on channel {}", channel->GetName());
140 for (
auto& part : parts) {
141 auto* dh = o2::header::get<o2::header::DataHeader*>(part->GetData());
146 LOGP(info,
"Sent {}/{}/{} for a total of {} bytes", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize);
147 count+= dh->payloadSize;
148 auto* dph = o2::header::get<o2::framework::DataProcessingHeader*>(part->GetData());
149 if (dph ==
nullptr) {
153 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
154 if ((
size_t)dph->startTime < oldestPossibleOutput.timeslice.value) {
155 LOGP(error,
"Sent startTime {} while oldestPossibleOutput is {}. This should not be possible.", dph->startTime, oldestPossibleOutput.timeslice.value);
158 LOGP(info,
"Sent {} parts for a total of {} bytes", parts.Size(),
count);
160 if (
res == (
size_t)fair::mq::TransferCode::timeout) {
161 LOGP(warning,
"Timed out sending after {}s. Downstream backpressure detected on {}.",
timeout/1000, channel->GetName());
162 channel->Send(parts);
163 LOGP(info,
"Downstream backpressure on {} recovered.", channel->GetName());
164 }
else if (
res == (
size_t) fair::mq::TransferCode::error) {
165 LOGP(fatal,
"Error while sending on channel {}", channel->GetName());
168 .
name =
"expendable",
171 return label.value ==
"expendable";
173 return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); },
179 if (
state.droppedMessages > 0) {
182 if (
state.droppedMessages == 1) {
183 LOGP(warning,
"Timed out sending after {}s. Downstream backpressure detected on expendable channel {}. Switching to dropping mode.",
timeout / 1000, channel->GetName());
185 if (
state.droppedMessages == 0) {
190 state.droppedMessages = 0;
192 state.droppedMessages++;