Project
Loading...
Searching...
No Matches
o2sim_parallel.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <fairmq/TransportFactory.h>
15#include <fairmq/Channel.h>
16#include <fairmq/Message.h>
17
18#include <cstdlib>
19#include <unistd.h>
20#include <ctime>
21#include <sstream>
22#include <iostream>
23#include <cstdio>
24#include <fcntl.h>
25#include <SimConfig/SimConfig.h>
26#include <sys/wait.h>
27#include <vector>
28#include <functional>
29#include <thread>
30#include <csignal>
31#include "TStopwatch.h"
32#include <fairlogger/Logger.h>
34#include "TFile.h"
35#include "TTree.h"
36#include <sys/types.h>
39#include "O2Version.h"
40#include <cstdio>
41#include <unordered_map>
42#include <filesystem>
43#include <atomic>
45#include "Headers/Stack.h"
46
50
51std::string getServerLogName()
52{
53 auto& conf = o2::conf::SimConfig::Instance();
54 std::stringstream str;
55 str << conf.getOutPrefix() << "_serverlog";
56 return str.str();
57}
58
59std::string getWorkerLogName()
60{
61 auto& conf = o2::conf::SimConfig::Instance();
62 std::stringstream str;
63 str << conf.getOutPrefix() << "_workerlog";
64 return str.str();
65}
66
67std::string getMergerLogName()
68{
69 auto& conf = o2::conf::SimConfig::Instance();
70 std::stringstream str;
71 str << conf.getOutPrefix() << "_mergerlog";
72 return str.str();
73}
74
76{
77 // remove all (known) socket files in /tmp
78 // using the naming convention /tmp/o2sim-.*PID
79 std::stringstream searchstr;
80 searchstr << "o2sim-.*-" << getpid() << "$";
81 auto filenames = o2::utils::listFiles("/tmp/", searchstr.str());
82 // remove those files
83 for (auto& fn : filenames) {
84 try {
85 std::filesystem::remove(std::filesystem::path(fn));
86 } catch (...) {
87 LOG(warn) << "Couldn't remove tmp file " << fn;
88 }
89 }
90}
91
92void cleanup()
93{
94 auto& conf = o2::conf::SimConfig::Instance();
95 if (conf.forwardKine()) {
96 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
97 auto forwardchannel = fair::mq::Channel{"kineforward", "pair", factory};
98 auto address = std::string{"ipc:///tmp/o2sim-hitmerger-kineforward-"} + std::to_string(getpid());
99 forwardchannel.Bind(address.c_str());
100 forwardchannel.Validate();
101 fair::mq::Parts parts;
102 fair::mq::MessagePtr payload(forwardchannel.NewMessage());
105 auto channelAlloc = o2::pmr::getTransportAllocator(forwardchannel.Transport());
106 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
107 parts.AddPart(std::move(header));
108 parts.AddPart(std::move(payload));
109 int timeoutinMS = 1000; // block for 1s max (other side might have disconnected already)
110 if (forwardchannel.Send(parts, timeoutinMS) > 0) {
111 LOGP(info, "SENDING END-OF-STREAM TO PROXY AT {}", address.c_str());
112 } else {
113 LOGP(warn, "SENDING END-OF-STREAM TIMED OUT; PEER PROBABLY NO LONGER CONNECTED");
114 }
115 }
118
119 // special mode in which we dump the output from various
120 // log files to terminal (mainly interesting for CI mode)
121 if (getenv("ALICE_O2SIM_DUMPLOG")) {
122 std::cerr << "------------- START OF EVENTSERVER LOG ----------" << std::endl;
123 std::stringstream catcommand1;
124 catcommand1 << "cat " << getServerLogName() << ";";
125 if (system(catcommand1.str().c_str()) != 0) {
126 LOG(warn) << "error executing system call";
127 }
128
129 std::cerr << "------------- START OF SIM WORKER(S) LOG --------" << std::endl;
130 std::stringstream catcommand2;
131 catcommand2 << "cat " << getWorkerLogName() << "*;";
132 if (system(catcommand2.str().c_str()) != 0) {
133 LOG(warn) << "error executing system call";
134 }
135
136 std::cerr << "------------- START OF MERGER LOG ---------------" << std::endl;
137 std::stringstream catcommand3;
138 catcommand3 << "cat " << getMergerLogName() << ";";
139 if (system(catcommand3.str().c_str()) != 0) {
140 LOG(warn) << "error executing system call";
141 }
142 }
143}
144
145// quick cross check of simulation output
147{
148 int errors = 0;
149 // We can put more or less complex things
150 // here.
151 auto& conf = o2::conf::SimConfig::Instance();
152 if (!conf.writeToDisc()) {
153 return 0;
154 }
155 // easy check: see if we have number of entries in output tree == number of events asked
156 std::string filename = o2::base::NameConf::getMCKinematicsFileName(conf.getOutPrefix().c_str());
157 TFile f(filename.c_str(), "OPEN");
158 if (f.IsZombie()) {
159 LOG(warn) << "Kinematics file corrupted or does not exist";
160 return 1;
161 }
162 auto tr = static_cast<TTree*>(f.Get("o2sim"));
163 if (!tr) {
164 errors++;
165 } else {
166 if (!conf.isFilterOutNoHitEvents()) {
167 if (tr->GetEntries() != conf.getNEvents()) {
168 LOG(warn) << "There are fewer events in the output than asked";
169 }
170 }
171 }
172 // add more simple checks
173
174 return errors;
175}
176
177// ---> THE FOLLOWING CAN BE PUT INTO A "STATE" STRUCT
178std::vector<int> gChildProcesses; // global vector of child pids
179// record distributed events in a container
180std::vector<int> gDistributedEvents;
181// record finished events in a container
182std::vector<int> gFinishedEvents;
184std::atomic<bool> gPrimServerIsInitialized = false;
185
186std::string getControlAddress()
187{
188 std::stringstream controlsocketname;
189 controlsocketname << "ipc:///tmp/o2sim-control-" << getpid();
190 return controlsocketname.str();
191}
193{
194 // creates names for an internal-only socket
195 // hashing to distinguish from more "public" sockets
196 std::hash<std::string> hasher;
197 std::stringstream str;
198 str << "o2sim-internal_" << getpid();
199 std::string tmp(std::to_string(hasher(str.str())));
200 std::stringstream controlsocketname;
201 controlsocketname << "ipc:///tmp/" << tmp.substr(0, 10) << "_" << getpid();
202 return controlsocketname.str();
203}
204
205bool isBusy()
206{
207 if (gFinishedEvents.size() != gAskedEvents) {
208 return true;
209 }
210 return false;
211}
212
213// launches a thread that listens for control command from outside
214// or that propagates control strings to all children
216{
217 static std::vector<std::thread> threads;
218 auto controladdress = getControlAddress();
219 auto internalcontroladdress = getInternalControlAddress();
220 LOG(info) << "Control address is: " << controladdress;
221 setenv("ALICE_O2SIMCONTROL", internalcontroladdress.c_str(), 1);
222
223 auto lambda = [controladdress, internalcontroladdress]() {
224 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
225
226 // used for internal distribution of control commands
227 auto internalchannel = fair::mq::Channel{"o2sim-internal", "pub", factory};
228 internalchannel.Bind(internalcontroladdress);
229 internalchannel.Validate();
230 std::unique_ptr<fair::mq::Message> message(internalchannel.NewMessage());
231
232 // the channel with which outside entities can control this simulator
233 auto outsidechannel = fair::mq::Channel{"o2sim-control", "rep", factory};
234 outsidechannel.Bind(controladdress);
235 outsidechannel.Validate();
236 std::unique_ptr<fair::mq::Message> request(outsidechannel.NewMessage());
237
238 bool keepgoing = true;
239 while (keepgoing) {
240 outsidechannel.Init();
241 outsidechannel.Bind(controladdress);
242 outsidechannel.Validate();
243 if (outsidechannel.Receive(request) > 0) {
244 std::string command(reinterpret_cast<char const*>(request->GetData()), request->GetSize());
245 LOG(info) << "Control message: " << command << " received ";
246 int code = -1;
247 if (isBusy()) {
248 code = 1; // code = 1 --> busy
249 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
250 outsidechannel.Send(reply);
251 } else {
252 code = 0; // code = 0 --> ok
253
255 auto success = o2::conf::parseSimReconfigFromString(command, reconfig);
256 if (!success) {
257 LOG(warn) << "CONTROL REQUEST COULD NOT BE PARSED";
258 code = 2; // code = 2 --> error with request data
259 }
260 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
261 outsidechannel.Send(reply);
262
263 if (code == 0) {
264 gAskedEvents = reconfig.nEvents;
265 gDistributedEvents.clear();
266 gFinishedEvents.clear();
267 // forward request from outside to all internal processes
268 internalchannel.Send(request);
269 keepgoing = !reconfig.stop;
270 }
271 }
272 }
273 }
274 };
275 threads.push_back(std::thread(lambda));
276 threads.back().detach();
277}
278
279// launches a thread that listens for control command from outside
280// or that propagates control strings to all children
282{
283 static std::vector<std::thread> threads;
284 auto lambda = []() {
285 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
286
287 auto listenchannel = fair::mq::Channel{"channel0", "sub", factory};
288 listenchannel.Init();
289 std::stringstream address;
290 address << "ipc:///tmp/o2sim-worker-notifications-" << getpid();
291 listenchannel.Connect(address.str());
292 listenchannel.Validate();
293 std::unique_ptr<fair::mq::Message> message(listenchannel.NewMessage());
294
295 while (true) {
296 if (listenchannel.Receive(message) > 0) {
297 std::string msg(reinterpret_cast<char const*>(message->GetData()), message->GetSize());
298 LOG(info) << "Worker message: " << msg;
299 }
300 }
301 };
302 threads.push_back(std::thread(lambda));
303 threads.back().detach();
304}
305
306// monitors a certain incoming event pipes and displays new information
307// gives possibility to exec a callback at these events
309 int pipefd, std::string text, std::vector<int>& eventcontainer,
310 std::function<bool(std::vector<int> const&)> callback = [](std::vector<int> const&) { return true; })
311{
312 static std::vector<std::thread> threads;
313 auto lambda = [pipefd, text, callback, &eventcontainer]() {
314 int eventcounter; // event id or some other int message
315 while (1) {
316 ssize_t count = read(pipefd, &eventcounter, sizeof(eventcounter));
317 if (count == -1) {
318 LOG(info) << "ERROR READING";
319 if (errno == EINTR) {
320 continue;
321 } else {
322 return;
323 }
324 } else if (count == 0) {
325 break;
326 } else {
327 eventcontainer.push_back(eventcounter);
328 if (callback(eventcontainer)) {
329 LOG(info) << text.c_str() << eventcounter;
330 }
331 }
332 };
333 };
334 threads.push_back(std::thread(lambda));
335 threads.back().detach();
336}
337
339{
340 static std::vector<std::thread> threads;
341 auto lambda = []() {
342 // once started ... we are waiting for some seconds
343 // then **force** shutdown all remaining children by killing them.
344 // This is to make sure that the process does not hang during a final wait
345 // and interrupted/blocked signal delivery.
346
347 struct timespec initial, remaining;
348 initial.tv_sec = 5;
349 // wait for specified time ... (and account for possible signal interruptions)
350 while (nanosleep(&initial, &remaining) == -1 && remaining.tv_sec > 0) {
351 initial = remaining;
352 }
353 LOG(info) << "Shutdown timer expired ... force killing remaining children";
354 for (auto p : gChildProcesses) {
355 killpg(p, SIGKILL);
356 }
357 };
358 threads.push_back(std::thread(lambda));
359 threads.back().detach();
360}
361
362void empty(int) {}
363
364// signal handler for graceful exit
365void sighandler(int sig)
366{
367 if (sig == SIGINT || sig == SIGTERM) {
368 signal(sig, empty); // ignore further deliveries of these signals
369 LOG(info) << "o2-sim driver: Signal caught ... clean up and exit (please be patient)";
370 // forward signal to all children
371 for (auto& pid : gChildProcesses) {
372 killpg(pid, sig);
373 }
374 cleanup();
375
376 // make sure everyone is really shutting down
377 int status, cpid;
379 while ((cpid = wait(&status))) {
380 if (cpid == -1) {
381 break;
382 }
383 }
384
385 exit(1); // exiting upon external signal is abnormal so exit code != 0
386 }
387}
388
389// We do some early checks on the arguments passed. In particular we fix
390// missing timestamps for consistent application in all sub-processes. An empty
391// vector is returned upon errors.
392std::vector<char*> checkArgs(int argc, char* argv[])
393{
394 auto conf = o2::conf::SimConfig::make();
395 std::vector<std::string> modifiedArgs;
396#ifdef SIM_RUN5
397 conf.setRun5();
398#endif
399 if (conf.resetFromArguments(argc, argv)) {
400 for (int i = 0; i < argc; ++i) {
401 modifiedArgs.push_back(argv[i]);
402 }
403
404 // Check the run and the time arguments and enforce consistency.
405 // This is important as queries to CCDB are done using the timestamp.
406 if (conf.getRunNumber() != -1) {
407 // if we have a run number we should fix or check the timestamp
408
409 // fetch the actual timestamp ranges for this run
410 auto& ccdbmgr = o2::ccdb::BasicCCDBManager::instance();
411 auto soreor = ccdbmgr.getRunDuration(conf.getRunNumber());
412 auto timestamp = conf.getTimestamp();
413 if (conf.getConfigData().mTimestampMode == o2::conf::TimeStampMode::kNow) {
414 timestamp = soreor.first;
415 LOG(info) << "Fixing timestamp to " << timestamp << " based on run number";
416 modifiedArgs.push_back("--timestamp");
417 modifiedArgs.push_back(std::to_string(timestamp));
418 } else if (conf.getConfigData().mTimestampMode == o2::conf::TimeStampMode::kManual && (timestamp < soreor.first || timestamp > soreor.second)) {
419 LOG(fatal) << "The given timestamp " << timestamp << " is incompatible with the given run number " << conf.getRunNumber() << " starting at " << soreor.first << " and ending at " << soreor.second;
420 }
421 }
422 }
423 std::vector<char*> final(modifiedArgs.size(), nullptr);
424 for (int i = 0; i < modifiedArgs.size(); ++i) {
425 final[i] = new char[modifiedArgs[i].size() + 1];
426 strcpy(final[i], modifiedArgs[i].c_str());
427 }
428 return final;
429}
430
431// helper executable to launch all the devices/processes
432// for parallel simulation
433int main(int argc, char* argv[])
434{
435 LOG(info) << "This is o2-sim version " << o2::fullVersion() << " (" << o2::gitRevision() << ")";
436 LOG(info) << o2::getBuildInfo();
437
438 signal(SIGINT, sighandler);
439 signal(SIGTERM, sighandler);
440 // we enable the forked version of the code by default
441 setenv("ALICE_SIMFORKINTERNAL", "ON", 1);
442
443 TStopwatch timer;
444 timer.Start();
445 auto o2env = getenv("O2_ROOT");
446 if (!o2env) {
447 LOG(fatal) << "O2_ROOT environment not defined";
448 }
449 std::string rootpath(o2env);
450 std::string installpath = rootpath + "/bin";
451
452 // copy topology file to working dir and update ports
453 std::stringstream configss;
454 configss << rootpath << "/share/config/o2simtopology_template.json";
455 auto localconfig = std::string("o2simtopology_") + std::to_string(getpid()) + std::string(".json");
456
457 // need to add pid to channel urls to allow simultaneous deploys!
458 // we simply insert the PID into the topology template
459 std::ifstream in(configss.str());
460 std::ofstream out(localconfig);
461 std::string wordToReplace("#PID#");
462 std::string wordToReplaceWith = std::to_string(getpid());
463 std::string line;
464 size_t len = wordToReplace.length();
465 while (std::getline(in, line)) {
466 size_t pos = line.find(wordToReplace);
467 if (pos != std::string::npos) {
468 line.replace(pos, len, wordToReplaceWith);
469 }
470 out << line << '\n';
471 }
472 in.close();
473 out.close();
474
475 // create a channel for outside event notifications --> factor out into common function
476 // auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
477 auto externalpublishchannel = o2::simpubsub::createPUBChannel(o2::simpubsub::getPublishAddress("o2sim-notifications"));
478
479 // check initial arguments and complete
480 auto finalArgs = checkArgs(argc, argv);
481 if (finalArgs.size() == 0) {
482 return 1;
483 }
484
485 auto& conf = o2::conf::SimConfig::Instance();
486#ifdef SIM_RUN5
487 conf.setRun5();
488#endif
489 if (!conf.resetFromArguments(finalArgs.size(), &finalArgs[0])) {
490 return 1;
491 }
492 // in case of zero events asked (only setup geometry etc) we just call the non-distributed version
493 // (otherwise we would need to add more synchronization between the actors)
494 if (conf.getNEvents() <= 0 && !conf.asService()) {
495 LOG(info) << "No events to be simulated; Switching to non-distributed mode";
496 const int Nargs = finalArgs.size() + 1;
497#ifdef SIM_RUN5
498 std::string name("o2-sim-serial-run5");
499#else
500 std::string name("o2-sim-serial");
501#endif
502 const char* arguments[Nargs];
503 arguments[0] = name.c_str();
504 for (int i = 1; i < finalArgs.size(); ++i) {
505 arguments[i] = finalArgs[i];
506 }
507 arguments[finalArgs.size()] = nullptr;
508 std::string path = installpath + "/" + name;
509 auto r = execv(path.c_str(), (char* const*)arguments);
510 if (r != 0) {
511 perror(nullptr);
512 }
513 return r;
514 }
515
516 gAskedEvents = conf.getNEvents();
517 if (conf.asService()) {
519 // launchWorkerListenerThread();
520 }
521
522 // we create the global shared mem pool; just enough to serve
523 // n simulation workers
524 int nworkers = conf.getNSimWorkers();
525 setenv("ALICE_NSIMWORKERS", std::to_string(nworkers).c_str(), 1);
526 LOG(info) << "Running with " << nworkers << " sim workers ";
527
529
530 // we can try to disable it here
531 if (getenv("ALICE_NOSIMSHM")) {
533 }
534
535 int pipe_serverdriver_fd[2];
536 if (pipe(pipe_serverdriver_fd) != 0) {
537 perror("problem in creating pipe");
538 }
539
540 // the server
541 int pid = fork();
542 if (pid == 0) {
543 int fd = open(getServerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
544 setenv("ALICE_O2SIMSERVERTODRIVER_PIPE", std::to_string(pipe_serverdriver_fd[1]).c_str(), 1);
545
546 dup2(fd, 1); // make stdout go to file
547 dup2(fd, 2); // make stderr go to file - you may choose to not do this
548 // or perhaps send stderr to another file
549 close(pipe_serverdriver_fd[0]);
550 close(fd); // fd no longer needed - the dup'ed handles are sufficient
551
552 const std::string name("o2-sim-primary-server-device-runner");
553 const std::string path = installpath + "/" + name;
554 const std::string config = localconfig;
555
556 // copy all arguments into a common vector
557#ifdef SIM_RUN5
558 const int addNArgs = 12;
559#else
560 const int addNArgs = 11;
561#endif
562 const int Nargs = finalArgs.size() + addNArgs;
563 const char* arguments[Nargs];
564 arguments[0] = name.c_str();
565 arguments[1] = "--control";
566 arguments[2] = "static";
567 arguments[3] = "--id";
568 arguments[4] = "primary-server";
569 arguments[5] = "--mq-config";
570 arguments[6] = config.c_str();
571 arguments[7] = "--severity";
572 arguments[8] = "debug";
573 arguments[9] = "--color";
574 arguments[10] = "false"; // switch off colored output
575#ifdef SIM_RUN5
576 arguments[11] = "--isRun5";
577#endif
578 for (int i = 1; i < finalArgs.size(); ++i) {
579 arguments[addNArgs - 1 + i] = finalArgs[i];
580 }
581 arguments[Nargs - 1] = nullptr;
582 for (int i = 0; i < Nargs; ++i) {
583 if (arguments[i]) {
584 std::cerr << arguments[i] << "\n";
585 }
586 }
587 std::cerr << "$$$$\n";
588 auto r = execv(path.c_str(), (char* const*)arguments);
589 LOG(info) << "Starting the server"
590 << "\n";
591 if (r != 0) {
592 perror(nullptr);
593 }
594 return r;
595 } else {
596 gChildProcesses.push_back(pid);
597 setpgid(pid, pid);
598 close(pipe_serverdriver_fd[1]);
599 std::cout << "Spawning particle server on PID " << pid << "; Redirect output to " << getServerLogName() << "\n";
600
601 // A simple callback for distributed primary-chunk "events"
602 auto distributionCallback = [&conf, &externalpublishchannel](std::vector<int> const& v) {
603 std::stringstream str;
604 if (v.back() == -111) {
605 // message that server is initialized
607 return false; // silent
608 } else {
609 str << "EVENT " << v.back() << " DISTRIBUTED";
610 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "INFO", str.str()));
611 return true;
612 }
613 };
614 launchThreadMonitoringEvents(pipe_serverdriver_fd[0], "DISTRIBUTING EVENT : ", gDistributedEvents, distributionCallback);
615 }
616
617 // we wait until the particle server is initialized before constructing the worker
618 // since the worker needs an operating server to initialize
619 while (!gPrimServerIsInitialized) {
620 int status;
621 auto result = waitpid(gChildProcesses.back(), &status, WNOHANG);
622 if (result != 0) {
623 break; // exit this busy loop if the server process exited for some reason
624 }
625 sleep(1); // otherwise wait until server is initialized
626 }
627
628 auto internalfork = getenv("ALICE_SIMFORKINTERNAL");
629 if (internalfork) {
630 // forking will be done internally to profit from copy-on-write
631 nworkers = 1;
632 }
633 for (int id = 0; id < nworkers; ++id) {
634 // the workers
635 std::stringstream workerlogss;
636 workerlogss << getWorkerLogName() << id;
637
638 // the workers
639 std::stringstream workerss;
640 workerss << "worker" << id;
641
642 pid = fork();
643 if (pid == 0) {
644 int fd = open(workerlogss.str().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
645 dup2(fd, 1); // make stdout go to file
646 dup2(fd, 2); // make stderr go to file - you may choose to not do this
647 // or perhaps send stderr to another file
648 close(fd); // fd no longer needed - the dup'ed handles are sufficient
649
650 const std::string name("o2-sim-device-runner");
651 const std::string path = installpath + "/" + name;
652
653 execl(path.c_str(), name.c_str(), "--control", "static", "--id", workerss.str().c_str(), "--config-key",
654 "worker", "--mq-config", localconfig.c_str(), "--severity", "info", (char*)nullptr);
655 return 0;
656 } else {
657 gChildProcesses.push_back(pid);
658 setpgid(pid, pid); // the worker processes will form their own group
659 std::cout << "Spawning sim worker " << id << " on PID " << pid
660 << "; Redirect output to " << workerlogss.str() << "\n";
661 }
662 }
663
664 // the hit merger
665 int pipe_mergerdriver_fd[2];
666 if (pipe(pipe_mergerdriver_fd) != 0) {
667 perror("problem in creating pipe");
668 }
669
670 pid = fork();
671
672 std::atomic<bool> shutdown_initiated = false;
673 if (pid == 0) {
674 int fd = open(getMergerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
675 dup2(fd, 1); // make stdout go to file
676 dup2(fd, 2); // make stderr go to file - you may choose to not do this
677 // or perhaps send stderr to another file
678 close(fd); // fd no longer needed - the dup'ed handles are sufficient
679 close(pipe_mergerdriver_fd[0]);
680 setenv("ALICE_O2SIMMERGERTODRIVER_PIPE", std::to_string(pipe_mergerdriver_fd[1]).c_str(), 1);
681 const std::string name("o2-sim-hit-merger-runner");
682 const std::string path = installpath + "/" + name;
683 execl(path.c_str(), name.c_str(), "--control", "static", "--catch-signals", "0", "--id", "hitmerger", "--mq-config", localconfig.c_str(), "--color", "false",
684 (char*)nullptr);
685 return 0;
686 } else {
687 std::cout << "Spawning hit merger on PID " << pid << "; Redirect output to " << getMergerLogName() << "\n";
688 setpgid(pid, pid);
689 gChildProcesses.push_back(pid);
690 close(pipe_mergerdriver_fd[1]);
691
692 // A simple callback that determines if the simulation is complete and triggers
693 // a shutdown of all child processes. This appears to be more robust than leaving
694 // that decision upon the children (sometimes there are problems with that).
695 auto finishCallback = [&shutdown_initiated, &conf, &externalpublishchannel](std::vector<int> const& v) {
696 std::stringstream str;
697 str << "EVENT " << v.back() << " FINISHED " << gAskedEvents << " " << v.size();
698 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "INFO", str.str()));
699 if (gAskedEvents == v.size()) {
700 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "STATE", "DONE"));
701 if (!conf.asService()) {
702 LOG(info) << "SIMULATION IS DONE. INITIATING SHUTDOWN.";
703 if (!shutdown_initiated) {
704 shutdown_initiated = true;
705 for (auto p : gChildProcesses) {
706 killpg(p, SIGTERM);
707 }
708 }
709 } else {
710 LOG(info) << "SIMULATION DONE. STAYING AS DAEMON.";
711 }
712 }
713 return true;
714 };
715
716 launchThreadMonitoringEvents(pipe_mergerdriver_fd[0], "EVENT FINISHED : ", gFinishedEvents, finishCallback);
717 }
718
719 // wait on merger (which when exiting completes the workflow)
720 auto mergerpid = gChildProcesses.back();
721
722 int status, cpid;
723 // wait just blocks and waits until any child returns; but we make sure to wait until merger is here
724 bool errored = false;
725 while ((cpid = wait(&status)) != mergerpid) {
726 if (WEXITSTATUS(status) || WIFSIGNALED(status)) {
727 if (!shutdown_initiated) {
728 LOG(info) << "Process " << cpid << " EXITED WITH CODE " << WEXITSTATUS(status) << " SIGNALED "
729 << WIFSIGNALED(status) << " SIGNAL " << WTERMSIG(status);
730
731 // we bring down all processes if one of them had problems or got a termination signal
732 // if (WTERMSIG(status) == SIGABRT || WTERMSIG(status) == SIGSEGV || WTERMSIG(status) == SIGBUS || WTERMSIG(status) == SIGTERM) {
733 LOG(info) << "Problem detected (or child received termination signal) ... shutting down whole system ";
734 for (auto p : gChildProcesses) {
735 LOG(info) << "TERMINATING " << p;
736 killpg(p, SIGTERM); // <--- makes sure to shutdown "unknown" child pids via the group property
737 }
738 LOG(error) << "SHUTTING DOWN DUE TO SIGNALED EXIT IN COMPONENT " << cpid;
739 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "STATE", "FAILURE"));
740 errored = true;
741 }
742 }
743 }
744 // This marks the actual end of the computation (since results are available)
745 LOG(info) << "Merger process " << mergerpid << " returned";
746 LOG(info) << "Simulation process took " << timer.RealTime() << " s";
747
748 if (!errored && !shutdown_initiated) {
749 shutdown_initiated = true;
750 // ordinary shutdown of the rest
751 for (auto p : gChildProcesses) {
752 if (p != mergerpid) {
753 LOG(info) << "SHUTTING DOWN CHILD PROCESS (normal thread)" << p;
754 killpg(p, SIGTERM);
755 }
756 }
757 }
758
759 // Final shutdown section. Here we definitely wait on all children
760 // otherwise this breaks accounting in the /usr/bin/time command. But we install
761 // an asynchronous timeout thread which triggers an emergency kill after some seconds in order to not block.
763 while ((cpid = wait(&status))) {
764 if (cpid == -1) {
765 break;
766 }
767 }
768
769 LOG(debug) << "ShmManager operation " << o2::utils::ShmManager::Instance().isOperational() << "\n";
770
771 // do a quick check to see if simulation produced something reasonable
772 // (mainly useful for continuous integration / automated testing suite)
773 auto returncode = errored ? 1 : checkresult();
774 if (returncode == 0) {
775 LOG(info) << "SIMULATION RETURNED SUCCESFULLY";
776 }
777 cleanup();
778 return returncode;
779}
int32_t i
Definition of the Names Generator class.
uint16_t pos
Definition RawData.h:3
uint16_t pid
Definition RawData.h:2
std::ostringstream debug
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static BasicCCDBManager & instance()
static SimConfig make()
Definition SimConfig.h:118
static SimConfig & Instance()
Definition SimConfig.h:111
static ShmManager & Instance()
Definition ShmManager.h:61
bool isOperational() const
Definition ShmManager.h:97
bool createGlobalSegment(int nsubsegments=1)
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLuint id
Definition glcorearb.h:650
bpo::variables_map arguments
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
DeliveryType read(const std::string &str)
@ Completed
The channel was signaled it will not receive any data.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
std::string getPublishAddress(std::string const &base, int pid=getpid())
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
fair::mq::Channel createPUBChannel(std::string const &address, std::string const &type="pub")
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
std::string getBuildInfo()
get information about build platform (for example OS and alidist release when used)
std::string gitRevision()
get O2 git commit used to build this
std::string fullVersion()
get full version information (official O2 release and git commit)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
bool isBusy()
void empty(int)
std::string getServerLogName()
std::vector< int > gFinishedEvents
std::vector< int > gChildProcesses
void cleanup()
int gAskedEvents
int checkresult()
void remove_tmp_files()
void launchWorkerListenerThread()
std::atomic< bool > gPrimServerIsInitialized
std::string getInternalControlAddress()
std::vector< int > gDistributedEvents
void launchShutdownThread()
void launchThreadMonitoringEvents(int pipefd, std::string text, std::vector< int > &eventcontainer, std::function< bool(std::vector< int > const &)> callback=[](std::vector< int > const &) { return true;})
std::string getMergerLogName()
void sighandler(int sig)
std::string getWorkerLogName()
std::vector< char * > checkArgs(int argc, char *argv[])
void launchControlThread()
std::string getControlAddress()
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
#define main
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153