26#include <fairmq/Device.h> 
   27#include <fairmq/Parts.h> 
   48    if (mRDHversion == 4) {
 
   51    if (mRDHversion == 6) {
 
   54    if (mRDHversion == 7) {
 
   67  bool mDumpData = 
false;
 
   69  char mBuffer[4194304];
 
 
   74  auto infilename = ic.
options().
get<std::string>(
"atc-file-proxy-input-filename");
 
   75  auto startfrom = ic.
options().
get<
int>(
"atc-file-proxy-start-from");
 
   76  mCONET = ic.
options().
get<
bool>(
"atc-file-proxy-conet-mode");
 
   77  mDumpData = ic.
options().
get<
bool>(
"atc-file-proxy-dump-data");
 
   78  mRDHversion = ic.
options().
get<
int>(
"atc-file-proxy-rdh-version");
 
   81  std::cout << 
" --- Opening input file: " << infilename << std::endl;
 
   82  mFile.open(infilename, std::fstream::in | std::fstream::binary);
 
   83  if (!mFile.is_open()) {
 
   84    std::cout << 
" --- File is not open " << std::endl;
 
   87  std::cout << 
" --- Start reading from byte offset: " << startfrom << std::endl;
 
   88  mFile.seekg(startfrom, std::ios::beg);
 
   91    std::cout << 
" --- CONET mode " << std::endl;
 
 
   95long FileProxyTask::readFLPv4()
 
  100    std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  107    if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
 
  108      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  112    payload += (rdh->offsetToNext - rdh->headerSize);
 
  114    if (!mFile.read(
pointer, 64)) {
 
  115      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  126long FileProxyTask::readFLPv6()
 
  130  if (!mFile.read(
pointer, 64)) {
 
  131    std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  138    if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
 
  139      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  143    payload += (rdh->offsetToNext - rdh->headerSize);
 
  145    if (!mFile.read(
pointer, 64)) {
 
  146      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  157long FileProxyTask::readFLPv7()
 
  161  if (!mFile.read(
pointer, 64)) {
 
  162    std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  169    if (!mFile.read(
pointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize)) {
 
  170      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  174    payload += (rdh->offsetToNext - rdh->headerSize);
 
  176    if (!mFile.read(
pointer, 64)) {
 
  177      std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  188long FileProxyTask::readCONET()
 
  192  auto current = mFile.tellg();
 
  195  if (!mFile.read((
char*)&word, 4)) {
 
  196    std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  200  auto bytePayload = word * 4;
 
  201  std::cout << 
" --- tofBuffer: " << word << 
" (" << bytePayload << 
" bytes) at " << current << std::endl;
 
  204  if (!mFile.read(mBuffer, bytePayload)) {
 
  205    std::cout << 
" --- Cannot read input file: " << strerror(errno) << std::endl;
 
  226    payload = readCONET();
 
  236    std::cout << 
" --- dump data: " << payload << 
" bytes" << std::endl;
 
  237    uint32_t* word = 
reinterpret_cast<uint32_t*
>(mBuffer);
 
  238    for (
int i = 0; 
i < payload / 4; ++
i) {
 
  239      printf(
" 0x%08x \n", *word);
 
  242    std::cout << 
" --- end of dump data " << std::endl;
 
  248  auto fairMQChannel = outputRoutes.at(0).channel;
 
  249  auto payloadMessage = device->NewMessage(payload);
 
  250  std::memcpy(payloadMessage->GetData(), mBuffer, payload);
 
  255  auto headerMessage = device->NewMessage(headerStack.size());
 
  256  std::memcpy(headerMessage->GetData(), headerStack.data(), headerStack.size());
 
  259  fair::mq::Parts parts;
 
  260  parts.AddPart(std::move(headerMessage));
 
  261  parts.AddPart(std::move(payloadMessage));
 
  262  device->Send(parts, fairMQChannel);
 
  266    std::cout << 
" --- End of file " << std::endl;
 
 
  273void customize(std::vector<ConfigParamSpec>& workflowOptions)
 
 
  288                        {
"atc-file-proxy-input-filename", VariantType::String, 
"", {
"Input file name"}},
 
  289                        {
"atc-file-proxy-start-from", VariantType::Int, 0, {
"Start reading from byte"}},
 
  290                        {
"atc-file-proxy-dump-data", VariantType::Bool, 
false, {
"Dump data"}},
 
  291                        {
"atc-file-proxy-rdh-version", VariantType::Int, 7, {
"RDH version"}},
 
  292                        {
"atc-file-proxy-conet-mode", VariantType::Bool, 
false, {
"CONET mode"}}}}};
 
 
void init(InitContext &ic) final
void run(ProcessingContext &pc) final
~FileProxyTask() override=default
T get(const char *key) const
ConfigParamRegistry const & options()
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs