41#include <fairmq/Device.h>
56size_t getCurrentTime()
58 auto now = std::chrono::system_clock::now();
59 auto duration = now.time_since_epoch();
60 return std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
73 size_t firstTimeslice =
start + inputTimeslice * step;
74 auto repetition = std::make_shared<size_t>(0);
79 if (decongestion.nextEnumerationTimeslice == 0) {
80 decongestion.nextEnumerationTimeslice = firstTimeslice;
83 for (
size_t si = 0; si <
index.size(); si++) {
84 if (decongestion.nextEnumerationTimeslice >
end) {
85 LOGP(
debug,
"Last greater than end");
89 if (
index.isValid(slot) ==
false) {
90 TimesliceId timestamp{decongestion.nextEnumerationTimeslice};
92 if (*repetition % maxRepetitions == 0) {
93 decongestion.nextEnumerationTimeslice += step * maxInputTimeslices;
95 LOGP(
debug,
"Associating timestamp {} to slot {}", timestamp.value, slot.index);
96 index.associate(timestamp, slot);
100 LOG(
debug) <<
"Oldest possible input is " << decongestion.nextEnumerationTimeslice;
102 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
107 LOGP(
debug,
"No slots available");
114 std::shared_ptr<bool> stablePeriods = std::make_shared<bool>(
false);
129 bool timerHasFired = hasTimerFired();
130 if (decongestion.nextEnumerationTimeslice == 0
ULL || (
index.didReceiveData() ==
false && timerHasFired)) {
131 std::random_device
r;
132 std::default_random_engine e1(
r());
133 std::uniform_int_distribution<uint64_t> dist(0, periods.front().count() * 0.9);
134 auto randomizedPeriodUs =
static_cast<int64_t
>(dist(e1) + periods.front().count() * 0.1);
135 decongestion.nextEnumerationTimeslice = getCurrentTime() - randomizedPeriodUs;
136 updateTimerPeriod(randomizedPeriodUs / 1000, randomizedPeriodUs / 1000);
137 *stablePeriods =
false;
138 LOG(
debug) <<
"Timer updated to a randomized period of " << randomizedPeriodUs <<
"us";
139 }
else if (timerHasFired && *stablePeriods ==
false) {
140 updateTimerPeriod(periods.front().count() / 1000, periods.front().count() / 1000);
141 *stablePeriods =
true;
142 LOG(
debug) <<
"Timer updated to a stable period of " << periods.front().count() <<
"us";
145 if (timerHasFired ==
false) {
146 [[maybe_unused]]
auto newOldest =
index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
147 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
151 static auto firstTime = getCurrentTime();
152 auto current = getCurrentTime();
153 if ((current - firstTime) / 1000000 > intervals.front().count() && periods.size() > 1) {
154 LOGP(detail,
"First {} seconds with period {} elapsed, switching to new interval.", intervals.front().count(), periods.front().count());
156 periods.erase(periods.begin());
157 intervals.erase(intervals.begin());
158 LOGP(detail,
"New period for timer is {}.", periods.front().count());
159 updateTimerPeriod(periods.front().count() / 1000, periods.front().count() / 1000);
164 for (
size_t i = 0;
i <
index.size(); ++
i) {
166 if (
index.isValid(slot) ==
false) {
169 auto& variables =
index.getVariablesForSlot(slot);
171 [[maybe_unused]]
auto newOldest =
index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
172 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
177 decongestion.nextEnumerationTimeslice = current;
181 newContext.
put({0,
static_cast<uint64_t
>(current)});
183 auto [action, slot] =
index.replaceLRUWith(newContext,
TimesliceId{current});
195 auto newOldest =
index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
196 index.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
215 std::vector<InputRecord::InputPos> inputPositions;
216 std::vector<InputRecord::InputPos> optionalPositions;
218 for (
auto& route : routes) {
219 if (route.timeslice != 0) {
222 if (route.matcher.lifetime != Lifetime::Optional) {
223 LOGP(
debug,
"Lifetime of input route {} is not optional at position {}", route.matcher.binding,
index);
224 inputPositions.push_back({
index});
226 LOGP(
debug,
"Lifetime of input route {} is optional at position {}", route.matcher.binding,
index);
227 optionalPositions.push_back({
index});
236 size_t requiredCount = 0;
237 size_t optionalCount = 0;
238 for (
auto& inputPos : inputPositions) {
240 if (
ref.header !=
nullptr) {
244 for (
auto& inputPos : optionalPositions) {
246 if (
ref.header !=
nullptr) {
250 LOGP(
debug,
"ExpireIfPresent: allRequired={}/{}, allOptional={}/{}", requiredCount, inputPositions.size(), optionalCount, optionalPositions.size());
251 return (requiredCount == inputPositions.size()) && (optionalCount != optionalPositions.size());
260 if ((
state.loopReason & requestedLoopReason) == 0) {
261 LOGP(
debug,
"No expiration due to a loop event. Requested: {:b}, reported: {:b}, matching: {:b}",
264 requestedLoopReason &
state.loopReason);
267 auto current = getCurrentTime();
272 for (
size_t i = 0;
i <
index.size(); ++
i) {
274 if (
index.isValid(slot) ==
false) {
277 auto& variables =
index.getVariablesForSlot(slot);
283 LOGP(
debug,
"Record was expired due to a loop event. Requested: {:b}, reported: {:b}, matching: {:b}",
286 requestedLoopReason &
state.loopReason);
291 newContext.
put({0,
static_cast<uint64_t
>(current)});
293 auto [action, slot] =
index.replaceLRUWith(newContext,
TimesliceId{current});
310 auto start = getCurrentTime();
311 auto last = std::make_shared<decltype(start)>(
start);
313 auto current = getCurrentTime();
314 auto delta = current - *last;
315 if (delta > period.count()) {
340 auto*
buffer = (std::vector<char>*)userdata;
341 size_t oldSize =
buffer->size();
343 memcpy(
buffer->data() + oldSize, p, nmemb *
size);
357 size_t oldSize =
buffer->size();
359 memcpy(
buffer->data() + oldSize, p, nmemb *
size);
365 std::string
const& channelName)
369 auto device = rawDeviceService.
device();
374 fair::mq::Parts parts;
375 device->Receive(parts, channelName, 0);
376 ref.header = std::move(parts.At(0));
377 ref.payload = std::move(parts.At(1));
387 throw runtime_error(
"fetchFromQARegistry: Not yet implemented");
398 throw runtime_error(
"fetchFromObjectRegistry: Not yet implemented");
405 int64_t orbitOffset, int64_t orbitMultiplier)
407 using counter_t = int64_t;
408 auto counter = std::make_shared<counter_t>(0);
415 assert(!
ref.payload);
430 dh.
firstTForbit = timestamp * orbitMultiplier + orbitOffset;
432 services.get<
CallbackService>().call<CallbackService::Id::NewTimeslice>(dh, dph);
440 auto&& transport = deviceProxy.getInputChannel(channelIndex)->Transport();
441 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
444 O2_SIGNPOST_START(parts, hid,
"parts",
"Enumerating part %p with timestamp %zu", header->GetData(), timestamp);
445 ref.header = std::move(header);
447 auto payload = transport->CreateMessage(
sizeof(counter_t));
448 *(counter_t*)payload->GetData() = *
counter;
449 ref.payload = std::move(payload);
457 using counter_t = int64_t;
458 auto counter = std::make_shared<counter_t>(0);
462 assert(!
ref.payload);
477 if (pval ==
nullptr) {
485 if (pval ==
nullptr) {
494 auto&& transport = deviceProxy.getInputChannel(channelIndex)->Transport();
495 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
497 ref.header = std::move(header);
499 O2_SIGNPOST_START(parts, hid,
"parts",
"Enumerating part %p with timestamp %zu", header->GetData(), timestamp);
500 auto payload = transport->CreateMessage(0);
501 ref.payload = std::move(payload);
508 oss << fmt::format(
"{}",
val);
Header to collect LHC related constants.
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_START(log, id, name, format,...)
ChannelIndex getInputChannelIndexByName(std::string const &channelName) const
ChannelIndex from a given channel name.
virtual fair::mq::Device * device()=0
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void put(ContextUpdate &&update)
GLsizei const GLfloat * value
@ CREATIONTIME_POS
The DataHeader::runNumber associated to the timeslice.
@ FIRSTTFORBIT_POS
The DataHeader::tfCounter associated to the timeslice.
@ TFCOUNTER_POS
The DataProcessingHeader::startTime associated to the timeslice.
@ RUNNUMBER_POS
The DataHeader::firstTForbit associated to the timeslice.
Defining PrimaryVertex explicitly as messageable.
size_t readToMessage(void *p, size_t size, size_t nmemb, void *userdata)
RuntimeErrorRef runtime_error(const char *)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
size_t readToBuffer(void *p, size_t size, size_t nmemb, void *userdata)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
header::DataDescription description
header::DataHeader::SubSpecificationType subSpec
header::DataOrigin origin
std::string runNumber
The current run number.
Running state information of a given device.
std::function< bool(ServiceRegistryRef, uint64_t timestamp, InputSpan const &record)> Checker
Callback type to check if the record must be expired.
std::function< void(ServiceRegistryRef, PartRef &expiredInput, data_matcher::VariableContext &variables)> Handler
Callback type to actually materialise a given record.
std::function< TimesliceSlot(ServiceRegistryRef, ChannelIndex)> Creator
static ExpirationHandler::Handler dummy(ConcreteDataMatcher const &spec, std::string const &sourceChannel)
Create a dummy message with the provided ConcreteDataMatcher.
static ExpirationHandler::Handler fetchFromQARegistry()
static ExpirationHandler::Creator enumDrivenCreation(size_t first, size_t last, size_t step, size_t inputTimeslice, size_t maxTimeSliceId, size_t repetitions)
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const &spec, std::string const &sourceChannel, int64_t orbitOffset, int64_t orbitMultiplier)
Enumerate entries on every invokation.
static ExpirationHandler::Creator timeDrivenCreation(std::vector< std::chrono::microseconds > periods, std::vector< std::chrono::seconds > intervals, std::function< bool(void)> hasTimerFired, std::function< void(uint64_t, uint64_t)> updateTimerPeriod)
static ExpirationHandler::Creator dataDrivenCreation()
Callback which does nothing, waiting for data to arrive.
static ExpirationHandler::Checker expireAlways()
static ExpirationHandler::Handler fetchFromObjectRegistry()
static ExpirationHandler::Checker expireNever()
static ExpirationHandler::Creator uvDrivenCreation(int loopReason, DeviceState &state)
Callback which creates a new timeslice whenever some libuv event happens.
static ExpirationHandler::Handler fetchFromFairMQ(InputSpec const &spec, std::string const &channelName)
static ExpirationHandler::Handler doNothing()
static ExpirationHandler::Checker expireIfPresent(std::vector< InputRoute > const &schema, ConcreteDataMatcher matcher)
static ExpirationHandler::Checker expireTimed(std::chrono::microseconds period)
Reference to an inflight part.
static constexpr uint64_t INVALID
static constexpr uint64_t ANY
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"