Project
Loading...
Searching...
No Matches
GPUWorkflowPipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
17#include "GPUO2Interface.h"
18#include "GPUDataTypes.h"
19#include "GPUSettings.h"
20#include "GPUWorkflowInternal.h"
21
22#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
30#include "Framework/Logger.h"
34
35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
37#include <fairmq/States.h>
38
39using namespace o2::framework;
40using namespace o2::header;
41using namespace o2::gpu;
42using namespace o2::base;
43using namespace o2::dataformats;
45
46namespace o2::gpu
47{
48
49static const std::string GPURecoWorkflowSpec_FMQCallbackKey = "GPURecoWorkflowSpec_FMQCallbackKey";
50
60
61void GPURecoWorkflowSpec::initPipeline(o2::framework::InitContext& ic)
62{
63 if (mSpecConfig.enableDoublePipeline == 1) {
64 mPipeline->fmqDevice = ic.services().get<RawDeviceService>().device();
65 mPipeline->fmqDevice->SubscribeToStateChange(GPURecoWorkflowSpec_FMQCallbackKey, [this](fair::mq::State s) { receiveFMQStateCallback(s); });
66 mPolicyOrder = [this](o2::framework::DataProcessingHeader::StartTime timeslice) {
67 std::unique_lock lk(mPipeline->completionPolicyMutex);
68 mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
69 return !mPipeline->completionPolicyQueue.empty() && mPipeline->completionPolicyQueue.front() == timeslice;
70 };
71 mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); });
72 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
73 mPipeline->workers[i].thread = std::thread([this, i]() { RunWorkerThread(i); });
74 }
75 }
76}
77
78void GPURecoWorkflowSpec::RunWorkerThread(int32_t id)
79{
80 LOG(debug) << "Running pipeline worker " << id;
81 auto& workerContext = mPipeline->workers[id];
82 while (!mPipeline->shouldTerminate) {
84 {
85 std::unique_lock lk(workerContext.inputQueueMutex);
86 workerContext.inputQueueNotify.wait(lk, [this, &workerContext]() { return mPipeline->shouldTerminate || !workerContext.inputQueue.empty(); });
87 if (workerContext.inputQueue.empty()) {
88 break;
89 }
90 context = workerContext.inputQueue.front();
91 workerContext.inputQueue.pop();
92 }
93 context->jobReturnValue = runMain(nullptr, context->jobPtrs, context->jobOutputRegions, id, context->jobInputUpdateCallback.get());
94 {
95 std::lock_guard lk(context->jobFinishedMutex);
96 context->jobFinished = true;
97 }
98 context->jobFinishedNotify.notify_one();
99 }
100}
101
102void GPURecoWorkflowSpec::enqueuePipelinedJob(GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, GPURecoWorkflow_QueueObject* context, bool inputFinal)
103{
104 {
105 std::unique_lock lk(mPipeline->mayInjectMutex);
106 mPipeline->mayInjectCondition.wait(lk, [this, context]() { return mPipeline->mayInject && mPipeline->mayInjectTFId == context->mTFId; });
107 mPipeline->mayInjectTFId = mPipeline->mayInjectTFId + 1;
108 mPipeline->mayInject = false;
109 }
110 context->jobSubmitted = true;
111 context->jobInputFinal = inputFinal;
112 context->jobPtrs = ptrs;
113 context->jobOutputRegions = outputRegions;
114
115 context->jobInputUpdateCallback = std::make_unique<GPUInterfaceInputUpdate>();
116
117 if (!inputFinal) {
119 std::unique_lock lk(context->jobInputFinalMutex);
120 context->jobInputFinalNotify.wait(lk, [context]() { return context->jobInputFinal; });
121 data = context->jobPtrs;
122 outputs = context->jobOutputRegions;
123 };
124 }
125 context->jobInputUpdateCallback->notifyCallback = [this]() {
126 {
127 std::lock_guard lk(mPipeline->mayInjectMutex);
128 mPipeline->mayInject = true;
129 }
130 mPipeline->mayInjectCondition.notify_one();
131 };
132
133 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
134
135 {
136 std::lock_guard lk(mPipeline->workers[mNextThreadIndex].inputQueueMutex);
137 mPipeline->workers[mNextThreadIndex].inputQueue.emplace(context);
138 }
139 mPipeline->workers[mNextThreadIndex].inputQueueNotify.notify_one();
140}
141
142void GPURecoWorkflowSpec::finalizeInputPipelinedJob(GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, GPURecoWorkflow_QueueObject* context)
143{
144 {
145 std::lock_guard lk(context->jobInputFinalMutex);
146 context->jobPtrs = ptrs;
147 context->jobOutputRegions = outputRegions;
148 context->jobInputFinal = true;
149 }
150 context->jobInputFinalNotify.notify_one();
151}
152
153int32_t GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS, std::unique_ptr<GPURecoWorkflow_QueueObject>& context)
154{
155 mPipeline->runStarted = true;
156 mPipeline->stateNotify.notify_all();
157
158 auto* device = pc.services().get<RawDeviceService>().device();
159 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
160 if (mSpecConfig.enableDoublePipeline == 1) {
161 std::unique_lock lk(mPipeline->queueMutex);
162 mPipeline->queueNotify.wait(lk, [this] { return !mPipeline->pipelineQueue.empty(); });
163 context = std::move(mPipeline->pipelineQueue.front());
164 mPipeline->pipelineQueue.pop();
165 lk.unlock();
166
167 if (context->timeSliceId != tinfo.timeslice) {
168 LOG(fatal) << "Prepare message for incorrect time frame received, time frames seem out of sync";
169 }
170
171 tpcZSmeta = std::move(context->tpcZSmeta);
172 tpcZS = context->tpcZS;
173 ptrs.tpcZS = &tpcZS;
174
175 {
176 std::lock_guard lk(mPipeline->completionPolicyMutex);
177 if (mPipeline->completionPolicyQueue.empty() || mPipeline->completionPolicyQueue.front() != tinfo.timeslice) {
178 LOG(fatal) << "Time frame processed does not equal the timeframe at the top of the queue, time frames seem out of sync";
179 }
180 mPipeline->completionPolicyQueue.pop();
181 }
182 }
183 if (mSpecConfig.enableDoublePipeline == 2) {
184 auto prepareDummyMessage = pc.outputs().make<DataAllocator::UninitializedVector<char>>(Output{gDataOriginGPU, "PIPELINEPREPARE", 0}, 0u);
185
186 size_t ptrsTotal = 0;
187 const void* firstPtr = nullptr;
188 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
189 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
190 if (firstPtr == nullptr && ptrs.tpcZS->sector[i].count[j]) {
191 firstPtr = ptrs.tpcZS->sector[i].zsPtr[j][0];
192 }
193 ptrsTotal += ptrs.tpcZS->sector[i].count[j];
194 }
195 }
196
197 size_t prepareBufferSize = sizeof(pipelinePrepareMessage) + ptrsTotal * sizeof(size_t) * 4;
198 std::vector<size_t> messageBuffer(prepareBufferSize / sizeof(size_t));
199 pipelinePrepareMessage& preMessage = *(pipelinePrepareMessage*)messageBuffer.data();
200 preMessage.magicWord = preMessage.MAGIC_WORD;
201 preMessage.timeSliceId = tinfo.timeslice;
202 preMessage.pointersTotal = ptrsTotal;
203 preMessage.flagEndOfStream = false;
204 memcpy((void*)&preMessage.tfSettings, (const void*)ptrs.settingsTF, sizeof(preMessage.tfSettings));
205
206 size_t* ptrBuffer = messageBuffer.data() + sizeof(preMessage) / sizeof(size_t);
207 size_t ptrsCopied = 0;
208 int32_t lastRegion = -1;
209 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
210 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
211 preMessage.pointerCounts[i][j] = ptrs.tpcZS->sector[i].count[j];
212 for (uint32_t k = 0; k < ptrs.tpcZS->sector[i].count[j]; k++) {
213 const void* curPtr = ptrs.tpcZS->sector[i].zsPtr[j][k];
214 bool regionFound = lastRegion != -1 && (size_t)curPtr >= (size_t)mRegionInfos[lastRegion].ptr && (size_t)curPtr < (size_t)mRegionInfos[lastRegion].ptr + mRegionInfos[lastRegion].size;
215 if (!regionFound) {
216 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
217 if ((size_t)curPtr >= (size_t)mRegionInfos[l].ptr && (size_t)curPtr < (size_t)mRegionInfos[l].ptr + mRegionInfos[l].size) {
218 lastRegion = l;
219 regionFound = true;
220 break;
221 }
222 }
223 }
224 if (!regionFound) {
225 LOG(fatal) << "Found a TPC ZS pointer outside of shared memory";
226 }
227 ptrBuffer[ptrsCopied + k] = (size_t)curPtr - (size_t)mRegionInfos[lastRegion].ptr;
228 ptrBuffer[ptrsTotal + ptrsCopied + k] = ptrs.tpcZS->sector[i].nZSPtr[j][k];
229 ptrBuffer[2 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].managed;
230 ptrBuffer[3 * ptrsTotal + ptrsCopied + k] = mRegionInfos[lastRegion].id;
231 }
232 ptrsCopied += ptrs.tpcZS->sector[i].count[j];
233 }
234 }
235
236 auto channel = device->GetChannels().find("gpu-prepare-channel");
237 fair::mq::MessagePtr payload(device->NewMessage());
238 LOG(info) << "Sending gpu-reco-workflow prepare message of size " << prepareBufferSize;
239 payload->Rebuild(messageBuffer.data(), prepareBufferSize, nullptr, nullptr);
240 channel->second[0].Send(payload);
241 return 2;
242 }
243 return 0;
244}
245
246void GPURecoWorkflowSpec::handlePipelineEndOfStream(EndOfStreamContext& ec)
247{
248 if (mSpecConfig.enableDoublePipeline == 1) {
249 mPipeline->endOfStreamDplReceived = true;
250 mPipeline->stateNotify.notify_all();
251 }
252 if (mSpecConfig.enableDoublePipeline == 2) {
253 auto* device = ec.services().get<RawDeviceService>().device();
254 pipelinePrepareMessage preMessage;
255 preMessage.flagEndOfStream = true;
256 auto channel = device->GetChannels().find("gpu-prepare-channel");
257 fair::mq::MessagePtr payload(device->NewMessage());
258 LOG(info) << "Sending end-of-stream message over out-of-bands channel";
259 payload->Rebuild(&preMessage, sizeof(preMessage), nullptr, nullptr);
260 channel->second[0].Send(payload);
261 }
262}
263
264void GPURecoWorkflowSpec::handlePipelineStop()
265{
266 if (mSpecConfig.enableDoublePipeline == 1) {
267 mPipeline->mayInjectTFId = 0;
268 }
269}
270
271void GPURecoWorkflowSpec::receiveFMQStateCallback(fair::mq::State newState)
272{
273 {
274 std::lock_guard lk(mPipeline->stateMutex);
275 if (mPipeline->fmqState != fair::mq::State::Running && newState == fair::mq::State::Running) {
276 mPipeline->endOfStreamAsyncReceived = false;
277 mPipeline->endOfStreamDplReceived = false;
278 }
279 mPipeline->fmqPreviousState = mPipeline->fmqState;
280 mPipeline->fmqState = newState;
281 if (newState == fair::mq::State::Exiting) {
282 mPipeline->fmqDevice->UnsubscribeFromStateChange(GPURecoWorkflowSpec_FMQCallbackKey);
283 }
284 }
285 mPipeline->stateNotify.notify_all();
286}
287
288void GPURecoWorkflowSpec::RunReceiveThread()
289{
290 auto* device = mPipeline->fmqDevice;
291 while (!mPipeline->shouldTerminate) {
292 bool received = false;
293 int32_t recvTimeot = 1000;
294 fair::mq::MessagePtr msg;
295 LOG(debug) << "Waiting for out of band message";
296 auto shouldReceive = [this]() { return ((mPipeline->fmqState == fair::mq::State::Running || (mPipeline->fmqState == fair::mq::State::Ready && mPipeline->fmqPreviousState == fair::mq::State::Running)) && !mPipeline->endOfStreamAsyncReceived); };
297 do {
298 {
299 std::unique_lock lk(mPipeline->stateMutex);
300 mPipeline->stateNotify.wait(lk, [this, shouldReceive]() { return shouldReceive() || mPipeline->shouldTerminate; }); // Do not check mPipeline->fmqDevice->NewStatePending() since we wait for EndOfStream!
301 }
302 if (mPipeline->shouldTerminate) {
303 break;
304 }
305 try {
306 do {
307 std::unique_lock lk(mPipeline->stateMutex);
308 if (!shouldReceive()) {
309 break;
310 }
311 msg = device->NewMessageFor("gpu-prepare-channel", 0, 0);
312 received = device->Receive(msg, "gpu-prepare-channel", 0, recvTimeot) > 0;
313 } while (!received && !mPipeline->shouldTerminate);
314 } catch (...) {
315 usleep(1000000);
316 }
317 } while (!received && !mPipeline->shouldTerminate);
318 if (mPipeline->shouldTerminate) {
319 break;
320 }
321 if (msg->GetSize() < sizeof(pipelinePrepareMessage)) {
322 LOG(fatal) << "Received prepare message of invalid size " << msg->GetSize() << " < " << sizeof(pipelinePrepareMessage);
323 }
324 const pipelinePrepareMessage* m = (const pipelinePrepareMessage*)msg->GetData();
325 if (m->magicWord != m->MAGIC_WORD) {
326 LOG(fatal) << "Prepare message corrupted, invalid magic word";
327 }
328 if (m->flagEndOfStream) {
329 LOG(info) << "Received end-of-stream from out-of-band channel";
330 std::lock_guard lk(mPipeline->stateMutex);
331 mPipeline->endOfStreamAsyncReceived = true;
332 mPipeline->mNTFReceived = 0;
333 mPipeline->runStarted = false;
334 continue;
335 }
336
337 {
338 std::lock_guard lk(mPipeline->completionPolicyMutex);
339 mPipeline->completionPolicyQueue.emplace(m->timeSliceId);
340 }
341 mPipeline->completionPolicyNotify.notify_one();
342
343 {
344 std::unique_lock lk(mPipeline->stateMutex);
345 mPipeline->stateNotify.wait(lk, [this]() { return (mPipeline->runStarted && !mPipeline->endOfStreamAsyncReceived) || mPipeline->shouldTerminate; });
346 if (!mPipeline->runStarted) {
347 continue;
348 }
349 }
350
351 auto context = std::make_unique<GPURecoWorkflow_QueueObject>();
352 context->timeSliceId = m->timeSliceId;
353 context->tfSettings = m->tfSettings;
354
355 size_t ptrsCopied = 0;
356 size_t* ptrBuffer = (size_t*)msg->GetData() + sizeof(pipelinePrepareMessage) / sizeof(size_t);
357 context->tpcZSmeta.Pointers[0][0].resize(m->pointersTotal);
358 context->tpcZSmeta.Sizes[0][0].resize(m->pointersTotal);
359 int32_t lastRegion = -1;
360 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
361 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
362 context->tpcZS.sector[i].count[j] = m->pointerCounts[i][j];
363 for (uint32_t k = 0; k < context->tpcZS.sector[i].count[j]; k++) {
364 bool regionManaged = ptrBuffer[2 * m->pointersTotal + ptrsCopied + k];
365 size_t regionId = ptrBuffer[3 * m->pointersTotal + ptrsCopied + k];
366 bool regionFound = lastRegion != -1 && mRegionInfos[lastRegion].managed == regionManaged && mRegionInfos[lastRegion].id == regionId;
367 if (!regionFound) {
368 for (uint32_t l = 0; l < mRegionInfos.size(); l++) {
369 if (mRegionInfos[l].managed == regionManaged && mRegionInfos[l].id == regionId) {
370 lastRegion = l;
371 regionFound = true;
372 break;
373 }
374 }
375 }
376 if (!regionFound) {
377 LOG(fatal) << "Received ZS Ptr for SHM region (managed " << (int32_t)regionManaged << ", id " << regionId << "), which was not registered for us";
378 }
379 context->tpcZSmeta.Pointers[0][0][ptrsCopied + k] = (void*)(ptrBuffer[ptrsCopied + k] + (size_t)mRegionInfos[lastRegion].ptr);
380 context->tpcZSmeta.Sizes[0][0][ptrsCopied + k] = ptrBuffer[m->pointersTotal + ptrsCopied + k];
381 }
382 context->tpcZS.sector[i].zsPtr[j] = context->tpcZSmeta.Pointers[0][0].data() + ptrsCopied;
383 context->tpcZS.sector[i].nZSPtr[j] = context->tpcZSmeta.Sizes[0][0].data() + ptrsCopied;
384 ptrsCopied += context->tpcZS.sector[i].count[j];
385 }
386 }
387 context->ptrs.tpcZS = &context->tpcZS;
388 context->ptrs.settingsTF = &context->tfSettings;
389 context->mTFId = mPipeline->mNTFReceived;
390 if (mPipeline->mNTFReceived++ >= mPipeline->workers.size()) { // Do not inject the first workers.size() TFs, since we need a first round of calib updates from DPL before starting
391 enqueuePipelinedJob(&context->ptrs, nullptr, context.get(), false);
392 }
393 {
394 std::lock_guard lk(mPipeline->queueMutex);
395 mPipeline->pipelineQueue.emplace(std::move(context));
396 }
397 mPipeline->queueNotify.notify_one();
398 }
399 mPipeline->pipelineSenderTerminating = true;
400 mPipeline->completionPolicyNotify.notify_one();
401}
402
403void GPURecoWorkflowSpec::ExitPipeline()
404{
405 if (mSpecConfig.enableDoublePipeline == 1 && mPipeline->fmqDevice) {
406 mPipeline->fmqDevice = nullptr;
407 mPipeline->shouldTerminate = true;
408 mPipeline->stateNotify.notify_all();
409 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
410 mPipeline->workers[i].inputQueueNotify.notify_one();
411 }
412 if (mPipeline->receiveThread.joinable()) {
413 mPipeline->receiveThread.join();
414 }
415 for (uint32_t i = 0; i < mPipeline->workers.size(); i++) {
416 if (mPipeline->workers[i].thread.joinable()) {
417 mPipeline->workers[i].thread.join();
418 }
419 }
420 }
421}
422
423} // namespace o2::gpu
int32_t i
A helper class to iteratate over all parts of all input routes.
uint32_t j
Definition RawData.h:0
Type wrappers for enfording a specific serialization method.
TBranch * ptr
std::ostringstream debug
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::framework::Outputs outputs()
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:592
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
O2 data header classes and API, v0.1.
Definition DetID.h:49
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
std::unique_ptr< GPUInterfaceInputUpdate > jobInputUpdateCallback
size_t pointerCounts[GPUTrackingInOutZS::NSECTORS][GPUTrackingInOutZS::NENDPOINTS]
DataProcessingHeader::StartTime timeSliceId
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153