Project
Loading...
Searching...
No Matches
o2sim_parallel.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <fairmq/TransportFactory.h>
15#include <fairmq/Channel.h>
16#include <fairmq/Message.h>
17
18#include <cstdlib>
19#include <unistd.h>
20#include <ctime>
21#include <sstream>
22#include <iostream>
23#include <cstdio>
24#include <fcntl.h>
25#include <SimConfig/SimConfig.h>
26#include <sys/wait.h>
27#include <vector>
28#include <functional>
29#include <thread>
30#include <csignal>
31#include "TStopwatch.h"
32#include <fairlogger/Logger.h>
34#include "TFile.h"
35#include "TTree.h"
36#include <sys/types.h>
39#include "O2Version.h"
40#include <cstdio>
41#include <unordered_map>
42#include <filesystem>
43#include <atomic>
45#include "Headers/Stack.h"
46
50
51std::string getServerLogName()
52{
53 auto& conf = o2::conf::SimConfig::Instance();
54 std::stringstream str;
55 str << conf.getOutPrefix() << "_serverlog";
56 return str.str();
57}
58
59std::string getWorkerLogName()
60{
61 auto& conf = o2::conf::SimConfig::Instance();
62 std::stringstream str;
63 str << conf.getOutPrefix() << "_workerlog";
64 return str.str();
65}
66
67std::string getMergerLogName()
68{
69 auto& conf = o2::conf::SimConfig::Instance();
70 std::stringstream str;
71 str << conf.getOutPrefix() << "_mergerlog";
72 return str.str();
73}
74
76{
77 // remove all (known) socket files in /tmp
78 // using the naming convention /tmp/o2sim-.*PID
79 std::stringstream searchstr;
80 searchstr << "o2sim-.*-" << getpid() << "$";
81 auto filenames = o2::utils::listFiles("/tmp/", searchstr.str());
82 // remove those files
83 for (auto& fn : filenames) {
84 try {
85 std::filesystem::remove(std::filesystem::path(fn));
86 } catch (...) {
87 LOG(warn) << "Couldn't remove tmp file " << fn;
88 }
89 }
90}
91
92void cleanup()
93{
94 auto& conf = o2::conf::SimConfig::Instance();
95 if (conf.forwardKine()) {
96 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
97 auto forwardchannel = fair::mq::Channel{"kineforward", "pair", factory};
98 auto address = std::string{"ipc:///tmp/o2sim-hitmerger-kineforward-"} + std::to_string(getpid());
99 forwardchannel.Bind(address.c_str());
100 forwardchannel.Validate();
101 fair::mq::Parts parts;
102 fair::mq::MessagePtr payload(forwardchannel.NewMessage());
105 auto channelAlloc = o2::pmr::getTransportAllocator(forwardchannel.Transport());
106 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
107 parts.AddPart(std::move(header));
108 parts.AddPart(std::move(payload));
109 int timeoutinMS = 1000; // block for 1s max (other side might have disconnected already)
110 if (forwardchannel.Send(parts, timeoutinMS) > 0) {
111 LOGP(info, "SENDING END-OF-STREAM TO PROXY AT {}", address.c_str());
112 } else {
113 LOGP(warn, "SENDING END-OF-STREAM TIMED OUT; PEER PROBABLY NO LONGER CONNECTED");
114 }
115 }
118
119 // special mode in which we dump the output from various
120 // log files to terminal (mainly interesting for CI mode)
121 if (getenv("ALICE_O2SIM_DUMPLOG")) {
122 std::cerr << "------------- START OF EVENTSERVER LOG ----------" << std::endl;
123 std::stringstream catcommand1;
124 catcommand1 << "cat " << getServerLogName() << ";";
125 if (system(catcommand1.str().c_str()) != 0) {
126 LOG(warn) << "error executing system call";
127 }
128
129 std::cerr << "------------- START OF SIM WORKER(S) LOG --------" << std::endl;
130 std::stringstream catcommand2;
131 catcommand2 << "cat " << getWorkerLogName() << "*;";
132 if (system(catcommand2.str().c_str()) != 0) {
133 LOG(warn) << "error executing system call";
134 }
135
136 std::cerr << "------------- START OF MERGER LOG ---------------" << std::endl;
137 std::stringstream catcommand3;
138 catcommand3 << "cat " << getMergerLogName() << ";";
139 if (system(catcommand3.str().c_str()) != 0) {
140 LOG(warn) << "error executing system call";
141 }
142 }
143}
144
145// quick cross check of simulation output
147{
148 int errors = 0;
149 // We can put more or less complex things
150 // here.
151 auto& conf = o2::conf::SimConfig::Instance();
152 if (!conf.writeToDisc()) {
153 return 0;
154 }
155 // easy check: see if we have number of entries in output tree == number of events asked
156 std::string filename = o2::base::NameConf::getMCKinematicsFileName(conf.getOutPrefix().c_str());
157 TFile f(filename.c_str(), "OPEN");
158 if (f.IsZombie()) {
159 LOG(warn) << "Kinematics file corrupted or does not exist";
160 return 1;
161 }
162 auto tr = static_cast<TTree*>(f.Get("o2sim"));
163 if (!tr) {
164 errors++;
165 } else {
166 if (!conf.isFilterOutNoHitEvents()) {
167 if (tr->GetEntries() != conf.getNEvents()) {
168 LOG(warn) << "There are fewer events in the output than asked";
169 }
170 }
171 }
172 // add more simple checks
173
174 return errors;
175}
176
177// ---> THE FOLLOWING CAN BE PUT INTO A "STATE" STRUCT
178std::vector<int> gChildProcesses; // global vector of child pids
179// record distributed events in a container
180std::vector<int> gDistributedEvents;
181// record finished events in a container
182std::vector<int> gFinishedEvents;
184std::atomic<bool> gPrimServerIsInitialized = false;
185
186std::string getControlAddress()
187{
188 std::stringstream controlsocketname;
189 controlsocketname << "ipc:///tmp/o2sim-control-" << getpid();
190 return controlsocketname.str();
191}
193{
194 // creates names for an internal-only socket
195 // hashing to distinguish from more "public" sockets
196 std::hash<std::string> hasher;
197 std::stringstream str;
198 str << "o2sim-internal_" << getpid();
199 std::string tmp(std::to_string(hasher(str.str())));
200 std::stringstream controlsocketname;
201 controlsocketname << "ipc:///tmp/" << tmp.substr(0, 10) << "_" << getpid();
202 return controlsocketname.str();
203}
204
205bool isBusy()
206{
207 if (gFinishedEvents.size() != gAskedEvents) {
208 return true;
209 }
210 return false;
211}
212
213// launches a thread that listens for control command from outside
214// or that propagates control strings to all children
216{
217 static std::vector<std::thread> threads;
218 auto controladdress = getControlAddress();
219 auto internalcontroladdress = getInternalControlAddress();
220 LOG(info) << "Control address is: " << controladdress;
221 setenv("ALICE_O2SIMCONTROL", internalcontroladdress.c_str(), 1);
222
223 auto lambda = [controladdress, internalcontroladdress]() {
224 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
225
226 // used for internal distribution of control commands
227 auto internalchannel = fair::mq::Channel{"o2sim-internal", "pub", factory};
228 internalchannel.Bind(internalcontroladdress);
229 internalchannel.Validate();
230 std::unique_ptr<fair::mq::Message> message(internalchannel.NewMessage());
231
232 // the channel with which outside entities can control this simulator
233 auto outsidechannel = fair::mq::Channel{"o2sim-control", "rep", factory};
234 outsidechannel.Bind(controladdress);
235 outsidechannel.Validate();
236 std::unique_ptr<fair::mq::Message> request(outsidechannel.NewMessage());
237
238 bool keepgoing = true;
239 while (keepgoing) {
240 outsidechannel.Init();
241 outsidechannel.Bind(controladdress);
242 outsidechannel.Validate();
243 if (outsidechannel.Receive(request) > 0) {
244 std::string command(reinterpret_cast<char const*>(request->GetData()), request->GetSize());
245 LOG(info) << "Control message: " << command << " received ";
246 int code = -1;
247 if (isBusy()) {
248 code = 1; // code = 1 --> busy
249 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
250 outsidechannel.Send(reply);
251 } else {
252 code = 0; // code = 0 --> ok
253
255 auto success = o2::conf::parseSimReconfigFromString(command, reconfig);
256 if (!success) {
257 LOG(warn) << "CONTROL REQUEST COULD NOT BE PARSED";
258 code = 2; // code = 2 --> error with request data
259 }
260 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
261 outsidechannel.Send(reply);
262
263 if (code == 0) {
264 gAskedEvents = reconfig.nEvents;
265 gDistributedEvents.clear();
266 gFinishedEvents.clear();
267 // forward request from outside to all internal processes
268 internalchannel.Send(request);
269 keepgoing = !reconfig.stop;
270 }
271 }
272 }
273 }
274 };
275 threads.push_back(std::thread(lambda));
276 threads.back().detach();
277}
278
279// launches a thread that listens for control command from outside
280// or that propagates control strings to all children
282{
283 static std::vector<std::thread> threads;
284 auto lambda = []() {
285 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
286
287 auto listenchannel = fair::mq::Channel{"channel0", "sub", factory};
288 listenchannel.Init();
289 std::stringstream address;
290 address << "ipc:///tmp/o2sim-worker-notifications-" << getpid();
291 listenchannel.Connect(address.str());
292 listenchannel.Validate();
293 std::unique_ptr<fair::mq::Message> message(listenchannel.NewMessage());
294
295 while (true) {
296 if (listenchannel.Receive(message) > 0) {
297 std::string msg(reinterpret_cast<char const*>(message->GetData()), message->GetSize());
298 LOG(info) << "Worker message: " << msg;
299 }
300 }
301 };
302 threads.push_back(std::thread(lambda));
303 threads.back().detach();
304}
305
306// monitors a certain incoming event pipes and displays new information
307// gives possibility to exec a callback at these events
309 int pipefd, std::string text, std::vector<int>& eventcontainer,
310 std::function<bool(std::vector<int> const&)> callback = [](std::vector<int> const&) { return true; })
311{
312 static std::vector<std::thread> threads;
313 auto lambda = [pipefd, text, callback, &eventcontainer]() {
314 int eventcounter; // event id or some other int message
315 while (1) {
316 ssize_t count = read(pipefd, &eventcounter, sizeof(eventcounter));
317 if (count == -1) {
318 LOG(info) << "ERROR READING";
319 if (errno == EINTR) {
320 continue;
321 } else {
322 return;
323 }
324 } else if (count == 0) {
325 break;
326 } else {
327 eventcontainer.push_back(eventcounter);
328 if (callback(eventcontainer)) {
329 LOG(info) << text.c_str() << eventcounter;
330 }
331 }
332 };
333 };
334 threads.push_back(std::thread(lambda));
335 threads.back().detach();
336}
337
339{
340 static std::vector<std::thread> threads;
341 auto lambda = []() {
342 // once started ... we are waiting for some seconds
343 // then **force** shutdown all remaining children by killing them.
344 // This is to make sure that the process does not hang during a final wait
345 // and interrupted/blocked signal delivery.
346
347 struct timespec initial, remaining;
348 initial.tv_sec = 5;
349 // wait for specified time ... (and account for possible signal interruptions)
350 while (nanosleep(&initial, &remaining) == -1 && remaining.tv_sec > 0) {
351 initial = remaining;
352 }
353 LOG(info) << "Shutdown timer expired ... force killing remaining children";
354 for (auto p : gChildProcesses) {
355 if (p != 0 && killpg(p, 0) == 0) { // see if process still exists
356 killpg(p, SIGKILL);
357 }
358 }
359 };
360 threads.push_back(std::thread(lambda));
361 threads.back().detach();
362}
363
364void empty(int) {}
365
366// signal handler for graceful exit
367void sighandler(int sig)
368{
369 if (sig == SIGINT || sig == SIGTERM) {
370 signal(sig, empty); // ignore further deliveries of these signals
371 LOG(info) << "o2-sim driver: Signal caught ... clean up and exit (please be patient)";
372 // forward signal to all children
373 for (auto& pid : gChildProcesses) {
374 killpg(pid, sig);
375 }
376 cleanup();
377
378 // make sure everyone is really shutting down
379 int status, cpid;
381 while ((cpid = wait(&status))) {
382 if (cpid == -1) {
383 break;
384 }
385 }
386
387 exit(1); // exiting upon external signal is abnormal so exit code != 0
388 }
389}
390
391// We do some early checks on the arguments passed. In particular we fix
392// missing timestamps for consistent application in all sub-processes. An empty
393// vector is returned upon errors.
394std::vector<char*> checkArgs(int argc, char* argv[])
395{
396 auto conf = o2::conf::SimConfig::make();
397 std::vector<std::string> modifiedArgs;
398#ifdef SIM_RUN5
399 conf.setRun5();
400#endif
401 if (conf.resetFromArguments(argc, argv)) {
402 for (int i = 0; i < argc; ++i) {
403 modifiedArgs.push_back(argv[i]);
404 }
405
406 // Check the run and the time arguments and enforce consistency.
407 // This is important as queries to CCDB are done using the timestamp.
408 if (conf.getRunNumber() != -1) {
409 // if we have a run number we should fix or check the timestamp
410
411 // fetch the actual timestamp ranges for this run
412 auto& ccdbmgr = o2::ccdb::BasicCCDBManager::instance();
413 auto soreor = ccdbmgr.getRunDuration(conf.getRunNumber());
414 auto timestamp = conf.getTimestamp();
415 if (conf.getConfigData().mTimestampMode == o2::conf::TimeStampMode::kNow) {
416 timestamp = soreor.first;
417 LOG(info) << "Fixing timestamp to " << timestamp << " based on run number";
418 modifiedArgs.push_back("--timestamp");
419 modifiedArgs.push_back(std::to_string(timestamp));
420 } else if (conf.getConfigData().mTimestampMode == o2::conf::TimeStampMode::kManual && (timestamp < soreor.first || timestamp > soreor.second)) {
421 LOG(fatal) << "The given timestamp " << timestamp << " is incompatible with the given run number " << conf.getRunNumber() << " starting at " << soreor.first << " and ending at " << soreor.second;
422 }
423 }
424 }
425 std::vector<char*> final(modifiedArgs.size(), nullptr);
426 for (int i = 0; i < modifiedArgs.size(); ++i) {
427 final[i] = new char[modifiedArgs[i].size() + 1];
428 strcpy(final[i], modifiedArgs[i].c_str());
429 }
430 return final;
431}
432
433// helper executable to launch all the devices/processes
434// for parallel simulation
435int main(int argc, char* argv[])
436{
437 LOG(info) << "This is o2-sim version " << o2::fullVersion() << " (" << o2::gitRevision() << ")";
438 LOG(info) << o2::getBuildInfo();
439
440 signal(SIGINT, sighandler);
441 signal(SIGTERM, sighandler);
442 // we enable the forked version of the code by default
443 setenv("ALICE_SIMFORKINTERNAL", "ON", 1);
444
445 // force execution as own process group
446 if (setpgid(0, 0) == -1) {
447 perror("setpgid");
448 exit(1);
449 }
450
451 TStopwatch timer;
452 timer.Start();
453 auto o2env = getenv("O2_ROOT");
454 if (!o2env) {
455 LOG(fatal) << "O2_ROOT environment not defined";
456 }
457 std::string rootpath(o2env);
458 std::string installpath = rootpath + "/bin";
459
460 // copy topology file to working dir and update ports
461 std::stringstream configss;
462 configss << rootpath << "/share/config/o2simtopology_template.json";
463 auto localconfig = std::string("o2simtopology_") + std::to_string(getpid()) + std::string(".json");
464
465 // need to add pid to channel urls to allow simultaneous deploys!
466 // we simply insert the PID into the topology template
467 std::ifstream in(configss.str());
468 std::ofstream out(localconfig);
469 std::string wordToReplace("#PID#");
470 std::string wordToReplaceWith = std::to_string(getpid());
471 std::string line;
472 size_t len = wordToReplace.length();
473 while (std::getline(in, line)) {
474 size_t pos = line.find(wordToReplace);
475 if (pos != std::string::npos) {
476 line.replace(pos, len, wordToReplaceWith);
477 }
478 out << line << '\n';
479 }
480 in.close();
481 out.close();
482
483 // create a channel for outside event notifications --> factor out into common function
484 // auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
485 auto externalpublishchannel = o2::simpubsub::createPUBChannel(o2::simpubsub::getPublishAddress("o2sim-notifications"));
486
487 // check initial arguments and complete
488 auto finalArgs = checkArgs(argc, argv);
489 if (finalArgs.size() == 0) {
490 return 1;
491 }
492
493 auto& conf = o2::conf::SimConfig::Instance();
494#ifdef SIM_RUN5
495 conf.setRun5();
496#endif
497 if (!conf.resetFromArguments(finalArgs.size(), &finalArgs[0])) {
498 return 1;
499 }
500 // in case of zero events asked (only setup geometry etc) we just call the non-distributed version
501 // (otherwise we would need to add more synchronization between the actors)
502 if (conf.getNEvents() <= 0 && !conf.asService()) {
503 LOG(info) << "No events to be simulated; Switching to non-distributed mode";
504 const int Nargs = finalArgs.size() + 1;
505#ifdef SIM_RUN5
506 std::string name("o2-sim-serial-run5");
507#else
508 std::string name("o2-sim-serial");
509#endif
510 const char* arguments[Nargs];
511 arguments[0] = name.c_str();
512 for (int i = 1; i < finalArgs.size(); ++i) {
513 arguments[i] = finalArgs[i];
514 }
515 arguments[finalArgs.size()] = nullptr;
516 std::string path = installpath + "/" + name;
517 auto r = execv(path.c_str(), (char* const*)arguments);
518 if (r != 0) {
519 perror(nullptr);
520 }
521 return r;
522 }
523
524 gAskedEvents = conf.getNEvents();
525 if (conf.asService()) {
527 // launchWorkerListenerThread();
528 }
529
530 // we create the global shared mem pool; just enough to serve
531 // n simulation workers
532 int nworkers = conf.getNSimWorkers();
533 setenv("ALICE_NSIMWORKERS", std::to_string(nworkers).c_str(), 1);
534 LOG(info) << "Running with " << nworkers << " sim workers ";
535
537
538 // we can try to disable it here
539 if (getenv("ALICE_NOSIMSHM")) {
541 }
542
543 int pipe_serverdriver_fd[2];
544 if (pipe(pipe_serverdriver_fd) != 0) {
545 perror("problem in creating pipe");
546 }
547
548 // the server
549 int pid = fork();
550 if (pid == 0) {
551 int fd = open(getServerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
552 setenv("ALICE_O2SIMSERVERTODRIVER_PIPE", std::to_string(pipe_serverdriver_fd[1]).c_str(), 1);
553
554 dup2(fd, 1); // make stdout go to file
555 dup2(fd, 2); // make stderr go to file - you may choose to not do this
556 // or perhaps send stderr to another file
557 close(pipe_serverdriver_fd[0]);
558 close(fd); // fd no longer needed - the dup'ed handles are sufficient
559
560 const std::string name("o2-sim-primary-server-device-runner");
561 const std::string path = installpath + "/" + name;
562 const std::string config = localconfig;
563
564 // copy all arguments into a common vector
565#ifdef SIM_RUN5
566 const int addNArgs = 12;
567#else
568 const int addNArgs = 11;
569#endif
570 const int Nargs = finalArgs.size() + addNArgs;
571 const char* arguments[Nargs];
572 arguments[0] = name.c_str();
573 arguments[1] = "--control";
574 arguments[2] = "static";
575 arguments[3] = "--id";
576 arguments[4] = "primary-server";
577 arguments[5] = "--mq-config";
578 arguments[6] = config.c_str();
579 arguments[7] = "--severity";
580 arguments[8] = "debug";
581 arguments[9] = "--color";
582 arguments[10] = "false"; // switch off colored output
583#ifdef SIM_RUN5
584 arguments[11] = "--isRun5";
585#endif
586 for (int i = 1; i < finalArgs.size(); ++i) {
587 arguments[addNArgs - 1 + i] = finalArgs[i];
588 }
589 arguments[Nargs - 1] = nullptr;
590 for (int i = 0; i < Nargs; ++i) {
591 if (arguments[i]) {
592 std::cerr << arguments[i] << "\n";
593 }
594 }
595 std::cerr << "$$$$\n";
596 auto r = execv(path.c_str(), (char* const*)arguments);
597 LOG(info) << "Starting the server"
598 << "\n";
599 if (r != 0) {
600 perror(nullptr);
601 }
602 return r;
603 } else {
604 gChildProcesses.push_back(pid);
605 setpgid(pid, pid);
606 close(pipe_serverdriver_fd[1]);
607 std::cout << "Spawning particle server on PID " << pid << "; Redirect output to " << getServerLogName() << "\n";
608
609 // A simple callback for distributed primary-chunk "events"
610 auto distributionCallback = [&conf, &externalpublishchannel](std::vector<int> const& v) {
611 std::stringstream str;
612 if (v.back() == -111) {
613 // message that server is initialized
615 return false; // silent
616 } else {
617 str << "EVENT " << v.back() << " DISTRIBUTED";
618 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "INFO", str.str()));
619 return true;
620 }
621 };
622 launchThreadMonitoringEvents(pipe_serverdriver_fd[0], "DISTRIBUTING EVENT : ", gDistributedEvents, distributionCallback);
623 }
624
625 // we wait until the particle server is initialized before constructing the worker
626 // since the worker needs an operating server to initialize
627 while (!gPrimServerIsInitialized) {
628 int status;
629 auto result = waitpid(gChildProcesses.back(), &status, WNOHANG);
630 if (result != 0) {
631 break; // exit this busy loop if the server process exited for some reason
632 }
633 sleep(1); // otherwise wait until server is initialized
634 }
635
636 auto internalfork = getenv("ALICE_SIMFORKINTERNAL");
637 if (internalfork) {
638 // forking will be done internally to profit from copy-on-write
639 nworkers = 1;
640 }
641 for (int id = 0; id < nworkers; ++id) {
642 // the workers
643 std::stringstream workerlogss;
644 workerlogss << getWorkerLogName() << id;
645
646 // the workers
647 std::stringstream workerss;
648 workerss << "worker" << id;
649
650 pid = fork();
651 if (pid == 0) {
652 int fd = open(workerlogss.str().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
653 dup2(fd, 1); // make stdout go to file
654 dup2(fd, 2); // make stderr go to file - you may choose to not do this
655 // or perhaps send stderr to another file
656 close(fd); // fd no longer needed - the dup'ed handles are sufficient
657
658 const std::string name("o2-sim-device-runner");
659 const std::string path = installpath + "/" + name;
660
661 execl(path.c_str(), name.c_str(), "--control", "static", "--id", workerss.str().c_str(), "--config-key",
662 "worker", "--mq-config", localconfig.c_str(), "--severity", "info", (char*)nullptr);
663 return 0;
664 } else {
665 gChildProcesses.push_back(pid);
666 setpgid(pid, pid); // the worker processes will form their own group
667 std::cout << "Spawning sim worker " << id << " on PID " << pid
668 << "; Redirect output to " << workerlogss.str() << "\n";
669 }
670 }
671
672 // the hit merger
673 int pipe_mergerdriver_fd[2];
674 if (pipe(pipe_mergerdriver_fd) != 0) {
675 perror("problem in creating pipe");
676 }
677
678 pid = fork();
679
680 std::atomic<bool> shutdown_initiated = false;
681 if (pid == 0) {
682 int fd = open(getMergerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
683 dup2(fd, 1); // make stdout go to file
684 dup2(fd, 2); // make stderr go to file - you may choose to not do this
685 // or perhaps send stderr to another file
686 close(fd); // fd no longer needed - the dup'ed handles are sufficient
687 close(pipe_mergerdriver_fd[0]);
688 setenv("ALICE_O2SIMMERGERTODRIVER_PIPE", std::to_string(pipe_mergerdriver_fd[1]).c_str(), 1);
689 const std::string name("o2-sim-hit-merger-runner");
690 const std::string path = installpath + "/" + name;
691 execl(path.c_str(), name.c_str(), "--control", "static", "--catch-signals", "0", "--id", "hitmerger", "--mq-config", localconfig.c_str(), "--color", "false",
692 (char*)nullptr);
693 return 0;
694 } else {
695 std::cout << "Spawning hit merger on PID " << pid << "; Redirect output to " << getMergerLogName() << "\n";
696 setpgid(pid, pid);
697 gChildProcesses.push_back(pid);
698 close(pipe_mergerdriver_fd[1]);
699
700 // A simple callback that determines if the simulation is complete and triggers
701 // a shutdown of all child processes. This appears to be more robust than leaving
702 // that decision upon the children (sometimes there are problems with that).
703 auto finishCallback = [&shutdown_initiated, &conf, &externalpublishchannel](std::vector<int> const& v) {
704 std::stringstream str;
705 str << "EVENT " << v.back() << " FINISHED " << gAskedEvents << " " << v.size();
706 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "INFO", str.str()));
707 if (gAskedEvents == v.size()) {
708 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "STATE", "DONE"));
709 if (!conf.asService()) {
710 LOG(info) << "SIMULATION IS DONE. INITIATING SHUTDOWN.";
711 if (!shutdown_initiated) {
712 shutdown_initiated = true;
713 for (auto p : gChildProcesses) {
714 if (killpg(p, 0) == 0) {
715 killpg(p, SIGTERM);
716 }
717 }
718 }
719 } else {
720 LOG(info) << "SIMULATION DONE. STAYING AS DAEMON.";
721 }
722 }
723 return true;
724 };
725
726 launchThreadMonitoringEvents(pipe_mergerdriver_fd[0], "EVENT FINISHED : ", gFinishedEvents, finishCallback);
727 }
728
729 // wait on merger (which when exiting completes the workflow)
730 auto mergerpid = gChildProcesses.back();
731
732 int status, cpid;
733 // wait just blocks and waits until any child returns; but we make sure to wait until merger is here
734 bool errored = false;
735 while ((cpid = wait(&status)) != mergerpid) {
736 if (WEXITSTATUS(status) || WIFSIGNALED(status)) {
737 if (!shutdown_initiated) {
738 LOG(info) << "Process " << cpid << " EXITED WITH CODE " << WEXITSTATUS(status) << " SIGNALED "
739 << WIFSIGNALED(status) << " SIGNAL " << WTERMSIG(status);
740
741 // we bring down all processes if one of them had problems or got a termination signal
742 // if (WTERMSIG(status) == SIGABRT || WTERMSIG(status) == SIGSEGV || WTERMSIG(status) == SIGBUS || WTERMSIG(status) == SIGTERM) {
743 LOG(info) << "Problem detected (or child received termination signal) ... shutting down whole system ";
744 for (auto p : gChildProcesses) {
745 LOG(info) << "TERMINATING " << p;
746 if (killpg(p, 0) == 0) {
747 killpg(p, SIGTERM); // <--- makes sure to shutdown "unknown" child pids via the group property
748 }
749 }
750 LOG(error) << "SHUTTING DOWN DUE TO SIGNALED EXIT IN COMPONENT " << cpid;
751 o2::simpubsub::publishMessage(externalpublishchannel, o2::simpubsub::simStatusString("O2SIM", "STATE", "FAILURE"));
752 errored = true;
753 }
754 }
755 }
756 // This marks the actual end of the computation (since results are available)
757 LOG(info) << "Merger process " << mergerpid << " returned";
758 LOG(info) << "Simulation process took " << timer.RealTime() << " s";
759
760 if (!errored && !shutdown_initiated) {
761 shutdown_initiated = true;
762 // ordinary shutdown of the rest
763 for (auto p : gChildProcesses) {
764 if (p != mergerpid) {
765 LOG(info) << "SHUTTING DOWN CHILD PROCESS (normal thread)" << p;
766 if (killpg(p, 0) == 0) {
767 killpg(p, SIGTERM);
768 }
769 }
770 }
771 }
772
773 // Final shutdown section. Here we definitely wait on all children
774 // otherwise this breaks accounting in the /usr/bin/time command. But we install
775 // an asynchronous timeout thread which triggers an emergency kill after some seconds in order to not block.
777 while ((cpid = wait(&status))) {
778 if (cpid == -1) {
779 break;
780 }
781 }
782
783 LOG(debug) << "ShmManager operation " << o2::utils::ShmManager::Instance().isOperational() << "\n";
784
785 // do a quick check to see if simulation produced something reasonable
786 // (mainly useful for continuous integration / automated testing suite)
787 auto returncode = errored ? 1 : checkresult();
788 if (returncode == 0) {
789 LOG(info) << "SIMULATION RETURNED SUCCESFULLY";
790 }
791 cleanup();
792 return returncode;
793}
int32_t i
Definition of the Names Generator class.
uint16_t pos
Definition RawData.h:3
uint16_t pid
Definition RawData.h:2
std::ostringstream debug
static std::string getMCKinematicsFileName(const std::string_view prefix=STANDARDSIMPREFIX)
Definition NameConf.h:46
static BasicCCDBManager & instance()
static SimConfig make()
Definition SimConfig.h:118
static SimConfig & Instance()
Definition SimConfig.h:111
static ShmManager & Instance()
Definition ShmManager.h:61
bool isOperational() const
Definition ShmManager.h:97
bool createGlobalSegment(int nsubsegments=1)
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLuint id
Definition glcorearb.h:650
bpo::variables_map arguments
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
DeliveryType read(const std::string &str)
@ Completed
The channel was signaled it will not receive any data.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
std::string getPublishAddress(std::string const &base, int pid=getpid())
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
fair::mq::Channel createPUBChannel(std::string const &address, std::string const &type="pub")
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
std::string getBuildInfo()
get information about build platform (for example OS and alidist release when used)
std::string gitRevision()
get O2 git commit used to build this
std::string fullVersion()
get full version information (official O2 release and git commit)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
bool isBusy()
void empty(int)
std::string getServerLogName()
std::vector< int > gFinishedEvents
std::vector< int > gChildProcesses
void cleanup()
int gAskedEvents
int checkresult()
void remove_tmp_files()
void launchWorkerListenerThread()
std::atomic< bool > gPrimServerIsInitialized
std::string getInternalControlAddress()
std::vector< int > gDistributedEvents
void launchShutdownThread()
void launchThreadMonitoringEvents(int pipefd, std::string text, std::vector< int > &eventcontainer, std::function< bool(std::vector< int > const &)> callback=[](std::vector< int > const &) { return true;})
std::string getMergerLogName()
void sighandler(int sig)
std::string getWorkerLogName()
std::vector< char * > checkArgs(int argc, char *argv[])
void launchControlThread()
std::string getControlAddress()
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
#define main
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153