117 using namespace fair::mq::hooks;
120 fair::mq::DeviceRunner runner{argc, argv};
122 runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner&
r) {
123 boost::program_options::options_description customOptions(
"Custom options");
125 r.fConfig.AddToCmdLineOptions(customOptions);
128 runner.AddHook<InstantiateDevice>([](DeviceRunner&
r) {
129 r.fDevice = std::unique_ptr<fair::mq::Device>{
getDevice(
r.fConfig)};
133 }
catch (std::exception& e) {
134 LOG(error) <<
"Unhandled exception reached the top of main: " << e.what()
135 <<
", application will now exit";
138 LOG(error) <<
"Non-exception instance being thrown. Please make sure you use std::runtime_exception() instead. "
139 <<
"Application will now exit.";
152KernelSetup initSim(std::string transport, std::string primaddress, std::string primstatusaddress, std::string mergeraddress,
int workerID)
154 auto factory = fair::mq::TransportFactory::CreateTransportFactory(transport);
155 auto primchannel =
new fair::mq::Channel{
"primary-get",
"req", factory};
156 primchannel->Connect(primaddress);
157 primchannel->Validate();
159 auto prim_status_channel =
new fair::mq::Channel{
"o2sim-primserv-info",
"req", factory};
160 prim_status_channel->Connect(primstatusaddress);
161 prim_status_channel->Validate();
163 auto datachannel =
new fair::mq::Channel{
"simdata",
"push", factory};
164 datachannel->Connect(mergeraddress);
165 datachannel->Validate();
172 return KernelSetup{sim, primchannel, datachannel, prim_status_channel, workerID};
224 static bool initialized =
false;
225 static fair::mq::Channel channel;
229 static auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
230 channel = fair::mq::Channel{
"o2sim-control",
"sub", factory};
231 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
232 channel.Connect(std::string(controlsocketname));
236 std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
238 doLogInfo(workerID,
"Listening for master control input");
239 if (channel.Receive(reply) > 0) {
240 auto data = reply->GetData();
241 auto size = reply->GetSize();
243 std::string command(
reinterpret_cast<char const*
>(
data),
size);
244 doLogInfo(workerID,
"Received control message: " + command);
249 doLogInfo(workerID,
"Stop asked, shutting down");
254 doLogInfo(workerID,
"No control input received ");
259int main(
int argc,
char* argv[])
261 struct sigaction act;
262 memset(&act, 0,
sizeof act);
263 sigemptyset(&act.sa_mask);
265 act.sa_flags = SA_SIGINFO;
267 std::vector<int> handledsignals = {SIGTERM, SIGINT, SIGQUIT, SIGSEGV, SIGBUS, SIGFPE, SIGABRT};
269 for (
auto s : handledsignals) {
270 if (sigaction(s, &act,
nullptr)) {
271 LOG(error) <<
"Could not install signal handler for " << s;
278 fair::Logger::OnFatal([] {
throw fair::FatalException(
"Fatal error occured. Exiting without core dump..."); });
280 FairLogger::GetLogger()->SetLogVerbosityLevel(
"MEDIUM");
283 bpo::options_description desc{
"Options"};
286 (
"control",
"control type")
288 (
"config-key",
"config key")
289 (
"mq-config",bpo::value<std::string>(),
"path to FairMQ config")
290 (
"severity",
"log severity");
292 bpo::variables_map vm;
293 bpo::store(parse_command_line(argc, argv, desc), vm);
296 std::string FMQconfig;
297 if (vm.count(
"mq-config")) {
298 FMQconfig = vm[
"mq-config"].as<std::string>();
301 auto internalfork = getenv(
"ALICE_SIMFORKINTERNAL");
303 int driverPID = getppid();
306 if (FMQconfig.empty()) {
307 throw std::runtime_error(
"This should never be called without FairMQ config.");
310 FILE* fp = fopen(FMQconfig.c_str(),
"r");
311 constexpr unsigned short usmax = std::numeric_limits<unsigned short>::max() - 1;
312 char readBuffer[usmax];
313 rapidjson::FileReadStream is(fp, readBuffer,
sizeof(readBuffer));
314 rapidjson::Document d;
319 std::string serveraddress;
320 std::string mergeraddress;
321 std::string serverstatus_address;
324 auto& options = d[
"fairMQOptions"];
325 assert(options.IsObject());
326 for (
auto option = options.MemberBegin(); option != options.MemberEnd(); ++option) {
327 s = option->name.GetString();
328 if (s ==
"devices") {
329 assert(option->value.IsArray());
330 auto devices = option->value.GetArray();
331 for (
auto& device : devices) {
332 s = device[
"id"].GetString();
333 if (s ==
"primary-server") {
334 auto channels = device[
"channels"].GetArray();
335 auto sockets = (
channels[0])[
"sockets"].GetArray();
336 auto address = (sockets[0])[
"address"].GetString();
338 sockets = (
channels[1])[
"sockets"].GetArray();
339 address = (sockets[0])[
"address"].GetString();
340 serverstatus_address =
address;
342 if (s ==
"hitmerger") {
343 auto channels = device[
"channels"].GetArray();
345 s = channel[
"name"].GetString();
346 if (s ==
"simdata") {
347 auto sockets = channel[
"sockets"].GetArray();
348 auto address = (sockets[0])[
"address"].GetString();
357 LOG(info) <<
"Parsed primary server address " << serveraddress;
358 LOG(info) <<
"Parsed primary server status address " << serverstatus_address;
359 LOG(info) <<
"Parsed merger address " << mergeraddress;
360 if (serveraddress.empty() || mergeraddress.empty()) {
361 throw std::runtime_error(
"Could not determine server or merger URLs.");
369 std::unique_ptr<FairRunSim> simrun;
371 if (!
initializeSim(
"zeromq", serverstatus_address, simrun)) {
372 LOG(error) <<
"Could not initialize simulation";
377 unsigned int nworkers = std::max(1u, std::thread::hardware_concurrency() / 2);
378 auto f = getenv(
"ALICE_NSIMWORKERS");
380 nworkers =
static_cast<unsigned int>(std::stoi(
f));
382 LOG(info) <<
"Running with " << nworkers <<
" sim workers ";
387 for (
auto i = 0u;
i < nworkers; ++
i) {
389 auto pid = (
i == nworkers - 1) ? 0 : fork();
395 auto collectAndPubThreadFunction = [driverPID, &pubchannel]() {
397 std::unique_ptr<fair::mq::Message>
msg(collectorchannel.NewMessage());
400 if (collectorchannel.Receive(
msg) > 0) {
403 std::string text(
reinterpret_cast<char const*
>(
data),
size);
409 if (
i == nworkers - 1) {
410 std::vector<std::thread> threads;
411 threads.push_back(std::thread(collectAndPubThreadFunction));
412 threads.back().detach();
422 auto kernelSetup =
initSim(
"zeromq", serveraddress, serverstatus_address, mergeraddress,
i);
424 std::stringstream worker;
425 worker <<
"WORKER" <<
i;
434 if (conf.asService()) {
435 LOG(info) <<
"IN SERVICE MODE WAITING";
442 LOG(info) <<
"FINISHING";
457 while ((cpid = wait(&status))) {
KernelSetup initSim(std::string transport, std::string primaddress, std::string primstatusaddress, std::string mergeraddress, int workerID)