Project
Loading...
Searching...
No Matches
O2SimDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_SIMDEVICE_H_
15#define ALICEO2_DEVICES_SIMDEVICE_H_
16
17#include <memory>
18#include <fairmq/Message.h>
19#include <fairmq/Device.h>
20#include <fairmq/Parts.h>
21#include <fairlogger/Logger.h>
22#include "../macro/o2sim.C"
23#include "TVirtualMC.h"
24#include "TMessage.h"
25#include <DetectorsBase/Stack.h>
28#include <TRandom.h>
29#include <SimConfig/SimConfig.h>
30#include <cstring>
31#include "PrimaryServerState.h"
32
33// a helper for logging with worker index prefixed
34void doLogInfo(int workerID, std::string const& message)
35{
36 LOG(info) << "[W" << workerID << "] " << message;
37}
38
39namespace o2
40{
41namespace devices
42{
43
45{
46 public:
47 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
48 ~TMessageWrapper() override = default;
49};
50
51// device representing a simulation worker
52class O2SimDevice final : public fair::mq::Device
53{
54 public:
55 O2SimDevice() = default;
56 O2SimDevice(o2::steer::O2MCApplication* vmcapp, TVirtualMC* vmc) : mVMCApp{vmcapp}, mVMC{vmc} {}
57
60 {
61 FairSystemInfo sysinfo;
63 LOG(info) << "Shutting down O2SimDevice";
64 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
65 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " " << sysinfo.GetMaxMemory() << " MB\n";
66 }
67
68 protected:
70 void InitTask() final
71 {
72 // in the initialization phase we will init the simulation
73 // NOTE: In a fair::mq::Device this is better done here (instead of outside) since
74 // we have to setup simulation + worker in the same thread (due to many threadlocal variables
75 // in the simulation) ... at least as long fair::mq::Device is not spawning workers on the master thread
76 initSim(GetChannels().at("o2sim-primserv-info").at(0), mSimRun);
77
78 // set the vmc and app pointers
79 mVMC = TVirtualMC::GetMC();
80 mVMCApp = static_cast<o2::steer::O2MCApplication*>(TVirtualMCApplication::Instance());
81 lateInit();
82 }
83
84 static void CustomCleanup(void* data, void* hint) { delete static_cast<std::string*>(hint); }
85
86 public:
87 void lateInit()
88 {
89 // late init
90 mVMCApp->initLate();
91 }
92
93 // should go into a helper
94 // this function queries the sim config data and initializes the SimConfig singleton
95 // returns true if successful / false if not
96 static bool querySimConfig(fair::mq::Channel& channel)
97 {
98 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config));
99 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
100
101 int timeoutinMS = 60000; // wait for 60s max --> should be fast reply
102 if (channel.Send(request, timeoutinMS) > 0) {
103 LOG(info) << "Waiting for configuration answer ";
104 if (channel.Receive(reply, timeoutinMS) > 0) {
105 LOG(info) << "Configuration answer received, containing " << reply->GetSize() << " bytes ";
106
107 // the answer is a TMessage containing the simulation Configuration
108 auto message = std::make_unique<o2::devices::TMessageWrapper>(reply->GetData(), reply->GetSize());
109 auto config = static_cast<o2::conf::SimConfigData*>(message.get()->ReadObjectAny(message.get()->GetClass()));
110 if (!config) {
111 return false;
112 }
113
114 LOG(info) << "COMMUNICATED ENGINE " << config->mMCEngine;
115
116 auto& conf = o2::conf::SimConfig::Instance();
117 conf.resetFromConfigData(*config);
118 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
119 delete config;
120 } else {
121 LOG(error) << "No configuration received within " << timeoutinMS << "ms\n";
122 return false;
123 }
124 } else {
125 LOG(error) << "Could not send configuration request within " << timeoutinMS << "ms\n";
126 return false;
127 }
128 return true;
129 }
130
131 // initializes the simulation classes; queries the configuration on a given channel
132 static bool initSim(fair::mq::Channel& channel, std::unique_ptr<FairRunSim>& simptr)
133 {
134 if (!querySimConfig(channel)) {
135 return false;
136 }
137
138 LOG(info) << "Setting up the simulation ...";
139 simptr = std::move(std::unique_ptr<FairRunSim>(o2sim_init(true)));
140 FairSystemInfo sysinfo;
141
142 // to finish initialization (trigger further cross section table building etc) -- which especially
143 // G4 is doing at the first ProcessRun
144 // The goal is to have everything setup before we fork
145 TVirtualMC::GetMC()->ProcessRun(0);
146
147 LOG(info) << "MEM-STAMP END OF SIM INIT" << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
148 << sysinfo.GetMaxMemory() << " MB\n";
149
150 return true;
151 }
152
153 bool isWorkAvailable(fair::mq::Channel& statuschannel, int workerID = -1)
154 {
155 std::stringstream str;
156 str << "[W" << workerID << "]";
157 auto workerStr = str.str();
158
159 int timeoutinMS = 2000; // wait for 2s max
160 bool reprobe = true;
161 while (reprobe) {
162 reprobe = false;
163 int i = -1;
164 fair::mq::MessagePtr request(statuschannel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Status));
165 fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(i));
166 auto sendcode = statuschannel.Send(request, timeoutinMS);
167 if (sendcode > 0) {
168 LOG(info) << workerStr << " Waiting for status answer ";
169 auto code = statuschannel.Receive(reply, timeoutinMS);
170 if (code > 0) {
171 int state(*((int*)(reply->GetData())));
173 LOG(info) << workerStr << " SERVER IS SERVING";
174 return true;
176 LOG(info) << workerStr << " SERVER IS STILL INITIALIZING";
177 reprobe = true;
178 sleep(1);
180 LOG(info) << workerStr << " SERVER IS WAITING FOR EVENT";
181 reprobe = true;
182 sleep(1);
183 } else if (state == (int)o2::O2PrimaryServerState::Idle) {
184 LOG(info) << workerStr << " SERVER IS IDLE";
185 return false;
186 } else {
187 LOG(info) << workerStr << " SERVER STATE UNKNOWN OR STOPPED";
188 }
189 } else {
190 LOG(error) << workerStr << " STATUS REQUEST UNSUCCESSFUL";
191 }
192 }
193 }
194 return false;
195 }
196
197 bool Kernel(int workerID, fair::mq::Channel& requestchannel, fair::mq::Channel& dataoutchannel, fair::mq::Channel* statuschannel = nullptr)
198 {
199 static int counter = 0;
200 bool reproducibleSim = true;
201 if (getenv("O2_DISABLE_REPRODUCIBLE_SIM")) {
202 reproducibleSim = false;
203 }
204
205 // Mainly for debugging reasons, we allow to transport
206 // a specific event + eventpart. This allows to reproduce and debug bugs faster, once
207 // we know in which precise chunk they occur. The expected format for the environment variable
208 // is "eventnum:partid".
209 auto eventselection = getenv("O2SIM_RESTRICT_EVENTPART");
210 int focus_on_event = -1;
211 int focus_on_part = -1;
212 if (eventselection) {
213 auto splitString = [](const std::string& str) {
214 std::pair<std::string, std::string> parts;
215 size_t pos = str.find(':');
216 if (pos != std::string::npos) {
217 parts.first = str.substr(0, pos);
218 parts.second = str.substr(pos + 1);
219 }
220 return parts;
221 };
222 auto p = splitString(eventselection);
223 focus_on_event = std::atoi(p.first.c_str());
224 focus_on_part = std::atoi(p.second.c_str());
225 }
226
227 fair::mq::MessagePtr request(requestchannel.NewSimpleMessage(PrimaryChunkRequest{workerID, -1, counter++})); // <-- don't need content; channel means -> give primaries
228 fair::mq::Parts reply;
229
230 mVMCApp->setSimDataChannel(&dataoutchannel);
231
232 // we log info with workerID prepended
233 auto workerStr = [workerID]() {
234 std::stringstream str;
235 str << "[W" << workerID << "]";
236 return str.str();
237 };
238
239 doLogInfo(workerID, "Requesting work chunk");
240 int timeoutinMS = 2000;
241 auto sendcode = requestchannel.Send(request, timeoutinMS);
242 if (sendcode > 0) {
243 doLogInfo(workerID, "Waiting for answer");
244 // asking for primary generation
245
246 auto code = requestchannel.Receive(reply);
247 if (code > 0) {
248 doLogInfo(workerID, "Primary chunk received");
249 auto rawmessage = std::move(reply.At(0));
250 auto header = *(o2::PrimaryChunkAnswer*)(rawmessage->GetData());
251 if (!header.payload_attached) {
252 doLogInfo(workerID, "No payload; Server in stage " + std::string(PrimStateToString[(int)header.serverstate]));
253 // if no payload attached we inspect the server state, to see what to do
254 if (header.serverstate == O2PrimaryServerState::Initializing || header.serverstate == O2PrimaryServerState::WaitingEvent) {
255 sleep(1); // back-off and retry
256 return true;
257 }
258 // we need to decide what to do when the server is idle ---> if this happens immediately after a new batch request it means that the server might just lag a bit behind
259 return false;
260 } else {
261 auto payload = std::move(reply.At(1));
262 // wrap incoming bytes as a TMessageWrapper which offers "adoption" of a buffer
263 auto message = new TMessageWrapper(payload->GetData(), payload->GetSize());
264 auto chunk = static_cast<o2::data::PrimaryChunk*>(message->ReadObjectAny(message->GetClass()));
265
266 bool goon = true;
267 // no particles and eventID == -1 --> indication for no more work
268 if (chunk->mParticles.size() == 0 && chunk->mSubEventInfo.eventID == -1) {
269 doLogInfo(workerID, "No particles in reply : quitting kernel");
270 goon = false;
271 }
272
273 if (goon) {
274
275 auto info = chunk->mSubEventInfo;
276 LOG(info) << workerStr() << " Processing " << chunk->mParticles.size() << " primary particles "
277 << "for event " << info.eventID << "/" << info.maxEvents << " "
278 << "part " << info.part << "/" << info.nparts;
279
280 if (eventselection == nullptr || (focus_on_event == info.eventID && focus_on_part == info.part)) {
281 mVMCApp->setPrimaries(chunk->mParticles);
282 } else {
283 // nothing to transport here
284 mVMCApp->setPrimaries(std::vector<TParticle>{});
285 LOG(info) << workerStr() << " This chunk will be skipped";
286 }
287
288 mVMCApp->setSubEventInfo(&info);
289
290 if (reproducibleSim) {
291 LOG(info) << workerStr() << " Setting seed for this sub-event to " << chunk->mSubEventInfo.seed;
292 gRandom->SetSeed(chunk->mSubEventInfo.seed);
294 }
295
296 // Process one event
297 auto& conf = o2::conf::SimConfig::Instance();
298 if (strcmp(conf.getMCEngine().c_str(), "TGeant4") == 0 || strcmp(conf.getMCEngine().c_str(), "O2TrivialMCEngine") == 0) {
299 // this is preferred and necessary for Geant4
300 // since repeated "ProcessRun" might have significant overheads
301 mVMC->ProcessEvent();
302 } else {
303 // for Geant3 calling ProcessEvent is not enough
304 // as some hooks are not called
305 mVMC->ProcessRun(1);
306 }
307
308 FairSystemInfo sysinfo;
309 LOG(info) << workerStr() << " TIME-STAMP " << mTimer.RealTime() << "\t";
310 mTimer.Continue();
311 LOG(info) << workerStr() << " MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
312 << sysinfo.GetMaxMemory() << " MB\n";
313 }
314 delete message;
315 delete chunk;
316 }
317 } else {
318 LOG(info) << workerStr() << " No primary answer received from server (within timeout). Return code " << code;
319 }
320 } else {
321 LOG(info) << workerStr() << " Requesting work from server not possible. Return code " << sendcode;
322 return false;
323 }
324 return true;
325 }
326
327 protected:
329 bool ConditionalRun() final
330 {
331 return Kernel(-1, GetChannels().at("primary-get").at(0), GetChannels().at("simdata").at(0));
332 }
333
334 void PostRun() final { LOG(info) << "Shutting down "; }
335
336 private:
337 TStopwatch mTimer;
338 o2::steer::O2MCApplication* mVMCApp = nullptr;
339 TVirtualMC* mVMC = nullptr;
340 std::unique_ptr<FairRunSim> mSimRun;
341};
342
343} // namespace devices
344} // namespace o2
345
346#endif
benchmark::State & state
Definition of the Stack class.
int32_t i
void doLogInfo(int workerID, std::string const &message)
Definition O2SimDevice.h:34
uint16_t pos
Definition RawData.h:3
std::vector< std::string > splitString(const std::string &src, char delim, bool trim=false)
static VMCSeederService const & instance()
static SimConfig & Instance()
Definition SimConfig.h:111
bool Kernel(int workerID, fair::mq::Channel &requestchannel, fair::mq::Channel &dataoutchannel, fair::mq::Channel *statuschannel=nullptr)
~O2SimDevice() final
Default destructor.
Definition O2SimDevice.h:59
O2SimDevice(o2::steer::O2MCApplication *vmcapp, TVirtualMC *vmc)
Definition O2SimDevice.h:56
static void CustomCleanup(void *data, void *hint)
Definition O2SimDevice.h:84
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
bool isWorkAvailable(fair::mq::Channel &statuschannel, int workerID=-1)
void InitTask() final
Overloads the InitTask() method of fair::mq::Device.
Definition O2SimDevice.h:70
static bool initSim(fair::mq::Channel &channel, std::unique_ptr< FairRunSim > &simptr)
bool ConditionalRun() final
Overloads the ConditionalRun() method of fair::mq::Device.
~TMessageWrapper() override=default
TMessageWrapper(void *buf, Int_t len)
Definition O2SimDevice.h:47
void setSimDataChannel(fair::mq::Channel *channel)
void setPrimaries(std::vector< TParticle > const &p)
void setSubEventInfo(o2::data::SubEventInfo *i)
static ShmManager & Instance()
Definition ShmManager.h:61
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint counter
Definition glcorearb.h:3987
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string mMCEngine
Definition SimConfig.h:54
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str