102 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
104 int timeoutinMS = 60000;
105 if (channel.Send(request, timeoutinMS) > 0) {
106 LOG(info) <<
"Waiting for configuration answer ";
107 if (channel.Receive(reply, timeoutinMS) > 0) {
108 LOG(info) <<
"Configuration answer received, containing " << reply->GetSize() <<
" bytes ";
111 auto message = std::make_unique<o2::devices::TMessageWrapper>(reply->GetData(), reply->GetSize());
117 LOG(info) <<
"COMMUNICATED ENGINE " << config->
mMCEngine;
120 conf.resetFromConfigData(*config);
121 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
124 LOG(error) <<
"No configuration received within " << timeoutinMS <<
"ms\n";
128 LOG(error) <<
"Could not send configuration request within " << timeoutinMS <<
"ms\n";
135 static bool initSim(fair::mq::Channel& channel, std::unique_ptr<FairRunSim>& simptr)
141 LOG(info) <<
"Setting up the simulation ...";
142 simptr = std::move(std::unique_ptr<FairRunSim>(o2sim_init(
true)));
143 FairSystemInfo sysinfo;
148 TVirtualMC::GetMC()->ProcessRun(0);
150 LOG(info) <<
"MEM-STAMP END OF SIM INIT" << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
151 << sysinfo.GetMaxMemory() <<
" MB\n";
158 std::stringstream
str;
159 str <<
"[W" << workerID <<
"]";
160 auto workerStr =
str.str();
162 int timeoutinMS = 2000;
168 fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(
i));
169 auto sendcode = statuschannel.Send(request, timeoutinMS);
171 LOG(info) << workerStr <<
" Waiting for status answer ";
172 auto code = statuschannel.Receive(reply, timeoutinMS);
174 int state(*((
int*)(reply->GetData())));
176 LOG(info) << workerStr <<
" SERVER IS SERVING";
179 LOG(info) << workerStr <<
" SERVER IS STILL INITIALIZING";
183 LOG(info) << workerStr <<
" SERVER IS WAITING FOR EVENT";
187 LOG(info) << workerStr <<
" SERVER IS IDLE";
190 LOG(info) << workerStr <<
" SERVER STATE UNKNOWN OR STOPPED";
193 LOG(error) << workerStr <<
" STATUS REQUEST UNSUCCESSFUL";
200 bool Kernel(
int workerID, fair::mq::Channel& requestchannel, fair::mq::Channel& dataoutchannel, fair::mq::Channel* statuschannel =
nullptr)
203 bool reproducibleSim =
true;
204 if (getenv(
"O2_DISABLE_REPRODUCIBLE_SIM")) {
205 reproducibleSim =
false;
212 auto eventselection = getenv(
"O2SIM_RESTRICT_EVENTPART");
213 int focus_on_event = -1;
214 int focus_on_part = -1;
215 if (eventselection) {
217 std::pair<std::string, std::string> parts;
218 size_t pos =
str.find(
':');
219 if (
pos != std::string::npos) {
220 parts.first =
str.substr(0,
pos);
221 parts.second =
str.substr(
pos + 1);
226 focus_on_event = std::atoi(p.first.c_str());
227 focus_on_part = std::atoi(p.second.c_str());
230 fair::mq::MessagePtr request(requestchannel.NewSimpleMessage(
PrimaryChunkRequest{workerID, -1, counter++}));
231 fair::mq::Parts reply;
236 auto workerStr = [workerID]() {
237 std::stringstream
str;
238 str <<
"[W" << workerID <<
"]";
242 doLogInfo(workerID,
"Requesting work chunk");
243 int timeoutinMS = 2000;
244 auto sendcode = requestchannel.Send(request, timeoutinMS);
246 doLogInfo(workerID,
"Waiting for answer");
249 auto code = requestchannel.Receive(reply);
251 doLogInfo(workerID,
"Primary chunk received");
252 auto rawmessage = std::move(reply.At(0));
254 if (!header.payload_attached) {
255 doLogInfo(workerID,
"No payload; Server in stage " + std::string(PrimStateToString[(
int)header.serverstate]));
264 auto payload = std::move(reply.At(1));
271 if (chunk->mParticles.size() == 0 && chunk->mSubEventInfo.eventID == -1) {
272 doLogInfo(workerID,
"No particles in reply : quitting kernel");
278 auto info = chunk->mSubEventInfo;
279 LOG(info) << workerStr() <<
" Processing " << chunk->mParticles.size() <<
" primary particles "
280 <<
"for event " << info.eventID <<
"/" << info.maxEvents <<
" "
281 <<
"part " << info.part <<
"/" << info.nparts;
283 if (eventselection ==
nullptr || (focus_on_event == info.eventID && focus_on_part == info.part)) {
288 LOG(info) << workerStr() <<
" This chunk will be skipped";
293 if (reproducibleSim) {
294 LOG(info) << workerStr() <<
" Setting seed for this sub-event to " << chunk->mSubEventInfo.seed;
295 gRandom->SetSeed(chunk->mSubEventInfo.seed);
301 if (strcmp(conf.getMCEngine().c_str(),
"TGeant4") == 0 || strcmp(conf.getMCEngine().c_str(),
"O2TrivialMCEngine") == 0) {
304 mVMC->ProcessEvent();
311 FairSystemInfo sysinfo;
312 LOG(info) << workerStr() <<
" TIME-STAMP " << mTimer.RealTime() <<
"\t";
314 LOG(info) << workerStr() <<
" MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
315 << sysinfo.GetMaxMemory() <<
" MB\n";
321 LOG(info) << workerStr() <<
" No primary answer received from server (within timeout). Return code " << code;
324 LOG(info) << workerStr() <<
" Requesting work from server not possible. Return code " << sendcode;