Project
Loading...
Searching...
No Matches
TimesliceIndex.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/Signpost.h"
13#include <fairmq/Channel.h>
14
15O2_DECLARE_DYNAMIC_LOG(timeslice_index);
16
17namespace o2::framework
18{
19
20TimesliceIndex::TimesliceIndex(size_t maxLanes, std::vector<InputChannelInfo>& channels)
21 : mMaxLanes{maxLanes},
22 mChannels{channels}
23{
24}
25
27{
28 mVariables.resize(s);
29 mPublishedVariables.resize(s);
30 mDirty.resize(s, false);
31}
32
34{
35 assert(mVariables.size() > slot.index);
36 mVariables[slot.index].put({0, static_cast<uint64_t>(timestamp.value)});
37 mVariables[slot.index].commit();
38 mDirty[slot.index] = true;
39 O2_SIGNPOST_ID_GENERATE(tid, timeslice_index);
40 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "associate", "Associating timestamp %zu to slot %zu", timestamp.value, slot.index);
41}
42
43TimesliceSlot TimesliceIndex::findOldestSlot(TimesliceId timestamp) const
44{
45 size_t lane = timestamp.value % mMaxLanes;
46 TimesliceSlot oldest{lane};
47 auto oldPVal = std::get_if<uint64_t>(&mVariables[oldest.index].get(0));
48 if (oldPVal == nullptr) {
49 return oldest;
50 }
51 uint64_t oldTimestamp = *oldPVal;
52
53 for (size_t i = lane + mMaxLanes; i < mVariables.size(); i += mMaxLanes) {
54 auto newPVal = std::get_if<uint64_t>(&mVariables[i].get(0));
55 if (newPVal == nullptr) {
56 return TimesliceSlot{i};
57 }
58 uint64_t newTimestamp = *newPVal;
59
60 if (oldTimestamp > newTimestamp) {
61 oldest = TimesliceSlot{i};
62 oldTimestamp = newTimestamp;
63 }
64 }
65 return oldest;
66}
67
68std::tuple<TimesliceIndex::ActionTaken, TimesliceSlot> TimesliceIndex::replaceLRUWith(data_matcher::VariableContext& newContext, TimesliceId timestamp)
69{
70 auto oldestSlot = findOldestSlot(timestamp);
71 O2_SIGNPOST_ID_GENERATE(tid, timeslice_index);
72 if (TimesliceIndex::isValid(oldestSlot) == false) {
73 mVariables[oldestSlot.index] = newContext;
74 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
75 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "replaceLRUWith", "slot %zu timeslice %zu (%" PRIu64 ")", oldestSlot.index, timestamp.value, *debugTimestamp);
76 return std::make_tuple(ActionTaken::ReplaceUnused, oldestSlot);
77 }
78 auto oldTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
79 if (oldTimestamp == nullptr) {
80 mVariables[oldestSlot.index] = newContext;
81 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
82 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "replaceLRUWith", "slot %zu timeslice %zu (%" PRIu64 ")", oldestSlot.index, timestamp.value, *debugTimestamp);
83 return std::make_tuple(ActionTaken::ReplaceUnused, oldestSlot);
84 }
85
86 auto newTimestamp = std::get_if<uint64_t>(&newContext.get(0));
87 if (newTimestamp == nullptr) {
89 }
90
91 if (*newTimestamp > *oldTimestamp) {
92 switch (mBackpressurePolicy) {
94 mVariables[oldestSlot.index] = newContext;
95 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
96 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "replaceLRUWith", "slot %zu timeslice %zu (%" PRIu64 ")", oldestSlot.index, timestamp.value, *debugTimestamp);
97 return std::make_tuple(ActionTaken::ReplaceObsolete, oldestSlot);
98 }
103 }
104 } else {
105 switch (mBackpressurePolicy) {
107 mVariables[oldestSlot.index] = newContext;
108 auto debugTimestamp = std::get_if<uint64_t>(&mVariables[oldestSlot.index].get(0));
109 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "replaceLRUWith", "slot %zu timeslice %zu (%" PRIu64 ")", oldestSlot.index, timestamp.value, *debugTimestamp);
110 return std::make_tuple(ActionTaken::ReplaceObsolete, oldestSlot);
111 }
116 }
117 }
119}
120
122{
123 bool expectsData = false;
124 for (int ci = 0; ci < mChannels.size(); ci++) {
125 auto& channel = mChannels[ci];
126 // Ignore non data channels
127 if (channel.channelType != ChannelAccountingType::DPL) {
128 continue;
129 }
130 expectsData = true;
131 // A data channel provided the oldest possible timeframe information
132 // we therefore can safely assume that we got some
133 // data from it.
134 if (channel.oldestForChannel.value != 0) {
135 return true;
136 }
137 }
138 return expectsData == false;
139}
140
142{
143 O2_SIGNPOST_ID_GENERATE(tid, timeslice_index);
144 // Each channel oldest possible input must be monotoically increasing.
145 if (timestamp.value < mChannels[channel.value].oldestForChannel.value) {
146 O2_SIGNPOST_EVENT_EMIT_ERROR(timeslice_index, tid, "setOldestPossibleInput",
147 "Received bogus oldest possible timeslice %zu for channel %d. Expected >= %zu.",
148 timestamp.value, channel.value, mChannels[channel.value].oldestForChannel.value);
149 }
150 mChannels[channel.value].oldestForChannel = timestamp;
151 OldestInputInfo result{timestamp, channel};
152 bool changed = false;
153 for (int ci = 0; ci < mChannels.size(); ci++) {
154 // Check if this is a real channel. Skip otherwise.
155 auto& channelRef = mChannels[ci];
156 if (channelRef.channelType != ChannelAccountingType::DPL) {
157 continue;
158 }
159 auto& a = channelRef.oldestForChannel;
160 if (a.value < result.timeslice.value) {
161 changed = true;
162 result = {a, ChannelIndex{ci}};
163 }
164 }
165 if (changed && mOldestPossibleInput.timeslice.value != result.timeslice.value) {
166 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "setOldestPossibleInput", "Success (channel %d): Oldest possible input is %zu due to channel %d",
167 channel.value, result.timeslice.value, result.channel.value);
168 } else if (mOldestPossibleInput.timeslice.value != result.timeslice.value) {
169 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "setOldestPossibleInput", "channel %d: Oldest possible input updated from timestamp: %zu --> %zu",
170 channel.value, mOldestPossibleInput.timeslice.value, result.timeslice.value);
171 } else {
172 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "setOldestPossibleInput", "No change in oldest possible input");
173 }
174 if (mOldestPossibleInput.timeslice.value > result.timeslice.value) {
175 LOG(error) << "DPL internal error - oldestPossibleInput of channel " << channel.value << ": " << getChannelInfo(channel).channel->GetName().c_str() << " decreased from " << mOldestPossibleOutput.timeslice.value << " to " << result.timeslice.value;
176 }
177 mOldestPossibleInput = result;
178 return mOldestPossibleInput;
179}
180
182{
183 if (mDirty[slot.index] == true) {
184 return true;
185 }
186
187 auto timestamp = std::get_if<uint64_t>(&mVariables[slot.index].get(0));
188 if (timestamp != nullptr && *timestamp < mOldestPossibleInput.timeslice.value) {
189 markAsInvalid(slot);
190 return false;
191 }
192 return true;
193}
194
196{
197 auto oldestInput = getOldestPossibleInput();
198 OldestOutputInfo result{oldestInput.timeslice, oldestInput.channel};
199
200 bool changed = false;
201 for (size_t i = 0; i < mVariables.size(); i++) {
202 // We do not check invalid slots.
203 if (isValid(TimesliceSlot{i}) == false) {
204 continue;
205 }
206 auto timestamp = std::get_if<uint64_t>(&mVariables[i].get(0));
207 if (timestamp != nullptr && *timestamp < result.timeslice.value) {
208 changed = true;
209 result.timeslice = TimesliceId{*timestamp};
210 result.slot = {i};
211 result.channel = {(int)-1};
212 }
213 }
214 O2_SIGNPOST_ID_GENERATE(tid, timeslice_index);
215 if (mOldestPossibleOutput.timeslice.value != result.timeslice.value) {
216 if (changed) {
217 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "updateOldestPossibleOutput", "Oldest possible output %zu (before %zu) due to %s %zu",
218 result.timeslice.value, mOldestPossibleOutput.timeslice.value,
219 result.channel.value == -1 ? "slot" : "channel",
220 result.channel.value == -1 ? result.slot.index : result.channel.value);
221 } else {
222 O2_SIGNPOST_EVENT_EMIT(timeslice_index, tid, "updateOldestPossibleOutput", "Oldest possible output updated from oldest Input : %zu --> %zu",
223 mOldestPossibleOutput.timeslice.value, result.timeslice.value);
224 }
225 }
226 if (rewinded == false && mOldestPossibleOutput.timeslice.value > result.timeslice.value) {
227 LOG(error) << "DPL internal error - oldestPossibleOutput decreased from " << mOldestPossibleOutput.timeslice.value << " to " << result.timeslice.value;
228 }
229 mOldestPossibleOutput = result;
230
231 return result;
232}
233
235{
236 return mChannels[channel.value];
237}
238
240{
241 mOldestPossibleInput = {
242 .timeslice = {0},
244 .channel = {ChannelIndex::INVALID}};
245 mOldestPossibleOutput = {
246 .timeslice = {0},
247 .channel = {ChannelIndex::INVALID},
248 .slot = {(size_t)-1}};
249 for (auto& channel : mChannels) {
250 channel.oldestForChannel = {0};
251 }
252}
253
254} // namespace o2::framework
#define O2_BUILTIN_UNREACHABLE
int32_t i
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:515
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
TimesliceIndex(size_t maxLanes, std::vector< InputChannelInfo > &channels)
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
void associate(TimesliceId timestamp, TimesliceSlot slot)
OldestInputInfo getOldestPossibleInput() const
std::tuple< ActionTaken, TimesliceSlot > replaceLRUWith(data_matcher::VariableContext &newContext, TimesliceId timestamp)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
ContextElement::Value const & get(size_t pos) const
GLuint64EXT * result
Definition glcorearb.h:5662
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ DPL
The channel is a normal input channel.
static constexpr int INVALID
fair::mq::Channel * channel
Definition ChannelInfo.h:51
static constexpr uint64_t INVALID
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels