95 if (conf.forwardKine()) {
96 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
97 auto forwardchannel = fair::mq::Channel{
"kineforward",
"pair", factory};
99 forwardchannel.Bind(
address.c_str());
100 forwardchannel.Validate();
101 fair::mq::Parts parts;
102 fair::mq::MessagePtr payload(forwardchannel.NewMessage());
105 auto channelAlloc = o2::pmr::getTransportAllocator(forwardchannel.Transport());
107 parts.AddPart(std::move(header));
108 parts.AddPart(std::move(payload));
109 int timeoutinMS = 1000;
110 if (forwardchannel.Send(parts, timeoutinMS) > 0) {
111 LOGP(info,
"SENDING END-OF-STREAM TO PROXY AT {}",
address.c_str());
113 LOGP(warn,
"SENDING END-OF-STREAM TIMED OUT; PEER PROBABLY NO LONGER CONNECTED");
121 if (getenv(
"ALICE_O2SIM_DUMPLOG")) {
122 std::cerr <<
"------------- START OF EVENTSERVER LOG ----------" << std::endl;
123 std::stringstream catcommand1;
125 if (system(catcommand1.str().c_str()) != 0) {
126 LOG(warn) <<
"error executing system call";
129 std::cerr <<
"------------- START OF SIM WORKER(S) LOG --------" << std::endl;
130 std::stringstream catcommand2;
132 if (system(catcommand2.str().c_str()) != 0) {
133 LOG(warn) <<
"error executing system call";
136 std::cerr <<
"------------- START OF MERGER LOG ---------------" << std::endl;
137 std::stringstream catcommand3;
139 if (system(catcommand3.str().c_str()) != 0) {
140 LOG(warn) <<
"error executing system call";
217 static std::vector<std::thread> threads;
220 LOG(info) <<
"Control address is: " << controladdress;
221 setenv(
"ALICE_O2SIMCONTROL", internalcontroladdress.c_str(), 1);
223 auto lambda = [controladdress, internalcontroladdress]() {
224 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
227 auto internalchannel = fair::mq::Channel{
"o2sim-internal",
"pub", factory};
228 internalchannel.Bind(internalcontroladdress);
229 internalchannel.Validate();
230 std::unique_ptr<fair::mq::Message>
message(internalchannel.NewMessage());
233 auto outsidechannel = fair::mq::Channel{
"o2sim-control",
"rep", factory};
234 outsidechannel.Bind(controladdress);
235 outsidechannel.Validate();
236 std::unique_ptr<fair::mq::Message> request(outsidechannel.NewMessage());
238 bool keepgoing =
true;
240 outsidechannel.Init();
241 outsidechannel.Bind(controladdress);
242 outsidechannel.Validate();
243 if (outsidechannel.Receive(request) > 0) {
244 std::string command(
reinterpret_cast<char const*
>(request->GetData()), request->GetSize());
245 LOG(info) <<
"Control message: " << command <<
" received ";
249 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
250 outsidechannel.Send(reply);
257 LOG(warn) <<
"CONTROL REQUEST COULD NOT BE PARSED";
260 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
261 outsidechannel.Send(reply);
268 internalchannel.Send(request);
269 keepgoing = !reconfig.
stop;
275 threads.push_back(std::thread(lambda));
276 threads.back().detach();
283 static std::vector<std::thread> threads;
285 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
287 auto listenchannel = fair::mq::Channel{
"channel0",
"sub", factory};
288 listenchannel.Init();
290 address <<
"ipc:///tmp/o2sim-worker-notifications-" << getpid();
291 listenchannel.Connect(
address.str());
292 listenchannel.Validate();
293 std::unique_ptr<fair::mq::Message>
message(listenchannel.NewMessage());
296 if (listenchannel.Receive(
message) > 0) {
297 std::string
msg(
reinterpret_cast<char const*
>(
message->GetData()),
message->GetSize());
298 LOG(info) <<
"Worker message: " <<
msg;
302 threads.push_back(std::thread(lambda));
303 threads.back().detach();
309 int pipefd, std::string text, std::vector<int>& eventcontainer,
310 std::function<
bool(std::vector<int>
const&)> callback = [](std::vector<int>
const&) {
return true; })
312 static std::vector<std::thread> threads;
313 auto lambda = [pipefd, text, callback, &eventcontainer]() {
316 ssize_t
count =
read(pipefd, &eventcounter,
sizeof(eventcounter));
318 LOG(info) <<
"ERROR READING";
319 if (errno == EINTR) {
324 }
else if (
count == 0) {
327 eventcontainer.push_back(eventcounter);
328 if (callback(eventcontainer)) {
329 LOG(info) << text.c_str() << eventcounter;
334 threads.push_back(std::thread(lambda));
335 threads.back().detach();
397 std::vector<std::string> modifiedArgs;
401 if (conf.resetFromArguments(argc, argv)) {
402 for (
int i = 0;
i < argc; ++
i) {
403 modifiedArgs.push_back(argv[
i]);
408 if (conf.getRunNumber() != -1) {
413 auto soreor = ccdbmgr.getRunDuration(conf.getRunNumber());
414 auto timestamp = conf.getTimestamp();
416 timestamp = soreor.first;
417 LOG(info) <<
"Fixing timestamp to " << timestamp <<
" based on run number";
418 modifiedArgs.push_back(
"--timestamp");
421 LOG(fatal) <<
"The given timestamp " << timestamp <<
" is incompatible with the given run number " << conf.getRunNumber() <<
" starting at " << soreor.first <<
" and ending at " << soreor.second;
425 std::vector<char*>
final(modifiedArgs.size(),
nullptr);
426 for (
int i = 0;
i < modifiedArgs.size(); ++
i) {
427 final[
i] =
new char[modifiedArgs[
i].size() + 1];
428 strcpy(
final[
i], modifiedArgs[
i].c_str());
435int main(
int argc,
char* argv[])
443 setenv(
"ALICE_SIMFORKINTERNAL",
"ON", 1);
446 if (setpgid(0, 0) == -1) {
453 auto o2env = getenv(
"O2_ROOT");
455 LOG(fatal) <<
"O2_ROOT environment not defined";
457 std::string rootpath(o2env);
458 std::string installpath = rootpath +
"/bin";
461 std::stringstream configss;
462 configss << rootpath <<
"/share/config/o2simtopology_template.json";
463 auto localconfig = std::string(
"o2simtopology_") +
std::to_string(getpid()) + std::string(
".json");
467 std::ifstream in(configss.str());
468 std::ofstream out(localconfig);
469 std::string wordToReplace(
"#PID#");
472 size_t len = wordToReplace.length();
473 while (std::getline(in, line)) {
474 size_t pos = line.find(wordToReplace);
475 if (
pos != std::string::npos) {
476 line.replace(
pos,
len, wordToReplaceWith);
489 if (finalArgs.size() == 0) {
497 if (!conf.resetFromArguments(finalArgs.size(), &finalArgs[0])) {
502 if (conf.getNEvents() <= 0 && !conf.asService()) {
503 LOG(info) <<
"No events to be simulated; Switching to non-distributed mode";
504 const int Nargs = finalArgs.size() + 1;
506 std::string
name(
"o2-sim-serial-run5");
508 std::string
name(
"o2-sim-serial");
512 for (
int i = 1;
i < finalArgs.size(); ++
i) {
516 std::string
path = installpath +
"/" +
name;
525 if (conf.asService()) {
532 int nworkers = conf.getNSimWorkers();
534 LOG(info) <<
"Running with " << nworkers <<
" sim workers ";
539 if (getenv(
"ALICE_NOSIMSHM")) {
543 int pipe_serverdriver_fd[2];
544 if (pipe(pipe_serverdriver_fd) != 0) {
545 perror(
"problem in creating pipe");
551 int fd = open(
getServerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
552 setenv(
"ALICE_O2SIMSERVERTODRIVER_PIPE",
std::to_string(pipe_serverdriver_fd[1]).c_str(), 1);
557 close(pipe_serverdriver_fd[0]);
560 const std::string
name(
"o2-sim-primary-server-device-runner");
561 const std::string
path = installpath +
"/" +
name;
562 const std::string config = localconfig;
566 const int addNArgs = 12;
568 const int addNArgs = 11;
570 const int Nargs = finalArgs.size() + addNArgs;
586 for (
int i = 1;
i < finalArgs.size(); ++
i) {
590 for (
int i = 0;
i < Nargs; ++
i) {
595 std::cerr <<
"$$$$\n";
597 LOG(info) <<
"Starting the server"
606 close(pipe_serverdriver_fd[1]);
607 std::cout <<
"Spawning particle server on PID " <<
pid <<
"; Redirect output to " <<
getServerLogName() <<
"\n";
610 auto distributionCallback = [&conf, &externalpublishchannel](std::vector<int>
const&
v) {
611 std::stringstream
str;
612 if (
v.back() == -111) {
617 str <<
"EVENT " <<
v.back() <<
" DISTRIBUTED";
636 auto internalfork = getenv(
"ALICE_SIMFORKINTERNAL");
641 for (
int id = 0;
id < nworkers; ++
id) {
643 std::stringstream workerlogss;
647 std::stringstream workerss;
648 workerss <<
"worker" <<
id;
652 int fd = open(workerlogss.str().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
658 const std::string
name(
"o2-sim-device-runner");
659 const std::string
path = installpath +
"/" +
name;
661 execl(
path.c_str(),
name.c_str(),
"--control",
"static",
"--id", workerss.str().c_str(),
"--config-key",
662 "worker",
"--mq-config", localconfig.c_str(),
"--severity",
"info", (
char*)
nullptr);
667 std::cout <<
"Spawning sim worker " <<
id <<
" on PID " <<
pid
668 <<
"; Redirect output to " << workerlogss.str() <<
"\n";
673 int pipe_mergerdriver_fd[2];
674 if (pipe(pipe_mergerdriver_fd) != 0) {
675 perror(
"problem in creating pipe");
680 std::atomic<bool> shutdown_initiated =
false;
682 int fd = open(
getMergerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
687 close(pipe_mergerdriver_fd[0]);
688 setenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE",
std::to_string(pipe_mergerdriver_fd[1]).c_str(), 1);
689 const std::string
name(
"o2-sim-hit-merger-runner");
690 const std::string
path = installpath +
"/" +
name;
691 execl(
path.c_str(),
name.c_str(),
"--control",
"static",
"--catch-signals",
"0",
"--id",
"hitmerger",
"--mq-config", localconfig.c_str(),
"--color",
"false",
695 std::cout <<
"Spawning hit merger on PID " <<
pid <<
"; Redirect output to " <<
getMergerLogName() <<
"\n";
698 close(pipe_mergerdriver_fd[1]);
703 auto finishCallback = [&shutdown_initiated, &conf, &externalpublishchannel](std::vector<int>
const&
v) {
704 std::stringstream
str;
709 if (!conf.asService()) {
710 LOG(info) <<
"SIMULATION IS DONE. INITIATING SHUTDOWN.";
711 if (!shutdown_initiated) {
712 shutdown_initiated =
true;
714 if (killpg(p, 0) == 0) {
720 LOG(info) <<
"SIMULATION DONE. STAYING AS DAEMON.";
734 bool errored =
false;
735 while ((cpid = wait(&status)) != mergerpid) {
736 if (WEXITSTATUS(status) || WIFSIGNALED(status)) {
737 if (!shutdown_initiated) {
738 LOG(info) <<
"Process " << cpid <<
" EXITED WITH CODE " << WEXITSTATUS(status) <<
" SIGNALED "
739 << WIFSIGNALED(status) <<
" SIGNAL " << WTERMSIG(status);
743 LOG(info) <<
"Problem detected (or child received termination signal) ... shutting down whole system ";
745 LOG(info) <<
"TERMINATING " << p;
746 if (killpg(p, 0) == 0) {
750 LOG(error) <<
"SHUTTING DOWN DUE TO SIGNALED EXIT IN COMPONENT " << cpid;
757 LOG(info) <<
"Merger process " << mergerpid <<
" returned";
758 LOG(info) <<
"Simulation process took " << timer.RealTime() <<
" s";
760 if (!errored && !shutdown_initiated) {
761 shutdown_initiated =
true;
764 if (p != mergerpid) {
765 LOG(info) <<
"SHUTTING DOWN CHILD PROCESS (normal thread)" << p;
766 if (killpg(p, 0) == 0) {
777 while ((cpid = wait(&status))) {
788 if (returncode == 0) {
789 LOG(info) <<
"SIMULATION RETURNED SUCCESFULLY";