Project
Loading...
Searching...
No Matches
CompressorTask.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
24
25using namespace o2::framework;
26
27namespace o2::tof
28{
29
30template <typename RDH, bool verbose, bool paranoid>
32{
33 if (mPayloadLimit < 0) {
34 LOG(info) << "Compressor init";
35 } else {
36 LOG(info) << "Compressor init with Payload limit at " << mPayloadLimit;
37 }
38
39 auto decoderCONET = ic.options().get<bool>("tof-compressor-conet-mode");
40 auto decoderVerbose = ic.options().get<bool>("tof-compressor-decoder-verbose");
41 auto encoderVerbose = ic.options().get<bool>("tof-compressor-encoder-verbose");
42 auto checkerVerbose = ic.options().get<bool>("tof-compressor-checker-verbose");
43 mOutputBufferSize = ic.options().get<int>("tof-compressor-output-buffer-size");
44
45 mCompressor.setDecoderCONET(decoderCONET);
46 mCompressor.setDecoderVerbose(decoderVerbose);
47 mCompressor.setEncoderVerbose(encoderVerbose);
48 mCompressor.setCheckerVerbose(checkerVerbose);
49
50 auto finishFunction = [this]() {
51 mCompressor.checkSummary();
52 };
53
54 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(finishFunction);
55}
56
57template <typename RDH, bool verbose, bool paranoid>
59{
60 LOG(debug) << "Compressor run";
61
63 std::map<int, std::vector<o2::framework::DataRef>> subspecPartMap;
64 std::map<int, int> subspecBufferSize;
65
66 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
67 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
68 {
69 auto& inputs = pc.inputs();
70 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
71 std::vector<InputSpec> dummy{InputSpec{"dummy", ConcreteDataMatcher{"TOF", "RAWDATA", 0xDEADBEEF}}};
72 for (const auto& ref : InputRecordWalker(inputs, dummy)) {
73 const auto* dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
74 auto payloadSize = DataRefUtils::getPayloadSize(ref);
75 if (payloadSize == 0) {
77 if (++contDeadBeef <= maxWarn) {
78 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
79 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
80 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
81 }
82 pc.outputs().cookDeadBeef(Output{"TOF", "CRAWDATA", dh->subSpecification});
83 return;
84 }
85 }
86 contDeadBeef = 0; // if good data, reset the counter
87 }
88
90 std::vector<InputSpec> sel{InputSpec{"filter", ConcreteDataTypeMatcher{"TOF", "RAWDATA"}}};
91 for (const auto& ref : InputRecordWalker(pc.inputs(), sel)) {
92 // for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) {
93 // if (!iit.isValid()) {
94 // continue;
95 // }
96
98 // for (auto const& ref : iit) {
99
101 auto headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
102 auto payloadInSize = DataRefUtils::getPayloadSize(ref);
103 auto subspec = headerIn->subSpecification;
104 subspecPartMap[subspec].push_back(ref);
105
107 if (!subspecBufferSize.count(subspec)) {
108 subspecBufferSize[subspec] = 0;
109 }
110 subspecBufferSize[subspec] += payloadInSize;
111 // }
112 }
113
115 for (auto& subspecPartEntry : subspecPartMap) {
116
117 auto subspec = subspecPartEntry.first;
118 auto parts = subspecPartEntry.second;
119 auto& firstPart = parts.at(0);
120
122 auto headerOut = *DataRefUtils::getHeader<o2::header::DataHeader*>(firstPart);
123 headerOut.dataDescription = "CRAWDATA";
124 headerOut.payloadSize = 0;
125 headerOut.splitPayloadParts = 1;
126
128 auto bufferSize = mOutputBufferSize >= 0 ? mOutputBufferSize + subspecBufferSize[subspec] : std::abs(mOutputBufferSize);
129 auto bufferSizeDouble = bufferSize * 2;
130 auto output = Output{headerOut.dataOrigin, "CRAWDATA", headerOut.subSpecification};
131 auto&& v = pc.outputs().makeVector<char>(output);
132 v.resize(bufferSizeDouble);
133 // Better way of doing this would be to used an offset, so that we can resize the vector
134 // as well. However, this should be good enough because bufferSize overestimates the size
135 // of the payload.
136 auto bufferPointer = v.data();
137
139 for (const auto& ref : parts) {
141 auto payloadIn = ref.payload;
142 auto payloadInSize = DataRefUtils::getPayloadSize(ref);
143
144 if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) {
145 LOG(error) << "Payload larger than limit (" << mPayloadLimit << "), payload = " << payloadInSize;
146 continue;
147 }
148
150 mCompressor.setDecoderBuffer(payloadIn);
151 mCompressor.setDecoderBufferSize(payloadInSize);
152 mCompressor.setEncoderBuffer(bufferPointer);
153 mCompressor.setEncoderBufferSize(bufferSize);
154
156 mCompressor.run();
157 auto payloadOutSize = mCompressor.getEncoderByteCounter();
158 bufferPointer += payloadOutSize;
159 bufferSize -= payloadOutSize;
160 headerOut.payloadSize += payloadOutSize;
161 }
162
163 if (headerOut.payloadSize > bufferSizeDouble) {
164 headerOut.payloadSize = 0; // put payload to zero, otherwise it will trigger a crash
165 }
166
167 v.resize(headerOut.payloadSize);
168 pc.outputs().adoptContainer(output, std::move(v));
169 }
170}
171
176
177} // namespace o2
TOF raw data compressor task.
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
void cookDeadBeef(const Output &spec)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
void run(ProcessingContext &pc) final
void init(InitContext &ic) final
const GLdouble * v
Definition glcorearb.h:832
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"