Project
Loading...
Searching...
No Matches
O2PrimaryServerDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
16
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
22#include <DetectorsBase/Stack.h>
25#include <TMessage.h>
26#include <TClass.h>
31#include <SimConfig/SimConfig.h>
35#include <Field/MagneticField.h>
36#include <TGeoGlobalMagField.h>
37#include <typeinfo>
38#include <thread>
39#include <TROOT.h>
40#include <TStopwatch.h>
41#include <fstream>
42#include <iostream>
43#include <atomic>
44#include "PrimaryServerState.h"
46#include <chrono>
48#include <TRandom3.h>
49#include <regex>
50
51namespace o2
52{
53namespace devices
54{
55
57{
58 public:
61 {
62 mUseFixedChunkSeed = getenv("ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
63 if (mUseFixedChunkSeed) {
64 mFixedChunkSeed = atol(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
65 }
66 }
67
70 {
71 try {
72 if (mGeneratorThread.joinable()) {
73 mGeneratorThread.join();
74 }
75 if (mControlThread.joinable()) {
76 mControlThread.join();
77 }
78 } catch (...) {
79 }
80 }
81
82 protected:
84 {
85 TStopwatch timer;
86 timer.Start();
87 const auto& conf = mSimConfig;
89 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
90 ccdbmgr.setTimestamp(conf.getTimestamp());
91
92 // set the global information about the number of events to be generated
93 unsigned int nTotalEvents = conf.getNEvents();
95
96 // init magnetic field as it might be needed by the generator
97 if (TGeoGlobalMagField::Instance()->GetField() == nullptr) {
98 TGeoGlobalMagField::Instance()->SetField(o2::base::SimFieldUtils::createMagField());
99 TGeoGlobalMagField::Instance()->Lock();
100 }
101
102 // look if we find a cached instances of Pythia8 or external generators in order to avoid
103 // (long) initialization times.
104 // This is evidently a bit weak, as generators might need reconfiguration (to be treated later).
105 // For now, we'd like to allow for fast switches between say a pythia8 instance and reading from kinematics
106 // to continue an already started simulation.
107 //
108 // Not using cached instances for external kinematics since these might change input filenames etc.
109 // and are in any case quickly setup.
110 mPrimGen = nullptr;
111 if (conf.getGenerator().compare("extkin") != 0 || conf.getGenerator().compare("extkinO2") != 0) {
112 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
113 if (iter != mPrimGeneratorCache.end()) {
114 mPrimGen = iter->second.get();
115 LOG(info) << "Found cached generator for " << conf.getGenerator();
116 }
117 }
118
119 if (mPrimGen == nullptr) {
120 mPrimGen = new o2::eventgen::PrimaryGenerator;
122
123 // setup vertexing
124 auto vtxMode = conf.getVertexMode();
126 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
127 mPrimGen->setVertexMode(vtxMode);
128 } else if (vtxMode == VertexMode::kCCDB) {
129 // we need to fetch the CCDB object
130 mPrimGen->setVertexMode(vtxMode, ccdbmgr.getForTimeStamp<o2::dataformats::MeanVertexObject>("GLO/Calib/MeanVertex", conf.getTimestamp()));
131 } else if (vtxMode == VertexMode::kCollCxt) {
132 // The vertex will be injected from the outside via setExternalVertex
133 } else {
134 LOG(fatal) << "Unsupported vertex mode";
135 }
136
137 auto embedinto_filename = conf.getEmbedIntoFileName();
138 if (!embedinto_filename.empty()) {
139 // determine the sim prefix from the embedding filename
140 // the filename should be an MCHeader file ... so it should match SOME_PATH/prefix_MCHeader.root
141 std::regex re(R"((.*/)?([^/]+)_MCHeader\.root$)");
142 std::smatch match;
143
144 if (std::regex_search(embedinto_filename, match, re)) {
145 std::cout << "Extracted embedding prefix : " << match[2] << '\n';
146 mEmbeddIntoPrefix = match[2];
147 } else {
148 LOG(fatal) << "Embedding asked but no suitable embedding prefix extractable from " << embedinto_filename;
149 }
150 mPrimGen->embedInto(embedinto_filename);
151 }
152
153 mPrimGen->Init();
154
155 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
156 ptr_wrapper.reset(mPrimGen);
157 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
158 }
159 mPrimGen->SetEvent(&mEventHeader);
160
161 // A good moment to couple to collision context
162 auto collContextFileName_PrefixPair = mSimConfig.getCollContextFilenameAndEventPrefix();
163 auto collContextFileName = collContextFileName_PrefixPair.first;
164 if (collContextFileName.size() > 0) {
165 LOG(info) << "Simulation has collission context";
166 mCollissionContext = o2::steer::DigitizationContext::loadFromFile(collContextFileName);
167 if (mCollissionContext) {
168 const auto& vertices = mCollissionContext->getInteractionVertices();
169 LOG(info) << "We found " << vertices.size() << " vertices included ";
170
171 // initialize the eventID to collID mapping
172 const auto source = mCollissionContext->findSimPrefix(collContextFileName_PrefixPair.second);
173 if (source == -1) {
174 LOG(fatal) << "Wrong simulation prefix";
175 }
176 mEventID_to_CollID.clear();
177 mEventID_to_CollID = mCollissionContext->getCollisionIndicesForSource(source);
178 }
179 }
180
181 LOG(info) << "Generator initialization took " << timer.CpuTime() << "s";
182 if (mMaxEvents > 0) {
183 generateEvent(); // generate a first event
184 }
185 }
186
187 // function generating one event
188 void generateEvent(/*bool changeState = false*/)
189 {
190 bool changeState = true; // false;
191 LOG(info) << "Event generation started ";
192 if (changeState) {
194 }
195 TStopwatch timer;
196 timer.Start();
197 try {
198 bool valid = false;
199 int retry_counter = 0;
200 const int MAX_RETRY = 100;
201 do {
202 mStack->Reset();
203 const auto& conf = mSimConfig;
204 // see if we the vertex comes from the collision context
205 if (mCollissionContext && conf.getVertexMode() == o2::conf::VertexMode::kCollCxt) {
206 const auto& vertices = mCollissionContext->getInteractionVertices();
207 if (vertices.size() > 0) {
208 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
209 auto& vertex = vertices.at(collisionindex);
210 LOG(info) << "Setting vertex " << vertex << " for event " << mEventCounter << " for prefix " << mSimConfig.getOutPrefix() << " from CollContext";
211 mPrimGen->setExternalVertexForNextEvent(vertex.X(), vertex.Y(), vertex.Z());
212
213 // set correct embedding index for PrimaryGenerator ... based on collision context for embedding
214 auto& collisionParts = mCollissionContext->getEventParts()[collisionindex];
215 int background_index = -1; // -1 means no embedding taking place for this signal
216
217 // find the part that corresponds to the event embeded into
218 for (auto& part : collisionParts) {
219 if (mCollissionContext->getSimPrefixes()[part.sourceID] == mEmbeddIntoPrefix) {
220 background_index = part.entryID;
221 LOG(info) << "Setting embedding index to " << background_index;
222 }
223 }
224 mPrimGen->setEmbedIndex(background_index);
225 }
226 }
227 mPrimGen->GenerateEvent(mStack);
228 if (mStack->getPrimaries().size() > 0) {
229 valid = true;
230 } else {
231 retry_counter++;
232 if (retry_counter > MAX_RETRY) {
233 LOG(warn) << "Not able to generate a non-empty event in " << MAX_RETRY << " trials";
234 // empty event is sent out
235 valid = true;
236 }
237 }
238 } while (!valid);
239 } catch (std::exception const& e) {
240 LOG(error) << " Exception occurred during event gen " << e.what();
241 }
242 timer.Stop();
243 LOG(info) << "Event generation took " << timer.CpuTime() << "s"
244 << " and produced " << mStack->getPrimaries().size() << " primaries ";
245 if (changeState) {
247 }
248 }
249
250 // launches a thread that listens for status/config/shutdown requests from outside asynchronously
252 {
253 static std::vector<std::thread> threads;
254 auto sendErrorReply = [](fair::mq::Channel& channel) {
255 LOG(error) << "UNKNOWN REQUEST";
256 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)(404)));
257 channel.Send(reply);
258 };
259
260 LOG(info) << "LAUNCHING STATUS THREAD";
261 auto lambda = [this, sendErrorReply]() {
262 bool canShutdown{false};
263 // Exit only when both: serving stopped and allowed from outside.
264 while (!(mState == O2PrimaryServerState::Stopped && canShutdown)) {
265 auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
266 if (!channel.IsValid()) {
267 LOG(error) << "channel primserv-info not valid";
268 }
269 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)(-1)));
270 int timeout = 100; // 100ms --> so as not to block and allow for proper termination of this thread
271 if (channel.Receive(request, timeout) > 0) {
272 int request_payload; // we expect an (int) ~ to type O2PrimaryServerInfoRequest
273 if (request->GetSize() != sizeof(request_payload)) {
274 LOG(error) << "Obtained request with unexpected payload size";
275 sendErrorReply(channel); // ALWAYS reply
276 }
277
278 memcpy(&request_payload, request->GetData(), sizeof(request_payload));
279
280 if (request_payload == (int)O2PrimaryServerInfoRequest::Status) {
281 LOG(info) << "Received status request";
282 // request needs to be a simple enum of type O2PrimaryServerInfoRequest
283 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)mState.load()));
284 if (channel.Send(reply) > 0) {
285 LOG(info) << "Send status successful";
286 }
287 } else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) {
288 HandleConfigRequest(channel);
289 } else if (request_payload == (int)O2PrimaryServerInfoRequest::AllowShutdown) {
290 LOG(info) << "Got info that we may shutdown";
291 std::unique_ptr<fair::mq::Message> ack(channel.NewSimpleMessage(200));
292 channel.Send(ack);
293 canShutdown = true;
294 } else {
295 sendErrorReply(channel);
296 }
297 }
298 }
299 mInfoThreadStopped = true;
300 };
301 threads.push_back(std::thread(lambda));
302 threads.back().detach();
303 }
304
305 void InitTask() final
306 {
307 // fatal without core dump
308 fair::Logger::OnFatal([] { throw fair::FatalException("Fatal error occured. Exiting without core dump..."); });
309
310 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), "SERVER : INITIALIZING");
311
313 LOG(info) << "Init Server device ";
314
315 // init sim config
316 auto& vm = GetConfig()->GetVarMap();
317 auto& conf = o2::conf::SimConfig::Instance();
318 if (vm.count("isRun5")) {
319 conf.setRun5();
320 }
321 conf.resetFromParsedMap(vm);
322
323 // update the parameters from an INI/JSON file, if given (overrides code-based version)
325 // update the parameters from stuff given at command line (overrides file-based version)
326 o2::conf::ConfigurableParam::updateFromString(conf.getKeyValueString());
327
328 // customize the level of log output
329 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
330 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
331
332 // from now on mSimConfig should be used within this process
333 mSimConfig = conf;
334
335 mStack = new o2::data::Stack();
336 mStack->setExternalMode(true);
337
338 // MC ENGINE
339 LOG(info) << "ENGINE SET TO " << vm["mcEngine"].as<std::string>();
340 // CHUNK SIZE
341 mChunkGranularity = vm["chunkSize"].as<unsigned int>();
342 LOG(info) << "CHUNK SIZE SET TO " << mChunkGranularity;
343
344 // initial initial seed --> we should store this somewhere
345 mInitialSeed = vm["seed"].as<ULong_t>();
346 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
347 mSeedGenerator.SetSeed(mInitialSeed);
348 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
349
350 mMaxEvents = conf.getNEvents();
351
352 // need to make ROOT thread-safe since we use ROOT services in all places
353 ROOT::EnableThreadSafety();
354
356
357 // launch initialization of particle generator asynchronously
358 // so that we reach the RUNNING state of the server quickly
359 // and do not block here
360 mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
361 if (mGeneratorThread.joinable()) {
362 try {
363 mGeneratorThread.join();
364 } catch (std::exception const& e) {
365 LOG(warn) << "Exception during thread join ..ignoring";
366 }
367 }
368
369 // init pipe
370 auto pipeenv = getenv("ALICE_O2SIMSERVERTODRIVER_PIPE");
371 if (pipeenv) {
372 mPipeToDriver = atoi(pipeenv);
373 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
374 } else {
375 LOG(info) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
376 }
377
378 mAsService = vm["asservice"].as<bool>();
379 if (mAsService) {
380 mControlChannel = fair::mq::Channel{"o2sim-control", "sub", fTransportFactory};
381 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
382 if (!controlsocketname) {
383 LOG(fatal) << "Internal error: Socketname for control input missing";
384 }
385 mControlChannel.Connect(std::string(controlsocketname));
386 mControlChannel.Validate();
387 }
388
389 if (mMaxEvents <= 0) {
390 if (mAsService) {
392 }
393 } else {
395 }
396
397 // feedback to driver that we are done initializing
398 if (mPipeToDriver != -1) {
399 int message = -111; // special code meaning end of initialization
400 if (write(mPipeToDriver, &message, sizeof(int))) {
401 }
402 }
403 }
404
405 // function for intermediate/on-the-fly reinitializations
406 bool ReInit(o2::conf::SimReconfigData const& reconfig)
407 {
408 LOG(info) << "ReInit Server device ";
409
410 if (reconfig.stop) {
411 return false;
412 }
413
414 // mSimConfig.getConfigData().mKeyValueTokens=reconfig.keyValueTokens;
415 // Think about this:
416 // update the parameters from an INI/JSON file, if given (overrides code-based version)
418 // update the parameters from stuff given at command line (overrides file-based version)
420
421 // initial initial seed --> we should store this somewhere
422 mInitialSeed = reconfig.startSeed;
423 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
424 mSeedGenerator.SetSeed(mInitialSeed);
425 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
426
427 mMaxEvents = reconfig.nEvents;
428
429 // updating the simconfig member with new information especially concerning the generators
430 // TODO: put this into utility function?
431 mSimConfig.getConfigData().mGenerator = reconfig.generator;
432 mSimConfig.getConfigData().mTrigger = reconfig.trigger;
433 mSimConfig.getConfigData().mExtKinFileName = reconfig.extKinfileName;
434
435 mEventCounter = 0;
436 mPartCounter = 0;
437 mNeedNewEvent = true;
438 // reinit generator and start generation of a new event
439 if (mGeneratorThread.joinable()) {
440 try {
441 mGeneratorThread.join();
442 } catch (std::exception const& e) {
443 LOG(warn) << "Exception during thread join ..ignoring";
444 }
445 }
446 // mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
448
449 return true;
450 }
451
452 // method reacting to requests to get the simulation configuration
453 bool HandleConfigRequest(fair::mq::Channel& channel)
454 {
455 LOG(info) << "Received config request";
456 // just sending the simulation configuration to anyone that wants it
457 const auto& confdata = mSimConfig.getConfigData();
458
459 TMessage* tmsg = new TMessage(kMESS_OBJECT);
460 tmsg->WriteObjectAny((void*)&confdata, TClass::GetClass(typeid(confdata)));
461
462 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
463
464 std::unique_ptr<fair::mq::Message> message(
465 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
466
467 // send answer
468 if (channel.Send(message) > 0) {
469 LOG(info) << "config reply send ";
470 return true;
471 } else {
472 LOG(error) << "Failure sending config reply ";
473 }
474 return true;
475 }
476
477 bool ConditionalRun() override
478 {
479 // we might come here in IDLE mode
480 if (mState.load() == O2PrimaryServerState::Idle) {
481 if (mWaitingControlInput.load() == 0) {
482 if (mControlThread.joinable()) {
483 mControlThread.join();
484 }
485 mControlThread = std::thread(&O2PrimaryServerDevice::waitForControlInput, this);
486 }
487 }
488
489 auto& channel = GetChannels().at("primary-get").at(0);
490 PrimaryChunkRequest requestpayload;
491 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
492 auto bytes = channel.Receive(request);
493 if (bytes < 0) {
494 LOG(error) << "Some error/interrupt occurred on socket during receive";
495 if (NewStatePending()) { // new state is typically pending if (term) signal was received
496 WaitForNextState();
497 // ask ourselves for termination of this loop
499 }
500 return false;
501 }
502
503 TStopwatch timer;
504 timer.Start();
505 auto& r = *((PrimaryChunkRequest*)(request->GetData()));
506 LOG(debug) << "PARTICLE REQUEST IN STATE " << PrimStateToString[(int)mState.load()] << " from " << r.workerid << ":" << r.requestid;
507
508 auto prestate = mState.load();
509 auto more = HandleRequest(request, 0, channel);
510 if (!more) {
511 if (mAsService) {
514 }
515 } else {
517 }
518 }
519 timer.Stop();
520 auto time = timer.CpuTime();
521 LOG(debug) << "COND-RUN TOOK " << time << " s";
522 return mState != O2PrimaryServerState::Stopped;
523 }
524
525 void PostRun() override
526 {
527 // We shouldn't shut down immediately when all events have been served
528 // Instead we also need to wait until the info thread running some communication server
529 // with other processes is finished.
530 while (!mInfoThreadStopped) {
531 LOG(info) << "Waiting info thread";
532 using namespace std::chrono_literals;
533 std::this_thread::sleep_for(1000ms);
534 }
535 }
536
537 bool HandleRequest(fair::mq::MessagePtr& request, int /*index*/, fair::mq::Channel& channel)
538 {
539 // LOG(debug) << "GOT A REQUEST WITH SIZE " << request->GetSize();
540 // std::string requeststring(static_cast<char*>(request->GetData()), request->GetSize());
541 // LOG(info) << "NORMAL REQUEST STRING " << requeststring;
542 bool workavailable = true;
543 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
544 workavailable = false;
545 }
546 if (!(mState.load() == O2PrimaryServerState::ReadyToServe || mState.load() == O2PrimaryServerState::WaitingEvent)) {
547 // send a zero answer
548 workavailable = false;
549 }
550
551 PrimaryChunkAnswer header{mState, workavailable};
552 fair::mq::Parts reply;
553 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
554 reply.AddPart(std::move(headermsg));
555
556 LOG(debug) << "Received request for work " << mEventCounter << " " << mMaxEvents << " " << mNeedNewEvent << " available " << workavailable;
557 if (workavailable) {
558
559 if (mNeedNewEvent) {
560 // we need a newly generated event now
561 if (mGeneratorThread.joinable()) {
562 try {
563 mGeneratorThread.join();
564 } catch (std::exception const& e) {
565 LOG(warn) << "Exception during thread join ..ignoring";
566 }
567 }
568 // also if we are still in event waiting stage (doing some busy sleep)
569 while (mState.load() == O2PrimaryServerState::WaitingEvent) {
570 LOG(info) << "Waiting for event generation do become fully available";
571 usleep(100);
572 }
573 mNeedNewEvent = false;
574 mPartCounter = 0;
575 mEventCounter++;
576 }
577
578 auto& prims = mStack->getPrimaries();
579 auto numberofparts = (int)std::ceil(prims.size() / (1. * mChunkGranularity));
580 // number of parts should be at least 1 (even if empty)
581 numberofparts = std::max(1, numberofparts);
582
583 LOG(debug) << "Have " << prims.size() << " " << numberofparts;
584
587 i.eventID = workavailable ? mEventCounter : -1;
588 i.maxEvents = mMaxEvents;
589 i.part = mPartCounter + 1;
590 i.nparts = numberofparts;
591 // assign a deterministic (yet collision free seed) to process this particle chunk in Geant
592 // limit range to uint32_t since internal limit of TRandom (despite API suggesting otherwise)
593 const uint64_t drawnSeed = (uint64_t)(static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
594 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
595 i.index = m.mParticles.size();
596 i.mMCEventHeader = mEventHeader;
597 m.mSubEventInfo = i;
598
599 int endindex = prims.size() - mPartCounter * mChunkGranularity;
600 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
601 LOG(debug) << "indices " << startindex << " " << endindex;
602
603 if (startindex < 0) {
604 startindex = 0;
605 }
606 if (endindex < 0) {
607 endindex = 0;
608 }
609
610 for (int index = startindex; index < endindex; ++index) {
611 m.mParticles.emplace_back(prims[index]);
612 }
613
614 LOG(info) << "Sending " << m.mParticles.size() << " particles";
615 LOG(info) << "treating ev " << mEventCounter << " part " << i.part << " out of " << i.nparts;
616
617 // feedback to driver if new event started
618 if (mPipeToDriver != -1 && i.part == 1 && workavailable) {
619 if (write(mPipeToDriver, &mEventCounter, sizeof(mEventCounter))) {
620 }
621 }
622
623 mPartCounter++;
624 if (mPartCounter == numberofparts) {
625 mNeedNewEvent = true;
626 // start generation of a new event
627 if (mEventCounter < mMaxEvents) {
628 mGeneratorThread = std::thread(&O2PrimaryServerDevice::generateEvent, this);
629 }
630 }
631
632 TMessage* tmsg = new TMessage(kMESS_OBJECT);
633 tmsg->WriteObjectAny((void*)&m, TClass::GetClass("o2::data::PrimaryChunk"));
634
635 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
636
637 std::unique_ptr<fair::mq::Message> message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
638
639 reply.AddPart(std::move(message));
640 }
641
642 // send answer
643 TStopwatch timer;
644 timer.Start();
645 auto code = Send(reply, "primary-get", 0, 5000); // we introduce timeout in order not to block other requests
646 timer.Stop();
647 auto time = timer.CpuTime();
648 if (code > 0) {
649 LOG(debug) << "Reply send in " << time << "s";
650 return workavailable;
651 } else {
652 LOG(warn) << "Sending process had problems. Return code : " << code << " time " << time << "s";
653 }
654 return false; // -> error should not get here
655 }
656
658 {
659 LOG(info) << message << " CHANGING STATE TO " << PrimStateToString[(int)to];
660 mState = to;
661 }
662
664 {
665 mWaitingControlInput.store(1);
666 if (mState.load() != O2PrimaryServerState::Idle) {
667 mWaitingControlInput.store(0);
668 return;
669 }
670
671 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), o2::simpubsub::simStatusString("PRIMSERVER", "STATUS", "AWAITING INPUT"));
672 // this means we are idling
673
674 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
675
676 bool ok = false;
677
678 LOG(info) << "WAITING FOR CONTROL INPUT";
679 if (mControlChannel.Receive(reply) > 0) {
681 auto data = reply->GetData();
682 auto size = reply->GetSize();
683
684 std::string command(reinterpret_cast<char const*>(data), size);
685 LOG(info) << "message: " << command;
686
688 o2::conf::parseSimReconfigFromString(command, reconfig);
689 LOG(info) << "Processing " << reconfig.nEvents << " new events";
690 try {
691 LOG(info) << "REINIT START";
692 ok = ReInit(reconfig);
693 LOG(info) << "REINIT DONE";
694 } catch (std::exception e) {
695 LOG(info) << "Exception during reinit";
696 }
697 } else {
698 LOG(info) << "NOTHING RECEIVED";
699 }
700 if (ok) {
701 // stateTransition(O2PrimaryServerState::ReadyToServe, "CONTROL"); --> SHOULD BE DONE FROM EVENT GENERATOR (which get's however called only when mEvents>0)
702 } else {
704 }
705 mWaitingControlInput.store(0);
706 }
707
708 private:
709 o2::conf::SimConfig mSimConfig = o2::conf::SimConfig::Instance(); // local sim config object
710 o2::eventgen::PrimaryGenerator* mPrimGen = nullptr; // the current primary generator
712 o2::data::Stack* mStack = nullptr; // the stack which is filled (pointer since constructor to be called only init method)
713 int mChunkGranularity = 500; // how many primaries to send to a worker
714 int mPartCounter = 0;
715 bool mNeedNewEvent = true;
716 int mMaxEvents = 2;
717 ULong_t mInitialSeed = 0;
718 bool mUseFixedChunkSeed = false;
719 ULong_t mFixedChunkSeed = 0;
720 int mPipeToDriver = -1; // handle for direct piper to driver (to communicate meta info)
721 int mEventCounter = 0;
722
723 std::thread mGeneratorThread;
724 // or to generate events
725 std::thread mControlThread;
726
727 // Keeps various generators instantiated in memory
728 // useful when running simulation as a service (when generators
729 // change between batches). Also takes care of resource management of Primary generators via unique ptr
730 // TODO: some care needs to be taken (or the user warned) that the caching is based on generator name
731 // and that parameter-based reconfiguration is not yet implemented (for which we would need to hash all
732 // configuration parameters as well)
733 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
734
735 std::atomic<O2PrimaryServerState> mState{O2PrimaryServerState::Initializing};
736 std::atomic<int> mWaitingControlInput{0};
737 std::atomic<bool> mInfoThreadStopped{false};
738
739 bool mAsService = false;
740
741 // a dedicate (on-the-fly channel) for control messages
742 fair::mq::Channel mControlChannel;
743
744 // some information specific to use case when we have a collision context
745 o2::steer::DigitizationContext* mCollissionContext = nullptr;
746 std::unordered_map<int, int> mEventID_to_CollID;
747 std::string mEmbeddIntoPrefix;
748
749 TRandom3 mSeedGenerator;
750};
751
752} // namespace devices
753} // namespace o2
754
755#endif
Definition of the Stack class.
std::ostringstream debug
uint64_t vertex
Definition RawEventData.h:9
int16_t time
Definition RawEventData.h:4
int32_t i
Definition of the MagF class.
Methods to create simulation mag field.
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
Definition SimConfig.h:133
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
Definition SimConfig.h:111
std::string getOutPrefix() const
Definition SimConfig.h:163
void setExternalMode(bool m)
Definition Stack.h:205
void Reset() override
Resets arrays and stack and deletes particles and tracks.
Definition Stack.cxx:588
const std::vector< TParticle > & getPrimaries() const
Definition Stack.h:187
bool ReInit(o2::conf::SimReconfigData const &reconfig)
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
static void setTotalNEvents(unsigned int &n)
Definition Generator.h:93
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
void setEmbedIndex(int idx)
sets the embedding index
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
std::vector< std::vector< o2::steer::EventPart > > & getEventParts(bool withQED=false)
std::vector< std::string > const & getSimPrefixes() const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
Definition RngHelper.h:37
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
Definition SimConfig.h:58
std::string mGenerator
Definition SimConfig.h:55
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"