Project
Loading...
Searching...
No Matches
o2sim_mctracks_proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <boost/program_options.hpp>
13
14#include "../Framework/Core/src/ArrowSupport.h"
19#include "Framework/Task.h"
20#include "Framework/DataRef.h"
22#include "Headers/DataHeader.h"
23#include "Headers/Stack.h"
28#include <unistd.h>
29
30using namespace o2::framework;
31using namespace o2::header;
32
33void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
34{
35 workflowOptions.push_back(ConfigParamSpec{"enable-test-consumer", o2::framework::VariantType::Bool, false, {"enable a simple test consumer for injected MC tracks"}});
36 workflowOptions.push_back(ConfigParamSpec{"o2sim-pid", o2::framework::VariantType::Int, -1, {"The process id of the source o2-sim"}});
37 workflowOptions.push_back(ConfigParamSpec{"nevents", o2::framework::VariantType::Int, -1, {"The number of events expected to arrive on the proxy"}});
38 workflowOptions.push_back(ConfigParamSpec{"aggregate-timeframe", o2::framework::VariantType::Int, -1, {"The number of events to aggregate per timeframe"}});
39}
40
42
43// a simple (test) consumer task for MCTracks and MCEventHeaders injected from
44// the proxy
46{
47 public:
50 {
51 LOG(debug) << "Running simple kinematics consumer client";
52 for (const DataRef& ref : InputRecordWalker(pc.inputs())) {
53 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
54 LOG(debug) << "Payload size " << dh->payloadSize << " method " << dh->payloadSerializationMethod.as<std::string>();
55 }
56 try {
57 auto tracks = pc.inputs().get<std::vector<o2::MCTrack>>("mctracks");
58 auto eventheader = pc.inputs().get<o2::dataformats::MCEventHeader*>("mcheader");
59 LOG(info) << "Got " << tracks.size() << " tracks";
60 LOG(info) << "Got " << eventheader->GetB() << " as impact parameter in the event header";
61 } catch (...) {
62 }
63 }
64};
65
66static DataHeader headerFromSpec(OutputSpec const& spec, size_t size, o2::header::SerializationMethod method, int splitParts = 0, int partIndex = 0)
67{
68 DataHeader dh;
70 dh.dataOrigin = matcher.origin;
71 dh.dataDescription = matcher.description;
72 dh.subSpecification = matcher.subSpec;
73 dh.payloadSize = size;
75 if (splitParts > 0) {
76 dh.splitPayloadParts = splitParts;
77 dh.splitPayloadIndex = partIndex;
78 }
79 return dh;
80}
81
86InjectorFunction o2simKinematicsConverter(std::vector<OutputSpec> const& specs, uint64_t startTime, uint64_t step, int nevents, int nPerTF)
87{
88 auto timesliceId = std::make_shared<size_t>(startTime);
89 auto totalEventCounter = std::make_shared<int>(0);
90 auto eventCounter = std::make_shared<int>(0);
91 auto TFcounter = std::make_shared<size_t>(startTime);
92 auto MCHeadersMessageCache = std::make_shared<fair::mq::Parts>();
93 auto MCTracksMessageCache = std::make_shared<fair::mq::Parts>();
94 auto Nparts = std::make_shared<int>(nPerTF);
95
96 return [timesliceId, specs, step, nevents, nPerTF, totalEventCounter, eventCounter, TFcounter, Nparts, MCHeadersMessageCache = MCHeadersMessageCache, MCTracksMessageCache = MCTracksMessageCache](TimingInfo& ti, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) mutable -> bool {
97 auto*device = services.get<RawDeviceService>().device();
98 bool didSendData = false;
99 if (nPerTF < 0) {
100 // if no aggregation requested, forward each message with the DPL header
101 if (*timesliceId != newTimesliceId) {
102 LOG(fatal) << "Time slice ID provided from oldestPossible mechanism " << newTimesliceId << " is out of sync with expected value " << *timesliceId;
103 }
104 // We iterate on all the parts and we send them two by two,
105 // adding the appropriate O2 header.
106 for (auto i = 0U; i < parts.Size(); ++i) {
107 DataHeader dh = headerFromSpec(specs[i], parts.At(i)->GetSize(), gSerializationMethodROOT);
108 DataProcessingHeader dph{newTimesliceId, 0};
109 // we have to move the incoming data
110 o2::header::Stack headerStack{dh, dph};
111 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(i)), specs[i], channelRetriever);
112 didSendData |= parts.At(i)->GetSize() > 0;
113 }
114 *timesliceId += step;
115 } else {
116 if (*eventCounter == 0) {
117 *Nparts = ((nevents - *totalEventCounter) < nPerTF) ? nevents - *totalEventCounter : nPerTF;
118 }
119 // if aggregation is requested, colelct the payloads into a multipart message
120 ti.timeslice = *TFcounter;
121 ti.tfCounter = *TFcounter;
122
123 auto headerSize = parts.At(0)->GetSize();
124 auto tracksSize = parts.At(1)->GetSize();
125
126 DataProcessingHeader hdph{*TFcounter, 0};
127 DataHeader headerDH = headerFromSpec(specs[0], headerSize, gSerializationMethodROOT, *Nparts, *eventCounter);
128 o2::header::Stack hhs{headerDH, hdph};
129
130 DataProcessingHeader tdph{*TFcounter, 0};
131 DataHeader tracksDH = headerFromSpec(specs[1], tracksSize, gSerializationMethodROOT, *Nparts, *eventCounter);
132 o2::header::Stack ths{tracksDH, tdph};
133
134 appendForSending(*device, std::move(hhs), *TFcounter, std::move(parts.At(0)), specs[0], *MCHeadersMessageCache.get(), channelRetriever);
135 appendForSending(*device, std::move(ths), *TFcounter, std::move(parts.At(1)), specs[1], *MCTracksMessageCache.get(), channelRetriever);
136 ++(*eventCounter);
137 }
138
139 ++(*totalEventCounter);
140 if (nPerTF > 0 && *eventCounter == *Nparts) {
141 // send the events when the timeframe is accumulated
142 LOGP(info, ">> Events: {}; TF counter: {}", *eventCounter, *TFcounter);
143 *eventCounter = 0;
144 sendOnChannel(*device, *MCHeadersMessageCache.get(), channelRetriever(specs[0], *TFcounter), *TFcounter);
145 sendOnChannel(*device, *MCTracksMessageCache.get(), channelRetriever(specs[1], *TFcounter), *TFcounter);
146 didSendData |= MCHeadersMessageCache->Size() > 0;
147 didSendData |= MCTracksMessageCache->Size() > 0;
148 ++(*TFcounter);
149 MCHeadersMessageCache->Clear();
150 MCTracksMessageCache->Clear();
151 }
152
153 if (*totalEventCounter == nevents) {
154 // I am done (I don't expect more events to convert); so tell the proxy device to shut-down
155 stop = true;
156 }
157 return didSendData;
158 };
159}
160
163{
164 WorkflowSpec specs;
165
166 // make a proxy (connecting to an external channel) and forwarding in DPL speak
167 std::vector<OutputSpec> outputs;
168 outputs.emplace_back("MC", "MCHEADER", 0, Lifetime::Timeframe);
169 outputs.emplace_back("MC", "MCTRACKS", 0, Lifetime::Timeframe);
170
171 // fetch the number of events to expect
172 auto nevents = configcontext.options().get<int>("nevents");
173 auto nEventsPerTF = configcontext.options().get<int>("aggregate-timeframe");
174 o2::framework::InjectorFunction f = o2simKinematicsConverter(outputs, 0, 1, nevents, nEventsPerTF);
175
176 // construct the input channel to listen on
177 // use given pid
178 // TODO: this could go away with a proper pipeline implementation
179 std::string channelspec;
180 std::string channelbase = "type=pair,method=connect,address=ipc://";
181 if (configcontext.options().get<int>("o2sim-pid") != -1) {
182 std::stringstream channelstr;
183 channelstr << channelbase << "/tmp/o2sim-hitmerger-kineforward-" << configcontext.options().get<int>("o2sim-pid") << ",rateLogging=100";
184 channelspec = channelstr.str();
185 } else {
186 // we try to detect an existing channel by name ... as long as it's unique ... else we fail
187 sleep(2); // give time for sim to startup
188 LOG(info) << "Looking for simulation MC-tracks socket";
189 auto socketlist = o2::utils::listFiles("/tmp", "o2sim-hitmerger-kineforward-.*");
190 if (socketlist.size() != 1) {
191 for (auto s : socketlist) {
192 LOG(info) << s;
193 }
194 LOG(fatal) << "Too many or no socket found " << socketlist.size() << "; Please pass sim pid via --o2sim-pid";
195 }
196 LOG(info) << "Found socket " << socketlist[0];
197 channelspec = channelbase + socketlist[0] + ",rateLogging=100";
198 }
199
200 auto proxy = specifyExternalFairMQDeviceProxy("o2sim-mctrack-proxy",
201 outputs,
202 channelspec.c_str(), f, 0, true);
203 // add monitoring service to be able to report number of timeframes sent for the rate limiting to work
204 proxy.requiredServices.push_back(o2::framework::ArrowSupport::arrowBackendSpec());
205 // if aggregation is requested, set the enumeration repetitions to aggregation size
206 if (nEventsPerTF > 0) {
207 proxy.inputs.emplace_back(InputSpec{"clock", "enum", "DPL", 0, Lifetime::Enumeration, {ConfigParamSpec{"repetitions", VariantType::Int64, static_cast<int64_t>(nEventsPerTF), {"merged events"}}}});
208 }
209 specs.push_back(proxy);
210
211 if (configcontext.options().get<bool>("enable-test-consumer") && (nEventsPerTF < 0)) {
212 // connect a test consumer
213 std::vector<InputSpec> inputs;
214 inputs.emplace_back("mctracks", "MC", "MCTRACKS", 0., Lifetime::Timeframe);
215 inputs.emplace_back("mcheader", "MC", "MCHEADER", 0., Lifetime::Timeframe);
216 specs.emplace_back(DataProcessorSpec{"sample-MCTrack-consumer",
217 inputs,
218 {},
219 AlgorithmSpec{adaptFromTask<ConsumerTask>()},
220 {}});
221 }
222
223 return specs;
224}
int32_t i
A helper class to iteratate over all parts of all input routes.
Definition of the MCTrack class.
std::ostringstream debug
void init(o2::framework::InitContext &)
void run(o2::framework::ProcessingContext &pc)
ConfigParamRegistry & options() const
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::vector< DataProcessorSpec > WorkflowSpec
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
void appendForSending(fair::mq::Device &device, o2::header::Stack &&headerStack, size_t timeSliceID, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, fair::mq::Parts &messageCache, ChannelRetriever &channelRetriever)
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodROOT
Definition DataHeader.h:328
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
Describe the DPL workflow.
InjectorFunction o2simKinematicsConverter(std::vector< OutputSpec > const &specs, uint64_t startTime, uint64_t step, int nevents, int nPerTF)
static ServiceSpec arrowBackendSpec()
header::DataHeader::SubSpecificationType subSpec
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"