35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
37#include <fairmq/States.h>
49static const std::string GPURecoWorkflowSpec_FMQCallbackKey =
"GPURecoWorkflowSpec_FMQCallbackKey";
65 mPipeline->fmqDevice->SubscribeToStateChange(GPURecoWorkflowSpec_FMQCallbackKey, [
this](fair::mq::State s) { receiveFMQStateCallback(s); });
67 std::unique_lock lk(mPipeline->completionPolicyMutex);
68 mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
69 return !mPipeline->completionPolicyQueue.empty() && mPipeline->completionPolicyQueue.front() == timeslice;
71 mPipeline->receiveThread = std::thread([
this]() { RunReceiveThread(); });
72 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
73 mPipeline->workers[
i].thread = std::thread([
this,
i]() { RunWorkerThread(
i); });
78void GPURecoWorkflowSpec::RunWorkerThread(int32_t
id)
80 LOG(
debug) <<
"Running pipeline worker " <<
id;
81 auto& workerContext = mPipeline->workers[
id];
82 while (!mPipeline->shouldTerminate) {
85 std::unique_lock lk(workerContext.inputQueueMutex);
86 workerContext.inputQueueNotify.wait(lk, [
this, &workerContext]() {
return mPipeline->shouldTerminate || !workerContext.inputQueue.empty(); });
87 if (workerContext.inputQueue.empty()) {
90 context = workerContext.inputQueue.front();
91 workerContext.inputQueue.pop();
105 std::unique_lock lk(mPipeline->mayInjectMutex);
106 mPipeline->mayInjectCondition.wait(lk, [
this, context]() {
return mPipeline->mayInject && mPipeline->mayInjectTFId == context->
mTFId; });
107 mPipeline->mayInjectTFId = mPipeline->mayInjectTFId + 1;
108 mPipeline->mayInject =
false;
127 std::lock_guard lk(mPipeline->mayInjectMutex);
128 mPipeline->mayInject =
true;
130 mPipeline->mayInjectCondition.notify_one();
133 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
136 std::lock_guard lk(mPipeline->workers[mNextThreadIndex].inputQueueMutex);
137 mPipeline->workers[mNextThreadIndex].inputQueue.emplace(context);
139 mPipeline->workers[mNextThreadIndex].inputQueueNotify.notify_one();
155 mPipeline->runStarted =
true;
156 mPipeline->stateNotify.notify_all();
161 std::unique_lock lk(mPipeline->queueMutex);
162 mPipeline->queueNotify.wait(lk, [
this] {
return !mPipeline->pipelineQueue.empty(); });
163 context = std::move(mPipeline->pipelineQueue.front());
164 mPipeline->pipelineQueue.pop();
167 if (context->timeSliceId != tinfo.timeslice) {
168 LOG(fatal) <<
"Prepare message for incorrect time frame received, time frames seem out of sync";
171 tpcZSmeta = std::move(context->tpcZSmeta);
172 tpcZS = context->tpcZS;
176 std::lock_guard lk(mPipeline->completionPolicyMutex);
177 if (mPipeline->completionPolicyQueue.empty() || mPipeline->completionPolicyQueue.front() != tinfo.timeslice) {
178 LOG(fatal) <<
"Time frame processed does not equal the timeframe at the top of the queue, time frames seem out of sync";
180 mPipeline->completionPolicyQueue.pop();
186 size_t ptrsTotal = 0;
187 const void* firstPtr =
nullptr;
198 std::vector<size_t> messageBuffer(prepareBufferSize /
sizeof(
size_t));
206 size_t* ptrBuffer = messageBuffer.data() +
sizeof(preMessage) /
sizeof(
size_t);
207 size_t ptrsCopied = 0;
208 int32_t lastRegion = -1;
214 bool regionFound = lastRegion != -1 && (size_t)curPtr >= (
size_t)mRegionInfos[lastRegion].ptr && (size_t)curPtr < (
size_t)mRegionInfos[lastRegion].ptr + mRegionInfos[lastRegion].size;
216 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
217 if ((
size_t)curPtr >= (size_t)mRegionInfos[l].
ptr && (
size_t)curPtr < (size_t)mRegionInfos[l].
ptr + mRegionInfos[l].
size) {
225 LOG(fatal) <<
"Found a TPC ZS pointer outside of shared memory";
227 ptrBuffer[ptrsCopied + k] = (size_t)curPtr - (
size_t)mRegionInfos[lastRegion].ptr;
229 ptrBuffer[2 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].managed;
230 ptrBuffer[3 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].id;
236 auto channel = device->GetChannels().find(
"gpu-prepare-channel");
237 fair::mq::MessagePtr payload(device->NewMessage());
238 LOG(info) <<
"Sending gpu-reco-workflow prepare message of size " << prepareBufferSize;
239 payload->Rebuild(messageBuffer.data(), prepareBufferSize,
nullptr,
nullptr);
240 channel->second[0].Send(payload);
249 mPipeline->endOfStreamDplReceived =
true;
250 mPipeline->stateNotify.notify_all();
256 auto channel = device->GetChannels().find(
"gpu-prepare-channel");
257 fair::mq::MessagePtr payload(device->NewMessage());
258 LOG(info) <<
"Sending end-of-stream message over out-of-bands channel";
259 payload->Rebuild(&preMessage,
sizeof(preMessage),
nullptr,
nullptr);
260 channel->second[0].Send(payload);
264void GPURecoWorkflowSpec::handlePipelineStop()
267 mPipeline->mayInjectTFId = 0;
271void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
274 std::lock_guard lk(mPipeline->stateMutex);
275 if (mPipeline->fmqState != fair::mq::State::Running && newState == fair::mq::State::Running) {
276 mPipeline->endOfStreamAsyncReceived =
false;
277 mPipeline->endOfStreamDplReceived =
false;
279 mPipeline->fmqPreviousState = mPipeline->fmqState;
280 mPipeline->fmqState = newState;
281 if (newState == fair::mq::State::Exiting) {
282 mPipeline->fmqDevice->UnsubscribeFromStateChange(GPURecoWorkflowSpec_FMQCallbackKey);
285 mPipeline->stateNotify.notify_all();
288void GPURecoWorkflowSpec::RunReceiveThread()
290 auto* device = mPipeline->fmqDevice;
291 while (!mPipeline->shouldTerminate) {
292 bool received =
false;
293 int32_t recvTimeot = 1000;
294 fair::mq::MessagePtr
msg;
295 LOG(
debug) <<
"Waiting for out of band message";
296 auto shouldReceive = [
this]() {
return ((mPipeline->fmqState == fair::mq::State::Running || (mPipeline->fmqState == fair::mq::State::Ready && mPipeline->fmqPreviousState == fair::mq::State::Running)) && !mPipeline->endOfStreamAsyncReceived); };
299 std::unique_lock lk(mPipeline->stateMutex);
300 mPipeline->stateNotify.wait(lk, [
this, shouldReceive]() {
return shouldReceive() || mPipeline->shouldTerminate; });
302 if (mPipeline->shouldTerminate) {
307 std::unique_lock lk(mPipeline->stateMutex);
308 if (!shouldReceive()) {
311 msg = device->NewMessageFor(
"gpu-prepare-channel", 0, 0);
312 received = device->Receive(
msg,
"gpu-prepare-channel", 0, recvTimeot) > 0;
313 }
while (!received && !mPipeline->shouldTerminate);
317 }
while (!received && !mPipeline->shouldTerminate);
318 if (mPipeline->shouldTerminate) {
325 if (
m->magicWord !=
m->MAGIC_WORD) {
326 LOG(fatal) <<
"Prepare message corrupted, invalid magic word";
328 if (
m->flagEndOfStream) {
329 LOG(info) <<
"Received end-of-stream from out-of-band channel";
330 std::lock_guard lk(mPipeline->stateMutex);
331 mPipeline->endOfStreamAsyncReceived =
true;
332 mPipeline->mNTFReceived = 0;
333 mPipeline->runStarted =
false;
338 std::lock_guard lk(mPipeline->completionPolicyMutex);
339 mPipeline->completionPolicyQueue.emplace(
m->timeSliceId);
341 mPipeline->completionPolicyNotify.notify_one();
344 std::unique_lock lk(mPipeline->stateMutex);
345 mPipeline->stateNotify.wait(lk, [
this]() {
return (mPipeline->runStarted && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; });
346 if (!mPipeline->runStarted) {
351 auto context = std::make_unique<GPURecoWorkflow_QueueObject>();
352 context->timeSliceId =
m->timeSliceId;
353 context->tfSettings =
m->tfSettings;
355 size_t ptrsCopied = 0;
357 context->tpcZSmeta.Pointers[0][0].resize(
m->pointersTotal);
358 context->tpcZSmeta.Sizes[0][0].resize(
m->pointersTotal);
359 int32_t lastRegion = -1;
362 context->tpcZS.sector[
i].count[
j] =
m->pointerCounts[
i][
j];
363 for (uint32_t k = 0; k < context->tpcZS.sector[
i].count[
j]; k++) {
364 bool regionManaged = ptrBuffer[2 *
m->pointersTotal + ptrsCopied + k];
365 size_t regionId = ptrBuffer[3 *
m->pointersTotal + ptrsCopied + k];
366 bool regionFound = lastRegion != -1 && mRegionInfos[lastRegion].managed == regionManaged && mRegionInfos[lastRegion].id == regionId;
368 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
369 if (mRegionInfos[l].managed == regionManaged && mRegionInfos[l].
id == regionId) {
377 LOG(fatal) <<
"Received ZS Ptr for SHM region (managed " << (int32_t)regionManaged <<
", id " << regionId <<
"), which was not registered for us";
379 context->tpcZSmeta.Pointers[0][0][ptrsCopied + k] = (
void*)(ptrBuffer[ptrsCopied + k] + (
size_t)mRegionInfos[lastRegion].ptr);
380 context->tpcZSmeta.Sizes[0][0][ptrsCopied + k] = ptrBuffer[
m->pointersTotal + ptrsCopied + k];
382 context->tpcZS.sector[
i].zsPtr[
j] = context->tpcZSmeta.Pointers[0][0].data() + ptrsCopied;
383 context->tpcZS.sector[
i].nZSPtr[
j] = context->tpcZSmeta.Sizes[0][0].data() + ptrsCopied;
384 ptrsCopied += context->tpcZS.sector[
i].count[
j];
387 context->ptrs.tpcZS = &context->tpcZS;
388 context->ptrs.settingsTF = &context->tfSettings;
389 context->mTFId = mPipeline->mNTFReceived;
390 if (mPipeline->mNTFReceived++ >= mPipeline->workers.size()) {
391 enqueuePipelinedJob(&context->ptrs,
nullptr, context.get(),
false);
394 std::lock_guard lk(mPipeline->queueMutex);
395 mPipeline->pipelineQueue.emplace(std::move(context));
397 mPipeline->queueNotify.notify_one();
399 mPipeline->pipelineSenderTerminating =
true;
400 mPipeline->completionPolicyNotify.notify_one();
403void GPURecoWorkflowSpec::ExitPipeline()
406 mPipeline->fmqDevice =
nullptr;
407 mPipeline->shouldTerminate =
true;
408 mPipeline->stateNotify.notify_all();
409 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
410 mPipeline->workers[
i].inputQueueNotify.notify_one();
412 if (mPipeline->receiveThread.joinable()) {
413 mPipeline->receiveThread.join();
415 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
416 if (mPipeline->workers[
i].thread.joinable()) {
417 mPipeline->workers[
i].thread.join();
Type wrappers for enfording a specific serialization method.
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::framework::Outputs outputs()
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
int32_t enableDoublePipeline
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
std::mutex jobInputFinalMutex
std::unique_ptr< GPUInterfaceInputUpdate > jobInputUpdateCallback
std::condition_variable jobInputFinalNotify
std::condition_variable jobFinishedNotify
GPUInterfaceOutputs * jobOutputRegions
std::mutex jobFinishedMutex
GPUTrackingInOutPointers * jobPtrs
static constexpr size_t MAGIC_WORD
size_t pointerCounts[GPUTrackingInOutZS::NSECTORS][GPUTrackingInOutZS::NENDPOINTS]
DataProcessingHeader::StartTime timeSliceId
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg