Project
Loading...
Searching...
No Matches
GPUWorkflowPipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
17#include "GPUO2Interface.h"
18#include "GPUDataTypes.h"
19#include "GPUSettings.h"
20#include "GPUWorkflowInternal.h"
21
22#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
30#include "Framework/Logger.h"
34
35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
37#include <fairmq/States.h>
38
39using namespace o2::framework;
40using namespace o2::header;
41using namespace o2::gpu;
42using namespace o2::base;
43using namespace o2::dataformats;
45
46namespace o2::gpu
47{
48
49static const std::string GPURecoWorkflowSpec_FMQCallbackKey = "GPURecoWorkflowSpec_FMQCallbackKey";
50
60
61void GPURecoWorkflowSpec::initPipeline(o2::framework::InitContext& ic)
62{
63 if (mSpecConfig.enableDoublePipeline == 1) {
64 mPipeline->fmqDevice = ic.services().get<RawDeviceService>().device();
65 mPipeline->fmqDevice->SubscribeToStateChange(GPURecoWorkflowSpec_FMQCallbackKey, [this](fair::mq::State s) { receiveFMQStateCallback(s); });
66 mPolicyOrder = [this](o2::framework::DataProcessingHeader::StartTime timeslice) {
67 std::unique_lock lk(mPipeline->completionPolicyMutex);
68 mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
69 if (mPipeline->completionPolicyQueue.front() == timeslice) {
70 mPipeline->completionPolicyQueue.pop();
71 return true;
72 }
73 return false;
74 };
75 mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); });
76 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
77 mPipeline->workers[i].thread = std::thread([this, i]() { RunWorkerThread(i); });
78 }
79 }
80}
81
82void GPURecoWorkflowSpec::RunWorkerThread(int32_t id)
83{
84 LOG(debug) << "Running pipeline worker " << id;
85 auto& workerContext = mPipeline->workers[id];
86 while (!mPipeline->shouldTerminate) {
88 {
89 std::unique_lock lk(workerContext.inputQueueMutex);
90 workerContext.inputQueueNotify.wait(lk, [this, &workerContext]() { return mPipeline->shouldTerminate || !workerContext.inputQueue.empty(); });
91 if (workerContext.inputQueue.empty()) {
92 break;
93 }
94 context = workerContext.inputQueue.front();
95 workerContext.inputQueue.pop();
96 }
97 context->jobReturnValue = runMain(nullptr, context->jobPtrs, context->jobOutputRegions, id, context->jobInputUpdateCallback.get());
98 {
99 std::lock_guard lk(context->jobFinishedMutex);
100 context->jobFinished = true;
101 }
102 context->jobFinishedNotify.notify_one();
103 }
104}
105
106void GPURecoWorkflowSpec::enqueuePipelinedJob(GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, GPURecoWorkflow_QueueObject* context, bool inputFinal)
107{
108 {
109 std::unique_lock lk(mPipeline->mayInjectMutex);
110 mPipeline->mayInjectCondition.wait(lk, [this, context]() { return mPipeline->mayInject && mPipeline->mayInjectTFId == context->mTFId; });
111 mPipeline->mayInjectTFId = mPipeline->mayInjectTFId + 1;
112 mPipeline->mayInject = false;
113 }
114 context->jobSubmitted = true;
115 context->jobInputFinal = inputFinal;
116 context->jobPtrs = ptrs;
117 context->jobOutputRegions = outputRegions;
118
119 context->jobInputUpdateCallback = std::make_unique<GPUInterfaceInputUpdate>();
120
121 if (!inputFinal) {
123 std::unique_lock lk(context->jobInputFinalMutex);
124 context->jobInputFinalNotify.wait(lk, [context]() { return context->jobInputFinal; });
125 data = context->jobPtrs;
126 outputs = context->jobOutputRegions;
127 };
128 }
129 context->jobInputUpdateCallback->notifyCallback = [this]() {
130 {
131 std::lock_guard lk(mPipeline->mayInjectMutex);
132 mPipeline->mayInject = true;
133 }
134 mPipeline->mayInjectCondition.notify_one();
135 };
136
137 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
138
139 {
140 std::lock_guard lk(mPipeline->workers[mNextThreadIndex].inputQueueMutex);
141 mPipeline->workers[mNextThreadIndex].inputQueue.emplace(context);
142 }
143 mPipeline->workers[mNextThreadIndex].inputQueueNotify.notify_one();
144}
145
146void GPURecoWorkflowSpec::finalizeInputPipelinedJob(GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, GPURecoWorkflow_QueueObject* context)
147{
148 {
149 std::lock_guard lk(context->jobInputFinalMutex);
150 context->jobPtrs = ptrs;
151 context->jobOutputRegions = outputRegions;
152 context->jobInputFinal = true;
153 }
154 context->jobInputFinalNotify.notify_one();
155}
156
157int32_t GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS, std::unique_ptr<GPURecoWorkflow_QueueObject>& context)
158{
159 mPipeline->runStarted = true;
160 mPipeline->stateNotify.notify_all();
161
162 auto* device = pc.services().get<RawDeviceService>().device();
163 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
164 if (mSpecConfig.enableDoublePipeline == 1) {
165 std::unique_lock lk(mPipeline->queueMutex);
166 mPipeline->queueNotify.wait(lk, [this] { return !mPipeline->pipelineQueue.empty(); });
167 context = std::move(mPipeline->pipelineQueue.front());
168 mPipeline->pipelineQueue.pop();
169 lk.unlock();
170
171 if (context->timeSliceId != tinfo.timeslice) {
172 LOG(fatal) << "Prepare message for incorrect time frame received, time frames seem out of sync";
173 }
174
175 tpcZSmeta = std::move(context->tpcZSmeta);
176 tpcZS = context->tpcZS;
177 ptrs.tpcZS = &tpcZS;
178 }
179 if (mSpecConfig.enableDoublePipeline == 2) {
180 auto prepareBuffer = pc.outputs().make<DataAllocator::UninitializedVector<char>>(Output{gDataOriginGPU, "PIPELINEPREPARE", 0}, 0u);
181
182 size_t ptrsTotal = 0;
183 const void* firstPtr = nullptr;
184 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
185 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
186 if (firstPtr == nullptr && ptrs.tpcZS->sector[i].count[j]) {
187 firstPtr = ptrs.tpcZS->sector[i].zsPtr[j][0];
188 }
189 ptrsTotal += ptrs.tpcZS->sector[i].count[j];
190 }
191 }
192
193 size_t prepareBufferSize = sizeof(pipelinePrepareMessage) + ptrsTotal * sizeof(size_t) * 4;
194 std::vector<size_t> messageBuffer(prepareBufferSize / sizeof(size_t));
195 pipelinePrepareMessage& preMessage = *(pipelinePrepareMessage*)messageBuffer.data();
196 preMessage.magicWord = preMessage.MAGIC_WORD;
197 preMessage.timeSliceId = tinfo.timeslice;
198 preMessage.pointersTotal = ptrsTotal;
199 preMessage.flagEndOfStream = false;
200 memcpy((void*)&preMessage.tfSettings, (const void*)ptrs.settingsTF, sizeof(preMessage.tfSettings));
201
202 size_t* ptrBuffer = messageBuffer.data() + sizeof(preMessage) / sizeof(size_t);
203 size_t ptrsCopied = 0;
204 int32_t lastRegion = -1;
205 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
206 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
207 preMessage.pointerCounts[i][j] = ptrs.tpcZS->sector[i].count[j];
208 for (uint32_t k = 0; k < ptrs.tpcZS->sector[i].count[j]; k++) {
209 const void* curPtr = ptrs.tpcZS->sector[i].zsPtr[j][k];
210 bool regionFound = lastRegion != -1 && (size_t)curPtr >= (size_t)mRegionInfos[lastRegion].ptr && (size_t)curPtr < (size_t)mRegionInfos[lastRegion].ptr + mRegionInfos[lastRegion].size;
211 if (!regionFound) {
212 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
213 if ((size_t)curPtr >= (size_t)mRegionInfos[l].ptr && (size_t)curPtr < (size_t)mRegionInfos[l].ptr + mRegionInfos[l].size) {
214 lastRegion = l;
215 regionFound = true;
216 break;
217 }
218 }
219 }
220 if (!regionFound) {
221 LOG(fatal) << "Found a TPC ZS pointer outside of shared memory";
222 }
223 ptrBuffer[ptrsCopied + k] = (size_t)curPtr - (size_t)mRegionInfos[lastRegion].ptr;
224 ptrBuffer[ptrsTotal + ptrsCopied + k] = ptrs.tpcZS->sector[i].nZSPtr[j][k];
225 ptrBuffer[2 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].managed;
226 ptrBuffer[3 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].id;
227 }
228 ptrsCopied += ptrs.tpcZS->sector[i].count[j];
229 }
230 }
231
232 auto channel = device->GetChannels().find("gpu-prepare-channel");
233 fair::mq::MessagePtr payload(device->NewMessage());
234 LOG(info) << "Sending gpu-reco-workflow prepare message of size " << prepareBufferSize;
235 payload->Rebuild(messageBuffer.data(), prepareBufferSize, nullptr, nullptr);
236 channel->second[0].Send(payload);
237 return 2;
238 }
239 return 0;
240}
241
242void GPURecoWorkflowSpec::handlePipelineEndOfStream(EndOfStreamContext& ec)
243{
244 if (mSpecConfig.enableDoublePipeline == 1) {
245 mPipeline->endOfStreamDplReceived = true;
246 mPipeline->stateNotify.notify_all();
247 }
248 if (mSpecConfig.enableDoublePipeline == 2) {
249 auto* device = ec.services().get<RawDeviceService>().device();
250 pipelinePrepareMessage preMessage;
251 preMessage.flagEndOfStream = true;
252 auto channel = device->GetChannels().find("gpu-prepare-channel");
253 fair::mq::MessagePtr payload(device->NewMessage());
254 LOG(info) << "Sending end-of-stream message over out-of-bands channel";
255 payload->Rebuild(&preMessage, sizeof(preMessage), nullptr, nullptr);
256 channel->second[0].Send(payload);
257 }
258}
259
260void GPURecoWorkflowSpec::handlePipelineStop()
261{
262 if (mSpecConfig.enableDoublePipeline == 1) {
263 mPipeline->mayInjectTFId = 0;
264 }
265}
266
267void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
268{
269 {
270 std::lock_guard lk(mPipeline->stateMutex);
271 if (mPipeline->fmqState != fair::mq::State::Running && newState == fair::mq::State::Running) {
272 mPipeline->endOfStreamAsyncReceived = false;
273 mPipeline->endOfStreamDplReceived = false;
274 }
275 mPipeline->fmqPreviousState = mPipeline->fmqState;
276 mPipeline->fmqState = newState;
277 if (newState == fair::mq::State::Exiting) {
278 mPipeline->fmqDevice->UnsubscribeFromStateChange(GPURecoWorkflowSpec_FMQCallbackKey);
279 }
280 }
281 mPipeline->stateNotify.notify_all();
282}
283
284void GPURecoWorkflowSpec::RunReceiveThread()
285{
286 auto* device = mPipeline->fmqDevice;
287 while (!mPipeline->shouldTerminate) {
288 bool received = false;
289 int32_t recvTimeot = 1000;
290 fair::mq::MessagePtr msg;
291 LOG(debug) << "Waiting for out of band message";
292 auto shouldReceive = [this]() { return ((mPipeline->fmqState == fair::mq::State::Running || (mPipeline->fmqState == fair::mq::State::Ready && mPipeline->fmqPreviousState == fair::mq::State::Running)) && !mPipeline->endOfStreamAsyncReceived); };
293 do {
294 {
295 std::unique_lock lk(mPipeline->stateMutex);
296 mPipeline->stateNotify.wait(lk, [this, shouldReceive]() { return shouldReceive() || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
297 }
298 if (mPipeline->shouldTerminate) {
299 break;
300 }
301 try {
302 do {
303 std::unique_lock lk(mPipeline->stateMutex);
304 if (!shouldReceive()) {
305 break;
306 }
307 msg = device->NewMessageFor("gpu-prepare-channel", 0, 0);
308 received = device->Receive(msg, "gpu-prepare-channel", 0, recvTimeot) > 0;
309 } while (!received && !mPipeline->shouldTerminate);
310 } catch (...) {
311 usleep(1000000);
312 }
313 } while (!received && !mPipeline->shouldTerminate);
314 if (mPipeline->shouldTerminate) {
315 break;
316 }
317 if (msg->GetSize() < sizeof(pipelinePrepareMessage)) {
318 LOG(fatal) << "Received prepare message of invalid size " << msg->GetSize() << " < " << sizeof(pipelinePrepareMessage);
319 }
320 const pipelinePrepareMessage* m = (const pipelinePrepareMessage*)msg->GetData();
321 if (m->magicWord != m->MAGIC_WORD) {
322 LOG(fatal) << "Prepare message corrupted, invalid magic word";
323 }
324 if (m->flagEndOfStream) {
325 LOG(info) << "Received end-of-stream from out-of-band channel";
326 std::lock_guard lk(mPipeline->stateMutex);
327 mPipeline->endOfStreamAsyncReceived = true;
328 mPipeline->mNTFReceived = 0;
329 mPipeline->runStarted = false;
330 continue;
331 }
332
333 {
334 std::lock_guard lk(mPipeline->completionPolicyMutex);
335 mPipeline->completionPolicyQueue.emplace(m->timeSliceId);
336 }
337 mPipeline->completionPolicyNotify.notify_one();
338
339 {
340 std::unique_lock lk(mPipeline->stateMutex);
341 mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->runStarted && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; });
342 if (!mPipeline->runStarted) {
343 continue;
344 }
345 }
346
347 auto context = std::make_unique<GPURecoWorkflow_QueueObject>();
348 context->timeSliceId = m->timeSliceId;
349 context->tfSettings = m->tfSettings;
350
351 size_t ptrsCopied = 0;
352 size_t* ptrBuffer = (size_t*)msg->GetData() + sizeof(pipelinePrepareMessage) / sizeof(size_t);
353 context->tpcZSmeta.Pointers[0][0].resize(m->pointersTotal);
354 context->tpcZSmeta.Sizes[0][0].resize(m->pointersTotal);
355 int32_t lastRegion = -1;
356 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
357 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
358 context->tpcZS.sector[i].count[j] = m->pointerCounts[i][j];
359 for (uint32_t k = 0; k < context->tpcZS.sector[i].count[j]; k++) {
360 bool regionManaged = ptrBuffer[2 * m->pointersTotal + ptrsCopied + k];
361 size_t regionId = ptrBuffer[3 * m->pointersTotal + ptrsCopied + k];
362 bool regionFound = lastRegion != -1 && mRegionInfos[lastRegion].managed == regionManaged && mRegionInfos[lastRegion].id == regionId;
363 if (!regionFound) {
364 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
365 if (mRegionInfos[l].managed == regionManaged && mRegionInfos[l].id == regionId) {
366 lastRegion = l;
367 regionFound = true;
368 break;
369 }
370 }
371 }
372 if (!regionFound) {
373 LOG(fatal) << "Received ZS Ptr for SHM region (managed " << (int32_t)regionManaged << ", id " << regionId << "), which was not registered for us";
374 }
375 context->tpcZSmeta.Pointers[0][0][ptrsCopied + k] = (void*)(ptrBuffer[ptrsCopied + k] + (size_t)mRegionInfos[lastRegion].ptr);
376 context->tpcZSmeta.Sizes[0][0][ptrsCopied + k] = ptrBuffer[m->pointersTotal + ptrsCopied + k];
377 }
378 context->tpcZS.sector[i].zsPtr[j] = context->tpcZSmeta.Pointers[0][0].data() + ptrsCopied;
379 context->tpcZS.sector[i].nZSPtr[j] = context->tpcZSmeta.Sizes[0][0].data() + ptrsCopied;
380 ptrsCopied += context->tpcZS.sector[i].count[j];
381 }
382 }
383 context->ptrs.tpcZS = &context->tpcZS;
384 context->ptrs.settingsTF = &context->tfSettings;
385 context->mTFId = mPipeline->mNTFReceived;
386 if (mPipeline->mNTFReceived++ >= mPipeline->workers.size()) { // Do not inject the first workers.size() TFs, since we need a first round of calib updates from DPL before starting
387 enqueuePipelinedJob(&context->ptrs, nullptr, context.get(), false);
388 }
389 {
390 std::lock_guard lk(mPipeline->queueMutex);
391 mPipeline->pipelineQueue.emplace(std::move(context));
392 }
393 mPipeline->queueNotify.notify_one();
394 }
395 mPipeline->pipelineSenderTerminating = true;
396 mPipeline->completionPolicyNotify.notify_one();
397}
398
399void GPURecoWorkflowSpec::ExitPipeline()
400{
401 if (mSpecConfig.enableDoublePipeline == 1 && mPipeline->fmqDevice) {
402 mPipeline->fmqDevice = nullptr;
403 mPipeline->shouldTerminate = true;
404 mPipeline->stateNotify.notify_all();
405 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
406 mPipeline->workers[i].inputQueueNotify.notify_one();
407 }
408 if (mPipeline->receiveThread.joinable()) {
409 mPipeline->receiveThread.join();
410 }
411 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
412 if (mPipeline->workers[i].thread.joinable()) {
413 mPipeline->workers[i].thread.join();
414 }
415 }
416 }
417}
418
419} // namespace o2::gpu
int32_t i
A helper class to iteratate over all parts of all input routes.
uint32_t j
Definition RawData.h:0
Type wrappers for enfording a specific serialization method.
TBranch * ptr
std::ostringstream debug
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::framework::Outputs outputs()
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:592
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
O2 data header classes and API, v0.1.
Definition DetID.h:49
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
std::unique_ptr< GPUInterfaceInputUpdate > jobInputUpdateCallback
size_t pointerCounts[GPUTrackingInOutZS::NSECTORS][GPUTrackingInOutZS::NENDPOINTS]
DataProcessingHeader::StartTime timeSliceId
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153