28#include <fairmq/Device.h>
30#include <arrow/ipc/writer.h>
31#include <arrow/type.h>
32#include <arrow/io/memory.h>
33#include <arrow/util/config.h>
35#include <TClonesArray.h>
50 : mRegistry{contextRegistry}
54RouteIndex DataAllocator::matchDataHeader(
const Output& spec,
size_t timeslice)
56 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
59 for (
auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
60 auto& route = allowedOutputRoutes[ri];
64 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"data_allocator",
"Route %" PRIu64
" (%{public}s) created for timeslice %" PRIu64,
70 "Worker is not authorised to create message with "
71 "origin(%s) description(%s) subSpec(%d)",
72 spec.
origin.
as<std::string>().c_str(),
80 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
83 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
97 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
107fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(
Output const& spec,
116 dh.subSpecification = spec.
subSpec;
117 dh.payloadSize = payloadSize;
118 dh.payloadSerializationMethod = method;
119 dh.tfCounter = timingInfo.tfCounter;
120 dh.firstTForbit = timingInfo.firstTForbit;
121 dh.runNumber = timingInfo.runNumber;
128 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
132void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payloadMessage,
const Output& spec,
135 auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0);
139 const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
142 O2_SIGNPOST_START(parts, pid,
"parts",
"addPartToContext %{public}s@%p %" PRIu64,
144 auto& context = mRegistry.
get<MessageContext>();
148 context.make_scoped<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
153 std::unique_ptr<std::string> payload(
ptr);
155 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
159 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
163 mRegistry.
get<
StringContext>().addString(std::move(header), std::move(payload), routeIndex);
164 assert(payload.get() ==
nullptr);
167void doWriteTable(std::shared_ptr<FairMQResizableBuffer>
b, arrow::Table* table)
169 auto mock = std::make_shared<arrow::io::MockOutputStream>();
171 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), table->schema());
172 arrow::Status outStatus;
174 outStatus = mockWriter.ValueOrDie()->WriteTable(*table);
176 std::vector<std::shared_ptr<arrow::Array>> columns;
177 columns.resize(table->columns().size());
178 for (
size_t ci = 0; ci < table->columns().
size(); ci++) {
179 columns[ci] = table->column(ci)->chunk(0);
181 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
182 outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
187 if (reserve.ok() ==
false) {
188 throw std::runtime_error(
"Unable to reserve memory for table");
191 auto stream = std::make_shared<FairMQOutputStream>(
b);
192 auto outBatch = arrow::ipc::MakeStreamWriter(
stream.get(), table->schema());
193 if (outBatch.ok() ==
false) {
194 throw ::std::runtime_error(
"Unable to create batch writer");
198 std::vector<std::shared_ptr<arrow::Array>> columns;
199 columns.resize(table->columns().size());
200 for (
size_t ci = 0; ci < table->columns().
size(); ci++) {
201 columns[ci] = table->column(ci)->chunk(0);
203 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
204 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
206 outStatus = outBatch.ValueOrDie()->WriteTable(*table);
209 if (outStatus.ok() ==
false) {
210 throw std::runtime_error(
"Unable to Write table");
214void doWriteBatch(std::shared_ptr<FairMQResizableBuffer>
b, arrow::RecordBatch* batch)
216 auto mock = std::make_shared<arrow::io::MockOutputStream>();
218 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
219 arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
223 if (reserve.ok() ==
false) {
224 throw std::runtime_error(
"Unable to reserve memory for table");
227 auto stream = std::make_shared<FairMQOutputStream>(
b);
230 auto outBatch = arrow::ipc::MakeStreamWriter(
stream.get(), batch->schema());
231 if (outBatch.ok() ==
false) {
232 throw ::std::runtime_error(
"Unable to create batch writer");
235 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
237 if (outStatus.ok() ==
false) {
238 throw std::runtime_error(
"Unable to Write batch");
245 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
247 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
253 auto creator = [transport = context.proxy().
getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
254 return transport->CreateMessage(s);
256 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
259 auto table = builder.finalize();
265 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
269 context.addBuffer(std::move(header),
buffer, std::move(finalizer), routeIndex);
275 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
280 auto creator = [transport = context.proxy().
getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
281 return transport->CreateMessage(s);
283 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
288 auto table =
tree.finalize();
296 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
301 context.addBuffer(std::move(header),
buffer, std::move(finalizer), routeIndex);
307 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
312 auto creator = [transport = context.proxy().
getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
313 return transport->CreateMessage(s);
315 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
320 auto batch =
source.finalize();
328 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
333 context.addBuffer(std::move(header),
buffer, std::move(finalizer), routeIndex);
339 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
343 auto creator = [transport = context.proxy().
getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
344 return transport->CreateMessage(s);
346 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
348 auto writer = [table =
ptr](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
352 context.addBuffer(std::move(header),
buffer, std::move(writer), routeIndex);
361 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
362 fair::mq::MessagePtr payloadMessage(proxy.createOutputMessage(routeIndex, payloadSize));
363 memcpy(payloadMessage->GetData(), payload, payloadSize);
365 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationMethod);
370 if (
ref.label.empty()) {
373 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
374 for (
auto ri = 0ul, re = allowedOutputRoutes.size(); ri != re; ++ri) {
375 if (allowedOutputRoutes[ri].matcher.binding.value ==
ref.label) {
376 auto spec = allowedOutputRoutes[ri].matcher;
378 return Output{dataType.
origin, dataType.description,
ref.subSpec, std::move(
ref.headerStack)};
381 std::string availableRoutes;
382 for (
auto const& route : allowedOutputRoutes) {
385 throw runtime_error_f(
"Unable to find OutputSpec with label %s. Available Routes: %s",
ref.label.c_str(), availableRoutes.c_str());
391 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
392 for (
auto const& route : allowedOutputRoutes) {
405 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
410 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
412 payloadMessage->GetSize()
425 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
#define O2_BUILTIN_UNREACHABLE
#define O2_BUILTIN_LIKELY(x)
#define O2_BUILTIN_UNLIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
void adopt(const Output &spec, std::string *)
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
DataAllocator(ServiceRegistryRef ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
fair::mq::TransportFactory * getOutputTransport(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
TrivialObject handles a message object.
auto & add(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
GLboolean GLboolean GLboolean b
GLsizei GLsizei GLchar * source
GLsizei const GLfloat * value
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
void doWriteBatch(std::shared_ptr< FairMQResizableBuffer > b, arrow::RecordBatch *batch)
void doWriteTable(std::shared_ptr< FairMQResizableBuffer > b, arrow::Table *table)
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
std::function< void(T &)> callback
header::DataHeader::SubSpecificationType subSpec
header::DataDescription description
header::DataOrigin origin
std::vector< bool > routeCreated
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))