14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
36#include <TGeoGlobalMagField.h>
40#include <TStopwatch.h>
61 mUseFixedChunkSeed = getenv(
"ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
62 if (mUseFixedChunkSeed) {
63 mFixedChunkSeed = atol(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
71 if (mGeneratorThread.joinable()) {
72 mGeneratorThread.join();
74 if (mControlThread.joinable()) {
75 mControlThread.join();
86 const auto& conf = mSimConfig;
88 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
89 ccdbmgr.setTimestamp(conf.getTimestamp());
92 unsigned int nTotalEvents = conf.getNEvents();
96 if (TGeoGlobalMagField::Instance()->GetField() ==
nullptr) {
98 TGeoGlobalMagField::Instance()->Lock();
110 if (conf.getGenerator().compare(
"extkin") != 0 || conf.getGenerator().compare(
"extkinO2") != 0) {
111 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
112 if (iter != mPrimGeneratorCache.end()) {
113 mPrimGen = iter->second.get();
114 LOG(info) <<
"Found cached generator for " << conf.getGenerator();
118 if (mPrimGen ==
nullptr) {
123 auto vtxMode = conf.getVertexMode();
125 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
127 }
else if (vtxMode == VertexMode::kCCDB) {
130 }
else if (vtxMode == VertexMode::kCollCxt) {
133 LOG(fatal) <<
"Unsupported vertex mode";
136 auto embedinto_filename = conf.getEmbedIntoFileName();
137 if (!embedinto_filename.empty()) {
143 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
144 ptr_wrapper.reset(mPrimGen);
145 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
147 mPrimGen->SetEvent(&mEventHeader);
151 auto collContextFileName = collContextFileName_PrefixPair.first;
152 if (collContextFileName.size() > 0) {
153 LOG(info) <<
"Simulation has collission context";
155 if (mCollissionContext) {
157 LOG(info) <<
"We found " << vertices.size() <<
" vertices included ";
160 const auto source = mCollissionContext->
findSimPrefix(collContextFileName_PrefixPair.second);
162 LOG(fatal) <<
"Wrong simulation prefix";
164 mEventID_to_CollID.clear();
169 LOG(info) <<
"Generator initialization took " << timer.CpuTime() <<
"s";
170 if (mMaxEvents > 0) {
178 bool changeState =
true;
179 LOG(info) <<
"Event generation started ";
187 int retry_counter = 0;
188 const int MAX_RETRY = 100;
191 const auto& conf = mSimConfig;
195 if (vertices.size() > 0) {
196 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
197 auto&
vertex = vertices.at(collisionindex);
198 LOG(info) <<
"Setting vertex " <<
vertex <<
" for event " << mEventCounter <<
" for prefix " << mSimConfig.
getOutPrefix() <<
" from CollContext";
207 if (retry_counter > MAX_RETRY) {
208 LOG(warn) <<
"Not able to generate a non-empty event in " << MAX_RETRY <<
" trials";
214 }
catch (std::exception
const& e) {
215 LOG(error) <<
" Exception occurred during event gen " << e.what();
218 LOG(info) <<
"Event generation took " << timer.CpuTime() <<
"s"
219 <<
" and produced " << mStack->
getPrimaries().size() <<
" primaries ";
228 static std::vector<std::thread> threads;
229 LOG(info) <<
"LAUNCHING STATUS THREAD";
230 auto lambda = [
this]() {
232 auto& channel = GetChannels().at(
"o2sim-primserv-info").at(0);
233 if (!channel.IsValid()) {
234 LOG(error) <<
"channel primserv-info not valid";
236 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(-1));
238 if (channel.Receive(request,
timeout) > 0) {
239 LOG(info) <<
"INFO REQUEST RECEIVED";
241 LOG(info) <<
"Received status request";
243 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((
int)mState.load()));
244 if (channel.Send(reply) > 0) {
245 LOG(info) <<
"Send status successful";
250 LOG(fatal) <<
"UNKNOWN REQUEST";
251 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage(404));
256 mInfoThreadStopped =
true;
258 threads.push_back(std::thread(lambda));
259 threads.back().detach();
265 fair::Logger::OnFatal([] {
throw fair::FatalException(
"Fatal error occured. Exiting without core dump..."); });
270 LOG(info) <<
"Init Server device ";
273 auto& vm = GetConfig()->GetVarMap();
275 if (vm.count(
"isRun5")) {
278 conf.resetFromParsedMap(vm);
286 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
287 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
296 LOG(info) <<
"ENGINE SET TO " << vm[
"mcEngine"].as<std::string>();
298 mChunkGranularity = vm[
"chunkSize"].as<
unsigned int>();
299 LOG(info) <<
"CHUNK SIZE SET TO " << mChunkGranularity;
302 mInitialSeed = vm[
"seed"].as<ULong_t>();
304 mSeedGenerator.SetSeed(mInitialSeed);
305 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
307 mMaxEvents = conf.getNEvents();
310 ROOT::EnableThreadSafety();
318 if (mGeneratorThread.joinable()) {
320 mGeneratorThread.join();
321 }
catch (std::exception
const& e) {
322 LOG(warn) <<
"Exception during thread join ..ignoring";
327 auto pipeenv = getenv(
"ALICE_O2SIMSERVERTODRIVER_PIPE");
329 mPipeToDriver = atoi(pipeenv);
330 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
332 LOG(info) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
335 mAsService = vm[
"asservice"].as<
bool>();
337 mControlChannel = fair::mq::Channel{
"o2sim-control",
"sub", fTransportFactory};
338 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
339 if (!controlsocketname) {
340 LOG(fatal) <<
"Internal error: Socketname for control input missing";
342 mControlChannel.Connect(std::string(controlsocketname));
343 mControlChannel.Validate();
346 if (mMaxEvents <= 0) {
355 if (mPipeToDriver != -1) {
357 if (write(mPipeToDriver, &
message,
sizeof(
int))) {
365 LOG(info) <<
"ReInit Server device ";
381 mSeedGenerator.SetSeed(mInitialSeed);
382 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
394 mNeedNewEvent =
true;
396 if (mGeneratorThread.joinable()) {
398 mGeneratorThread.join();
399 }
catch (std::exception
const& e) {
400 LOG(warn) <<
"Exception during thread join ..ignoring";
412 LOG(info) <<
"Received config request";
417 tmsg->WriteObjectAny((
void*)&confdata, TClass::GetClass(
typeid(confdata)));
419 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
421 std::unique_ptr<fair::mq::Message>
message(
422 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
425 if (channel.Send(
message) > 0) {
426 LOG(info) <<
"config reply send ";
436 if (mWaitingControlInput.load() == 0) {
437 if (mControlThread.joinable()) {
438 mControlThread.join();
444 auto& channel = GetChannels().at(
"primary-get").at(0);
446 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
447 auto bytes = channel.Receive(request);
449 LOG(error) <<
"Some error/interrupt occurred on socket during receive";
450 if (NewStatePending()) {
461 LOG(
debug) <<
"PARTICLE REQUEST IN STATE " << PrimStateToString[(
int)mState.load()] <<
" from " <<
r.workerid <<
":" <<
r.requestid;
463 auto prestate = mState.load();
475 auto time = timer.CpuTime();
482 while (!mInfoThreadStopped) {
483 LOG(info) <<
"Waiting info thread";
484 using namespace std::chrono_literals;
485 std::this_thread::sleep_for(100ms);
489 bool HandleRequest(fair::mq::MessagePtr& request,
int , fair::mq::Channel& channel)
494 bool workavailable =
true;
495 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
496 workavailable =
false;
500 workavailable =
false;
504 fair::mq::Parts reply;
505 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
506 reply.AddPart(std::move(headermsg));
508 LOG(
debug) <<
"Received request for work " << mEventCounter <<
" " << mMaxEvents <<
" " << mNeedNewEvent <<
" available " << workavailable;
513 if (mGeneratorThread.joinable()) {
515 mGeneratorThread.join();
516 }
catch (std::exception
const& e) {
517 LOG(warn) <<
"Exception during thread join ..ignoring";
522 LOG(info) <<
"Waiting for event generation do become fully available";
525 mNeedNewEvent =
false;
531 auto numberofparts = (
int)std::ceil(prims.size() / (1. * mChunkGranularity));
533 numberofparts = std::max(1, numberofparts);
535 LOG(
debug) <<
"Have " << prims.size() <<
" " << numberofparts;
539 i.
eventID = workavailable ? mEventCounter : -1;
540 i.maxEvents = mMaxEvents;
541 i.part = mPartCounter + 1;
542 i.nparts = numberofparts;
545 const uint64_t drawnSeed = (uint64_t)(
static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
546 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
547 i.index =
m.mParticles.size();
548 i.mMCEventHeader = mEventHeader;
551 int endindex = prims.size() - mPartCounter * mChunkGranularity;
552 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
553 LOG(
debug) <<
"indices " << startindex <<
" " << endindex;
555 if (startindex < 0) {
563 m.mParticles.emplace_back(prims[
index]);
566 LOG(info) <<
"Sending " <<
m.mParticles.size() <<
" particles";
567 LOG(info) <<
"treating ev " << mEventCounter <<
" part " <<
i.part <<
" out of " <<
i.nparts;
570 if (mPipeToDriver != -1 &&
i.part == 1 && workavailable) {
571 if (write(mPipeToDriver, &mEventCounter,
sizeof(mEventCounter))) {
576 if (mPartCounter == numberofparts) {
577 mNeedNewEvent =
true;
579 if (mEventCounter < mMaxEvents) {
585 tmsg->WriteObjectAny((
void*)&
m, TClass::GetClass(
"o2::data::PrimaryChunk"));
587 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
589 std::unique_ptr<fair::mq::Message>
message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
591 reply.AddPart(std::move(
message));
597 auto code = Send(reply,
"primary-get", 0, 5000);
599 auto time = timer.CpuTime();
602 return workavailable;
604 LOG(warn) <<
"Sending process had problems. Return code : " << code <<
" time " <<
time <<
"s";
611 LOG(info) <<
message <<
" CHANGING STATE TO " << PrimStateToString[(
int)to];
617 mWaitingControlInput.store(1);
619 mWaitingControlInput.store(0);
626 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
630 LOG(info) <<
"WAITING FOR CONTROL INPUT";
631 if (mControlChannel.Receive(reply) > 0) {
633 auto data = reply->GetData();
634 auto size = reply->GetSize();
636 std::string command(
reinterpret_cast<char const*
>(
data),
size);
637 LOG(info) <<
"message: " << command;
641 LOG(info) <<
"Processing " << reconfig.
nEvents <<
" new events";
643 LOG(info) <<
"REINIT START";
645 LOG(info) <<
"REINIT DONE";
646 }
catch (std::exception e) {
647 LOG(info) <<
"Exception during reinit";
650 LOG(info) <<
"NOTHING RECEIVED";
657 mWaitingControlInput.store(0);
665 int mChunkGranularity = 500;
666 int mPartCounter = 0;
667 bool mNeedNewEvent =
true;
669 ULong_t mInitialSeed = 0;
670 bool mUseFixedChunkSeed =
false;
671 ULong_t mFixedChunkSeed = 0;
672 int mPipeToDriver = -1;
673 int mEventCounter = 0;
675 std::thread mGeneratorThread;
677 std::thread mControlThread;
685 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
688 std::atomic<int> mWaitingControlInput{0};
689 std::atomic<bool> mInfoThreadStopped{
false};
691 bool mAsService =
false;
694 fair::mq::Channel mControlChannel;
698 std::unordered_map<int, int> mEventID_to_CollID;
700 TRandom3 mSeedGenerator;
Definition of the Stack class.
Definition of the MagF class.
Methods to create simulation mag field.
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const ¶msList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
std::string getOutPrefix() const
void setExternalMode(bool m)
void Reset() override
Resets arrays and stack and deletes particles and tracks.
const std::vector< TParticle > & getPrimaries() const
bool ReInit(o2::conf::SimReconfigData const &reconfig)
bool ConditionalRun() override
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
void waitForControlInput()
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
O2PrimaryServerDevice()
constructor
static void setTotalNEvents(unsigned int &n)
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
Bool_t embedInto(TString fname)
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
GLsizei GLsizei GLchar * source
GLuint GLsizei const GLchar * message
GLbitfield GLuint64 timeout
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
TODO: Make this a base class of SimConfigData?
std::string extKinfileName
std::string keyValueTokens
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"