29#include <fairmq/Device.h>
31#include <arrow/ipc/writer.h>
32#include <arrow/type.h>
33#include <arrow/io/memory.h>
34#include <arrow/util/config.h>
36#include <TClonesArray.h>
51 : mRegistry{contextRegistry}
55RouteIndex DataAllocator::matchDataHeader(
const Output& spec,
size_t timeslice)
57 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
60 for (
auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
61 auto& route = allowedOutputRoutes[ri];
65 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"data_allocator",
"Route %" PRIu64
" (%{public}s) created for timeslice %" PRIu64,
71 "Worker is not authorised to create message with "
72 "origin(%s) description(%s) subSpec(%d)",
73 spec.
origin.
as<std::string>().c_str(),
81 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
84 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
98 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
108fair::mq::MessagePtr DataAllocator::headerMessageFromOutput(
Output const& spec,
117 dh.subSpecification = spec.
subSpec;
118 dh.payloadSize = payloadSize;
119 dh.payloadSerializationMethod = method;
120 dh.tfCounter = timingInfo.tfCounter;
121 dh.firstTForbit = timingInfo.firstTForbit;
122 dh.runNumber = timingInfo.runNumber;
131 auto* transport = proxy.getOutputTransport(routeIndex);
133 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
137void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr&& payloadMessage,
const Output& spec,
140 auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0);
144 const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
147 O2_SIGNPOST_START(parts, pid,
"parts",
"addPartToContext %{public}s@%p %" PRIu64,
149 auto& context = mRegistry.
get<MessageContext>();
153 context.make_scoped<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
158 std::unique_ptr<std::string> payload(
ptr);
160 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
164 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
168 mRegistry.
get<
StringContext>().addString(std::move(header), std::move(payload), routeIndex);
169 assert(payload.get() ==
nullptr);
172void doWriteTable(std::shared_ptr<FairMQResizableBuffer>
b, arrow::Table* table)
174 auto mock = std::make_shared<arrow::io::MockOutputStream>();
176 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), table->schema());
177 arrow::Status outStatus;
179 outStatus = mockWriter.ValueOrDie()->WriteTable(*table);
181 std::vector<std::shared_ptr<arrow::Array>> columns;
182 columns.resize(table->columns().size());
183 for (
size_t ci = 0; ci < table->columns().
size(); ci++) {
184 columns[ci] = table->column(ci)->chunk(0);
186 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
187 outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
192 if (reserve.ok() ==
false) {
193 throw std::runtime_error(
"Unable to reserve memory for table");
196 auto stream = std::make_shared<FairMQOutputStream>(
b);
197 auto outBatch = arrow::ipc::MakeStreamWriter(
stream.get(), table->schema());
198 if (outBatch.ok() ==
false) {
199 throw ::std::runtime_error(
"Unable to create batch writer");
203 std::vector<std::shared_ptr<arrow::Array>> columns;
204 columns.resize(table->columns().size());
205 for (
size_t ci = 0; ci < table->columns().
size(); ci++) {
206 columns[ci] = table->column(ci)->chunk(0);
208 auto batch = arrow::RecordBatch::Make(table->schema(), 0, columns);
209 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
211 outStatus = outBatch.ValueOrDie()->WriteTable(*table);
214 if (outStatus.ok() ==
false) {
215 throw std::runtime_error(
"Unable to Write table");
222 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
224 const DataHeader* cdh = o2::header::get<DataHeader*>(header->GetData());
230 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
231 return transport->CreateMessage(s);
233 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
236 auto table = builder.finalize();
242 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
246 context.addBuffer(std::move(header),
buffer, std::move(finalizer), routeIndex);
252 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
257 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
258 return transport->CreateMessage(s);
260 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
265 auto batch =
source.finalize();
266 auto mock = std::make_shared<arrow::io::MockOutputStream>();
268 auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
269 arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
273 if (reserve.ok() ==
false) {
274 throw std::runtime_error(
"Unable to reserve memory for table");
279 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema());
280 if (outBatch.ok() ==
false) {
281 throw ::std::runtime_error(
"Unable to create batch writer");
284 outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
286 if (outStatus.ok() ==
false) {
287 throw std::runtime_error(
"Unable to Write batch");
292 auto finalizer = [](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
297 context.addBuffer(std::move(header),
buffer, std::move(finalizer), routeIndex);
303 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
307 auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](
size_t s) -> std::unique_ptr<fair::mq::Message> {
308 return transport->CreateMessage(s);
310 auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
312 auto writer = [table =
ptr](std::shared_ptr<FairMQResizableBuffer>
b) ->
void {
316 context.addBuffer(std::move(header),
buffer, std::move(writer), routeIndex);
325 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
326 fair::mq::MessagePtr payloadMessage(proxy.createOutputMessage(routeIndex, payloadSize));
327 memcpy(payloadMessage->GetData(), payload, payloadSize);
329 addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationMethod);
334 if (
ref.label.empty()) {
337 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
338 for (
auto ri = 0ul, re = allowedOutputRoutes.size(); ri != re; ++ri) {
339 if (allowedOutputRoutes[ri].matcher.binding.value ==
ref.label) {
340 auto spec = allowedOutputRoutes[ri].matcher;
342 return Output{dataType.
origin, dataType.description,
ref.subSpec, std::move(
ref.headerStack)};
345 std::string availableRoutes;
346 for (
auto const& route : allowedOutputRoutes) {
349 throw runtime_error_f(
"Unable to find OutputSpec with label %s. Available Routes: %s",
ref.label.c_str(), availableRoutes.c_str());
355 auto& allowedOutputRoutes = mRegistry.
get<
DeviceSpec const>().outputs;
356 for (
auto const& route : allowedOutputRoutes) {
369 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
374 fair::mq::MessagePtr headerMessage = headerMessageFromOutput(spec, routeIndex,
376 payloadMessage->GetSize()
389 RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
#define O2_BUILTIN_UNREACHABLE
#define O2_BUILTIN_LIKELY(x)
#define O2_BUILTIN_UNLIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
void adopt(const Output &spec, std::string *)
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
DataAllocator(ServiceRegistryRef ref)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
bool isAllowed(Output const &query)
check if a certain output is allowed
void cookDeadBeef(const Output &spec)
OutputRoute const & getOutputRoute(RouteIndex routeIndex) const
Retrieve the transport associated to a given route.
TrivialObject handles a message object.
auto & add(Args &&... args)
std::unique_ptr< fair::mq::Message > cloneFromCache(int64_t id) const
GLboolean GLboolean GLboolean b
GLsizei GLsizei GLchar * source
GLsizei const GLfloat * value
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
void doWriteTable(std::shared_ptr< FairMQResizableBuffer > b, arrow::Table *table)
RuntimeErrorRef runtime_error_f(const char *,...)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::string describe(InputSpec const &spec)
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
std::function< void(T &)> callback
header::DataHeader::SubSpecificationType subSpec
header::DataDescription description
header::DataOrigin origin
std::vector< bool > routeCreated