33  if (mPayloadLimit < 0) {
 
   34    LOG(info) << 
"Compressor init";
 
   36    LOG(info) << 
"Compressor init with Payload limit at " << mPayloadLimit;
 
   39  auto decoderCONET = ic.
options().
get<
bool>(
"tof-compressor-conet-mode");
 
   40  auto decoderVerbose = ic.
options().
get<
bool>(
"tof-compressor-decoder-verbose");
 
   41  auto encoderVerbose = ic.
options().
get<
bool>(
"tof-compressor-encoder-verbose");
 
   42  auto checkerVerbose = ic.
options().
get<
bool>(
"tof-compressor-checker-verbose");
 
   43  mOutputBufferSize = ic.
options().
get<
int>(
"tof-compressor-output-buffer-size");
 
   45  mCompressor.setDecoderCONET(decoderCONET);
 
   46  mCompressor.setDecoderVerbose(decoderVerbose);
 
   47  mCompressor.setEncoderVerbose(encoderVerbose);
 
   48  mCompressor.setCheckerVerbose(checkerVerbose);
 
   50  auto finishFunction = [
this]() {
 
   51    mCompressor.checkSummary();
 
 
   63  std::map<int, std::vector<o2::framework::DataRef>> subspecPartMap;
 
   64  std::map<int, int> subspecBufferSize;
 
   69    auto& inputs = pc.
inputs();
 
   70    static size_t contDeadBeef = 0; 
 
   73      const auto* dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
 
   75      if (payloadSize == 0) {
 
   77        if (++contDeadBeef <= maxWarn) {
 
   78          LOGP(alarm, 
"Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
 
   79               dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
 
   80               contDeadBeef == maxWarn ? fmt::format(
". {} such inputs in row received, stopping reporting", contDeadBeef) : 
"");
 
  101    auto headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
 
  103    auto subspec = headerIn->subSpecification;
 
  104    subspecPartMap[subspec].push_back(
ref);
 
  107    if (!subspecBufferSize.count(subspec)) {
 
  108      subspecBufferSize[subspec] = 0;
 
  110    subspecBufferSize[subspec] += payloadInSize;
 
  115  for (
auto& subspecPartEntry : subspecPartMap) {
 
  117    auto subspec = subspecPartEntry.first;
 
  118    auto parts = subspecPartEntry.second;
 
  119    auto& firstPart = parts.at(0);
 
  122    auto headerOut = *DataRefUtils::getHeader<o2::header::DataHeader*>(firstPart);
 
  123    headerOut.dataDescription = 
"CRAWDATA";
 
  124    headerOut.payloadSize = 0;
 
  125    headerOut.splitPayloadParts = 1;
 
  128    auto bufferSize = mOutputBufferSize >= 0 ? mOutputBufferSize + subspecBufferSize[subspec] : std::abs(mOutputBufferSize);
 
  129    auto bufferSizeDouble = bufferSize * 2;
 
  130    auto output = 
Output{headerOut.dataOrigin, 
"CRAWDATA", headerOut.subSpecification};
 
  132    v.resize(bufferSizeDouble);
 
  136    auto bufferPointer = 
v.data();
 
  139    for (
const auto& 
ref : parts) {
 
  141      auto payloadIn = 
ref.payload;
 
  144      if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) {
 
  145        LOG(error) << 
"Payload larger than limit (" << mPayloadLimit << 
"), payload = " << payloadInSize;
 
  150      mCompressor.setDecoderBuffer(payloadIn);
 
  151      mCompressor.setDecoderBufferSize(payloadInSize);
 
  152      mCompressor.setEncoderBuffer(bufferPointer);
 
  153      mCompressor.setEncoderBufferSize(bufferSize);
 
  157      auto payloadOutSize = mCompressor.getEncoderByteCounter();
 
  158      bufferPointer += payloadOutSize;
 
  159      bufferSize -= payloadOutSize;
 
  160      headerOut.payloadSize += payloadOutSize;
 
  163    if (headerOut.payloadSize > bufferSizeDouble) {
 
  164      headerOut.payloadSize = 0; 
 
  167    v.resize(headerOut.payloadSize);
 
 
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.