95 if (conf.forwardKine()) {
96 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
97 auto forwardchannel = fair::mq::Channel{
"kineforward",
"pair", factory};
99 forwardchannel.Bind(
address.c_str());
100 forwardchannel.Validate();
101 fair::mq::Parts parts;
102 fair::mq::MessagePtr payload(forwardchannel.NewMessage());
105 auto channelAlloc = o2::pmr::getTransportAllocator(forwardchannel.Transport());
107 parts.AddPart(std::move(header));
108 parts.AddPart(std::move(payload));
109 int timeoutinMS = 1000;
110 if (forwardchannel.Send(parts, timeoutinMS) > 0) {
111 LOGP(info,
"SENDING END-OF-STREAM TO PROXY AT {}",
address.c_str());
113 LOGP(warn,
"SENDING END-OF-STREAM TIMED OUT; PEER PROBABLY NO LONGER CONNECTED");
121 if (getenv(
"ALICE_O2SIM_DUMPLOG")) {
122 std::cerr <<
"------------- START OF EVENTSERVER LOG ----------" << std::endl;
123 std::stringstream catcommand1;
125 if (system(catcommand1.str().c_str()) != 0) {
126 LOG(warn) <<
"error executing system call";
129 std::cerr <<
"------------- START OF SIM WORKER(S) LOG --------" << std::endl;
130 std::stringstream catcommand2;
132 if (system(catcommand2.str().c_str()) != 0) {
133 LOG(warn) <<
"error executing system call";
136 std::cerr <<
"------------- START OF MERGER LOG ---------------" << std::endl;
137 std::stringstream catcommand3;
139 if (system(catcommand3.str().c_str()) != 0) {
140 LOG(warn) <<
"error executing system call";
217 static std::vector<std::thread> threads;
220 LOG(info) <<
"Control address is: " << controladdress;
221 setenv(
"ALICE_O2SIMCONTROL", internalcontroladdress.c_str(), 1);
223 auto lambda = [controladdress, internalcontroladdress]() {
224 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
227 auto internalchannel = fair::mq::Channel{
"o2sim-internal",
"pub", factory};
228 internalchannel.Bind(internalcontroladdress);
229 internalchannel.Validate();
230 std::unique_ptr<fair::mq::Message>
message(internalchannel.NewMessage());
233 auto outsidechannel = fair::mq::Channel{
"o2sim-control",
"rep", factory};
234 outsidechannel.Bind(controladdress);
235 outsidechannel.Validate();
236 std::unique_ptr<fair::mq::Message> request(outsidechannel.NewMessage());
238 bool keepgoing =
true;
240 outsidechannel.Init();
241 outsidechannel.Bind(controladdress);
242 outsidechannel.Validate();
243 if (outsidechannel.Receive(request) > 0) {
244 std::string command(
reinterpret_cast<char const*
>(request->GetData()), request->GetSize());
245 LOG(info) <<
"Control message: " << command <<
" received ";
249 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
250 outsidechannel.Send(reply);
257 LOG(warn) <<
"CONTROL REQUEST COULD NOT BE PARSED";
260 std::unique_ptr<fair::mq::Message> reply(outsidechannel.NewSimpleMessage(code));
261 outsidechannel.Send(reply);
268 internalchannel.Send(request);
269 keepgoing = !reconfig.
stop;
275 threads.push_back(std::thread(lambda));
276 threads.back().detach();
283 static std::vector<std::thread> threads;
285 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
287 auto listenchannel = fair::mq::Channel{
"channel0",
"sub", factory};
288 listenchannel.Init();
290 address <<
"ipc:///tmp/o2sim-worker-notifications-" << getpid();
291 listenchannel.Connect(
address.str());
292 listenchannel.Validate();
293 std::unique_ptr<fair::mq::Message>
message(listenchannel.NewMessage());
296 if (listenchannel.Receive(
message) > 0) {
297 std::string
msg(
reinterpret_cast<char const*
>(
message->GetData()),
message->GetSize());
298 LOG(info) <<
"Worker message: " <<
msg;
302 threads.push_back(std::thread(lambda));
303 threads.back().detach();
309 int pipefd, std::string text, std::vector<int>& eventcontainer,
310 std::function<
bool(std::vector<int>
const&)> callback = [](std::vector<int>
const&) {
return true; })
312 static std::vector<std::thread> threads;
313 auto lambda = [pipefd, text, callback, &eventcontainer]() {
316 ssize_t
count =
read(pipefd, &eventcounter,
sizeof(eventcounter));
318 LOG(info) <<
"ERROR READING";
319 if (errno == EINTR) {
324 }
else if (
count == 0) {
327 eventcontainer.push_back(eventcounter);
328 if (callback(eventcontainer)) {
329 LOG(info) << text.c_str() << eventcounter;
334 threads.push_back(std::thread(lambda));
335 threads.back().detach();
395 std::vector<std::string> modifiedArgs;
399 if (conf.resetFromArguments(argc, argv)) {
400 for (
int i = 0;
i < argc; ++
i) {
401 modifiedArgs.push_back(argv[
i]);
406 if (conf.getRunNumber() != -1) {
411 auto soreor = ccdbmgr.getRunDuration(conf.getRunNumber());
412 auto timestamp = conf.getTimestamp();
414 timestamp = soreor.first;
415 LOG(info) <<
"Fixing timestamp to " << timestamp <<
" based on run number";
416 modifiedArgs.push_back(
"--timestamp");
419 LOG(fatal) <<
"The given timestamp " << timestamp <<
" is incompatible with the given run number " << conf.getRunNumber() <<
" starting at " << soreor.first <<
" and ending at " << soreor.second;
423 std::vector<char*>
final(modifiedArgs.size(),
nullptr);
424 for (
int i = 0;
i < modifiedArgs.size(); ++
i) {
425 final[
i] =
new char[modifiedArgs[
i].size() + 1];
426 strcpy(
final[
i], modifiedArgs[
i].c_str());
433int main(
int argc,
char* argv[])
441 setenv(
"ALICE_SIMFORKINTERNAL",
"ON", 1);
445 auto o2env = getenv(
"O2_ROOT");
447 LOG(fatal) <<
"O2_ROOT environment not defined";
449 std::string rootpath(o2env);
450 std::string installpath = rootpath +
"/bin";
453 std::stringstream configss;
454 configss << rootpath <<
"/share/config/o2simtopology_template.json";
455 auto localconfig = std::string(
"o2simtopology_") +
std::to_string(getpid()) + std::string(
".json");
459 std::ifstream in(configss.str());
460 std::ofstream out(localconfig);
461 std::string wordToReplace(
"#PID#");
464 size_t len = wordToReplace.length();
465 while (std::getline(in, line)) {
466 size_t pos = line.find(wordToReplace);
467 if (
pos != std::string::npos) {
468 line.replace(
pos,
len, wordToReplaceWith);
481 if (finalArgs.size() == 0) {
489 if (!conf.resetFromArguments(finalArgs.size(), &finalArgs[0])) {
494 if (conf.getNEvents() <= 0 && !conf.asService()) {
495 LOG(info) <<
"No events to be simulated; Switching to non-distributed mode";
496 const int Nargs = finalArgs.size() + 1;
498 std::string
name(
"o2-sim-serial-run5");
500 std::string
name(
"o2-sim-serial");
504 for (
int i = 1;
i < finalArgs.size(); ++
i) {
508 std::string
path = installpath +
"/" +
name;
517 if (conf.asService()) {
524 int nworkers = conf.getNSimWorkers();
526 LOG(info) <<
"Running with " << nworkers <<
" sim workers ";
531 if (getenv(
"ALICE_NOSIMSHM")) {
535 int pipe_serverdriver_fd[2];
536 if (pipe(pipe_serverdriver_fd) != 0) {
537 perror(
"problem in creating pipe");
543 int fd = open(
getServerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
544 setenv(
"ALICE_O2SIMSERVERTODRIVER_PIPE",
std::to_string(pipe_serverdriver_fd[1]).c_str(), 1);
549 close(pipe_serverdriver_fd[0]);
552 const std::string
name(
"o2-sim-primary-server-device-runner");
553 const std::string
path = installpath +
"/" +
name;
554 const std::string config = localconfig;
558 const int addNArgs = 12;
560 const int addNArgs = 11;
562 const int Nargs = finalArgs.size() + addNArgs;
578 for (
int i = 1;
i < finalArgs.size(); ++
i) {
582 for (
int i = 0;
i < Nargs; ++
i) {
587 std::cerr <<
"$$$$\n";
589 LOG(info) <<
"Starting the server"
598 close(pipe_serverdriver_fd[1]);
599 std::cout <<
"Spawning particle server on PID " <<
pid <<
"; Redirect output to " <<
getServerLogName() <<
"\n";
602 auto distributionCallback = [&conf, &externalpublishchannel](std::vector<int>
const&
v) {
603 std::stringstream
str;
604 if (
v.back() == -111) {
609 str <<
"EVENT " <<
v.back() <<
" DISTRIBUTED";
628 auto internalfork = getenv(
"ALICE_SIMFORKINTERNAL");
633 for (
int id = 0;
id < nworkers; ++
id) {
635 std::stringstream workerlogss;
639 std::stringstream workerss;
640 workerss <<
"worker" <<
id;
644 int fd = open(workerlogss.str().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
650 const std::string
name(
"o2-sim-device-runner");
651 const std::string
path = installpath +
"/" +
name;
653 execl(
path.c_str(),
name.c_str(),
"--control",
"static",
"--id", workerss.str().c_str(),
"--config-key",
654 "worker",
"--mq-config", localconfig.c_str(),
"--severity",
"info", (
char*)
nullptr);
659 std::cout <<
"Spawning sim worker " <<
id <<
" on PID " <<
pid
660 <<
"; Redirect output to " << workerlogss.str() <<
"\n";
665 int pipe_mergerdriver_fd[2];
666 if (pipe(pipe_mergerdriver_fd) != 0) {
667 perror(
"problem in creating pipe");
672 std::atomic<bool> shutdown_initiated =
false;
674 int fd = open(
getMergerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
679 close(pipe_mergerdriver_fd[0]);
680 setenv(
"ALICE_O2SIMMERGERTODRIVER_PIPE",
std::to_string(pipe_mergerdriver_fd[1]).c_str(), 1);
681 const std::string
name(
"o2-sim-hit-merger-runner");
682 const std::string
path = installpath +
"/" +
name;
683 execl(
path.c_str(),
name.c_str(),
"--control",
"static",
"--catch-signals",
"0",
"--id",
"hitmerger",
"--mq-config", localconfig.c_str(),
"--color",
"false",
687 std::cout <<
"Spawning hit merger on PID " <<
pid <<
"; Redirect output to " <<
getMergerLogName() <<
"\n";
690 close(pipe_mergerdriver_fd[1]);
695 auto finishCallback = [&shutdown_initiated, &conf, &externalpublishchannel](std::vector<int>
const&
v) {
696 std::stringstream
str;
701 if (!conf.asService()) {
702 LOG(info) <<
"SIMULATION IS DONE. INITIATING SHUTDOWN.";
703 if (!shutdown_initiated) {
704 shutdown_initiated =
true;
710 LOG(info) <<
"SIMULATION DONE. STAYING AS DAEMON.";
724 bool errored =
false;
725 while ((cpid = wait(&status)) != mergerpid) {
726 if (WEXITSTATUS(status) || WIFSIGNALED(status)) {
727 if (!shutdown_initiated) {
728 LOG(info) <<
"Process " << cpid <<
" EXITED WITH CODE " << WEXITSTATUS(status) <<
" SIGNALED "
729 << WIFSIGNALED(status) <<
" SIGNAL " << WTERMSIG(status);
733 LOG(info) <<
"Problem detected (or child received termination signal) ... shutting down whole system ";
735 LOG(info) <<
"TERMINATING " << p;
738 LOG(error) <<
"SHUTTING DOWN DUE TO SIGNALED EXIT IN COMPONENT " << cpid;
745 LOG(info) <<
"Merger process " << mergerpid <<
" returned";
746 LOG(info) <<
"Simulation process took " << timer.RealTime() <<
" s";
748 if (!errored && !shutdown_initiated) {
749 shutdown_initiated =
true;
752 if (p != mergerpid) {
753 LOG(info) <<
"SHUTTING DOWN CHILD PROCESS (normal thread)" << p;
763 while ((cpid = wait(&status))) {
774 if (returncode == 0) {
775 LOG(info) <<
"SIMULATION RETURNED SUCCESFULLY";