99 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
101 int timeoutinMS = 60000;
102 if (channel.Send(request, timeoutinMS) > 0) {
103 LOG(info) <<
"Waiting for configuration answer ";
104 if (channel.Receive(reply, timeoutinMS) > 0) {
105 LOG(info) <<
"Configuration answer received, containing " << reply->GetSize() <<
" bytes ";
108 auto message = std::make_unique<o2::devices::TMessageWrapper>(reply->GetData(), reply->GetSize());
114 LOG(info) <<
"COMMUNICATED ENGINE " << config->
mMCEngine;
117 conf.resetFromConfigData(*config);
118 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
121 LOG(error) <<
"No configuration received within " << timeoutinMS <<
"ms\n";
125 LOG(error) <<
"Could not send configuration request within " << timeoutinMS <<
"ms\n";
132 static bool initSim(fair::mq::Channel& channel, std::unique_ptr<FairRunSim>& simptr)
138 LOG(info) <<
"Setting up the simulation ...";
139 simptr = std::move(std::unique_ptr<FairRunSim>(o2sim_init(
true)));
140 FairSystemInfo sysinfo;
145 TVirtualMC::GetMC()->ProcessRun(0);
147 LOG(info) <<
"MEM-STAMP END OF SIM INIT" << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
148 << sysinfo.GetMaxMemory() <<
" MB\n";
155 std::stringstream
str;
156 str <<
"[W" << workerID <<
"]";
157 auto workerStr =
str.str();
159 int timeoutinMS = 2000;
165 fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(
i));
166 auto sendcode = statuschannel.Send(request, timeoutinMS);
168 LOG(info) << workerStr <<
" Waiting for status answer ";
169 auto code = statuschannel.Receive(reply, timeoutinMS);
171 int state(*((
int*)(reply->GetData())));
173 LOG(info) << workerStr <<
" SERVER IS SERVING";
176 LOG(info) << workerStr <<
" SERVER IS STILL INITIALIZING";
180 LOG(info) << workerStr <<
" SERVER IS WAITING FOR EVENT";
184 LOG(info) << workerStr <<
" SERVER IS IDLE";
187 LOG(info) << workerStr <<
" SERVER STATE UNKNOWN OR STOPPED";
190 LOG(error) << workerStr <<
" STATUS REQUEST UNSUCCESSFUL";
197 bool Kernel(
int workerID, fair::mq::Channel& requestchannel, fair::mq::Channel& dataoutchannel, fair::mq::Channel* statuschannel =
nullptr)
200 bool reproducibleSim =
true;
201 if (getenv(
"O2_DISABLE_REPRODUCIBLE_SIM")) {
202 reproducibleSim =
false;
209 auto eventselection = getenv(
"O2SIM_RESTRICT_EVENTPART");
210 int focus_on_event = -1;
211 int focus_on_part = -1;
212 if (eventselection) {
214 std::pair<std::string, std::string> parts;
215 size_t pos =
str.find(
':');
216 if (
pos != std::string::npos) {
217 parts.first =
str.substr(0,
pos);
218 parts.second =
str.substr(
pos + 1);
223 focus_on_event = std::atoi(p.first.c_str());
224 focus_on_part = std::atoi(p.second.c_str());
227 fair::mq::MessagePtr request(requestchannel.NewSimpleMessage(
PrimaryChunkRequest{workerID, -1, counter++}));
228 fair::mq::Parts reply;
233 auto workerStr = [workerID]() {
234 std::stringstream
str;
235 str <<
"[W" << workerID <<
"]";
239 doLogInfo(workerID,
"Requesting work chunk");
240 int timeoutinMS = 2000;
241 auto sendcode = requestchannel.Send(request, timeoutinMS);
243 doLogInfo(workerID,
"Waiting for answer");
246 auto code = requestchannel.Receive(reply);
248 doLogInfo(workerID,
"Primary chunk received");
249 auto rawmessage = std::move(reply.At(0));
251 if (!header.payload_attached) {
252 doLogInfo(workerID,
"No payload; Server in stage " + std::string(PrimStateToString[(
int)header.serverstate]));
261 auto payload = std::move(reply.At(1));
268 if (chunk->mParticles.size() == 0 && chunk->mSubEventInfo.eventID == -1) {
269 doLogInfo(workerID,
"No particles in reply : quitting kernel");
275 auto info = chunk->mSubEventInfo;
276 LOG(info) << workerStr() <<
" Processing " << chunk->mParticles.size() <<
" primary particles "
277 <<
"for event " << info.eventID <<
"/" << info.maxEvents <<
" "
278 <<
"part " << info.part <<
"/" << info.nparts;
280 if (eventselection ==
nullptr || (focus_on_event == info.eventID && focus_on_part == info.part)) {
285 LOG(info) << workerStr() <<
" This chunk will be skipped";
290 if (reproducibleSim) {
291 LOG(info) << workerStr() <<
" Setting seed for this sub-event to " << chunk->mSubEventInfo.seed;
292 gRandom->SetSeed(chunk->mSubEventInfo.seed);
298 if (strcmp(conf.getMCEngine().c_str(),
"TGeant4") == 0 || strcmp(conf.getMCEngine().c_str(),
"O2TrivialMCEngine") == 0) {
301 mVMC->ProcessEvent();
308 FairSystemInfo sysinfo;
309 LOG(info) << workerStr() <<
" TIME-STAMP " << mTimer.RealTime() <<
"\t";
311 LOG(info) << workerStr() <<
" MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) <<
" "
312 << sysinfo.GetMaxMemory() <<
" MB\n";
318 LOG(info) << workerStr() <<
" No primary answer received from server (within timeout). Return code " << code;
321 LOG(info) << workerStr() <<
" Requesting work from server not possible. Return code " << sendcode;