33 if (mPayloadLimit < 0) {
34 LOG(info) <<
"Compressor init";
36 LOG(info) <<
"Compressor init with Payload limit at " << mPayloadLimit;
39 auto decoderCONET = ic.
options().
get<
bool>(
"tof-compressor-conet-mode");
40 auto decoderVerbose = ic.
options().
get<
bool>(
"tof-compressor-decoder-verbose");
41 auto encoderVerbose = ic.
options().
get<
bool>(
"tof-compressor-encoder-verbose");
42 auto checkerVerbose = ic.
options().
get<
bool>(
"tof-compressor-checker-verbose");
43 mOutputBufferSize = ic.
options().
get<
int>(
"tof-compressor-output-buffer-size");
45 mCompressor.setDecoderCONET(decoderCONET);
46 mCompressor.setDecoderVerbose(decoderVerbose);
47 mCompressor.setEncoderVerbose(encoderVerbose);
48 mCompressor.setCheckerVerbose(checkerVerbose);
50 auto finishFunction = [
this]() {
51 mCompressor.checkSummary();
63 std::map<int, std::vector<o2::framework::DataRef>> subspecPartMap;
64 std::map<int, int> subspecBufferSize;
69 auto& inputs = pc.
inputs();
70 static size_t contDeadBeef = 0;
73 const auto* dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
75 if (payloadSize == 0) {
77 if (++contDeadBeef <= maxWarn) {
78 LOGP(alarm,
"Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
79 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
80 contDeadBeef == maxWarn ? fmt::format(
". {} such inputs in row received, stopping reporting", contDeadBeef) :
"");
101 auto headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
103 auto subspec = headerIn->subSpecification;
104 subspecPartMap[subspec].push_back(
ref);
107 if (!subspecBufferSize.count(subspec)) {
108 subspecBufferSize[subspec] = 0;
110 subspecBufferSize[subspec] += payloadInSize;
115 for (
auto& subspecPartEntry : subspecPartMap) {
117 auto subspec = subspecPartEntry.first;
118 auto parts = subspecPartEntry.second;
119 auto& firstPart = parts.at(0);
122 auto headerOut = *DataRefUtils::getHeader<o2::header::DataHeader*>(firstPart);
123 headerOut.dataDescription =
"CRAWDATA";
124 headerOut.payloadSize = 0;
125 headerOut.splitPayloadParts = 1;
128 auto bufferSize = mOutputBufferSize >= 0 ? mOutputBufferSize + subspecBufferSize[subspec] : std::abs(mOutputBufferSize);
129 auto bufferSizeDouble = bufferSize * 2;
130 auto output =
Output{headerOut.dataOrigin,
"CRAWDATA", headerOut.subSpecification};
132 v.resize(bufferSizeDouble);
136 auto bufferPointer =
v.data();
139 for (
const auto&
ref : parts) {
141 auto payloadIn =
ref.payload;
144 if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) {
145 LOG(error) <<
"Payload larger than limit (" << mPayloadLimit <<
"), payload = " << payloadInSize;
150 mCompressor.setDecoderBuffer(payloadIn);
151 mCompressor.setDecoderBufferSize(payloadInSize);
152 mCompressor.setEncoderBuffer(bufferPointer);
153 mCompressor.setEncoderBufferSize(bufferSize);
157 auto payloadOutSize = mCompressor.getEncoderByteCounter();
158 bufferPointer += payloadOutSize;
159 bufferSize -= payloadOutSize;
160 headerOut.payloadSize += payloadOutSize;
163 if (headerOut.payloadSize > bufferSizeDouble) {
164 headerOut.payloadSize = 0;
167 v.resize(headerOut.payloadSize);
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.