51 for (
auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
52 const auto* hd = (*it)->header();
53 if (hd->dataOrigin == spec.
origin && hd->dataDescription == spec.
description && hd->subSpecification == spec.
subSpec) {
57 for (
auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
58 const auto* hd = (*it)->header();
59 if (hd->dataOrigin == spec.
origin && hd->dataDescription == spec.
description && hd->subSpecification == spec.
subSpec) {
87 int noutputs = mDidDispatch ? 1 : 0;
89 for (
auto it = mMessages.rbegin(); it != mMessages.rend(); ++it) {
90 if (!excludeDPLOrigin || (*it)->header()->dataOrigin != DataOriginDPL) {
94 for (
auto it = mScheduledMessages.rbegin(); it != mScheduledMessages.rend(); ++it) {
95 if (!excludeDPLOrigin || (*it)->header()->dataOrigin != DataOriginDPL) {
135 auto const* header =
message->header();
136 if (header ==
nullptr) {
137 throw std::logic_error(
"No valid header message found");
139 mScheduledMessages.emplace_back(std::move(
message));
140 if (mDispatchControl.
dispatch !=
nullptr) {
142 if (mDispatchControl.
trigger ==
nullptr || mDispatchControl.
trigger(*header)) {
143 std::vector<fair::mq::Parts> outputsPerChannel;
145 for (
auto&
message : mScheduledMessages) {
146 fair::mq::Parts parts =
message->finalize();
148 assert(parts.Size() == 2);
149 for (
auto& part : parts) {
154 auto& parts = outputsPerChannel[ci];
155 if (parts.Size() == 0) {
160 mDidDispatch = mScheduledMessages.empty() ==
false;
161 mScheduledMessages.clear();
o2::header::Stack * findMessageHeaderStack(const Output &spec)
static constexpr int DefaultChannelIndex
void schedule(Messages::value_type &&message)
int countDeviceOutputs(bool excludeDPLOrigin=false) const
int64_t addToCache(std::unique_ptr< fair::mq::Message > &message)
o2::header::DataHeader * findMessageHeader(const Output &spec)
return the headers of the 1st (from the end) matching message checking first in mMessages then in mSc...
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
fair::mq::MessagePtr createMessage(RouteIndex routeIndex, int index, size_t size)
o2::framework::DataProcessingHeader * findMessageDataProcessingHeader(const Output &spec)
void pruneFromCache(int64_t id)