Project
Loading...
Searching...
No Matches
cru-page-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
24
25#include <random>
26#include <iostream>
27#include <queue>
28#include <fstream>
29#include <stdexcept>
34#include "Framework/Lifetime.h"
35#include "Framework/Output.h"
36#include "Framework/Task.h"
38
44
45using namespace o2;
46using namespace o2::framework;
47using namespace o2::raw;
48
49namespace o2
50{
51namespace mch
52{
53namespace raw
54{
55
57
58static const int NFEEID = 64;
59static const int NLINKS = 16;
60
61struct TimeFrame {
62 char* buf{nullptr};
63 size_t tfSize{0};
64 size_t totalSize{0};
65 size_t payloadSize{0};
66 uint32_t firstOrbit{0xFFFFFFFF};
67 std::vector<std::pair<size_t, size_t>> hbframes;
68
70 {
71 payloadSize = 0;
72 if (buf == nullptr) {
73 return;
74 }
75
76 size_t offset = 0;
77 while (offset < totalSize) {
78 char* ptr = buf + offset;
79 RDH* rdh = (RDH*)ptr;
80
81 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
82 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
83 auto memorySize = o2::raw::RDHUtils::getMemorySize(rdh);
84 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
85
86 payloadSize += memorySize - rdhHeaderSize;
87
88 offset += pageSize;
89 }
90 }
91
92 void print()
93 {
94 if (buf == nullptr) {
95 return;
96 }
97
98 int nPrinted = 0;
99
100 printf("\n//////////////////////\n");
101 size_t offset = 0;
102 size_t nStop = 0;
103 while (offset < totalSize) {
104 char* ptr = buf + offset;
105 RDH* rdh = (RDH*)ptr;
106
107 auto stopBit = o2::raw::RDHUtils::getStop(rdh);
108 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
109 if (stopBit > 0) {
110 nStop += 1;
111 }
112
113 offset += pageSize;
114 }
115
116 offset = 0;
117 bool doPrint = false;
118 size_t iStop = 0;
119 while (offset < totalSize) {
120 char* ptr = buf + offset;
121 RDH* rdh = (RDH*)ptr;
122
123 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
124 uint16_t cruID = o2::raw::RDHUtils::getCRUID(rdh) & 0x3F;
125 uint8_t endPointID = o2::raw::RDHUtils::getEndPointID(rdh);
126 uint8_t linkID = o2::raw::RDHUtils::getLinkID(rdh);
127 uint16_t feeID = cruID * 2 + endPointID;
128 auto stopBit = o2::raw::RDHUtils::getStop(rdh);
129 auto triggerType = o2::raw::RDHUtils::getTriggerType(rdh);
130 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
131
132 if (iStop < 2 || iStop > (nStop - 3)) {
133
134 printf("%6d: version %X offset %4d packet %3d srcID %d cruID %2d dp %d link %2d orbit %u bc %4d trig 0x%08X page %d stop %d",
135 (int)0, (int)rdhVersion, (int)pageSize,
136 (int)RDHUtils::getPacketCounter(rdh), (int)RDHUtils::getSourceID(rdh),
137 (int)cruID, (int)endPointID, (int)linkID,
138 (uint32_t)RDHUtils::getHeartBeatOrbit(rdh), (int)RDHUtils::getTriggerBC(rdh),
139 (int)triggerType, (int)RDHUtils::getPageCounter(rdh), (int)stopBit);
140 if ((triggerType & 0x800) != 0) {
141 printf(" <===");
142 }
143 printf("\n");
144 }
145 if (stopBit > 0 && iStop == 3) {
146 printf("........................\n");
147 }
148
149 if (stopBit > 0) {
150 iStop += 1;
151 }
152
153 offset += pageSize;
154 }
155 fmt::printf("total size: {}\n", totalSize);
156 printf("//////////////////////\n");
157 }
158};
159
160using TFQueue = std::queue<TimeFrame>;
161
162TFQueue tfQueues[NFEEID][NLINKS];
163
165{
166 public:
167 //_________________________________________________________________________________________________
169 {
171 LOG(info) << "initializing file reader";
172 mFrameMax = ic.options().get<int>("nframes");
173 mTimeFrameMax = ic.options().get<int>("max-time-frame");
174 mPrint = ic.options().get<bool>("print");
175 mFullHBF = ic.options().get<bool>("full-hbf");
176 mFullTF = ic.options().get<bool>("full-tf");
177 mSaveTF = ic.options().get<bool>("save-tf");
178 mOverlap = ic.options().get<int>("overlap");
179
180 auto inputFileName = ic.options().get<std::string>("infile");
181 mInputFile.open(inputFileName, std::ios::binary);
182 if (!mInputFile.is_open()) {
183 throw std::invalid_argument("Cannot open input file \"" + inputFileName + "\"");
184 }
185
186 auto stop = [this]() {
188 LOG(info) << "stop file reader";
189 this->mInputFile.close();
190 };
191 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(stop);
192
193 const auto& hbfu = o2::raw::HBFUtils::Instance();
194 if (hbfu.runNumber != 0) {
195 mTFIDInfo.runNumber = hbfu.runNumber;
196 }
197 if (hbfu.orbitFirst != 0) {
198 mTFIDInfo.firstTForbit = hbfu.orbitFirst;
199 }
200 if (hbfu.startTime != 0) {
201 mTFIDInfo.creation = hbfu.startTime;
202 }
203 }
204
205 void printHBF(char* framePtr, size_t frameSize)
206 {
207 size_t pageStart = 0;
208 std::cout << "----\n";
209 while (pageStart < frameSize) {
210 RDH* rdh = (RDH*)(&(framePtr[pageStart]));
211 // check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
212 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
213 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
214 uint16_t cruID = o2::raw::RDHUtils::getCRUID(rdh) & 0x3F;
215 uint8_t endPointID = o2::raw::RDHUtils::getEndPointID(rdh);
216 uint8_t linkID = o2::raw::RDHUtils::getLinkID(rdh);
217 uint16_t feeID = cruID * 2 + endPointID;
218 auto stopBit = o2::raw::RDHUtils::getStop(rdh);
219 auto triggerType = o2::raw::RDHUtils::getTriggerType(rdh);
220 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
221 auto pageCounter = RDHUtils::getPageCounter(rdh);
222
223 printf("%6d: V %X offset %4d packet %3d srcID %d cruID %2d dp %d link %2d orbit %u bc %4d trig 0x%08X p %d s %d",
224 (int)0, (int)rdhVersion, (int)pageSize,
225 (int)RDHUtils::getPacketCounter(rdh), (int)RDHUtils::getSourceID(rdh),
226 (int)cruID, (int)endPointID, (int)linkID,
227 (uint32_t)RDHUtils::getHeartBeatOrbit(rdh), (int)RDHUtils::getTriggerBC(rdh),
228 (int)triggerType, (int)pageCounter, (int)stopBit);
229 if ((triggerType & 0x800) != 0) {
230 printf(" <===");
231 }
232 printf("\n");
233 pageStart += pageSize;
234 }
235 std::cout << "----\n";
236 }
237
238 bool appendHBF(TimeFrame& tf, char* framePtr, size_t frameSize, bool addHBF)
239 {
240 // new size of the TimeFrame buffer after appending the HBFrame
241 size_t newSize = tf.totalSize + frameSize;
242 // increase the size of the memory buffer
243 tf.buf = (char*)realloc(tf.buf, newSize);
244 if (tf.buf == nullptr) {
245 std::cout << "failed to allocate TimeFrame buffer" << std::endl;
246 return false;
247 }
248
249 // copy the HBFrame into the TimeFrame buffer
250 char* bufPtr = tf.buf + tf.totalSize;
251 memcpy(bufPtr, framePtr, frameSize);
252
253 if (addHBF) {
254 // add the offset and size of the HBFrame to the vector
255 tf.hbframes.emplace_back(std::make_pair(tf.totalSize, frameSize));
256 // increase the TimeFrame sizes
257 tf.tfSize += frameSize;
258 }
259
260 // increase the total buffer sizes
261 tf.totalSize += frameSize;
262
263 return true;
264 }
265
266 //_________________________________________________________________________________________________
268 {
269 uint32_t orbitMin = 0xFFFFFFFF;
270 int maxQueueSize = 0;
271 for (int feeId = 0; feeId < NFEEID; feeId++) {
272 for (int linkId = 0; linkId < NLINKS; linkId++) {
273 TFQueue& tfQueue = tfQueues[feeId][linkId];
274 if (tfQueue.empty()) {
275 continue;
276 }
277 if (mPrint) {
278 std::cout << fmt::format("FEE ID {} LINK {} orbit {} queue size {}", feeId, linkId, tfQueue.front().firstOrbit, tfQueue.size()) << std::endl;
279 }
280 if (tfQueue.front().firstOrbit < orbitMin) {
281 orbitMin = tfQueue.front().firstOrbit;
282 }
283 if (tfQueue.size() > maxQueueSize) {
284 maxQueueSize = tfQueue.size();
285 }
286 }
287 }
288
289 if (maxQueueSize < 3) {
290 return false;
291 }
292
293 char* outBuf{nullptr};
294 size_t outSize{0};
295 for (int feeId = 0; feeId < NFEEID; feeId++) {
296 for (int linkId = 0; linkId < NLINKS; linkId++) {
297 TFQueue& tfQueue = tfQueues[feeId][linkId];
298 TimeFrame& tf = tfQueue.front();
299 if (tf.firstOrbit != orbitMin) {
300 continue;
301 }
302
303 size_t newSize = outSize + tf.totalSize;
304 // increase the size of the memory buffer
305 outBuf = (char*)realloc(outBuf, newSize);
306 if (outBuf == nullptr) {
307 std::cout << "failed to allocate output buffer of size " << newSize << " bytes" << std::endl;
308 return false;
309 }
310 if (mPrint) {
311 std::cout << fmt::format("Appending FEE ID {} LINK {} orbit {} to current TF", feeId, linkId, tf.firstOrbit) << std::endl;
312 }
313 // copy the SubTimeFrame into the TimeFrame buffer
314 char* bufPtr = outBuf + outSize;
315 memcpy(bufPtr, tf.buf, tf.totalSize);
316
317 outSize += tf.totalSize;
318 tfQueue.pop();
319 }
320 }
321
322 if (mPrint) {
323 std::cout << "Sending TF " << orbitMin << " (previous " << mLastTForbit << " delta " << (orbitMin - mLastTForbit) << ")" << std::endl
324 << std::endl;
325 }
326 mLastTForbit = orbitMin;
327 auto freefct = [](void* data, void* /*hint*/) { free(data); };
328 pc.outputs().adoptChunk(Output{"RDT", "RAWDATA"}, outBuf, outSize, freefct, nullptr);
329
330 return true;
331 }
332
333 //_________________________________________________________________________________________________
335 {
337 RDH rdh;
338
339 static int TFid = 0;
340
341 if (mTimeFrameMax > 0 && TFid == mTimeFrameMax) {
342 pc.services().get<ControlService>().endOfStream();
343 return;
344 }
345
346 while (true) {
347
348 // stop if the required number of frames has been reached
349 if (mFrameMax == 0) {
350 pc.services().get<ControlService>().endOfStream();
351 return;
352 }
353
354 if (mPrint && false) {
355 printf("mFrameMax: %d\n", mFrameMax);
356 }
357 if (mFrameMax > 0) {
358 mFrameMax -= 1;
359 }
360
361 // read the next RDH, stop if no more data is available
362 if (mPrint) {
363 std::cout << "Reading " << sizeof(RDH) << " for RDH from input file\n";
364 }
365 mInputFile.read((char*)(&rdh), sizeof(RDH));
366 if (mInputFile.fail()) {
367 if (mPrint) {
368 std::cout << "end of file reached" << std::endl;
369 }
370 pc.services().get<ControlService>().endOfStream();
371 return; // probably reached eof
372 }
373
374 // check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
375 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
376 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
377 uint16_t cruID = o2::raw::RDHUtils::getCRUID(rdh) & 0x3F;
378 uint8_t endPointID = o2::raw::RDHUtils::getEndPointID(rdh);
379 uint8_t linkID = o2::raw::RDHUtils::getLinkID(rdh);
380 uint16_t feeID = cruID * 2 + endPointID;
381 auto stopBit = o2::raw::RDHUtils::getStop(rdh);
382 auto triggerType = o2::raw::RDHUtils::getTriggerType(rdh);
383 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
384 auto pageCounter = RDHUtils::getPageCounter(rdh);
385 auto orbit = RDHUtils::getHeartBeatOrbit(rdh);
386 int bc = (int)RDHUtils::getTriggerBC(rdh);
387
388 if (mPrint) {
389 printf("%6d: V %X offset %4d packet %3d srcID %d cruID %2d dp %d link %2d orbit %u bc %4d trig 0x%08X p %d s %d",
390 (int)0, (int)rdhVersion, (int)pageSize,
391 (int)RDHUtils::getPacketCounter(rdh), (int)RDHUtils::getSourceID(rdh),
392 (int)cruID, (int)endPointID, (int)linkID,
393 orbit, bc, (int)triggerType, (int)pageCounter, (int)stopBit);
394 if ((triggerType & 0x800) != 0) {
395 printf(" <===");
396 }
397 printf("\n");
398 }
399 if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
400 return;
401 }
402
403 TFQueue& tfQueue = tfQueues[feeID][linkID];
404
405 // get the frame size from the RDH offsetToNext field
406 if (mPrint && false) {
407 std::cout << "pageSize=" << pageSize << std::endl;
408 }
409
410 // stop if the frame size is too small
411 if (pageSize < rdhHeaderSize) {
412 std::cout << mFrameMax << " - pageSize too small: " << pageSize << std::endl;
413 pc.services().get<ControlService>().endOfStream();
414 return;
415 }
416
417 // allocate or extend the output buffer
418 mTimeFrameBufs[feeID][linkID] = (char*)realloc(mTimeFrameBufs[feeID][linkID], mTimeFrameSizes[feeID][linkID] + pageSize);
419 if (mTimeFrameBufs[feeID][linkID] == nullptr) {
420 std::cout << mFrameMax << " - failed to allocate buffer" << std::endl;
421 pc.services().get<ControlService>().endOfStream();
422 return;
423 }
424
425 if (mPrint) {
426 std::cout << "Copying RDH into buf " << (int)feeID << "," << (int)linkID << std::endl;
427 std::cout << " frame size: " << mTimeFrameSizes[feeID][linkID] << std::endl;
428 }
429 // copy the RDH into the output buffer
430 memcpy(mTimeFrameBufs[feeID][linkID] + mTimeFrameSizes[feeID][linkID], &rdh, rdhHeaderSize);
431
432 // read the frame payload into the output buffer
433 if (mPrint) {
434 std::cout << "Reading " << pageSize - rdhHeaderSize << " for payload from input file\n";
435 std::cout << "Copying payload into buf " << (int)feeID << "," << (int)linkID << std::endl;
436 std::cout << " frame size: " << mTimeFrameSizes[feeID][linkID] << std::endl;
437 }
438 mInputFile.read(mTimeFrameBufs[feeID][linkID] + mTimeFrameSizes[feeID][linkID] + rdhHeaderSize, pageSize - rdhHeaderSize);
439
440 // stop if data cannot be read completely
441 if (mInputFile.fail()) {
442 if (mPrint) {
443 std::cout << "end of file reached" << std::endl;
444 }
445 pc.services().get<ControlService>().endOfStream();
446 return; // probably reached eof
447 }
448
449 // increment the total buffer size
450 mTimeFrameSizes[feeID][linkID] += pageSize;
451
452 if ((triggerType & 0x800) != 0 && /*stopBit == 0 && pageCounter == 0 &&*/ bc == 0) {
453 // This is the start of a new TimeFrame, so we need to push a new empty TimeFrame in the queue
454 if (mPrint) {
455 std::cout << "tfQueue.size(): " << tfQueue.size() << std::endl;
456 }
457 tfQueue.emplace();
458 tfQueue.back().firstOrbit = orbit;
459 }
460
461 if (stopBit && tfQueue.size() > 0) {
462 // we reached the end of the current HBFrame, we need to append it to the TimeFrame
463
464 if (mPrint) {
465 std::cout << "Appending HBF from " << (int)feeID << "," << (int)linkID << " to TF #" << tfQueue.size() << std::endl;
466 std::cout << " frame size: " << mTimeFrameSizes[feeID][linkID] << std::endl;
467 printHBF(mTimeFrameBufs[feeID][linkID], mTimeFrameSizes[feeID][linkID]);
468 }
469 if (!appendHBF(tfQueue.back(), mTimeFrameBufs[feeID][linkID], mTimeFrameSizes[feeID][linkID], true)) {
470 std::cout << mFrameMax << " - failed to append HBframe" << std::endl;
471 pc.services().get<ControlService>().endOfStream();
472 return;
473 }
474
475 // free the HBFrame buffer
476 free(mTimeFrameBufs[feeID][linkID]);
477 mTimeFrameBufs[feeID][linkID] = nullptr;
478 mTimeFrameSizes[feeID][linkID] = 0;
479
480 if (sendTF(pc)) {
481 break;
482 }
483 }
484 }
485 }
486
487 //_________________________________________________________________________________________________
489 {
490 setMessageHeader(pc, mTFIDInfo);
491
492 if (mFullTF) {
493 appendSTF(pc);
494 // pc.services().get<ControlService>().endOfStream();
495 return;
496 }
497
499 RDH rdh;
500 char* buf{nullptr};
501 size_t bufSize{0};
502
503 while (true) {
504
505 // stop if the required number of frames has been reached
506 if (mFrameMax == 0) {
507 pc.services().get<ControlService>().endOfStream();
508 return;
509 }
510
511 if (mPrint) {
512 printf("mFrameMax: %d\n", mFrameMax);
513 }
514 if (mFrameMax > 0) {
515 mFrameMax -= 1;
516 }
517
518 // read the next RDH, stop if no more data is available
519 mInputFile.read((char*)(&rdh), sizeof(RDH));
520 if (mInputFile.fail()) {
521 if (mPrint) {
522 std::cout << "end of file reached" << std::endl;
523 }
524 pc.services().get<ControlService>().endOfStream();
525 return; // probably reached eof
526 }
527
528 // check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
529 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
530 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
531 if (mPrint) {
532 std::cout << "header_version=" << (int)rdhVersion << std::endl;
533 }
534 if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
535 return;
536 }
537
538 // get the frame size from the RDH offsetToNext field
539 auto frameSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
540 if (mPrint) {
541 std::cout << "frameSize=" << frameSize << std::endl;
542 }
543
544 // stop if the frame size is too small
545 if (frameSize < rdhHeaderSize) {
546 std::cout << mFrameMax << " - frameSize too small: " << frameSize << std::endl;
547 pc.services().get<ControlService>().endOfStream();
548 return;
549 }
550
551 // allocate the output buffer
552 buf = (char*)realloc(buf, bufSize + frameSize);
553 if (buf == nullptr) {
554 std::cout << mFrameMax << " - failed to allocate buffer" << std::endl;
555 pc.services().get<ControlService>().endOfStream();
556 return;
557 }
558
559 // copy the RDH into the output buffer
560 memcpy(buf + bufSize, &rdh, rdhHeaderSize);
561
562 // read the frame payload into the output buffer
563 mInputFile.read(buf + bufSize + rdhHeaderSize, frameSize - rdhHeaderSize);
564
565 // stop if data cannot be read completely
566 if (mInputFile.fail()) {
567 if (mPrint) {
568 std::cout << "end of file reached" << std::endl;
569 }
570 free(buf);
571 pc.services().get<ControlService>().endOfStream();
572 return; // probably reached eof
573 }
574
575 // increment the total buffer size
576 bufSize += frameSize;
577
578 auto stopBit = o2::raw::RDHUtils::getStop(rdh);
579
580 // when requesting full HBframes, the output message is sent only when the stop RDH is reached
581 // otherwise we send one message for each CRU page
582 if ((stopBit != 0) || (mFullHBF == false)) {
583 // create the output message
584 auto freefct = [](void* data, void* /*hint*/) { free(data); };
585 pc.outputs().adoptChunk(Output{"RDT", "RAWDATA"}, buf, bufSize, freefct, nullptr);
586
587 // stop the readout loop
588 break;
589 }
590 } // while (true)
591 }
592
594 {
595 auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
596 if (tfid.firstTForbit != -1U) {
597 timingInfo.firstTForbit = tfid.firstTForbit;
598 }
599 if (tfid.tfCounter != -1U) {
600 timingInfo.tfCounter = tfid.tfCounter;
601 }
602 if (tfid.runNumber != -1U) {
603 timingInfo.runNumber = tfid.runNumber;
604 }
605 if (tfid.creation != -1U) {
606 timingInfo.creation = tfid.creation;
607 }
608 // LOGP(info, "TimingInfo set to : firstTForbit {}, tfCounter {}, runNumber {}, creatio {}", timingInfo.firstTForbit, timingInfo.tfCounter, timingInfo.runNumber, timingInfo.creation);
609 }
610
611 private:
612 std::ifstream mInputFile{};
613 int mFrameMax;
614 int mTimeFrameMax;
615 bool mFullHBF;
616 bool mFullTF;
617 bool mSaveTF;
618 int mOverlap;
619 int mLastTForbit{0};
620 bool mPrint = false;
621 o2::dataformats::TFIDInfo mTFIDInfo{}; // struct to modify output headers
622
623 char* mTimeFrameBufs[NFEEID][NLINKS] = {nullptr};
624 size_t mTimeFrameSizes[NFEEID][NLINKS] = {0};
625};
626
627//_________________________________________________________________________________________________
628// clang-format off
630{
631 return DataProcessorSpec{
632 specName,
633 Inputs{},
634 Outputs{OutputSpec{"RDT", "RAWDATA", 0, Lifetime::Sporadic}},
635 AlgorithmSpec{adaptFromTask<FileReaderTask>()},
636 Options{{"infile", VariantType::String, "", {"input file name"}},
637 {"nframes", VariantType::Int, -1, {"number of frames to process"}},
638 {"max-time-frame", VariantType::Int, -1, {"number of time frames to process"}},
639 {"full-hbf", VariantType::Bool, false, {"send full HeartBeat frames"}},
640 {"full-tf", VariantType::Bool, false, {"send full time frames"}},
641 {"save-tf", VariantType::Bool, false, {"save individual time frames to file"}},
642 {"overlap", VariantType::Int, 0, {"overlap between contiguous TimeFrames"}},
643 {"print", VariantType::Bool, false, {"verbose output"}}}};
644}
645// clang-format on
646
647} // end namespace raw
648} // end namespace mch
649} // end namespace o2
650
651// we need to add workflow options before including Framework/runDataProcessing
652void customize(std::vector<ConfigParamSpec>& workflowOptions)
653{
654 std::vector<ConfigParamSpec> options{{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}};
655 workflowOptions.insert(workflowOptions.end(), options.begin(), options.end());
656}
657
659
660using namespace o2;
661using namespace o2::framework;
662
664{
665 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
666 WorkflowSpec specs;
667
668 // The producer to generate some data in the workflow
669 DataProcessorSpec producer = mch::raw::getFileReaderSpec("mch-cru-page-reader");
670 specs.push_back(producer);
671
672 return specs;
673}
A raw page parser for DPL input.
uint64_t orbit
Definition RawEventData.h:6
uint64_t bc
Definition RawEventData.h:5
Definition of the RAW Data Header.
TBranch * ptr
const char * specName
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
bool appendHBF(TimeFrame &tf, char *framePtr, size_t frameSize, bool addHBF)
void printHBF(char *framePtr, size_t frameSize)
void init(framework::InitContext &ic)
void appendSTF(framework::ProcessingContext &pc)
bool sendTF(framework::ProcessingContext &pc)
void run(framework::ProcessingContext &pc)
void setMessageHeader(ProcessingContext &pc, const o2::dataformats::TFIDInfo &tfid) const
void customize(std::vector< ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(const ConfigContext &cfgc)
GLuint GLsizei bufSize
Definition glcorearb.h:790
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
o2::framework::DataProcessorSpec getFileReaderSpec(const char *specName)
TFQueue tfQueues[NFEEID][NLINKS]
std::queue< TimeFrame > TFQueue
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::unique_ptr< GPUReconstructionTimeframe > tf
std::vector< std::pair< size_t, size_t > > hbframes
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:58
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"