35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
37#include <fairmq/States.h>
49static const std::string GPURecoWorkflowSpec_FMQCallbackKey =
"GPURecoWorkflowSpec_FMQCallbackKey";
65 mPipeline->fmqDevice->SubscribeToStateChange(GPURecoWorkflowSpec_FMQCallbackKey, [
this](fair::mq::State s) { receiveFMQStateCallback(s); });
67 std::unique_lock lk(mPipeline->completionPolicyMutex);
68 mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
69 if (mPipeline->completionPolicyQueue.front() == timeslice) {
70 mPipeline->completionPolicyQueue.pop();
75 mPipeline->receiveThread = std::thread([
this]() { RunReceiveThread(); });
76 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
77 mPipeline->workers[
i].thread = std::thread([
this,
i]() { RunWorkerThread(
i); });
82void GPURecoWorkflowSpec::RunWorkerThread(int32_t
id)
84 LOG(
debug) <<
"Running pipeline worker " <<
id;
85 auto& workerContext = mPipeline->workers[
id];
86 while (!mPipeline->shouldTerminate) {
89 std::unique_lock lk(workerContext.inputQueueMutex);
90 workerContext.inputQueueNotify.wait(lk, [
this, &workerContext]() {
return mPipeline->shouldTerminate || !workerContext.inputQueue.empty(); });
91 if (workerContext.inputQueue.empty()) {
94 context = workerContext.inputQueue.front();
95 workerContext.inputQueue.pop();
109 std::unique_lock lk(mPipeline->mayInjectMutex);
110 mPipeline->mayInjectCondition.wait(lk, [
this, context]() {
return mPipeline->mayInject && mPipeline->mayInjectTFId == context->
mTFId; });
111 mPipeline->mayInjectTFId = mPipeline->mayInjectTFId + 1;
112 mPipeline->mayInject =
false;
131 std::lock_guard lk(mPipeline->mayInjectMutex);
132 mPipeline->mayInject =
true;
134 mPipeline->mayInjectCondition.notify_one();
137 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
140 std::lock_guard lk(mPipeline->workers[mNextThreadIndex].inputQueueMutex);
141 mPipeline->workers[mNextThreadIndex].inputQueue.emplace(context);
143 mPipeline->workers[mNextThreadIndex].inputQueueNotify.notify_one();
159 mPipeline->runStarted =
true;
160 mPipeline->stateNotify.notify_all();
165 std::unique_lock lk(mPipeline->queueMutex);
166 mPipeline->queueNotify.wait(lk, [
this] {
return !mPipeline->pipelineQueue.empty(); });
167 context = std::move(mPipeline->pipelineQueue.front());
168 mPipeline->pipelineQueue.pop();
171 if (context->timeSliceId != tinfo.timeslice) {
172 LOG(fatal) <<
"Prepare message for incorrect time frame received, time frames seem out of sync";
175 tpcZSmeta = std::move(context->tpcZSmeta);
176 tpcZS = context->tpcZS;
182 size_t ptrsTotal = 0;
183 const void* firstPtr =
nullptr;
194 std::vector<size_t> messageBuffer(prepareBufferSize /
sizeof(
size_t));
202 size_t* ptrBuffer = messageBuffer.data() +
sizeof(preMessage) /
sizeof(
size_t);
203 size_t ptrsCopied = 0;
204 int32_t lastRegion = -1;
210 bool regionFound = lastRegion != -1 && (size_t)curPtr >= (
size_t)mRegionInfos[lastRegion].ptr && (size_t)curPtr < (
size_t)mRegionInfos[lastRegion].ptr + mRegionInfos[lastRegion].size;
212 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
213 if ((
size_t)curPtr >= (size_t)mRegionInfos[l].
ptr && (
size_t)curPtr < (size_t)mRegionInfos[l].
ptr + mRegionInfos[l].
size) {
221 LOG(fatal) <<
"Found a TPC ZS pointer outside of shared memory";
223 ptrBuffer[ptrsCopied + k] = (size_t)curPtr - (
size_t)mRegionInfos[lastRegion].ptr;
225 ptrBuffer[2 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].managed;
226 ptrBuffer[3 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].id;
232 auto channel = device->GetChannels().find(
"gpu-prepare-channel");
233 fair::mq::MessagePtr payload(device->NewMessage());
234 LOG(info) <<
"Sending gpu-reco-workflow prepare message of size " << prepareBufferSize;
235 payload->Rebuild(messageBuffer.data(), prepareBufferSize,
nullptr,
nullptr);
236 channel->second[0].Send(payload);
245 mPipeline->endOfStreamDplReceived =
true;
246 mPipeline->stateNotify.notify_all();
252 auto channel = device->GetChannels().find(
"gpu-prepare-channel");
253 fair::mq::MessagePtr payload(device->NewMessage());
254 LOG(info) <<
"Sending end-of-stream message over out-of-bands channel";
255 payload->Rebuild(&preMessage,
sizeof(preMessage),
nullptr,
nullptr);
256 channel->second[0].Send(payload);
260void GPURecoWorkflowSpec::handlePipelineStop()
263 mPipeline->mayInjectTFId = 0;
267void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
270 std::lock_guard lk(mPipeline->stateMutex);
271 if (mPipeline->fmqState != fair::mq::State::Running && newState == fair::mq::State::Running) {
272 mPipeline->endOfStreamAsyncReceived =
false;
273 mPipeline->endOfStreamDplReceived =
false;
275 mPipeline->fmqPreviousState = mPipeline->fmqState;
276 mPipeline->fmqState = newState;
277 if (newState == fair::mq::State::Exiting) {
278 mPipeline->fmqDevice->UnsubscribeFromStateChange(GPURecoWorkflowSpec_FMQCallbackKey);
281 mPipeline->stateNotify.notify_all();
284void GPURecoWorkflowSpec::RunReceiveThread()
286 auto* device = mPipeline->fmqDevice;
287 while (!mPipeline->shouldTerminate) {
288 bool received =
false;
289 int32_t recvTimeot = 1000;
290 fair::mq::MessagePtr
msg;
291 LOG(
debug) <<
"Waiting for out of band message";
292 auto shouldReceive = [
this]() {
return ((mPipeline->fmqState == fair::mq::State::Running || (mPipeline->fmqState == fair::mq::State::Ready && mPipeline->fmqPreviousState == fair::mq::State::Running)) && !mPipeline->endOfStreamAsyncReceived); };
295 std::unique_lock lk(mPipeline->stateMutex);
296 mPipeline->stateNotify.wait(lk, [
this, shouldReceive]() {
return shouldReceive() || mPipeline->shouldTerminate; });
298 if (mPipeline->shouldTerminate) {
303 std::unique_lock lk(mPipeline->stateMutex);
304 if (!shouldReceive()) {
307 msg = device->NewMessageFor(
"gpu-prepare-channel", 0, 0);
308 received = device->Receive(
msg,
"gpu-prepare-channel", 0, recvTimeot) > 0;
309 }
while (!received && !mPipeline->shouldTerminate);
313 }
while (!received && !mPipeline->shouldTerminate);
314 if (mPipeline->shouldTerminate) {
321 if (
m->magicWord !=
m->MAGIC_WORD) {
322 LOG(fatal) <<
"Prepare message corrupted, invalid magic word";
324 if (
m->flagEndOfStream) {
325 LOG(info) <<
"Received end-of-stream from out-of-band channel";
326 std::lock_guard lk(mPipeline->stateMutex);
327 mPipeline->endOfStreamAsyncReceived =
true;
328 mPipeline->mNTFReceived = 0;
329 mPipeline->runStarted =
false;
334 std::lock_guard lk(mPipeline->completionPolicyMutex);
335 mPipeline->completionPolicyQueue.emplace(
m->timeSliceId);
337 mPipeline->completionPolicyNotify.notify_one();
340 std::unique_lock lk(mPipeline->stateMutex);
341 mPipeline->stateNotify.wait(lk, [
this]() {
return (mPipeline->runStarted && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; });
342 if (!mPipeline->runStarted) {
347 auto context = std::make_unique<GPURecoWorkflow_QueueObject>();
348 context->timeSliceId =
m->timeSliceId;
349 context->tfSettings =
m->tfSettings;
351 size_t ptrsCopied = 0;
353 context->tpcZSmeta.Pointers[0][0].resize(
m->pointersTotal);
354 context->tpcZSmeta.Sizes[0][0].resize(
m->pointersTotal);
355 int32_t lastRegion = -1;
358 context->tpcZS.sector[
i].count[
j] =
m->pointerCounts[
i][
j];
359 for (uint32_t k = 0; k < context->tpcZS.sector[
i].count[
j]; k++) {
360 bool regionManaged = ptrBuffer[2 *
m->pointersTotal + ptrsCopied + k];
361 size_t regionId = ptrBuffer[3 *
m->pointersTotal + ptrsCopied + k];
362 bool regionFound = lastRegion != -1 && mRegionInfos[lastRegion].managed == regionManaged && mRegionInfos[lastRegion].id == regionId;
364 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
365 if (mRegionInfos[l].managed == regionManaged && mRegionInfos[l].
id == regionId) {
373 LOG(fatal) <<
"Received ZS Ptr for SHM region (managed " << (int32_t)regionManaged <<
", id " << regionId <<
"), which was not registered for us";
375 context->tpcZSmeta.Pointers[0][0][ptrsCopied + k] = (
void*)(ptrBuffer[ptrsCopied + k] + (
size_t)mRegionInfos[lastRegion].ptr);
376 context->tpcZSmeta.Sizes[0][0][ptrsCopied + k] = ptrBuffer[
m->pointersTotal + ptrsCopied + k];
378 context->tpcZS.sector[
i].zsPtr[
j] = context->tpcZSmeta.Pointers[0][0].data() + ptrsCopied;
379 context->tpcZS.sector[
i].nZSPtr[
j] = context->tpcZSmeta.Sizes[0][0].data() + ptrsCopied;
380 ptrsCopied += context->tpcZS.sector[
i].count[
j];
383 context->ptrs.tpcZS = &context->tpcZS;
384 context->ptrs.settingsTF = &context->tfSettings;
385 context->mTFId = mPipeline->mNTFReceived;
386 if (mPipeline->mNTFReceived++ >= mPipeline->workers.size()) {
387 enqueuePipelinedJob(&context->ptrs,
nullptr, context.get(),
false);
390 std::lock_guard lk(mPipeline->queueMutex);
391 mPipeline->pipelineQueue.emplace(std::move(context));
393 mPipeline->queueNotify.notify_one();
395 mPipeline->pipelineSenderTerminating =
true;
396 mPipeline->completionPolicyNotify.notify_one();
399void GPURecoWorkflowSpec::ExitPipeline()
402 mPipeline->fmqDevice =
nullptr;
403 mPipeline->shouldTerminate =
true;
404 mPipeline->stateNotify.notify_all();
405 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
406 mPipeline->workers[
i].inputQueueNotify.notify_one();
408 if (mPipeline->receiveThread.joinable()) {
409 mPipeline->receiveThread.join();
411 for (uint32_t
i = 0;
i < mPipeline->workers.size();
i++) {
412 if (mPipeline->workers[
i].thread.joinable()) {
413 mPipeline->workers[
i].thread.join();
Type wrappers for enfording a specific serialization method.
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::framework::Outputs outputs()
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
int32_t enableDoublePipeline
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
std::mutex jobInputFinalMutex
std::unique_ptr< GPUInterfaceInputUpdate > jobInputUpdateCallback
std::condition_variable jobInputFinalNotify
std::condition_variable jobFinishedNotify
GPUInterfaceOutputs * jobOutputRegions
std::mutex jobFinishedMutex
GPUTrackingInOutPointers * jobPtrs
static constexpr size_t MAGIC_WORD
size_t pointerCounts[GPUTrackingInOutZS::NSECTORS][GPUTrackingInOutZS::NENDPOINTS]
DataProcessingHeader::StartTime timeSliceId
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg