Project
Loading...
Searching...
No Matches
file-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
17#include "Framework/Task.h"
26#include <fairmq/Device.h>
27#include <fairmq/Parts.h>
28
31
32#include <iostream>
33
34using namespace o2::framework;
35
36class FileProxyTask : public Task
37{
38 public:
39 FileProxyTask() = default;
40 ~FileProxyTask() override = default;
41
42 void init(InitContext& ic) final;
43 void run(ProcessingContext& pc) final;
44
45 private:
46 long readFLP()
47 {
48 if (mRDHversion == 4) {
49 return readFLPv4();
50 }
51 if (mRDHversion == 6) {
52 return readFLPv6();
53 }
54 if (mRDHversion == 7) {
55 return readFLPv7();
56 }
57 return 0;
58 };
59 long readFLPv4();
60 long readFLPv6();
61 long readFLPv7();
62 long readCONET();
63
64 bool mStatus = false;
65 bool mCONET = false;
66 int mRDHversion = 4;
67 bool mDumpData = false;
68 std::ifstream mFile;
69 char mBuffer[4194304];
70};
71
73{
74 auto infilename = ic.options().get<std::string>("atc-file-proxy-input-filename");
75 auto startfrom = ic.options().get<int>("atc-file-proxy-start-from");
76 mCONET = ic.options().get<bool>("atc-file-proxy-conet-mode");
77 mDumpData = ic.options().get<bool>("atc-file-proxy-dump-data");
78 mRDHversion = ic.options().get<int>("atc-file-proxy-rdh-version");
79
81 std::cout << " --- Opening input file: " << infilename << std::endl;
82 mFile.open(infilename, std::fstream::in | std::fstream::binary);
83 if (!mFile.is_open()) {
84 std::cout << " --- File is not open " << std::endl;
85 mStatus = true;
86 }
87 std::cout << " --- Start reading from byte offset: " << startfrom << std::endl;
88 mFile.seekg(startfrom, std::ios::beg);
89
90 if (mCONET) {
91 std::cout << " --- CONET mode " << std::endl;
92 }
93};
94
95long FileProxyTask::readFLPv4()
96{
98 char* pointer = mBuffer;
99 if (!mFile.read(pointer, 64)) {
100 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
101 mStatus = true;
102 return 0;
103 }
104 long payload = 64;
105 auto rdh = reinterpret_cast<o2::header::RAWDataHeaderV4*>(pointer);
106 while (!rdh->stop) {
107 if (!mFile.read(pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
108 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
109 mStatus = true;
110 return 0;
111 }
112 payload += (rdh->offsetToNext - rdh->headerSize);
113 pointer += rdh->offsetToNext;
114 if (!mFile.read(pointer, 64)) {
115 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
116 mStatus = true;
117 return 0;
118 }
119 payload += 64;
120 rdh = reinterpret_cast<o2::header::RAWDataHeaderV4*>(pointer);
121 }
122
123 return payload;
124}
125
126long FileProxyTask::readFLPv6()
127{
129 char* pointer = mBuffer;
130 if (!mFile.read(pointer, 64)) {
131 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
132 mStatus = true;
133 return 0;
134 }
135 long payload = 64;
136 auto rdh = reinterpret_cast<o2::header::RAWDataHeaderV6*>(pointer);
137 while (!rdh->stop) {
138 if (!mFile.read(pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
139 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
140 mStatus = true;
141 return 0;
142 }
143 payload += (rdh->offsetToNext - rdh->headerSize);
144 pointer += rdh->offsetToNext;
145 if (!mFile.read(pointer, 64)) {
146 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
147 mStatus = true;
148 return 0;
149 }
150 payload += 64;
151 rdh = reinterpret_cast<o2::header::RAWDataHeaderV6*>(pointer);
152 }
153
154 return payload;
155}
156
157long FileProxyTask::readFLPv7()
158{
160 char* pointer = mBuffer;
161 if (!mFile.read(pointer, 64)) {
162 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
163 mStatus = true;
164 return 0;
165 }
166 long payload = 64;
167 auto rdh = reinterpret_cast<o2::header::RAWDataHeaderV7*>(pointer);
168 while (!rdh->stop) {
169 if (!mFile.read(pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
170 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
171 mStatus = true;
172 return 0;
173 }
174 payload += (rdh->offsetToNext - rdh->headerSize);
175 pointer += rdh->offsetToNext;
176 if (!mFile.read(pointer, 64)) {
177 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
178 mStatus = true;
179 return 0;
180 }
181 payload += 64;
182 rdh = reinterpret_cast<o2::header::RAWDataHeaderV7*>(pointer);
183 }
184
185 return payload;
186}
187
188long FileProxyTask::readCONET()
189{
190
191 uint32_t word;
192 auto current = mFile.tellg();
193
195 if (!mFile.read((char*)&word, 4)) {
196 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
197 mStatus = true;
198 return 0;
199 }
200 auto bytePayload = word * 4;
201 std::cout << " --- tofBuffer: " << word << " (" << bytePayload << " bytes) at " << current << std::endl;
202
204 if (!mFile.read(mBuffer, bytePayload)) {
205 std::cout << " --- Cannot read input file: " << strerror(errno) << std::endl;
206 mStatus = true;
207 return 0;
208 }
209 return bytePayload;
210}
211
213{
214
216 if (mStatus) {
217 mFile.close();
219 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
220 return;
221 }
222
224 int payload = 0;
225 if (mCONET) {
226 payload = readCONET();
227 } else {
228 payload = readFLP();
229 }
230 if (payload == 0) {
231 mStatus = true;
232 return;
233 }
234
235 if (mDumpData) {
236 std::cout << " --- dump data: " << payload << " bytes" << std::endl;
237 uint32_t* word = reinterpret_cast<uint32_t*>(mBuffer);
238 for (int i = 0; i < payload / 4; ++i) {
239 printf(" 0x%08x \n", *word);
240 word++;
241 }
242 std::cout << " --- end of dump data " << std::endl;
243 }
244
246 auto device = pc.services().get<o2::framework::RawDeviceService>().device();
247 auto outputRoutes = pc.services().get<o2::framework::RawDeviceService>().spec().outputs;
248 auto fairMQChannel = outputRoutes.at(0).channel;
249 auto payloadMessage = device->NewMessage(payload);
250 std::memcpy(payloadMessage->GetData(), mBuffer, payload);
251 o2::header::DataHeader header("RAWDATA", "TOF", 0);
252 header.payloadSize = payload;
253 o2::framework::DataProcessingHeader dataProcessingHeader{0};
254 o2::header::Stack headerStack{header, dataProcessingHeader};
255 auto headerMessage = device->NewMessage(headerStack.size());
256 std::memcpy(headerMessage->GetData(), headerStack.data(), headerStack.size());
257
259 fair::mq::Parts parts;
260 parts.AddPart(std::move(headerMessage));
261 parts.AddPart(std::move(payloadMessage));
262 device->Send(parts, fairMQChannel);
263
265 if (mFile.eof()) {
266 std::cout << " --- End of file " << std::endl;
267 mStatus = true;
268 }
269};
270
271// add workflow options, note that customization needs to be declared before
272// including Framework/runDataProcessing
273void customize(std::vector<ConfigParamSpec>& workflowOptions)
274{
275}
276
277#include "Framework/runDataProcessing.h" // the main driver
278
281{
282 return WorkflowSpec{
283 DataProcessorSpec{"file-proxy",
284 Inputs{},
285 Outputs{OutputSpec(ConcreteDataTypeMatcher{"TOF", "RAWDATA"})},
286 AlgorithmSpec(adaptFromTask<FileProxyTask>()),
287 Options{
288 {"atc-file-proxy-input-filename", VariantType::String, "", {"Input file name"}},
289 {"atc-file-proxy-start-from", VariantType::Int, 0, {"Start reading from byte"}},
290 {"atc-file-proxy-dump-data", VariantType::Bool, false, {"Dump data"}},
291 {"atc-file-proxy-rdh-version", VariantType::Int, 7, {"RDH version"}},
292 {"atc-file-proxy-conet-mode", VariantType::Bool, false, {"CONET mode"}}}}};
293}
int32_t i
Definition of the RAW Data Header.
TOF raw data format.
void init(InitContext &ic) final
void run(ProcessingContext &pc) final
~FileProxyTask() override=default
FileProxyTask()=default
ConfigParamRegistry const & options()
Definition InitContext.h:33
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
GLenum void ** pointer
Definition glcorearb.h:805
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
the main header struct
Definition DataHeader.h:618
PayloadSizeType payloadSize
Definition DataHeader.h:666
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36