26#include <fairmq/Device.h>
27#include <fairmq/Parts.h>
48 if (mRDHversion == 4) {
51 if (mRDHversion == 6) {
54 if (mRDHversion == 7) {
67 bool mDumpData =
false;
69 char mBuffer[4194304];
74 auto infilename = ic.
options().
get<std::string>(
"atc-file-proxy-input-filename");
75 auto startfrom = ic.
options().
get<
int>(
"atc-file-proxy-start-from");
76 mCONET = ic.
options().
get<
bool>(
"atc-file-proxy-conet-mode");
77 mDumpData = ic.
options().
get<
bool>(
"atc-file-proxy-dump-data");
78 mRDHversion = ic.
options().
get<
int>(
"atc-file-proxy-rdh-version");
81 std::cout <<
" --- Opening input file: " << infilename << std::endl;
82 mFile.open(infilename, std::fstream::in | std::fstream::binary);
83 if (!mFile.is_open()) {
84 std::cout <<
" --- File is not open " << std::endl;
87 std::cout <<
" --- Start reading from byte offset: " << startfrom << std::endl;
88 mFile.seekg(startfrom, std::ios::beg);
91 std::cout <<
" --- CONET mode " << std::endl;
95long FileProxyTask::readFLPv4()
100 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
107 if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
108 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
112 payload += (rdh->offsetToNext - rdh->headerSize);
114 if (!mFile.read(
pointer, 64)) {
115 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
126long FileProxyTask::readFLPv6()
130 if (!mFile.read(
pointer, 64)) {
131 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
138 if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
139 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
143 payload += (rdh->offsetToNext - rdh->headerSize);
145 if (!mFile.read(
pointer, 64)) {
146 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
157long FileProxyTask::readFLPv7()
161 if (!mFile.read(
pointer, 64)) {
162 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
169 if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
170 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
174 payload += (rdh->offsetToNext - rdh->headerSize);
176 if (!mFile.read(
pointer, 64)) {
177 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
188long FileProxyTask::readCONET()
192 auto current = mFile.tellg();
195 if (!mFile.read((
char*)&word, 4)) {
196 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
200 auto bytePayload = word * 4;
201 std::cout <<
" --- tofBuffer: " << word <<
" (" << bytePayload <<
" bytes) at " << current << std::endl;
204 if (!mFile.read(mBuffer, bytePayload)) {
205 std::cout <<
" --- Cannot read input file: " << strerror(errno) << std::endl;
226 payload = readCONET();
236 std::cout <<
" --- dump data: " << payload <<
" bytes" << std::endl;
237 uint32_t* word =
reinterpret_cast<uint32_t*
>(mBuffer);
238 for (
int i = 0;
i < payload / 4; ++
i) {
239 printf(
" 0x%08x \n", *word);
242 std::cout <<
" --- end of dump data " << std::endl;
248 auto fairMQChannel = outputRoutes.at(0).channel;
249 auto payloadMessage = device->NewMessage(payload);
250 std::memcpy(payloadMessage->GetData(), mBuffer, payload);
255 auto headerMessage = device->NewMessage(headerStack.size());
256 std::memcpy(headerMessage->GetData(), headerStack.data(), headerStack.size());
259 fair::mq::Parts parts;
260 parts.AddPart(std::move(headerMessage));
261 parts.AddPart(std::move(payloadMessage));
262 device->Send(parts, fairMQChannel);
266 std::cout <<
" --- End of file " << std::endl;
273void customize(std::vector<ConfigParamSpec>& workflowOptions)
288 {
"atc-file-proxy-input-filename", VariantType::String,
"", {
"Input file name"}},
289 {
"atc-file-proxy-start-from", VariantType::Int, 0, {
"Start reading from byte"}},
290 {
"atc-file-proxy-dump-data", VariantType::Bool,
false, {
"Dump data"}},
291 {
"atc-file-proxy-rdh-version", VariantType::Int, 7, {
"RDH version"}},
292 {
"atc-file-proxy-conet-mode", VariantType::Bool,
false, {
"CONET mode"}}}}};
void init(InitContext &ic) final
void run(ProcessingContext &pc) final
~FileProxyTask() override=default
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs