88 auto timesliceId = std::make_shared<size_t>(startTime);
89 auto totalEventCounter = std::make_shared<int>(0);
90 auto eventCounter = std::make_shared<int>(0);
91 auto TFcounter = std::make_shared<size_t>(startTime);
92 auto MCHeadersMessageCache = std::make_shared<fair::mq::Parts>();
93 auto MCTracksMessageCache = std::make_shared<fair::mq::Parts>();
94 auto Nparts = std::make_shared<int>(nPerTF);
96 return [timesliceId, specs, step, nevents, nPerTF, totalEventCounter, eventCounter, TFcounter, Nparts, MCHeadersMessageCache = MCHeadersMessageCache, MCTracksMessageCache = MCTracksMessageCache](
TimingInfo& ti,
ServiceRegistryRef const& services, fair::mq::Parts& parts,
ChannelRetriever channelRetriever,
size_t newTimesliceId,
bool& stop)
mutable ->
bool {
98 bool didSendData =
false;
101 if (*timesliceId != newTimesliceId) {
102 LOG(fatal) <<
"Time slice ID provided from oldestPossible mechanism " << newTimesliceId <<
" is out of sync with expected value " << *timesliceId;
106 for (
auto i = 0U;
i < parts.Size(); ++
i) {
111 sendOnChannel(*device, std::move(headerStack), std::move(parts.At(
i)), specs[
i], channelRetriever);
112 didSendData |= parts.At(
i)->GetSize() > 0;
114 *timesliceId += step;
116 if (*eventCounter == 0) {
117 *Nparts = ((nevents - *totalEventCounter) < nPerTF) ? nevents - *totalEventCounter : nPerTF;
120 ti.timeslice = *TFcounter;
121 ti.tfCounter = *TFcounter;
123 auto headerSize = parts.At(0)->GetSize();
124 auto tracksSize = parts.At(1)->GetSize();
134 appendForSending(*device, std::move(hhs), *TFcounter, std::move(parts.At(0)), specs[0], *MCHeadersMessageCache.get(), channelRetriever);
135 appendForSending(*device, std::move(ths), *TFcounter, std::move(parts.At(1)), specs[1], *MCTracksMessageCache.get(), channelRetriever);
139 ++(*totalEventCounter);
140 if (nPerTF > 0 && *eventCounter == *Nparts) {
142 LOGP(info,
">> Events: {}; TF counter: {}", *eventCounter, *TFcounter);
144 sendOnChannel(*device, *MCHeadersMessageCache.get(), channelRetriever(specs[0], *TFcounter), *TFcounter);
145 sendOnChannel(*device, *MCTracksMessageCache.get(), channelRetriever(specs[1], *TFcounter), *TFcounter);
146 didSendData |= MCHeadersMessageCache->Size() > 0;
147 didSendData |= MCTracksMessageCache->Size() > 0;
149 MCHeadersMessageCache->Clear();
150 MCTracksMessageCache->Clear();
153 if (*totalEventCounter == nevents) {
167 std::vector<OutputSpec> outputs;
168 outputs.emplace_back(
"MC",
"MCHEADER", 0, Lifetime::Timeframe);
169 outputs.emplace_back(
"MC",
"MCTRACKS", 0, Lifetime::Timeframe);
172 auto nevents = configcontext.
options().
get<
int>(
"nevents");
173 auto nEventsPerTF = configcontext.
options().
get<
int>(
"aggregate-timeframe");
179 std::string channelspec;
180 std::string channelbase =
"type=pair,method=connect,address=ipc://";
181 if (configcontext.
options().
get<
int>(
"o2sim-pid") != -1) {
182 std::stringstream channelstr;
183 channelstr << channelbase <<
"/tmp/o2sim-hitmerger-kineforward-" << configcontext.
options().
get<
int>(
"o2sim-pid") <<
",rateLogging=100";
184 channelspec = channelstr.str();
188 LOG(info) <<
"Looking for simulation MC-tracks socket";
190 if (socketlist.size() != 1) {
191 for (
auto s : socketlist) {
194 LOG(fatal) <<
"Too many or no socket found " << socketlist.size() <<
"; Please pass sim pid via --o2sim-pid";
196 LOG(info) <<
"Found socket " << socketlist[0];
197 channelspec = channelbase + socketlist[0] +
",rateLogging=100";
202 channelspec.c_str(),
f, 0,
true);
206 if (nEventsPerTF > 0) {
207 proxy.inputs.emplace_back(
InputSpec{
"clock",
"enum",
"DPL", 0, Lifetime::Enumeration, {
ConfigParamSpec{
"repetitions", VariantType::Int64,
static_cast<int64_t
>(nEventsPerTF), {
"merged events"}}}});
209 specs.push_back(proxy);
211 if (configcontext.
options().
get<
bool>(
"enable-test-consumer") && (nEventsPerTF < 0)) {
213 std::vector<InputSpec> inputs;
214 inputs.emplace_back(
"mctracks",
"MC",
"MCTRACKS", 0., Lifetime::Timeframe);
215 inputs.emplace_back(
"mcheader",
"MC",
"MCHEADER", 0., Lifetime::Timeframe);
ConfigParamRegistry & options() const
InputRecord & inputs()
The inputs associated with this processing context.
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)