14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
36#include <TGeoGlobalMagField.h>
40#include <TStopwatch.h>
62 mUseFixedChunkSeed = getenv(
"ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
63 if (mUseFixedChunkSeed) {
64 mFixedChunkSeed = atol(getenv(
"ALICEO2_O2SIM_SUBEVENTSEED"));
72 if (mGeneratorThread.joinable()) {
73 mGeneratorThread.join();
75 if (mControlThread.joinable()) {
76 mControlThread.join();
87 const auto& conf = mSimConfig;
89 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
90 ccdbmgr.setTimestamp(conf.getTimestamp());
93 unsigned int nTotalEvents = conf.getNEvents();
97 if (TGeoGlobalMagField::Instance()->GetField() ==
nullptr) {
99 TGeoGlobalMagField::Instance()->Lock();
111 if (conf.getGenerator().compare(
"extkin") != 0 || conf.getGenerator().compare(
"extkinO2") != 0) {
112 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
113 if (iter != mPrimGeneratorCache.end()) {
114 mPrimGen = iter->second.get();
115 LOG(info) <<
"Found cached generator for " << conf.getGenerator();
119 if (mPrimGen ==
nullptr) {
124 auto vtxMode = conf.getVertexMode();
126 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
128 }
else if (vtxMode == VertexMode::kCCDB) {
131 }
else if (vtxMode == VertexMode::kCollCxt) {
134 LOG(fatal) <<
"Unsupported vertex mode";
137 auto embedinto_filename = conf.getEmbedIntoFileName();
138 if (!embedinto_filename.empty()) {
141 std::regex re(R
"((.*/)?([^/]+)_MCHeader\.root$)");
144 if (std::regex_search(embedinto_filename,
match, re)) {
145 std::cout <<
"Extracted embedding prefix : " <<
match[2] <<
'\n';
146 mEmbeddIntoPrefix =
match[2];
148 LOG(fatal) <<
"Embedding asked but no suitable embedding prefix extractable from " << embedinto_filename;
155 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
156 ptr_wrapper.reset(mPrimGen);
157 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
159 mPrimGen->SetEvent(&mEventHeader);
163 auto collContextFileName = collContextFileName_PrefixPair.first;
164 if (collContextFileName.size() > 0) {
165 LOG(info) <<
"Simulation has collission context";
167 if (mCollissionContext) {
169 LOG(info) <<
"We found " << vertices.size() <<
" vertices included ";
172 const auto source = mCollissionContext->
findSimPrefix(collContextFileName_PrefixPair.second);
174 LOG(fatal) <<
"Wrong simulation prefix";
176 mEventID_to_CollID.clear();
181 LOG(info) <<
"Generator initialization took " << timer.CpuTime() <<
"s";
182 if (mMaxEvents > 0) {
190 bool changeState =
true;
191 LOG(info) <<
"Event generation started ";
199 int retry_counter = 0;
200 const int MAX_RETRY = 100;
203 const auto& conf = mSimConfig;
207 if (vertices.size() > 0) {
208 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
209 auto&
vertex = vertices.at(collisionindex);
210 LOG(info) <<
"Setting vertex " <<
vertex <<
" for event " << mEventCounter <<
" for prefix " << mSimConfig.
getOutPrefix() <<
" from CollContext";
214 auto& collisionParts = mCollissionContext->
getEventParts()[collisionindex];
215 int background_index = -1;
218 for (
auto& part : collisionParts) {
219 if (mCollissionContext->
getSimPrefixes()[part.sourceID] == mEmbeddIntoPrefix) {
220 background_index = part.entryID;
221 LOG(info) <<
"Setting embedding index to " << background_index;
232 if (retry_counter > MAX_RETRY) {
233 LOG(warn) <<
"Not able to generate a non-empty event in " << MAX_RETRY <<
" trials";
239 }
catch (std::exception
const& e) {
240 LOG(error) <<
" Exception occurred during event gen " << e.what();
243 LOG(info) <<
"Event generation took " << timer.CpuTime() <<
"s"
244 <<
" and produced " << mStack->
getPrimaries().size() <<
" primaries ";
253 static std::vector<std::thread> threads;
254 auto sendErrorReply = [](fair::mq::Channel& channel) {
255 LOG(error) <<
"UNKNOWN REQUEST";
256 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((
int)(404)));
260 LOG(info) <<
"LAUNCHING STATUS THREAD";
261 auto lambda = [
this, sendErrorReply]() {
262 bool canShutdown{
false};
265 auto& channel = GetChannels().at(
"o2sim-primserv-info").at(0);
266 if (!channel.IsValid()) {
267 LOG(error) <<
"channel primserv-info not valid";
269 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((
int)(-1)));
271 if (channel.Receive(request,
timeout) > 0) {
273 if (request->GetSize() !=
sizeof(request_payload)) {
274 LOG(error) <<
"Obtained request with unexpected payload size";
275 sendErrorReply(channel);
278 memcpy(&request_payload, request->GetData(),
sizeof(request_payload));
281 LOG(info) <<
"Received status request";
283 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((
int)mState.load()));
284 if (channel.Send(reply) > 0) {
285 LOG(info) <<
"Send status successful";
290 LOG(info) <<
"Got info that we may shutdown";
291 std::unique_ptr<fair::mq::Message> ack(channel.NewSimpleMessage(200));
295 sendErrorReply(channel);
299 mInfoThreadStopped =
true;
301 threads.push_back(std::thread(lambda));
302 threads.back().detach();
308 fair::Logger::OnFatal([] {
throw fair::FatalException(
"Fatal error occured. Exiting without core dump..."); });
313 LOG(info) <<
"Init Server device ";
316 auto& vm = GetConfig()->GetVarMap();
318 if (vm.count(
"isRun5")) {
321 conf.resetFromParsedMap(vm);
329 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
330 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
339 LOG(info) <<
"ENGINE SET TO " << vm[
"mcEngine"].as<std::string>();
341 mChunkGranularity = vm[
"chunkSize"].as<
unsigned int>();
342 LOG(info) <<
"CHUNK SIZE SET TO " << mChunkGranularity;
345 mInitialSeed = vm[
"seed"].as<ULong_t>();
347 mSeedGenerator.SetSeed(mInitialSeed);
348 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
350 mMaxEvents = conf.getNEvents();
353 ROOT::EnableThreadSafety();
361 if (mGeneratorThread.joinable()) {
363 mGeneratorThread.join();
364 }
catch (std::exception
const& e) {
365 LOG(warn) <<
"Exception during thread join ..ignoring";
370 auto pipeenv = getenv(
"ALICE_O2SIMSERVERTODRIVER_PIPE");
372 mPipeToDriver = atoi(pipeenv);
373 LOG(info) <<
"ASSIGNED PIPE HANDLE " << mPipeToDriver;
375 LOG(info) <<
"DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
378 mAsService = vm[
"asservice"].as<
bool>();
380 mControlChannel = fair::mq::Channel{
"o2sim-control",
"sub", fTransportFactory};
381 auto controlsocketname = getenv(
"ALICE_O2SIMCONTROL");
382 if (!controlsocketname) {
383 LOG(fatal) <<
"Internal error: Socketname for control input missing";
385 mControlChannel.Connect(std::string(controlsocketname));
386 mControlChannel.Validate();
389 if (mMaxEvents <= 0) {
398 if (mPipeToDriver != -1) {
400 if (write(mPipeToDriver, &
message,
sizeof(
int))) {
408 LOG(info) <<
"ReInit Server device ";
424 mSeedGenerator.SetSeed(mInitialSeed);
425 LOG(info) <<
"RNG INITIAL SEED " << mInitialSeed;
437 mNeedNewEvent =
true;
439 if (mGeneratorThread.joinable()) {
441 mGeneratorThread.join();
442 }
catch (std::exception
const& e) {
443 LOG(warn) <<
"Exception during thread join ..ignoring";
455 LOG(info) <<
"Received config request";
460 tmsg->WriteObjectAny((
void*)&confdata, TClass::GetClass(
typeid(confdata)));
462 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
464 std::unique_ptr<fair::mq::Message>
message(
465 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
468 if (channel.Send(
message) > 0) {
469 LOG(info) <<
"config reply send ";
472 LOG(error) <<
"Failure sending config reply ";
481 if (mWaitingControlInput.load() == 0) {
482 if (mControlThread.joinable()) {
483 mControlThread.join();
489 auto& channel = GetChannels().at(
"primary-get").at(0);
491 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
492 auto bytes = channel.Receive(request);
494 LOG(error) <<
"Some error/interrupt occurred on socket during receive";
495 if (NewStatePending()) {
506 LOG(
debug) <<
"PARTICLE REQUEST IN STATE " << PrimStateToString[(
int)mState.load()] <<
" from " <<
r.workerid <<
":" <<
r.requestid;
508 auto prestate = mState.load();
520 auto time = timer.CpuTime();
530 while (!mInfoThreadStopped) {
531 LOG(info) <<
"Waiting info thread";
532 using namespace std::chrono_literals;
533 std::this_thread::sleep_for(1000ms);
537 bool HandleRequest(fair::mq::MessagePtr& request,
int , fair::mq::Channel& channel)
542 bool workavailable =
true;
543 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
544 workavailable =
false;
548 workavailable =
false;
552 fair::mq::Parts reply;
553 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
554 reply.AddPart(std::move(headermsg));
556 LOG(
debug) <<
"Received request for work " << mEventCounter <<
" " << mMaxEvents <<
" " << mNeedNewEvent <<
" available " << workavailable;
561 if (mGeneratorThread.joinable()) {
563 mGeneratorThread.join();
564 }
catch (std::exception
const& e) {
565 LOG(warn) <<
"Exception during thread join ..ignoring";
570 LOG(info) <<
"Waiting for event generation do become fully available";
573 mNeedNewEvent =
false;
579 auto numberofparts = (
int)std::ceil(prims.size() / (1. * mChunkGranularity));
581 numberofparts = std::max(1, numberofparts);
583 LOG(
debug) <<
"Have " << prims.size() <<
" " << numberofparts;
587 i.
eventID = workavailable ? mEventCounter : -1;
588 i.maxEvents = mMaxEvents;
589 i.part = mPartCounter + 1;
590 i.nparts = numberofparts;
593 const uint64_t drawnSeed = (uint64_t)(
static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
594 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
595 i.index =
m.mParticles.size();
596 i.mMCEventHeader = mEventHeader;
599 int endindex = prims.size() - mPartCounter * mChunkGranularity;
600 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
601 LOG(
debug) <<
"indices " << startindex <<
" " << endindex;
603 if (startindex < 0) {
611 m.mParticles.emplace_back(prims[
index]);
614 LOG(info) <<
"Sending " <<
m.mParticles.size() <<
" particles";
615 LOG(info) <<
"treating ev " << mEventCounter <<
" part " <<
i.part <<
" out of " <<
i.nparts;
618 if (mPipeToDriver != -1 &&
i.part == 1 && workavailable) {
619 if (write(mPipeToDriver, &mEventCounter,
sizeof(mEventCounter))) {
624 if (mPartCounter == numberofparts) {
625 mNeedNewEvent =
true;
627 if (mEventCounter < mMaxEvents) {
633 tmsg->WriteObjectAny((
void*)&
m, TClass::GetClass(
"o2::data::PrimaryChunk"));
635 auto free_tmessage = [](
void*
data,
void* hint) {
delete static_cast<TMessage*
>(hint); };
637 std::unique_ptr<fair::mq::Message>
message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
639 reply.AddPart(std::move(
message));
645 auto code = Send(reply,
"primary-get", 0, 5000);
647 auto time = timer.CpuTime();
650 return workavailable;
652 LOG(warn) <<
"Sending process had problems. Return code : " << code <<
" time " <<
time <<
"s";
659 LOG(info) <<
message <<
" CHANGING STATE TO " << PrimStateToString[(
int)to];
665 mWaitingControlInput.store(1);
667 mWaitingControlInput.store(0);
674 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
678 LOG(info) <<
"WAITING FOR CONTROL INPUT";
679 if (mControlChannel.Receive(reply) > 0) {
681 auto data = reply->GetData();
682 auto size = reply->GetSize();
684 std::string command(
reinterpret_cast<char const*
>(
data),
size);
685 LOG(info) <<
"message: " << command;
689 LOG(info) <<
"Processing " << reconfig.
nEvents <<
" new events";
691 LOG(info) <<
"REINIT START";
693 LOG(info) <<
"REINIT DONE";
694 }
catch (std::exception e) {
695 LOG(info) <<
"Exception during reinit";
698 LOG(info) <<
"NOTHING RECEIVED";
705 mWaitingControlInput.store(0);
713 int mChunkGranularity = 500;
714 int mPartCounter = 0;
715 bool mNeedNewEvent =
true;
717 ULong_t mInitialSeed = 0;
718 bool mUseFixedChunkSeed =
false;
719 ULong_t mFixedChunkSeed = 0;
720 int mPipeToDriver = -1;
721 int mEventCounter = 0;
723 std::thread mGeneratorThread;
725 std::thread mControlThread;
733 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
736 std::atomic<int> mWaitingControlInput{0};
737 std::atomic<bool> mInfoThreadStopped{
false};
739 bool mAsService =
false;
742 fair::mq::Channel mControlChannel;
746 std::unordered_map<int, int> mEventID_to_CollID;
747 std::string mEmbeddIntoPrefix;
749 TRandom3 mSeedGenerator;
Definition of the Stack class.
Definition of the MagF class.
Methods to create simulation mag field.
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const ¶msList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
std::string getOutPrefix() const
void setExternalMode(bool m)
void Reset() override
Resets arrays and stack and deletes particles and tracks.
const std::vector< TParticle > & getPrimaries() const
bool ReInit(o2::conf::SimReconfigData const &reconfig)
bool ConditionalRun() override
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
void waitForControlInput()
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
O2PrimaryServerDevice()
constructor
static void setTotalNEvents(unsigned int &n)
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
Bool_t embedInto(TString fname)
void setEmbedIndex(int idx)
sets the embedding index
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
std::vector< std::vector< o2::steer::EventPart > > & getEventParts(bool withQED=false)
std::vector< std::string > const & getSimPrefixes() const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
bool match(const std::vector< std::string > &queries, const char *pattern)
GLsizei GLsizei GLchar * source
GLuint GLsizei const GLchar * message
GLbitfield GLuint64 timeout
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
TODO: Make this a base class of SimConfigData?
std::string extKinfileName
std::string keyValueTokens
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"