Project
Loading...
Searching...
No Matches
DataProcessingContext.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include "Framework/Signpost.h"
17
18O2_DECLARE_DYNAMIC_LOG(data_processor_context);
20
21namespace o2::framework
22{
23
24namespace
25{
26template <typename T, typename... ARGS>
27void invokeAll(T& handles, char const* callbackName, o2::framework::DataProcessorSpec* spec, ARGS&... args)
28{
29 assert(callbackName);
30 O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, spec);
31 // FIXME: for now spec is nullptr because we don't have a list of possible DataProcessorSpecs
32 // per device.
33 char const* dataProcessorName = spec ? spec->name.c_str() : "DataProcessorContext";
34 O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting %{public}s::%{public}s", dataProcessorName, callbackName);
35 for (auto& handle : handles) {
36 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
37 O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
38 handle.callback(args..., handle.service);
39 O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending %{public}s::%{public}s::%{public}s", dataProcessorName, handle.spec.name.c_str(), callbackName);
40 }
41 O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending %{public}s::%{public}s", dataProcessorName, callbackName);
42}
43} // namespace
44
47{
48 invokeAll(preProcessingHandlers, "preProcessingCallbacks", spec, ctx);
49}
50
52{
53 invokeAll(finaliseOutputsHandles, "finaliseOutputsCallbacks", spec, ctx);
54}
55
58{
59 invokeAll(postProcessingHandlers, "postProcessingCallbacks", spec, ctx);
60}
61
64{
65 invokeAll(preDanglingHandles, "preDanglingCallbacks", spec, ctx);
66}
67
70{
71 invokeAll(postDanglingHandles, "postDanglingCallbacks", spec, ctx);
72}
73
76{
77 invokeAll(preEOSHandles, "preEOSCallbacks", spec, ctx);
78}
79
82{
83 invokeAll(postEOSHandles, "postEOSCallbacks", spec, ctx);
84}
85
88{
89 invokeAll(postDispatchingHandles, "postDispatchingCallbacks", spec, ctx);
90}
91
94{
95 invokeAll(postForwardingHandles, "postForwardingCallbacks", spec, ctx);
96}
97
100{
101 invokeAll(preStartHandles, "preStartCallbacks", spec, ref);
102}
103
105{
106 invokeAll(postStopHandles, "postStopCallbacks", spec, ref);
107}
108
110void DataProcessorContext::preExitCallbacks(std::vector<ServiceExitHandle> handles, ServiceRegistryRef ref)
111{
112 O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, &ref);
113 O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preExitCallbacks");
114 // FIXME: we need to call the callback only once for the global services
116 for (auto handle = handles.rbegin(); handle != handles.rend(); ++handle) {
117 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle->service);
118 O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
119 handle->callback(ref, handle->service);
120 O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preExitCallbacks for service %{public}s", handle->spec.name.c_str());
121 }
122 O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preExitCallbacks");
123}
124
127{
128 invokeAll(preLoopHandles, "preLoopCallbacks", spec, ref);
129}
130
132{
133 O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
134 O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext domainInfoUpdatedCallback");
135 for (auto& handle : domainInfoHandles) {
136 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
137 O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
138 handle.callback(ref, oldestPossibleTimeslice, channelIndex);
139 O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::domainInfoUpdatedCallback for service %{public}s", handle.spec.name.c_str());
140 }
141 O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext domainInfoUpdatedCallback");
142}
143
145{
146 O2_SIGNPOST_ID_FROM_POINTER(dpid, data_processor_context, this);
147 O2_SIGNPOST_START(data_processor_context, dpid, "callbacks", "Starting DataProcessorContext preSendingMessagesCallbacks");
148 for (auto& handle : preSendingMessagesHandles) {
149 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, handle.service);
150 O2_SIGNPOST_START(data_processor_context, cid, "callbacks", "Starting DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
151 handle.callback(ref, parts, channelIndex);
152 O2_SIGNPOST_END(data_processor_context, cid, "callbacks", "Ending DataProcessorContext::preSendingMessagesCallbacks for service %{public}s", handle.spec.name.c_str());
153 }
154 O2_SIGNPOST_END(data_processor_context, dpid, "callbacks", "Ending DataProcessorContext preSendingMessagesCallbacks");
155}
156
157} // namespace o2::framework
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ServicePreSendingMessagesHandle > preSendingMessagesHandles
Callbacks for services to be executed before sending messages.
std::vector< ServiceDanglingHandle > preDanglingHandles
Callbacks for services to be executed before every dangling check.
std::vector< ServiceEOSHandle > postEOSHandles
Callbacks for services to be executed after every EOS user callback invokation.
void preSendingMessagesCallbacks(ServiceRegistryRef, fair::mq::Parts &parts, ChannelIndex channelindex)
Invoke before sending messages parts on a channel channelindex.
std::vector< ServiceEOSHandle > preEOSHandles
Callbacks for services to be executed before every EOS user callback invokation.
void postDispatchingCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every data Dispatching.
std::vector< ServiceDomainInfoHandle > domainInfoHandles
Callbacks for services to be executed on exit.
std::vector< ServiceStartHandle > preStartHandles
Callbacks for services to be executed before Start.
void finaliseOutputsCallbacks(ProcessingContext &)
std::vector< ServicePreLoopHandle > preLoopHandles
Callbacks for services to be executed before we enter the event loop.
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
std::vector< ServiceProcessingHandle > postProcessingHandlers
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preEOSCallbacks(EndOfStreamContext &)
Invoke callbacks to be executed before every EOS user callback invokation.
std::vector< ServiceForwardingHandle > postForwardingHandles
Callbacks for services to be executed after every dispatching.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
std::vector< ServiceDispatchingHandle > postDispatchingHandles
Callbacks for services to be executed after every dispatching.
void postEOSCallbacks(EndOfStreamContext &)
Invoke callbacks to be executed after every EOS user callback invokation.
void domainInfoUpdatedCallback(ServiceRegistryRef, size_t oldestPossibleTimeslice, ChannelIndex channelIndex)
Invoke whenever we get a new DomainInfo message.
std::vector< ServiceStopHandle > postStopHandles
Callbacks for services to be executed on the Stop transition.
static void preExitCallbacks(std::vector< ServiceExitHandle >, ServiceRegistryRef)
Invoke callback to be executed on exit, in reverse order.
std::vector< ServiceProcessingHandle > finaliseOutputsHandles
std::vector< ServiceDanglingHandle > postDanglingHandles
Callbacks for services to be executed after every dangling check.
void postProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
std::vector< ServiceProcessingHandle > preProcessingHandlers
void postForwardingCallbacks(ProcessingContext &)
Callback invoked after the late forwarding has been done.
void preDanglingCallbacks(DanglingContext &)
Invoke callbacks to be executed before every dangling check.
void postDanglingCallbacks(DanglingContext &)
Invoke callbacks to be executed after every dangling check.