Project
Loading...
Searching...
No Matches
DataInspector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInspector.h"
16
17#include <algorithm>
18#include <cstdio>
19#include <cstdlib>
20#include <cstring>
21#include <cstdint>
22#include <fcntl.h>
23#include <iomanip>
24#include <ios>
25#include <iostream>
26#include <iterator>
27#include <memory>
28#include <sstream>
29#include <stdexcept>
30#include <string>
31#include <unistd.h>
32#include <utility>
33
34#include <rapidjson/document.h>
35#include <rapidjson/prettywriter.h>
36#include "boost/asio.hpp"
37#include <TBufferJSON.h>
38#include <boost/algorithm/string/join.hpp>
39#include <arrow/table.h>
41#include "boost/archive/iterators/base64_from_binary.hpp"
42#include "boost/archive/iterators/transform_width.hpp"
43#include "boost/predef/other/endian.h"
44
45using namespace rapidjson;
46
48{
49#if BOOST_ENDIAN_BIG_BYTE
50static const auto endianness = "BIG";
51#elif BOOST_ENDIAN_LITTLE_BYTE
52static const auto endianness = "LITTLE";
53#else
54static const auto endianness = "UNKNOWN";
55#endif
56
57inline size_t base64PaddingSize(uint64_t dataSize)
58{
59 return (3 - dataSize % 3) % 3;
60}
61
62std::string encode64(const char* data, uint64_t size)
63{
64 auto* begin = data;
65 auto* end = data + size;
66
67 using namespace boost::archive::iterators;
68 using EncodingIt = base64_from_binary<transform_width<const char*, 6, 8>>;
69 return std::string(EncodingIt(begin), EncodingIt(end)).append(base64PaddingSize(size), '=');
70}
71
72void addPayload(Document& message,
73 uint64_t payloadSize,
74 const char* payload,
75 Document::AllocatorType& alloc)
76{
77 message.AddMember("payload", Value(encode64(payload, payloadSize).c_str(), alloc), alloc);
78 message.AddMember("payloadEndianness", Value(endianness, alloc), alloc);
79}
80
81void addBasicDataHeaderInfo(Document& message, const header::DataHeader* header, Document::AllocatorType& alloc)
82{
83 std::string origin = header->dataOrigin.as<std::string>();
84 std::string description = header->dataDescription.as<std::string>();
85 std::string method = header->payloadSerializationMethod.as<std::string>();
86
87 message.AddMember("origin", Value(origin.c_str(), alloc), alloc);
88 message.AddMember("description", Value(description.c_str(), alloc), alloc);
89 message.AddMember("subSpecification", Value(header->subSpecification), alloc);
90 message.AddMember("firstTForbit", Value(header->firstTForbit), alloc);
91 message.AddMember("tfCounter", Value(header->tfCounter), alloc);
92 message.AddMember("runNumber", Value(header->runNumber), alloc);
93 message.AddMember("payloadSize", Value(header->payloadSize), alloc);
94 message.AddMember("splitPayloadParts", Value(header->splitPayloadParts), alloc);
95 message.AddMember("payloadSerialization", Value(method.c_str(), alloc), alloc);
96 message.AddMember("payloadSplitIndex", Value(header->splitPayloadIndex), alloc);
97}
98
99void addBasicDataProcessingHeaderInfo(Document& message, const DataProcessingHeader* header, Document::AllocatorType& alloc)
100{
101 message.AddMember("startTime", Value(header->startTime), alloc);
102 message.AddMember("duration", Value(header->duration), alloc);
103 message.AddMember("creationTimer", Value(header->creation), alloc);
104}
105
106void addBasicOutputObjHeaderInfo(Document& message, const OutputObjHeader* header, Document::AllocatorType& alloc)
107{
108 message.AddMember("taskHash", Value(header->mTaskHash), alloc);
109}
110
111void buildDocument(Document& message, std::string sender, const DataRef& ref)
112{
113 message.SetObject();
114 Document::AllocatorType& alloc = message.GetAllocator();
115 message.AddMember("sender", Value(sender.c_str(), alloc), alloc);
116
117 const header::BaseHeader* baseHeader = header::BaseHeader::get(reinterpret_cast<const std::byte*>(ref.header));
118 for (; baseHeader != nullptr; baseHeader = baseHeader->next()) {
119 if (baseHeader->description == header::DataHeader::sHeaderType) {
120 const auto* header = header::get<header::DataHeader*>(baseHeader->data());
121 addBasicDataHeaderInfo(message, header, alloc);
122 addPayload(message, header->payloadSize, ref.payload, alloc);
123 } else if (baseHeader->description == DataProcessingHeader::sHeaderType) {
124 const auto* header = header::get<DataProcessingHeader*>(baseHeader->data());
126 } else if (baseHeader->description == OutputObjHeader::sHeaderType) {
127 const auto* header = header::get<OutputObjHeader*>(baseHeader->data());
128 addBasicOutputObjHeaderInfo(message, header, alloc);
129 }
130 }
131}
132
133/* Callback which transforms each `DataRef` to a JSON object*/
134std::vector<DIMessage> serializeO2Messages(const std::vector<DataRef>& refs, const std::string& deviceName)
135{
136 std::vector<DIMessage> messages{};
137
138 for (auto& ref : refs) {
139 Document message;
140 StringBuffer buffer;
141 Writer<StringBuffer> writer(buffer);
142 buildDocument(message, deviceName, ref);
143 message.Accept(writer);
144
145 messages.emplace_back(DIMessage{DIMessage::Header::Type::DATA, std::string{buffer.GetString(), buffer.GetSize()}});
146 }
147
148 return messages;
149}
150} // namespace o2::framework::data_inspector
o2::monitoring::tags::Value Value
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLenum GLsizei dataSize
Definition glcorearb.h:3994
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
std::vector< DIMessage > serializeO2Messages(const std::vector< DataRef > &refs, const std::string &deviceName)
void addBasicOutputObjHeaderInfo(Document &message, const OutputObjHeader *header, Document::AllocatorType &alloc)
size_t base64PaddingSize(uint64_t dataSize)
void addBasicDataHeaderInfo(Document &message, const header::DataHeader *header, Document::AllocatorType &alloc)
std::string encode64(const char *data, uint64_t size)
void addBasicDataProcessingHeaderInfo(Document &message, const DataProcessingHeader *header, Document::AllocatorType &alloc)
void addPayload(Document &message, uint64_t payloadSize, const char *payload, Document::AllocatorType &alloc)
void buildDocument(Document &message, std::string sender, const DataRef &ref)
static constexpr const o2::header::HeaderType sHeaderType
O2 header for OutputObj metadata.
static constexpr const o2::header::HeaderType sHeaderType
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
const std::byte * data() const noexcept
Definition DataHeader.h:422
o2::header::HeaderType description
header type description, set by derived header
Definition DataHeader.h:385
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
const BaseHeader * next() const noexcept
get the next header if any (const version)
Definition DataHeader.h:425
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
RunNumberType runNumber
Definition DataHeader.h:684
static constexpr o2::header::HeaderType sHeaderType
Definition DataHeader.h:630
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301