13#include <fairmq/Channel.h>
21 : mMaxLanes{maxLanes},
29 mPublishedVariables.resize(s);
30 mDirty.resize(s,
false);
35 assert(mVariables.size() > slot.
index);
36 mVariables[slot.
index].put({0,
static_cast<uint64_t
>(timestamp.value)});
37 mVariables[slot.
index].commit();
38 mDirty[slot.
index] =
true;
45 size_t lane = timestamp.value % mMaxLanes;
47 auto oldPVal = std::get_if<uint64_t>(&mVariables[oldest.index].get(0));
48 if (oldPVal ==
nullptr) {
51 uint64_t oldTimestamp = *oldPVal;
53 for (
size_t i = lane + mMaxLanes;
i < mVariables.size();
i += mMaxLanes) {
54 auto newPVal = std::get_if<uint64_t>(&mVariables[
i].get(0));
55 if (newPVal ==
nullptr) {
56 return TimesliceSlot{
i};
58 uint64_t newTimestamp = *newPVal;
60 if (oldTimestamp > newTimestamp) {
61 oldest = TimesliceSlot{
i};
62 oldTimestamp = newTimestamp;
70 auto oldestSlot = findOldestSlot(timestamp);
73 mVariables[oldestSlot.index] = newContext;
74 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
75 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"replaceLRUWith",
"slot %zu timeslice %zu (%" PRIu64
")", oldestSlot.index, timestamp.value, *debugTimestamp);
78 auto oldTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
79 if (oldTimestamp ==
nullptr) {
80 mVariables[oldestSlot.index] = newContext;
81 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
82 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"replaceLRUWith",
"slot %zu timeslice %zu (%" PRIu64
")", oldestSlot.index, timestamp.value, *debugTimestamp);
86 auto newTimestamp = std::get_if<uint64_t>(&newContext.
get(0));
87 if (newTimestamp ==
nullptr) {
91 if (*newTimestamp > *oldTimestamp) {
92 switch (mBackpressurePolicy) {
94 mVariables[oldestSlot.index] = newContext;
95 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
96 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"replaceLRUWith",
"slot %zu timeslice %zu (%" PRIu64
")", oldestSlot.index, timestamp.value, *debugTimestamp);
105 switch (mBackpressurePolicy) {
107 mVariables[oldestSlot.index] = newContext;
108 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
109 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"replaceLRUWith",
"slot %zu timeslice %zu (%" PRIu64
")", oldestSlot.index, timestamp.value, *debugTimestamp);
123 bool expectsData =
false;
124 for (
int ci = 0; ci < mChannels.size(); ci++) {
125 auto& channel = mChannels[ci];
134 if (channel.oldestForChannel.value != 0) {
138 return expectsData ==
false;
145 if (timestamp.value < mChannels[channel.
value].oldestForChannel.value) {
147 "Received bogus oldest possible timeslice %zu for channel %d. Expected >= %zu.",
148 timestamp.value, channel.
value, mChannels[channel.
value].oldestForChannel.value);
150 mChannels[channel.
value].oldestForChannel = timestamp;
152 bool changed =
false;
153 for (
int ci = 0; ci < mChannels.size(); ci++) {
155 auto& channelRef = mChannels[ci];
159 auto&
a = channelRef.oldestForChannel;
160 if (
a.value <
result.timeslice.value) {
166 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"setOldestPossibleInput",
"Success (channel %d): Oldest possible input is %zu due to channel %d",
169 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"setOldestPossibleInput",
"channel %d: Oldest possible input updated from timestamp: %zu --> %zu",
172 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"setOldestPossibleInput",
"No change in oldest possible input");
175 LOG(error) <<
"DPL internal error - oldestPossibleInput of channel " << channel.
value <<
": " <<
getChannelInfo(channel).
channel->GetName().c_str() <<
" decreased from " << mOldestPossibleOutput.
timeslice.
value <<
" to " <<
result.timeslice.value;
177 mOldestPossibleInput =
result;
178 return mOldestPossibleInput;
183 if (mDirty[slot.
index] ==
true) {
187 auto timestamp = std::get_if<uint64_t>(&mVariables[slot.
index].get(0));
188 if (timestamp !=
nullptr && *timestamp < mOldestPossibleInput.
timeslice.
value) {
200 bool changed =
false;
201 for (
size_t i = 0;
i < mVariables.size();
i++) {
206 auto timestamp = std::get_if<uint64_t>(&mVariables[
i].get(0));
207 if (timestamp !=
nullptr && *timestamp <
result.timeslice.value) {
217 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"updateOldestPossibleOutput",
"Oldest possible output %zu (before %zu) due to %s %zu",
219 result.channel.value == -1 ?
"slot" :
"channel",
222 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid,
"updateOldestPossibleOutput",
"Oldest possible output updated from oldest Input : %zu --> %zu",
227 LOG(error) <<
"DPL internal error - oldestPossibleOutput decreased from " << mOldestPossibleOutput.
timeslice.
value <<
" to " <<
result.timeslice.value;
229 mOldestPossibleOutput =
result;
236 return mChannels[channel.
value];
241 mOldestPossibleInput = {
245 mOldestPossibleOutput = {
248 .slot = {(size_t)-1}};
249 for (
auto& channel : mChannels) {
250 channel.oldestForChannel = {0};
#define O2_BUILTIN_UNREACHABLE
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
bool didReceiveData() const
TimesliceIndex(size_t maxLanes, std::vector< InputChannelInfo > &channels)
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
void associate(TimesliceId timestamp, TimesliceSlot slot)
OldestInputInfo getOldestPossibleInput() const
std::tuple< ActionTaken, TimesliceSlot > replaceLRUWith(data_matcher::VariableContext &newContext, TimesliceId timestamp)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
ContextElement::Value const & get(size_t pos) const
GLboolean GLboolean GLboolean GLboolean a
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
static constexpr int INVALID
static constexpr uint64_t INVALID
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels