Project
Loading...
Searching...
No Matches
O2SimDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef ALICEO2_DEVICES_SIMDEVICE_H_
15#define ALICEO2_DEVICES_SIMDEVICE_H_
16
17#include <memory>
18#include <fairmq/Message.h>
19#include <fairmq/Device.h>
20#include <fairmq/Parts.h>
21#include <fairlogger/Logger.h>
22#include "../macro/o2sim.C"
23#include "TVirtualMC.h"
24#include "TMessage.h"
25#include <DetectorsBase/Stack.h>
28#include <TRandom.h>
29#include <SimConfig/SimConfig.h>
30#include <cstring>
31#include "PrimaryServerState.h"
32
33// a helper for logging with worker index prefixed
34void doLogInfo(int workerID, std::string const& message)
35{
36 LOG(info) << "[W" << workerID << "] " << message;
37}
38
39namespace o2
40{
41namespace devices
42{
43
45{
46 public:
47 TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
48 ~TMessageWrapper() override = default;
49};
50
51// device representing a simulation worker
52class O2SimDevice final : public fair::mq::Device
53{
54 public:
55 O2SimDevice() = default;
56 O2SimDevice(o2::steer::O2MCApplication* vmcapp, TVirtualMC* vmc) : mVMCApp{vmcapp}, mVMC{vmc} {}
57
60 {
61 FairSystemInfo sysinfo;
63 LOG(info) << "Shutting down O2SimDevice";
64 LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
65 LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " " << sysinfo.GetMaxMemory() << " MB\n";
66 }
67
68 protected:
70 void InitTask() final
71 {
72 // in the initialization phase we will init the simulation
73 // NOTE: In a fair::mq::Device this is better done here (instead of outside) since
74 // we have to setup simulation + worker in the same thread (due to many threadlocal variables
75 // in the simulation) ... at least as long fair::mq::Device is not spawning workers on the master thread
76 initSim(GetChannels().at("o2sim-primserv-info").at(0), mSimRun);
77
78 // set the vmc and app pointers
79 mVMC = TVirtualMC::GetMC();
80 mVMCApp = static_cast<o2::steer::O2MCApplication*>(TVirtualMCApplication::Instance());
81 lateInit();
82 }
83
84 static void CustomCleanup(void* data, void* hint) { delete static_cast<std::string*>(hint); }
85
86 public:
87 void lateInit()
88 {
89 // late init
90 mVMCApp->initLate();
91 }
92
93 // should go into a helper
94 // this function queries the sim config data and initializes the SimConfig singleton
95 // returns true if successful / false if not
96 static bool querySimConfig(fair::mq::Channel& channel)
97 {
98 // auto text = new std::string("configrequest");
99 // std::unique_ptr<fair::mq::Message> request(channel.NewMessage(const_cast<char*>(text->c_str()),
100 // text->length(), CustomCleanup, text));
101 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(O2PrimaryServerInfoRequest::Config));
102 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
103
104 int timeoutinMS = 60000; // wait for 60s max --> should be fast reply
105 if (channel.Send(request, timeoutinMS) > 0) {
106 LOG(info) << "Waiting for configuration answer ";
107 if (channel.Receive(reply, timeoutinMS) > 0) {
108 LOG(info) << "Configuration answer received, containing " << reply->GetSize() << " bytes ";
109
110 // the answer is a TMessage containing the simulation Configuration
111 auto message = std::make_unique<o2::devices::TMessageWrapper>(reply->GetData(), reply->GetSize());
112 auto config = static_cast<o2::conf::SimConfigData*>(message.get()->ReadObjectAny(message.get()->GetClass()));
113 if (!config) {
114 return false;
115 }
116
117 LOG(info) << "COMMUNICATED ENGINE " << config->mMCEngine;
118
119 auto& conf = o2::conf::SimConfig::Instance();
120 conf.resetFromConfigData(*config);
121 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
122 delete config;
123 } else {
124 LOG(error) << "No configuration received within " << timeoutinMS << "ms\n";
125 return false;
126 }
127 } else {
128 LOG(error) << "Could not send configuration request within " << timeoutinMS << "ms\n";
129 return false;
130 }
131 return true;
132 }
133
134 // initializes the simulation classes; queries the configuration on a given channel
135 static bool initSim(fair::mq::Channel& channel, std::unique_ptr<FairRunSim>& simptr)
136 {
137 if (!querySimConfig(channel)) {
138 return false;
139 }
140
141 LOG(info) << "Setting up the simulation ...";
142 simptr = std::move(std::unique_ptr<FairRunSim>(o2sim_init(true)));
143 FairSystemInfo sysinfo;
144
145 // to finish initialization (trigger further cross section table building etc) -- which especially
146 // G4 is doing at the first ProcessRun
147 // The goal is to have everything setup before we fork
148 TVirtualMC::GetMC()->ProcessRun(0);
149
150 LOG(info) << "MEM-STAMP END OF SIM INIT" << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
151 << sysinfo.GetMaxMemory() << " MB\n";
152
153 return true;
154 }
155
156 bool isWorkAvailable(fair::mq::Channel& statuschannel, int workerID = -1)
157 {
158 std::stringstream str;
159 str << "[W" << workerID << "]";
160 auto workerStr = str.str();
161
162 int timeoutinMS = 2000; // wait for 2s max
163 bool reprobe = true;
164 while (reprobe) {
165 reprobe = false;
166 int i = -1;
167 fair::mq::MessagePtr request(statuschannel.NewSimpleMessage(O2PrimaryServerInfoRequest::Status));
168 fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(i));
169 auto sendcode = statuschannel.Send(request, timeoutinMS);
170 if (sendcode > 0) {
171 LOG(info) << workerStr << " Waiting for status answer ";
172 auto code = statuschannel.Receive(reply, timeoutinMS);
173 if (code > 0) {
174 int state(*((int*)(reply->GetData())));
176 LOG(info) << workerStr << " SERVER IS SERVING";
177 return true;
179 LOG(info) << workerStr << " SERVER IS STILL INITIALIZING";
180 reprobe = true;
181 sleep(1);
183 LOG(info) << workerStr << " SERVER IS WAITING FOR EVENT";
184 reprobe = true;
185 sleep(1);
186 } else if (state == (int)o2::O2PrimaryServerState::Idle) {
187 LOG(info) << workerStr << " SERVER IS IDLE";
188 return false;
189 } else {
190 LOG(info) << workerStr << " SERVER STATE UNKNOWN OR STOPPED";
191 }
192 } else {
193 LOG(error) << workerStr << " STATUS REQUEST UNSUCCESSFUL";
194 }
195 }
196 }
197 return false;
198 }
199
200 bool Kernel(int workerID, fair::mq::Channel& requestchannel, fair::mq::Channel& dataoutchannel, fair::mq::Channel* statuschannel = nullptr)
201 {
202 static int counter = 0;
203 bool reproducibleSim = true;
204 if (getenv("O2_DISABLE_REPRODUCIBLE_SIM")) {
205 reproducibleSim = false;
206 }
207
208 // Mainly for debugging reasons, we allow to transport
209 // a specific event + eventpart. This allows to reproduce and debug bugs faster, once
210 // we know in which precise chunk they occur. The expected format for the environment variable
211 // is "eventnum:partid".
212 auto eventselection = getenv("O2SIM_RESTRICT_EVENTPART");
213 int focus_on_event = -1;
214 int focus_on_part = -1;
215 if (eventselection) {
216 auto splitString = [](const std::string& str) {
217 std::pair<std::string, std::string> parts;
218 size_t pos = str.find(':');
219 if (pos != std::string::npos) {
220 parts.first = str.substr(0, pos);
221 parts.second = str.substr(pos + 1);
222 }
223 return parts;
224 };
225 auto p = splitString(eventselection);
226 focus_on_event = std::atoi(p.first.c_str());
227 focus_on_part = std::atoi(p.second.c_str());
228 }
229
230 fair::mq::MessagePtr request(requestchannel.NewSimpleMessage(PrimaryChunkRequest{workerID, -1, counter++})); // <-- don't need content; channel means -> give primaries
231 fair::mq::Parts reply;
232
233 mVMCApp->setSimDataChannel(&dataoutchannel);
234
235 // we log info with workerID prepended
236 auto workerStr = [workerID]() {
237 std::stringstream str;
238 str << "[W" << workerID << "]";
239 return str.str();
240 };
241
242 doLogInfo(workerID, "Requesting work chunk");
243 int timeoutinMS = 2000;
244 auto sendcode = requestchannel.Send(request, timeoutinMS);
245 if (sendcode > 0) {
246 doLogInfo(workerID, "Waiting for answer");
247 // asking for primary generation
248
249 auto code = requestchannel.Receive(reply);
250 if (code > 0) {
251 doLogInfo(workerID, "Primary chunk received");
252 auto rawmessage = std::move(reply.At(0));
253 auto header = *(o2::PrimaryChunkAnswer*)(rawmessage->GetData());
254 if (!header.payload_attached) {
255 doLogInfo(workerID, "No payload; Server in stage " + std::string(PrimStateToString[(int)header.serverstate]));
256 // if no payload attached we inspect the server state, to see what to do
257 if (header.serverstate == O2PrimaryServerState::Initializing || header.serverstate == O2PrimaryServerState::WaitingEvent) {
258 sleep(1); // back-off and retry
259 return true;
260 }
261 // we need to decide what to do when the server is idle ---> if this happens immediately after a new batch request it means that the server might just lag a bit behind
262 return false;
263 } else {
264 auto payload = std::move(reply.At(1));
265 // wrap incoming bytes as a TMessageWrapper which offers "adoption" of a buffer
266 auto message = new TMessageWrapper(payload->GetData(), payload->GetSize());
267 auto chunk = static_cast<o2::data::PrimaryChunk*>(message->ReadObjectAny(message->GetClass()));
268
269 bool goon = true;
270 // no particles and eventID == -1 --> indication for no more work
271 if (chunk->mParticles.size() == 0 && chunk->mSubEventInfo.eventID == -1) {
272 doLogInfo(workerID, "No particles in reply : quitting kernel");
273 goon = false;
274 }
275
276 if (goon) {
277
278 auto info = chunk->mSubEventInfo;
279 LOG(info) << workerStr() << " Processing " << chunk->mParticles.size() << " primary particles "
280 << "for event " << info.eventID << "/" << info.maxEvents << " "
281 << "part " << info.part << "/" << info.nparts;
282
283 if (eventselection == nullptr || (focus_on_event == info.eventID && focus_on_part == info.part)) {
284 mVMCApp->setPrimaries(chunk->mParticles);
285 } else {
286 // nothing to transport here
287 mVMCApp->setPrimaries(std::vector<TParticle>{});
288 LOG(info) << workerStr() << " This chunk will be skipped";
289 }
290
291 mVMCApp->setSubEventInfo(&info);
292
293 if (reproducibleSim) {
294 LOG(info) << workerStr() << " Setting seed for this sub-event to " << chunk->mSubEventInfo.seed;
295 gRandom->SetSeed(chunk->mSubEventInfo.seed);
297 }
298
299 // Process one event
300 auto& conf = o2::conf::SimConfig::Instance();
301 if (strcmp(conf.getMCEngine().c_str(), "TGeant4") == 0 || strcmp(conf.getMCEngine().c_str(), "O2TrivialMCEngine") == 0) {
302 // this is preferred and necessary for Geant4
303 // since repeated "ProcessRun" might have significant overheads
304 mVMC->ProcessEvent();
305 } else {
306 // for Geant3 calling ProcessEvent is not enough
307 // as some hooks are not called
308 mVMC->ProcessRun(1);
309 }
310
311 FairSystemInfo sysinfo;
312 LOG(info) << workerStr() << " TIME-STAMP " << mTimer.RealTime() << "\t";
313 mTimer.Continue();
314 LOG(info) << workerStr() << " MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
315 << sysinfo.GetMaxMemory() << " MB\n";
316 }
317 delete message;
318 delete chunk;
319 }
320 } else {
321 LOG(info) << workerStr() << " No primary answer received from server (within timeout). Return code " << code;
322 }
323 } else {
324 LOG(info) << workerStr() << " Requesting work from server not possible. Return code " << sendcode;
325 return false;
326 }
327 return true;
328 }
329
330 protected:
332 bool ConditionalRun() final
333 {
334 return Kernel(-1, GetChannels().at("primary-get").at(0), GetChannels().at("simdata").at(0));
335 }
336
337 void PostRun() final { LOG(info) << "Shutting down "; }
338
339 private:
340 TStopwatch mTimer;
341 o2::steer::O2MCApplication* mVMCApp = nullptr;
342 TVirtualMC* mVMC = nullptr;
343 std::unique_ptr<FairRunSim> mSimRun;
344};
345
346} // namespace devices
347} // namespace o2
348
349#endif
benchmark::State & state
Definition of the Stack class.
int32_t i
void doLogInfo(int workerID, std::string const &message)
Definition O2SimDevice.h:34
uint16_t pos
Definition RawData.h:3
std::vector< std::string > splitString(const std::string &src, char delim, bool trim=false)
static VMCSeederService const & instance()
static SimConfig & Instance()
Definition SimConfig.h:111
bool Kernel(int workerID, fair::mq::Channel &requestchannel, fair::mq::Channel &dataoutchannel, fair::mq::Channel *statuschannel=nullptr)
~O2SimDevice() final
Default destructor.
Definition O2SimDevice.h:59
O2SimDevice(o2::steer::O2MCApplication *vmcapp, TVirtualMC *vmc)
Definition O2SimDevice.h:56
static void CustomCleanup(void *data, void *hint)
Definition O2SimDevice.h:84
static bool querySimConfig(fair::mq::Channel &channel)
Definition O2SimDevice.h:96
bool isWorkAvailable(fair::mq::Channel &statuschannel, int workerID=-1)
void InitTask() final
Overloads the InitTask() method of fair::mq::Device.
Definition O2SimDevice.h:70
static bool initSim(fair::mq::Channel &channel, std::unique_ptr< FairRunSim > &simptr)
bool ConditionalRun() final
Overloads the ConditionalRun() method of fair::mq::Device.
~TMessageWrapper() override=default
TMessageWrapper(void *buf, Int_t len)
Definition O2SimDevice.h:47
void setSimDataChannel(fair::mq::Channel *channel)
void setPrimaries(std::vector< TParticle > const &p)
void setSubEventInfo(o2::data::SubEventInfo *i)
static ShmManager & Instance()
Definition ShmManager.h:61
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint counter
Definition glcorearb.h:3987
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string mMCEngine
Definition SimConfig.h:54
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str