Project
Loading...
Searching...
No Matches
LifetimeHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "DecongestionService.h"
14#include "Framework/InputSpec.h"
17#include "Framework/Logger.h"
29#include "Framework/Signpost.h"
30
31#include "Headers/DataHeader.h"
33#include "Headers/Stack.h"
36#include <typeinfo>
37#include <TError.h>
38#include <TMemFile.h>
39#include <curl/curl.h>
40
41#include <fairmq/Device.h>
42
43#include <cstdlib>
44#include <random>
45
46using namespace o2::header;
47using namespace fair;
48
50
51namespace o2::framework
52{
53
54namespace
55{
56size_t getCurrentTime()
57{
58 auto now = std::chrono::system_clock::now();
59 auto duration = now.time_since_epoch();
60 return std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
61}
62} // namespace
63
70
71ExpirationHandler::Creator LifetimeHelpers::enumDrivenCreation(size_t start, size_t end, size_t step, size_t inputTimeslice, size_t maxInputTimeslices, size_t maxRepetitions)
72{
73 size_t firstTimeslice = start + inputTimeslice * step;
74 auto repetition = std::make_shared<size_t>(0);
75
76 return [end, step, firstTimeslice, maxInputTimeslices, maxRepetitions, repetition](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot {
77 auto& index = services.get<TimesliceIndex>();
78 auto& decongestion = services.get<DecongestionService>();
79 if (decongestion.nextEnumerationTimeslice == 0) {
80 decongestion.nextEnumerationTimeslice = firstTimeslice;
81 }
82
83 for (size_t si = 0; si < index.size(); si++) {
84 if (decongestion.nextEnumerationTimeslice > end) {
85 LOGP(debug, "Last greater than end");
87 }
88 auto slot = TimesliceSlot{si};
89 if (index.isValid(slot) == false) {
90 TimesliceId timestamp{decongestion.nextEnumerationTimeslice};
91 *repetition += 1;
92 if (*repetition % maxRepetitions == 0) {
93 decongestion.nextEnumerationTimeslice += step * maxInputTimeslices;
94 }
95 LOGP(debug, "Associating timestamp {} to slot {}", timestamp.value, slot.index);
96 index.associate(timestamp, slot);
97 // We know that next association will bring in last
98 // so we can state this will be the latest possible input for the channel
99 // associated with this.
100 LOG(debug) << "Oldest possible input is " << decongestion.nextEnumerationTimeslice;
101 [[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
102 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
103 return slot;
104 }
105 }
106
107 LOGP(debug, "No slots available");
109 };
110}
111
112ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::chrono::microseconds> periods, std::vector<std::chrono::seconds> intervals, std::function<bool(void)> hasTimerFired, std::function<void(uint64_t, uint64_t)> updateTimerPeriod)
113{
114 std::shared_ptr<bool> stablePeriods = std::make_shared<bool>(false);
115 // FIXME: should create timeslices when period expires....
116 return [stablePeriods, periods, intervals, hasTimerFired, updateTimerPeriod](ServiceRegistryRef services, ChannelIndex channelIndex) mutable -> TimesliceSlot {
117 // We start with a random offset to avoid all the devices
118 // send their first message at the same time, bring down
119 // the QC machine.
120 // We reduce the first interval rather than increasing it
121 // to avoid having a triggered timer which appears to be in
122 // the future.
123 // We do it here because if we do it in configure, long delays
124 // between configure and run will cause this to behave
125 // incorrectly.
126 auto& index = services.get<TimesliceIndex>();
127 auto& decongestion = services.get<DecongestionService>();
128
129 bool timerHasFired = hasTimerFired();
130 if (decongestion.nextEnumerationTimeslice == 0ULL || (index.didReceiveData() == false && timerHasFired)) {
131 std::random_device r;
132 std::default_random_engine e1(r());
133 std::uniform_int_distribution<uint64_t> dist(0, periods.front().count() * 0.9);
134 auto randomizedPeriodUs = static_cast<int64_t>(dist(e1) + periods.front().count() * 0.1);
135 decongestion.nextEnumerationTimeslice = getCurrentTime() - randomizedPeriodUs;
136 updateTimerPeriod(randomizedPeriodUs / 1000, randomizedPeriodUs / 1000);
137 *stablePeriods = false;
138 LOG(debug) << "Timer updated to a randomized period of " << randomizedPeriodUs << "us";
139 } else if (timerHasFired && *stablePeriods == false) {
140 updateTimerPeriod(periods.front().count() / 1000, periods.front().count() / 1000);
141 *stablePeriods = true;
142 LOG(debug) << "Timer updated to a stable period of " << periods.front().count() << "us";
143 }
144 // Nothing to do if the time has not expired yet.
145 if (timerHasFired == false) {
146 [[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
147 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
149 }
150 // Get the first time we were invoked.
151 static auto firstTime = getCurrentTime();
152 auto current = getCurrentTime();
153 if ((current - firstTime) / 1000000 > intervals.front().count() && periods.size() > 1) {
154 LOGP(detail, "First {} seconds with period {} elapsed, switching to new interval.", intervals.front().count(), periods.front().count());
155 // Remove the first period and the first interval
156 periods.erase(periods.begin());
157 intervals.erase(intervals.begin());
158 LOGP(detail, "New period for timer is {}.", periods.front().count());
159 updateTimerPeriod(periods.front().count() / 1000, periods.front().count() / 1000);
160 }
161 // We first check if the current time is not already present
162 // FIXME: this should really be done by query matching? Ok
163 // for now to avoid duplicate entries.
164 for (size_t i = 0; i < index.size(); ++i) {
165 TimesliceSlot slot{i};
166 if (index.isValid(slot) == false) {
167 continue;
168 }
169 auto& variables = index.getVariablesForSlot(slot);
170 if (VariableContextHelpers::getTimeslice(variables).value == current) {
171 [[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
172 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
174 }
175 }
176
177 decongestion.nextEnumerationTimeslice = current;
178 // If we are here the timer has expired and a new slice needs
179 // to be created.
181 newContext.put({0, static_cast<uint64_t>(current)});
182 newContext.commit();
183 auto [action, slot] = index.replaceLRUWith(newContext, TimesliceId{current});
184 switch (action) {
187 index.associate(TimesliceId{current}, slot);
188 break;
192 break;
193 }
194
195 auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
196 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
197 return slot;
198 };
199}
200
202{
203 return [](ServiceRegistryRef, int64_t, InputSpan const&) -> bool { return false; };
204}
205
207{
208 return [](ServiceRegistryRef, int64_t, InputSpan const&) -> bool { return true; };
209}
210
212{
213 // find all the input routes which have timeframe data
214 // and store it in a vector for use inside the lambda
215 std::vector<InputRecord::InputPos> inputPositions;
216 std::vector<InputRecord::InputPos> optionalPositions;
217 size_t index = 0;
218 for (auto& route : routes) {
219 if (route.timeslice != 0) {
220 continue;
221 }
222 if (route.matcher.lifetime != Lifetime::Optional) {
223 LOGP(debug, "Lifetime of input route {} is not optional at position {}", route.matcher.binding, index);
224 inputPositions.push_back({index});
225 } else {
226 LOGP(debug, "Lifetime of input route {} is optional at position {}", route.matcher.binding, index);
227 optionalPositions.push_back({index});
228 }
229 index++;
230 }
231
232 return [inputPositions, optionalPositions, routes](ServiceRegistryRef, int64_t, InputSpan const& span) -> bool {
233 // Check if timeframe data is fully present.
234 // If yes, we expire the optional data.
235 // If not, we continue to wait for the data.
236 size_t requiredCount = 0;
237 size_t optionalCount = 0;
238 for (auto& inputPos : inputPositions) {
239 auto ref = InputRecord::getByPos(routes, span, inputPos.index, 0);
240 if (ref.header != nullptr) {
241 requiredCount++;
242 }
243 }
244 for (auto& inputPos : optionalPositions) {
245 auto ref = InputRecord::getByPos(routes, span, inputPos.index, 0);
246 if (ref.header != nullptr) {
247 optionalCount++;
248 }
249 }
250 LOGP(debug, "ExpireIfPresent: allRequired={}/{}, allOptional={}/{}", requiredCount, inputPositions.size(), optionalCount, optionalPositions.size());
251 return (requiredCount == inputPositions.size()) && (optionalCount != optionalPositions.size());
252 };
253}
254
256{
257 return [requestedLoopReason, &state](ServiceRegistryRef services, ChannelIndex) -> TimesliceSlot {
259 auto& index = services.get<TimesliceIndex>();
260 if ((state.loopReason & requestedLoopReason) == 0) {
261 LOGP(debug, "No expiration due to a loop event. Requested: {:b}, reported: {:b}, matching: {:b}",
262 requestedLoopReason,
263 state.loopReason,
264 requestedLoopReason & state.loopReason);
266 }
267 auto current = getCurrentTime();
268
269 // We first check if the current time is not already present
270 // FIXME: this should really be done by query matching? Ok
271 // for now to avoid duplicate entries.
272 for (size_t i = 0; i < index.size(); ++i) {
273 TimesliceSlot slot{i};
274 if (index.isValid(slot) == false) {
275 continue;
276 }
277 auto& variables = index.getVariablesForSlot(slot);
278 if (VariableContextHelpers::getTimeslice(variables).value == current) {
280 }
281 }
282
283 LOGP(debug, "Record was expired due to a loop event. Requested: {:b}, reported: {:b}, matching: {:b}",
284 requestedLoopReason,
285 state.loopReason,
286 requestedLoopReason & state.loopReason);
287
288 // If we are here the loop has triggered with the expected
289 // event so we need to create a slot.
291 newContext.put({0, static_cast<uint64_t>(current)});
292 newContext.commit();
293 auto [action, slot] = index.replaceLRUWith(newContext, TimesliceId{current});
294 switch (action) {
297 index.associate(TimesliceId{current}, slot);
298 break;
302 break;
303 }
304 return slot;
305 };
306}
307
309{
310 auto start = getCurrentTime();
311 auto last = std::make_shared<decltype(start)>(start);
312 return [last, period](ServiceRegistryRef, int64_t, InputSpan const&) -> bool {
313 auto current = getCurrentTime();
314 auto delta = current - *last;
315 if (delta > period.count()) {
316 *last = current;
317 return true;
318 }
319 return false;
320 };
321}
322
330
331// We simply put everything
332size_t readToBuffer(void* p, size_t size, size_t nmemb, void* userdata)
333{
334 if (nmemb == 0) {
335 return 0;
336 }
337 if (size == 0) {
338 return 0;
339 }
340 auto* buffer = (std::vector<char>*)userdata;
341 size_t oldSize = buffer->size();
342 buffer->resize(oldSize + nmemb * size);
343 memcpy(buffer->data() + oldSize, p, nmemb * size);
344 return size * nmemb;
345}
346
347// We simply put everything in a stringstream and read it afterwards.
348size_t readToMessage(void* p, size_t size, size_t nmemb, void* userdata)
349{
350 if (nmemb == 0) {
351 return 0;
352 }
353 if (size == 0) {
354 return 0;
355 }
356 auto* buffer = (o2::pmr::vector<char>*)userdata;
357 size_t oldSize = buffer->size();
358 buffer->resize(oldSize + nmemb * size);
359 memcpy(buffer->data() + oldSize, p, nmemb * size);
360 return size * nmemb;
361}
362
365 std::string const& channelName)
366{
367 return [spec, channelName](ServiceRegistryRef services, PartRef& ref, data_matcher::VariableContext&) -> void {
368 auto& rawDeviceService = services.get<RawDeviceService>();
369 auto device = rawDeviceService.device();
370
371 // Receive parts and put them in the PartRef
372 // we know this is not blocking because we were polled
373 // on the channel.
374 fair::mq::Parts parts;
375 device->Receive(parts, channelName, 0);
376 ref.header = std::move(parts.At(0));
377 ref.payload = std::move(parts.At(1));
378 };
379}
380
385{
387 throw runtime_error("fetchFromQARegistry: Not yet implemented");
388 return;
389 };
390}
391
396{
398 throw runtime_error("fetchFromObjectRegistry: Not yet implemented");
399 return;
400 };
401}
402
404ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const& matcher, std::string const& sourceChannel,
405 int64_t orbitOffset, int64_t orbitMultiplier)
406{
407 using counter_t = int64_t;
408 auto counter = std::make_shared<counter_t>(0);
409 return [matcher, counter, sourceChannel, orbitOffset, orbitMultiplier](ServiceRegistryRef services, PartRef& ref, data_matcher::VariableContext& variables) -> void {
410 // Get the ChannelIndex associated to a given channel name
411 auto& deviceProxy = services.get<FairMQDeviceProxy>();
412 auto channelIndex = deviceProxy.getInputChannelIndexByName(sourceChannel);
413 // We should invoke the handler only once.
414 assert(!ref.header);
415 assert(!ref.payload);
416
417 auto timestamp = VariableContextHelpers::getTimeslice(variables).value;
418 DataHeader dh;
419 dh.dataOrigin = matcher.origin;
420 dh.dataDescription = matcher.description;
421 dh.subSpecification = matcher.subSpec;
422 dh.payloadSize = sizeof(counter_t);
424 dh.tfCounter = timestamp;
425 try {
426 dh.runNumber = atoi(services.get<DataTakingContext>().runNumber.c_str());
427 } catch (...) {
428 dh.runNumber = 0;
429 }
430 dh.firstTForbit = timestamp * orbitMultiplier + orbitOffset;
431 DataProcessingHeader dph{timestamp, 1};
432 services.get<CallbackService>().call<CallbackService::Id::NewTimeslice>(dh, dph);
433
434 variables.put({data_matcher::FIRSTTFORBIT_POS, dh.firstTForbit});
435 variables.put({data_matcher::TFCOUNTER_POS, dh.tfCounter});
436 variables.put({data_matcher::RUNNUMBER_POS, dh.runNumber});
437 variables.put({data_matcher::STARTTIME_POS, dph.startTime});
438 variables.put({data_matcher::CREATIONTIME_POS, dph.creation});
439
440 auto&& transport = deviceProxy.getInputChannel(channelIndex)->Transport();
441 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
442 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
443 O2_SIGNPOST_ID_FROM_POINTER(hid, parts, header->GetData());
444 O2_SIGNPOST_START(parts, hid, "parts", "Enumerating part %p with timestamp %zu", header->GetData(), timestamp);
445 ref.header = std::move(header);
446
447 auto payload = transport->CreateMessage(sizeof(counter_t));
448 *(counter_t*)payload->GetData() = *counter;
449 ref.payload = std::move(payload);
450 (*counter)++;
451 };
452}
453
455ExpirationHandler::Handler LifetimeHelpers::dummy(ConcreteDataMatcher const& matcher, std::string const& sourceChannel)
456{
457 using counter_t = int64_t;
458 auto counter = std::make_shared<counter_t>(0);
459 auto f = [matcher, counter, sourceChannel](ServiceRegistryRef services, PartRef& ref, data_matcher::VariableContext& variables) -> void {
460 // We should invoke the handler only once.
461 assert(!ref.header);
462 assert(!ref.payload);
463 // Get the ChannelIndex associated to a given channel name
464 auto& deviceProxy = services.get<FairMQDeviceProxy>();
465 auto channelIndex = deviceProxy.getInputChannelIndexByName(sourceChannel);
466
467 auto timestamp = VariableContextHelpers::getTimeslice(variables).value;
468 DataHeader dh;
469 dh.dataOrigin = matcher.origin;
470 dh.dataDescription = matcher.description;
471 dh.subSpecification = matcher.subSpec;
472 dh.payloadSize = 0;
474
475 {
476 auto pval = std::get_if<uint32_t>(&variables.get(data_matcher::FIRSTTFORBIT_POS));
477 if (pval == nullptr) {
478 dh.firstTForbit = -1;
479 } else {
480 dh.firstTForbit = *pval;
481 }
482 }
483 {
484 auto pval = std::get_if<uint32_t>(&variables.get(data_matcher::TFCOUNTER_POS));
485 if (pval == nullptr) {
486 dh.tfCounter = timestamp;
487 } else {
488 dh.tfCounter = *pval;
489 }
490 }
491
492 DataProcessingHeader dph{timestamp, 1};
493
494 auto&& transport = deviceProxy.getInputChannel(channelIndex)->Transport();
495 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
496 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
497 ref.header = std::move(header);
498 O2_SIGNPOST_ID_FROM_POINTER(hid, parts, header->GetData());
499 O2_SIGNPOST_START(parts, hid, "parts", "Enumerating part %p with timestamp %zu", header->GetData(), timestamp);
500 auto payload = transport->CreateMessage(0);
501 ref.payload = std::move(payload);
502 };
503 return f;
504}
505
506std::ostream& operator<<(std::ostream& oss, Lifetime const& val)
507{
508 oss << fmt::format("{}", val);
509 return oss;
510}
511
512} // namespace o2::framework
benchmark::State & state
int32_t i
Header to collect LHC related constants.
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
std::ostringstream debug
ChannelIndex getInputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
virtual fair::mq::Device * device()=0
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLfloat * val
Definition glcorearb.h:1582
GLboolean r
Definition glcorearb.h:1233
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLuint counter
Definition glcorearb.h:3987
@ CREATIONTIME_POS
The DataHeader::runNumber associated to the timeslice.
@ FIRSTTFORBIT_POS
The DataHeader::tfCounter associated to the timeslice.
@ TFCOUNTER_POS
The DataProcessingHeader::startTime associated to the timeslice.
@ RUNNUMBER_POS
The DataHeader::firstTForbit associated to the timeslice.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
size_t readToMessage(void *p, size_t size, size_t nmemb, void *userdata)
RuntimeErrorRef runtime_error(const char *)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
size_t readToBuffer(void *p, size_t size, size_t nmemb, void *userdata)
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
unsigned long long ULL
header::DataHeader::SubSpecificationType subSpec
std::string runNumber
The current run number.
Running state information of a given device.
Definition DeviceState.h:34
std::function< bool(ServiceRegistryRef, uint64_t timestamp, InputSpan const &record)> Checker
Callback type to check if the record must be expired.
std::function< void(ServiceRegistryRef, PartRef &expiredInput, data_matcher::VariableContext &variables)> Handler
Callback type to actually materialise a given record.
std::function< TimesliceSlot(ServiceRegistryRef, ChannelIndex)> Creator
static ExpirationHandler::Handler dummy(ConcreteDataMatcher const &spec, std::string const &sourceChannel)
Create a dummy message with the provided ConcreteDataMatcher.
static ExpirationHandler::Handler fetchFromQARegistry()
static ExpirationHandler::Creator enumDrivenCreation(size_t first, size_t last, size_t step, size_t inputTimeslice, size_t maxTimeSliceId, size_t repetitions)
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const &spec, std::string const &sourceChannel, int64_t orbitOffset, int64_t orbitMultiplier)
Enumerate entries on every invokation.
static ExpirationHandler::Creator timeDrivenCreation(std::vector< std::chrono::microseconds > periods, std::vector< std::chrono::seconds > intervals, std::function< bool(void)> hasTimerFired, std::function< void(uint64_t, uint64_t)> updateTimerPeriod)
static ExpirationHandler::Creator dataDrivenCreation()
Callback which does nothing, waiting for data to arrive.
static ExpirationHandler::Checker expireAlways()
static ExpirationHandler::Handler fetchFromObjectRegistry()
static ExpirationHandler::Checker expireNever()
static ExpirationHandler::Creator uvDrivenCreation(int loopReason, DeviceState &state)
Callback which creates a new timeslice whenever some libuv event happens.
static ExpirationHandler::Handler fetchFromFairMQ(InputSpec const &spec, std::string const &channelName)
static ExpirationHandler::Handler doNothing()
static ExpirationHandler::Checker expireIfPresent(std::vector< InputRoute > const &schema, ConcreteDataMatcher matcher)
static ExpirationHandler::Checker expireTimed(std::chrono::microseconds period)
Reference to an inflight part.
Definition PartRef.h:24
static constexpr uint64_t INVALID
static constexpr uint64_t ANY
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
RunNumberType runNumber
Definition DataHeader.h:684
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"