14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
36#include <TGeoGlobalMagField.h>
40#include <TStopwatch.h>
62 mUseFixedChunkSeed = getenv(
"ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
63 if (mUseFixedChunkSeed) {
64 mFixedChunkSeed = atol(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
72 if (mGeneratorThread.joinable()) {
73 mGeneratorThread.join();
75 if (mControlThread.joinable()) {
76 mControlThread.join();
87 const auto& conf = mSimConfig;
89 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
90 ccdbmgr.setTimestamp(conf.getTimestamp());
93 unsigned int nTotalEvents = conf.getNEvents();
97 if (TGeoGlobalMagField::Instance()->GetField() ==
nullptr) {
99 TGeoGlobalMagField::Instance()->Lock();
111 if (conf.getGenerator().compare(
"extkin") != 0 || conf.getGenerator().compare(
"extkinO2") != 0) {
112 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
113 if (iter != mPrimGeneratorCache.end()) {
114 mPrimGen = iter->second.get();
115 LOG(info) <<
"Found cached generator for " << conf.getGenerator();
119 if (mPrimGen ==
nullptr) {
124 auto vtxMode = conf.getVertexMode();
126 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
128 }
else if (vtxMode == VertexMode::kCCDB) {
131 }
else if (vtxMode == VertexMode::kCollCxt) {
134 LOG(fatal) <<
"Unsupported vertex mode";
137 auto embedinto_filename = conf.getEmbedIntoFileName();
138 if (!embedinto_filename.empty()) {
141 std::regex re(R
"((.*/)?([^/]+)_MCHeader\.root$)");
144 if (std::regex_search(embedinto_filename,
match, re)) {
145 std::cout <<
"Extracted embedding prefix : " <<
match[2] <<
'\n';
146 mEmbeddIntoPrefix =
match[2];
148 LOG(fatal) <<
"Embedding asked but no suitable embedding prefix extractable from " << embedinto_filename;
155 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
156 ptr_wrapper.reset(mPrimGen);
157 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
159 mPrimGen->SetEvent(&mEventHeader);
163 auto collContextFileName = collContextFileName_PrefixPair.first;
164 if (collContextFileName.size() > 0) {
165 LOG(info) <<
"Simulation has collission context";
167 if (mCollissionContext) {
169 LOG(info) <<
"We found " << vertices.size() <<
" vertices included ";
172 const auto source = mCollissionContext->
findSimPrefix(collContextFileName_PrefixPair.second);
174 LOG(fatal) <<
"Wrong simulation prefix";
176 mEventID_to_CollID.clear();
181 LOG(info) <<
"Generator initialization took " << timer.CpuTime() <<
"s";
182 if (mMaxEvents > 0) {
190 bool changeState =
true;
191 LOG(info) <<
"Event generation started ";
199 int retry_counter = 0;
200 const int MAX_RETRY = 100;
203 const auto& conf = mSimConfig;
207 if (vertices.size() > 0) {
208 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
209 auto&
vertex = vertices.at(collisionindex);
210 LOG(info) <<
"Setting vertex " <<
vertex <<
" for event " << mEventCounter <<
" for prefix " << mSimConfig.
getOutPrefix() <<
" from CollContext";
214 auto& collisionParts = mCollissionContext->
getEventParts()[collisionindex];
215 int background_index = -1;
218 for (
auto& part : collisionParts) {
219 if (mCollissionContext->
getSimPrefixes()[part.sourceID] == mEmbeddIntoPrefix) {
220 background_index = part.entryID;
221 LOG(info) <<
"Setting embedding index to " << background_index;
232 if (retry_counter > MAX_RETRY) {
233 LOG(warn) <<
"Not able to generate a non-empty event in " << MAX_RETRY <<
" trials";
239 }
catch (std::exception
const& e) {
240 LOG(error) <<
" Exception occurred during event gen " << e.what();
243 LOG(info) <<
"Event generation took " << timer.CpuTime() <<
"s"
244 <<
" and produced " << mStack->
getPrimaries().size() <<
" primaries ";
253 static std::vector<std::thread> threads;
254 LOG(info) <<
"LAUNCHING STATUS THREAD";
255 auto lambda = [
this]() {
257 auto& channel = GetChannels().at(
"o2sim-primserv-info").at(0);
258 if (!channel.IsValid()) {
259 LOG(error) <<
"channel primserv-info not valid";
261 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(-1));
263 if (channel.Receive(request,
timeout) > 0) {
264 LOG(info) <<
"INFO REQUEST RECEIVED";
266 LOG(info) <<
"Received status request";
268 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((
int)mState.load()));
269 if (channel.Send(reply) > 0) {
270 LOG(info) <<
"Send status successful";
275 LOG(fatal) <<
"UNKNOWN REQUEST";
276 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage(404));
281 mInfoThreadStopped =
true;
283 threads.push_back(std::thread(lambda));
284 threads.back().detach();
290 fair::Logger::OnFatal([] {
throw fair::FatalException(
"Fatal error occured. Exiting without core dump..."); });
295 LOG(info) <<
"Init Server device ";
298 auto& vm = GetConfig()->GetVarMap();
300 if (vm.count(
"isRun5")) {
303 conf.resetFromParsedMap(vm);
311 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
312 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
321 LOG(info) <<
"ENGINE SET TO " << vm[
"mcEngine"].as<std::string>();
323 mChunkGranularity = vm[
"chunkSize"].as<
unsigned int>();
324 LOG(info) <<
"CHUNK SIZE SET TO " << mChunkGranularity;
327 mInitialSeed = vm[
"seed"].as<ULong_t>();
329 mSeedGenerator.SetSeed(mInitialSeed);
330 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
332 mMaxEvents = conf.getNEvents();
335 ROOT::EnableThreadSafety();
343 if (mGeneratorThread.joinable()) {
345 mGeneratorThread.join();
346 }
catch (std::exception
const& e) {
347 LOG(warn) <<
"Exception during thread join ..ignoring";
352 auto pipeenv = getenv(
"ALICE_O2SIMSERVERTODRIVER_PIPE");
354 mPipeToDriver = atoi(pipeenv);
355 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
357 LOG(info) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
360 mAsService = vm[
"asservice"].as<
bool>();
362 mControlChannel = fair::mq::Channel{
"o2sim-control",
"sub", fTransportFactory};
363 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
364 if (!controlsocketname) {
365 LOG(fatal) <<
"Internal error: Socketname for control input missing";
367 mControlChannel.Connect(std::string(controlsocketname));
368 mControlChannel.Validate();
371 if (mMaxEvents <= 0) {
380 if (mPipeToDriver != -1) {
382 if (write(mPipeToDriver, &
message,
sizeof(
int))) {
390 LOG(info) <<
"ReInit Server device ";
406 mSeedGenerator.SetSeed(mInitialSeed);
407 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
419 mNeedNewEvent =
true;
421 if (mGeneratorThread.joinable()) {
423 mGeneratorThread.join();
424 }
catch (std::exception
const& e) {
425 LOG(warn) <<
"Exception during thread join ..ignoring";
437 LOG(info) <<
"Received config request";
442 tmsg->WriteObjectAny((
void*)&confdata, TClass::GetClass(
typeid(confdata)));
444 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
446 std::unique_ptr<fair::mq::Message>
message(
447 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
450 if (channel.Send(
message) > 0) {
451 LOG(info) <<
"config reply send ";
461 if (mWaitingControlInput.load() == 0) {
462 if (mControlThread.joinable()) {
463 mControlThread.join();
469 auto& channel = GetChannels().at(
"primary-get").at(0);
471 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
472 auto bytes = channel.Receive(request);
474 LOG(error) <<
"Some error/interrupt occurred on socket during receive";
475 if (NewStatePending()) {
486 LOG(
debug) <<
"PARTICLE REQUEST IN STATE " << PrimStateToString[(
int)mState.load()] <<
" from " <<
r.workerid <<
":" <<
r.requestid;
488 auto prestate = mState.load();
500 auto time = timer.CpuTime();
507 while (!mInfoThreadStopped) {
508 LOG(info) <<
"Waiting info thread";
509 using namespace std::chrono_literals;
510 std::this_thread::sleep_for(100ms);
514 bool HandleRequest(fair::mq::MessagePtr& request,
int , fair::mq::Channel& channel)
519 bool workavailable =
true;
520 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
521 workavailable =
false;
525 workavailable =
false;
529 fair::mq::Parts reply;
530 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
531 reply.AddPart(std::move(headermsg));
533 LOG(
debug) <<
"Received request for work " << mEventCounter <<
" " << mMaxEvents <<
" " << mNeedNewEvent <<
" available " << workavailable;
538 if (mGeneratorThread.joinable()) {
540 mGeneratorThread.join();
541 }
catch (std::exception
const& e) {
542 LOG(warn) <<
"Exception during thread join ..ignoring";
547 LOG(info) <<
"Waiting for event generation do become fully available";
550 mNeedNewEvent =
false;
556 auto numberofparts = (
int)std::ceil(prims.size() / (1. * mChunkGranularity));
558 numberofparts = std::max(1, numberofparts);
560 LOG(
debug) <<
"Have " << prims.size() <<
" " << numberofparts;
564 i.
eventID = workavailable ? mEventCounter : -1;
565 i.maxEvents = mMaxEvents;
566 i.part = mPartCounter + 1;
567 i.nparts = numberofparts;
570 const uint64_t drawnSeed = (uint64_t)(
static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
571 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
572 i.index =
m.mParticles.size();
573 i.mMCEventHeader = mEventHeader;
576 int endindex = prims.size() - mPartCounter * mChunkGranularity;
577 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
578 LOG(
debug) <<
"indices " << startindex <<
" " << endindex;
580 if (startindex < 0) {
588 m.mParticles.emplace_back(prims[
index]);
591 LOG(info) <<
"Sending " <<
m.mParticles.size() <<
" particles";
592 LOG(info) <<
"treating ev " << mEventCounter <<
" part " <<
i.part <<
" out of " <<
i.nparts;
595 if (mPipeToDriver != -1 &&
i.part == 1 && workavailable) {
596 if (write(mPipeToDriver, &mEventCounter,
sizeof(mEventCounter))) {
601 if (mPartCounter == numberofparts) {
602 mNeedNewEvent =
true;
604 if (mEventCounter < mMaxEvents) {
610 tmsg->WriteObjectAny((
void*)&
m, TClass::GetClass(
"o2::data::PrimaryChunk"));
612 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
614 std::unique_ptr<fair::mq::Message>
message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
616 reply.AddPart(std::move(
message));
622 auto code = Send(reply,
"primary-get", 0, 5000);
624 auto time = timer.CpuTime();
627 return workavailable;
629 LOG(warn) <<
"Sending process had problems. Return code : " << code <<
" time " <<
time <<
"s";
636 LOG(info) <<
message <<
" CHANGING STATE TO " << PrimStateToString[(
int)to];
642 mWaitingControlInput.store(1);
644 mWaitingControlInput.store(0);
651 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
655 LOG(info) <<
"WAITING FOR CONTROL INPUT";
656 if (mControlChannel.Receive(reply) > 0) {
658 auto data = reply->GetData();
659 auto size = reply->GetSize();
661 std::string command(
reinterpret_cast<char const*
>(
data),
size);
662 LOG(info) <<
"message: " << command;
666 LOG(info) <<
"Processing " << reconfig.
nEvents <<
" new events";
668 LOG(info) <<
"REINIT START";
670 LOG(info) <<
"REINIT DONE";
671 }
catch (std::exception e) {
672 LOG(info) <<
"Exception during reinit";
675 LOG(info) <<
"NOTHING RECEIVED";
682 mWaitingControlInput.store(0);
690 int mChunkGranularity = 500;
691 int mPartCounter = 0;
692 bool mNeedNewEvent =
true;
694 ULong_t mInitialSeed = 0;
695 bool mUseFixedChunkSeed =
false;
696 ULong_t mFixedChunkSeed = 0;
697 int mPipeToDriver = -1;
698 int mEventCounter = 0;
700 std::thread mGeneratorThread;
702 std::thread mControlThread;
710 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
713 std::atomic<int> mWaitingControlInput{0};
714 std::atomic<bool> mInfoThreadStopped{
false};
716 bool mAsService =
false;
719 fair::mq::Channel mControlChannel;
723 std::unordered_map<int, int> mEventID_to_CollID;
724 std::string mEmbeddIntoPrefix;
726 TRandom3 mSeedGenerator;
Definition of the Stack class.
Definition of the MagF class.
Methods to create simulation mag field.
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const ¶msList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
std::string getOutPrefix() const
void setExternalMode(bool m)
void Reset() override
Resets arrays and stack and deletes particles and tracks.
const std::vector< TParticle > & getPrimaries() const
bool ReInit(o2::conf::SimReconfigData const &reconfig)
bool ConditionalRun() override
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
void waitForControlInput()
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
O2PrimaryServerDevice()
constructor
static void setTotalNEvents(unsigned int &n)
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
Bool_t embedInto(TString fname)
void setEmbedIndex(int idx)
sets the embedding index
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
std::vector< std::vector< o2::steer::EventPart > > & getEventParts(bool withQED=false)
std::vector< std::string > const & getSimPrefixes() const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
bool match(const std::vector< std::string > &queries, const char *pattern)
GLsizei GLsizei GLchar * source
GLuint GLsizei const GLchar * message
GLbitfield GLuint64 timeout
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
TODO: Make this a base class of SimConfigData?
std::string extKinfileName
std::string keyValueTokens
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"