Project
Loading...
Searching...
No Matches
O2SimDeviceRunner.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include "O2SimDevice.h"
15#include "SimSetup/SimSetup.h"
16#include <fairmq/DeviceRunner.h>
17#include <boost/program_options.hpp>
18#include <memory>
19#include <string>
20#include <fairmq/Channel.h>
21#include <fairlogger/Logger.h>
22#include <fairmq/Parts.h>
23#include <fairmq/TransportFactory.h>
24#include <TStopwatch.h>
25#include <sys/wait.h>
26#include <pthread.h> // to set cpu affinity
27#include <cmath>
28#include <csignal>
29#include <unistd.h>
31
32#include "rapidjson/document.h"
33#include "rapidjson/stringbuffer.h"
34#include "rapidjson/filereadstream.h"
35namespace bpo = boost::program_options;
36
37std::vector<int> gChildProcesses; // global vector of child pids
40
41// a handler for error/termination signals
42void sigaction_handler(int signal, siginfo_t* signal_info, void*)
43{
44 auto pid = getpid();
45 LOG(info) << pid << " caught signal " << signal << " from source " << signal_info->si_pid;
46 auto groupid = getpgrp();
47 if (pid == gMasterProcess && signal_info->si_pid != gDriverProcess) {
48 // master worker forwards signal to whole worker process group
49 // (do this only if not coming from gDriverProcess since this uses killpg and already affected all children)
50 killpg(pid, signal);
51 } else {
52 if (signal_info->si_pid != gDriverProcess) {
53 // forward to master worker if coming internally
54 kill(groupid, signal);
55 }
56 }
57
58 if (signal_info->si_pid == gDriverProcess || signal == SIGTERM) {
59 // signal was sent from driver process --> not error
60 // or it was a standard SIGTERM
61
62 // need to wait for potential children before exiting itself
63 // ... in order to have correct resource accounting
64 int status, cpid;
65 while ((cpid = wait(&status))) {
66 if (cpid == -1) {
67 break;
68 }
69 }
71 _exit(0);
72 }
73
74 // we treat internal signal interruption as an error
75 // because only ordinary termination is good in the context of the distributed system
76 _exit(128 + signal);
77}
78
79void addCustomOptions(bpo::options_description& options)
80{
81}
82
83void CustomCleanup(void* data, void* hint) { delete static_cast<std::string*>(hint); }
84
85// this will initialize the simulation setup
86// once before initializing the actual FairMQ device
87bool initializeSim(std::string transport, std::string address, std::unique_ptr<FairRunSim>& simptr)
88{
89 // This needs an already running PrimaryServer
90 auto factory = fair::mq::TransportFactory::CreateTransportFactory(transport);
91 auto channel = fair::mq::Channel{"o2sim-primserv-info", "req", factory};
92 channel.Connect(address);
93 channel.Validate();
94
95 return o2::devices::O2SimDevice::initSim(channel, simptr);
96}
97
99{
100 auto app = static_cast<o2::steer::O2MCApplication*>(TVirtualMCApplication::Instance());
101 auto vmc = TVirtualMC::GetMC();
102
103 if (app == nullptr) {
104 LOG(warning) << "no vmc application found at this stage";
105 }
106 return new o2::devices::O2SimDevice(app, vmc);
107}
108
109fair::mq::Device* getDevice(const fair::mq::ProgOptions& config)
110{
111 return getDevice();
112}
113
114int initAndRunDevice(int argc, char* argv[])
115{
116 using namespace fair::mq;
117 using namespace fair::mq::hooks;
118
119 try {
120 fair::mq::DeviceRunner runner{argc, argv};
121
122 runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner& r) {
123 boost::program_options::options_description customOptions("Custom options");
124 addCustomOptions(customOptions);
125 r.fConfig.AddToCmdLineOptions(customOptions);
126 });
127
128 runner.AddHook<InstantiateDevice>([](DeviceRunner& r) {
129 r.fDevice = std::unique_ptr<fair::mq::Device>{getDevice(r.fConfig)};
130 });
131
132 return runner.Run();
133 } catch (std::exception& e) {
134 LOG(error) << "Unhandled exception reached the top of main: " << e.what()
135 << ", application will now exit";
136 return 1;
137 } catch (...) {
138 LOG(error) << "Non-exception instance being thrown. Please make sure you use std::runtime_exception() instead. "
139 << "Application will now exit.";
140 return 1;
141 }
142}
143
146 fair::mq::Channel* primchannel = nullptr;
147 fair::mq::Channel* datachannel = nullptr;
148 fair::mq::Channel* primstatuschannel = nullptr;
149 int workerID = -1;
150};
151
152KernelSetup initSim(std::string transport, std::string primaddress, std::string primstatusaddress, std::string mergeraddress, int workerID)
153{
154 auto factory = fair::mq::TransportFactory::CreateTransportFactory(transport);
155 auto primchannel = new fair::mq::Channel{"primary-get", "req", factory};
156 primchannel->Connect(primaddress);
157 primchannel->Validate();
158
159 auto prim_status_channel = new fair::mq::Channel{"o2sim-primserv-info", "req", factory};
160 prim_status_channel->Connect(primstatusaddress);
161 prim_status_channel->Validate();
162
163 auto datachannel = new fair::mq::Channel{"simdata", "push", factory};
164 datachannel->Connect(mergeraddress);
165 datachannel->Validate();
166 // the channels are setup
167
168 // init the sim object
169 auto sim = getDevice();
170 sim->lateInit();
171
172 return KernelSetup{sim, primchannel, datachannel, prim_status_channel, workerID};
173}
174
176{
177 // the simplified runloop
178 while (setup.sim->Kernel(setup.workerID, *setup.primchannel, *setup.datachannel, setup.primstatuschannel)) {
179 }
180 doLogInfo(setup.workerID, "simulation is done");
182 return 0;
183}
184
185void pinToCPU(unsigned int cpuid)
186{
187 auto affinity = getenv("ALICE_CPUAFFINITY");
188 if (affinity) {
189 // MacOS does not support this API so we add a protection
190#ifndef __APPLE__
191
192 pthread_t thread;
193
194 thread = pthread_self();
195
196 cpu_set_t cpuset;
197 CPU_ZERO(&cpuset);
198 CPU_SET(cpuid, &cpuset);
199
200 auto s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
201 if (s != 0) {
202 LOG(warning) << "FAILED TO SET PTHREAD AFFINITY";
203 }
204
205 /* Check the actual affinity mask assigned to the thread */
206 s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
207 if (s != 0) {
208 LOG(warning) << "FAILED TO GET PTHREAD AFFINITY";
209 }
210
211 for (int j = 0; j < CPU_SETSIZE; j++) {
212 if (CPU_ISSET(j, &cpuset)) {
213 LOG(info) << "ENABLED CPU " << j;
214 }
215 }
216#else
217 LOG(warn) << "CPU AFFINITY NOT IMPLEMENTED ON APPLE";
218#endif
219 }
220}
221
222bool waitForControlInput(int workerID)
223{
224 static bool initialized = false;
225 static fair::mq::Channel channel;
226 if (!initialized) {
227 // we do the channel connect and initialization only once
228 // (reducing the chances that we might miss a control message from the master)
229 static auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
230 channel = fair::mq::Channel{"o2sim-control", "sub", factory};
231 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
232 channel.Connect(std::string(controlsocketname));
233 channel.Validate();
234 initialized = true;
235 }
236 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
237
238 doLogInfo(workerID, "Listening for master control input");
239 if (channel.Receive(reply) > 0) {
240 auto data = reply->GetData();
241 auto size = reply->GetSize();
242
243 std::string command(reinterpret_cast<char const*>(data), size);
244 doLogInfo(workerID, "Received control message: " + command);
245
247 o2::conf::parseSimReconfigFromString(command, reconfig);
248 if (reconfig.stop) {
249 doLogInfo(workerID, "Stop asked, shutting down");
250 return false;
251 }
252 doLogInfo(workerID, "Asked to process " + std::to_string(reconfig.nEvents) + std::string(" new events"));
253 } else {
254 doLogInfo(workerID, "No control input received ");
255 }
256 return true;
257}
258
259int main(int argc, char* argv[])
260{
261 struct sigaction act;
262 memset(&act, 0, sizeof act);
263 sigemptyset(&act.sa_mask);
264 act.sa_sigaction = &sigaction_handler;
265 act.sa_flags = SA_SIGINFO; // <--- enable sigaction
266
267 std::vector<int> handledsignals = {SIGTERM, SIGINT, SIGQUIT, SIGSEGV, SIGBUS, SIGFPE, SIGABRT}; // <--- may need to be completed
268 // remember that SIGKILL can't be handled
269 for (auto s : handledsignals) {
270 if (sigaction(s, &act, nullptr)) {
271 LOG(error) << "Could not install signal handler for " << s;
272 exit(EXIT_FAILURE);
273 }
274 }
275
276 // set the fatal callback for the logger to not do a core dump (since this might interfere with process shutdown sequence
277 // since it calls ROOT::TSystem and further child processes)
278 fair::Logger::OnFatal([] { throw fair::FatalException("Fatal error occured. Exiting without core dump..."); });
279 // initialy set logger verbosity to medium
280 FairLogger::GetLogger()->SetLogVerbosityLevel("MEDIUM");
281
282 // extract the path to FairMQ config
283 bpo::options_description desc{"Options"};
284 // clang-format off
285 desc.add_options()
286 ("control","control type")
287 ("id","ID")
288 ("config-key","config key")
289 ("mq-config",bpo::value<std::string>(),"path to FairMQ config")
290 ("severity","log severity");
291 // clang-format on
292 bpo::variables_map vm;
293 bpo::store(parse_command_line(argc, argv, desc), vm);
294 bpo::notify(vm);
295
296 std::string FMQconfig;
297 if (vm.count("mq-config")) {
298 FMQconfig = vm["mq-config"].as<std::string>();
299 }
300
301 auto internalfork = getenv("ALICE_SIMFORKINTERNAL");
302 if (internalfork) {
303 int driverPID = getppid();
304 auto pubchannel = o2::simpubsub::createPUBChannel(o2::simpubsub::getPublishAddress("o2sim-worker-notifications", driverPID));
305
306 if (FMQconfig.empty()) {
307 throw std::runtime_error("This should never be called without FairMQ config.");
308 }
309 // read the JSON config
310 FILE* fp = fopen(FMQconfig.c_str(), "r");
311 constexpr unsigned short usmax = std::numeric_limits<unsigned short>::max() - 1;
312 char readBuffer[usmax];
313 rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
314 rapidjson::Document d;
315 d.ParseStream(is);
316 fclose(fp);
317
318 // retrieve correct server and merger URLs
319 std::string serveraddress;
320 std::string mergeraddress;
321 std::string serverstatus_address;
322 std::string s;
323
324 auto& options = d["fairMQOptions"];
325 assert(options.IsObject());
326 for (auto option = options.MemberBegin(); option != options.MemberEnd(); ++option) {
327 s = option->name.GetString();
328 if (s == "devices") {
329 assert(option->value.IsArray());
330 auto devices = option->value.GetArray();
331 for (auto& device : devices) {
332 s = device["id"].GetString();
333 if (s == "primary-server") {
334 auto channels = device["channels"].GetArray();
335 auto sockets = (channels[0])["sockets"].GetArray();
336 auto address = (sockets[0])["address"].GetString();
337 serveraddress = address;
338 sockets = (channels[1])["sockets"].GetArray();
339 address = (sockets[0])["address"].GetString();
340 serverstatus_address = address;
341 }
342 if (s == "hitmerger") {
343 auto channels = device["channels"].GetArray();
344 for (auto& channel : channels) {
345 s = channel["name"].GetString();
346 if (s == "simdata") {
347 auto sockets = channel["sockets"].GetArray();
348 auto address = (sockets[0])["address"].GetString();
349 mergeraddress = address;
350 }
351 }
352 }
353 }
354 }
355 }
356
357 LOG(info) << "Parsed primary server address " << serveraddress;
358 LOG(info) << "Parsed primary server status address " << serverstatus_address;
359 LOG(info) << "Parsed merger address " << mergeraddress;
360 if (serveraddress.empty() || mergeraddress.empty()) {
361 throw std::runtime_error("Could not determine server or merger URLs.");
362 }
363 // This is a solution based on initializing the simulation once
364 // and then fork the process to share the simulation memory across
365 // many processes. Here we are not using fair::mq::Devices and just setup
366 // some channels manually and do our own runloop.
367
368 // we init the simulation first
369 std::unique_ptr<FairRunSim> simrun;
370 // TODO: take the addresses from somewhere else
371 if (!initializeSim("zeromq", serverstatus_address, simrun)) {
372 LOG(error) << "Could not initialize simulation";
373 return 1;
374 }
375
376 // should be factored out?
377 unsigned int nworkers = std::max(1u, std::thread::hardware_concurrency() / 2);
378 auto f = getenv("ALICE_NSIMWORKERS");
379 if (f) {
380 nworkers = static_cast<unsigned int>(std::stoi(f));
381 }
382 LOG(info) << "Running with " << nworkers << " sim workers ";
383
384 gMasterProcess = getpid();
385 gDriverProcess = getppid();
386 // then we fork and create a device in each fork
387 for (auto i = 0u; i < nworkers; ++i) {
388 // we use the current process as one of the workers as it has nothing else to do
389 auto pid = (i == nworkers - 1) ? 0 : fork();
390 if (pid == 0) {
391 // Each worker can publish its progress/state on a ZMQ channel.
392 // We actually use a push/pull mechanism to collect all messages in the
393 // master worker which can then publish using PUB/SUB.
394 // auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
395 auto collectAndPubThreadFunction = [driverPID, &pubchannel]() {
396 auto collectorchannel = o2::simpubsub::createPUBChannel(o2::simpubsub::getPublishAddress("o2sim-workerinternal", driverPID), "pull");
397 std::unique_ptr<fair::mq::Message> msg(collectorchannel.NewMessage());
398
399 while (true) {
400 if (collectorchannel.Receive(msg) > 0) {
401 auto data = msg->GetData();
402 auto size = msg->GetSize();
403 std::string text(reinterpret_cast<char const*>(data), size);
404 // LOG(info) << "Collector message: " << text;
405 o2::simpubsub::publishMessage(pubchannel, text);
406 }
407 }
408 };
409 if (i == nworkers - 1) { // <---- extremely important to take non-forked version since ZMQ sockets do not behave well on fork
410 std::vector<std::thread> threads;
411 threads.push_back(std::thread(collectAndPubThreadFunction));
412 threads.back().detach();
413 }
414
415 // everyone else is getting a push socket for notifications
416 auto pushchannel = o2::simpubsub::createPUBChannel(o2::simpubsub::getPublishAddress("o2sim-workerinternal", driverPID), "push");
417
418 // we will try to pin each worker to a particular CPU
419 // this can be made configurable via environment variables??
420 pinToCPU(i);
421
422 auto kernelSetup = initSim("zeromq", serveraddress, serverstatus_address, mergeraddress, i);
423
424 std::stringstream worker;
425 worker << "WORKER" << i;
426 o2::simpubsub::publishMessage(pushchannel, o2::simpubsub::simStatusString(worker.str(), "STATUS", "SETUP COMPLETED"));
427
428 auto& conf = o2::conf::SimConfig::Instance();
429
430 bool more = true;
431 while (more) {
432 runSim(kernelSetup);
433
434 if (conf.asService()) {
435 LOG(info) << "IN SERVICE MODE WAITING";
436 o2::simpubsub::publishMessage(pushchannel, o2::simpubsub::simStatusString(worker.str(), "STATUS", "AWAITING INPUT"));
437 more = waitForControlInput(kernelSetup.workerID);
438 usleep(100); // --> why? (probably to give the server some chance to come to a "serving" state)
439 } else {
440 o2::simpubsub::publishMessage(pushchannel, o2::simpubsub::simStatusString(worker.str(), "STATUS", "TERMINATING"));
441
442 LOG(info) << "FINISHING";
443 more = false;
444 }
445 }
446 sleep(10); // ---> give some time for message to be delivered to merger (destructing too early might affect the ZQM buffers)
447 // The process will in any case be terminated by the main o2-sim driver.
448
449 // destruct setup (using _exit due to problems in ROOT shutdown (segmentation violations)
450 // Clearly at some moment, a more robust solution would be appreciated
451 _exit(0);
452 } else {
453 gChildProcesses.push_back(pid);
454 }
455 }
456 int status, cpid;
457 while ((cpid = wait(&status))) {
458 // LOG(info) << "normal wait " << cpid << " returned ";
459 if (cpid == -1) {
460 break;
461 }
462 }
463 _exit(0);
464 } else {
465 // This the solution where we setup an ordinary fair::mq::Device
466 // (each if which will setup its own simulation). Parallelism
467 // is achieved outside by instantiating multiple device processes.
468 _exit(initAndRunDevice(argc, argv));
469 }
470}
int32_t i
std::vector< int > gChildProcesses
void sigaction_handler(int signal, siginfo_t *signal_info, void *)
KernelSetup initSim(std::string transport, std::string primaddress, std::string primstatusaddress, std::string mergeraddress, int workerID)
int gMasterProcess
void pinToCPU(unsigned int cpuid)
void CustomCleanup(void *data, void *hint)
bool initializeSim(std::string transport, std::string address, std::unique_ptr< FairRunSim > &simptr)
bool waitForControlInput(int workerID)
int initAndRunDevice(int argc, char *argv[])
int runSim(KernelSetup setup)
o2::devices::O2SimDevice * getDevice()
void addCustomOptions(bpo::options_description &options)
int gDriverProcess
void doLogInfo(int workerID, std::string const &message)
Definition O2SimDevice.h:34
uint32_t j
Definition RawData.h:0
uint16_t pid
Definition RawData.h:2
static SimConfig & Instance()
Definition SimConfig.h:111
bool Kernel(int workerID, fair::mq::Channel &requestchannel, fair::mq::Channel &dataoutchannel, fair::mq::Channel *statuschannel=nullptr)
static bool initSim(fair::mq::Channel &channel, std::unique_ptr< FairRunSim > &simptr)
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
GLboolean * data
Definition glcorearb.h:298
GLboolean r
Definition glcorearb.h:1233
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
std::string getPublishAddress(std::string const &base, int pid=getpid())
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
fair::mq::Channel createPUBChannel(std::string const &address, std::string const &type="pub")
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
fair::mq::Channel * datachannel
o2::devices::O2SimDevice * sim
fair::mq::Channel * primstatuschannel
fair::mq::Channel * primchannel
static void shutdown()
Definition SimSetup.cxx:77
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
#define main
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels
uint64_t const void const *restrict const msg
Definition x9.h:153