Project
Loading...
Searching...
No Matches
O2PrimaryServerDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
16
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
22#include <DetectorsBase/Stack.h>
25#include <TMessage.h>
26#include <TClass.h>
31#include <SimConfig/SimConfig.h>
35#include <Field/MagneticField.h>
36#include <TGeoGlobalMagField.h>
37#include <typeinfo>
38#include <thread>
39#include <TROOT.h>
40#include <TStopwatch.h>
41#include <fstream>
42#include <iostream>
43#include <atomic>
44#include "PrimaryServerState.h"
46#include <chrono>
48#include <TRandom3.h>
49
50namespace o2
51{
52namespace devices
53{
54
56{
57 public:
60 {
61 mUseFixedChunkSeed = getenv("ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
62 if (mUseFixedChunkSeed) {
63 mFixedChunkSeed = atol(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
64 }
65 }
66
69 {
70 try {
71 if (mGeneratorThread.joinable()) {
72 mGeneratorThread.join();
73 }
74 if (mControlThread.joinable()) {
75 mControlThread.join();
76 }
77 } catch (...) {
78 }
79 }
80
81 protected:
83 {
84 TStopwatch timer;
85 timer.Start();
86 const auto& conf = mSimConfig;
88 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
89 ccdbmgr.setTimestamp(conf.getTimestamp());
90
91 // set the global information about the number of events to be generated
92 unsigned int nTotalEvents = conf.getNEvents();
94
95 // init magnetic field as it might be needed by the generator
96 if (TGeoGlobalMagField::Instance()->GetField() == nullptr) {
97 TGeoGlobalMagField::Instance()->SetField(o2::base::SimFieldUtils::createMagField());
98 TGeoGlobalMagField::Instance()->Lock();
99 }
100
101 // look if we find a cached instances of Pythia8 or external generators in order to avoid
102 // (long) initialization times.
103 // This is evidently a bit weak, as generators might need reconfiguration (to be treated later).
104 // For now, we'd like to allow for fast switches between say a pythia8 instance and reading from kinematics
105 // to continue an already started simulation.
106 //
107 // Not using cached instances for external kinematics since these might change input filenames etc.
108 // and are in any case quickly setup.
109 mPrimGen = nullptr;
110 if (conf.getGenerator().compare("extkin") != 0 || conf.getGenerator().compare("extkinO2") != 0) {
111 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
112 if (iter != mPrimGeneratorCache.end()) {
113 mPrimGen = iter->second.get();
114 LOG(info) << "Found cached generator for " << conf.getGenerator();
115 }
116 }
117
118 if (mPrimGen == nullptr) {
119 mPrimGen = new o2::eventgen::PrimaryGenerator;
121
122 // setup vertexing
123 auto vtxMode = conf.getVertexMode();
125 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
126 mPrimGen->setVertexMode(vtxMode);
127 } else if (vtxMode == VertexMode::kCCDB) {
128 // we need to fetch the CCDB object
129 mPrimGen->setVertexMode(vtxMode, ccdbmgr.getForTimeStamp<o2::dataformats::MeanVertexObject>("GLO/Calib/MeanVertex", conf.getTimestamp()));
130 } else if (vtxMode == VertexMode::kCollCxt) {
131 // The vertex will be injected from the outside via setExternalVertex
132 } else {
133 LOG(fatal) << "Unsupported vertex mode";
134 }
135
136 auto embedinto_filename = conf.getEmbedIntoFileName();
137 if (!embedinto_filename.empty()) {
138 mPrimGen->embedInto(embedinto_filename);
139 }
140
141 mPrimGen->Init();
142
143 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
144 ptr_wrapper.reset(mPrimGen);
145 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
146 }
147 mPrimGen->SetEvent(&mEventHeader);
148
149 // A good moment to couple to collision context
150 auto collContextFileName_PrefixPair = mSimConfig.getCollContextFilenameAndEventPrefix();
151 auto collContextFileName = collContextFileName_PrefixPair.first;
152 if (collContextFileName.size() > 0) {
153 LOG(info) << "Simulation has collission context";
154 mCollissionContext = o2::steer::DigitizationContext::loadFromFile(collContextFileName);
155 if (mCollissionContext) {
156 const auto& vertices = mCollissionContext->getInteractionVertices();
157 LOG(info) << "We found " << vertices.size() << " vertices included ";
158
159 // initialize the eventID to collID mapping
160 const auto source = mCollissionContext->findSimPrefix(collContextFileName_PrefixPair.second);
161 if (source == -1) {
162 LOG(fatal) << "Wrong simulation prefix";
163 }
164 mEventID_to_CollID.clear();
165 mEventID_to_CollID = mCollissionContext->getCollisionIndicesForSource(source);
166 }
167 }
168
169 LOG(info) << "Generator initialization took " << timer.CpuTime() << "s";
170 if (mMaxEvents > 0) {
171 generateEvent(); // generate a first event
172 }
173 }
174
175 // function generating one event
176 void generateEvent(/*bool changeState = false*/)
177 {
178 bool changeState = true; // false;
179 LOG(info) << "Event generation started ";
180 if (changeState) {
182 }
183 TStopwatch timer;
184 timer.Start();
185 try {
186 bool valid = false;
187 int retry_counter = 0;
188 const int MAX_RETRY = 100;
189 do {
190 mStack->Reset();
191 const auto& conf = mSimConfig;
192 // see if we the vertex comes from the collision context
193 if (mCollissionContext && conf.getVertexMode() == o2::conf::VertexMode::kCollCxt) {
194 const auto& vertices = mCollissionContext->getInteractionVertices();
195 if (vertices.size() > 0) {
196 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
197 auto& vertex = vertices.at(collisionindex);
198 LOG(info) << "Setting vertex " << vertex << " for event " << mEventCounter << " for prefix " << mSimConfig.getOutPrefix() << " from CollContext";
199 mPrimGen->setExternalVertexForNextEvent(vertex.X(), vertex.Y(), vertex.Z());
200 }
201 }
202 mPrimGen->GenerateEvent(mStack);
203 if (mStack->getPrimaries().size() > 0) {
204 valid = true;
205 } else {
206 retry_counter++;
207 if (retry_counter > MAX_RETRY) {
208 LOG(warn) << "Not able to generate a non-empty event in " << MAX_RETRY << " trials";
209 // empty event is sent out
210 valid = true;
211 }
212 }
213 } while (!valid);
214 } catch (std::exception const& e) {
215 LOG(error) << " Exception occurred during event gen " << e.what();
216 }
217 timer.Stop();
218 LOG(info) << "Event generation took " << timer.CpuTime() << "s"
219 << " and produced " << mStack->getPrimaries().size() << " primaries ";
220 if (changeState) {
222 }
223 }
224
225 // launches a thread that listens for status requests from outside asynchronously
227 {
228 static std::vector<std::thread> threads;
229 LOG(info) << "LAUNCHING STATUS THREAD";
230 auto lambda = [this]() {
231 while (mState != O2PrimaryServerState::Stopped) {
232 auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
233 if (!channel.IsValid()) {
234 LOG(error) << "channel primserv-info not valid";
235 }
236 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(-1));
237 int timeout = 100; // 100ms --> so as not to block and allow for proper termination of this thread
238 if (channel.Receive(request, timeout) > 0) {
239 LOG(info) << "INFO REQUEST RECEIVED";
240 if (*(int*)(request->GetData()) == (int)O2PrimaryServerInfoRequest::Status) {
241 LOG(info) << "Received status request";
242 // request needs to be a simple enum of type O2PrimaryServerInfoRequest
243 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)mState.load()));
244 if (channel.Send(reply) > 0) {
245 LOG(info) << "Send status successful";
246 }
247 } else if (*(int*)request->GetData() == (int)O2PrimaryServerInfoRequest::Config) {
248 HandleConfigRequest(channel);
249 } else {
250 LOG(fatal) << "UNKNOWN REQUEST";
251 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage(404));
252 channel.Send(reply);
253 }
254 }
255 }
256 mInfoThreadStopped = true;
257 };
258 threads.push_back(std::thread(lambda));
259 threads.back().detach();
260 }
261
262 void InitTask() final
263 {
264 // fatal without core dump
265 fair::Logger::OnFatal([] { throw fair::FatalException("Fatal error occured. Exiting without core dump..."); });
266
267 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), "SERVER : INITIALIZING");
268
270 LOG(info) << "Init Server device ";
271
272 // init sim config
273 auto& vm = GetConfig()->GetVarMap();
274 auto& conf = o2::conf::SimConfig::Instance();
275 if (vm.count("isRun5")) {
276 conf.setRun5();
277 }
278 conf.resetFromParsedMap(vm);
279
280 // update the parameters from an INI/JSON file, if given (overrides code-based version)
282 // update the parameters from stuff given at command line (overrides file-based version)
283 o2::conf::ConfigurableParam::updateFromString(conf.getKeyValueString());
284
285 // customize the level of log output
286 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
287 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
288
289 // from now on mSimConfig should be used within this process
290 mSimConfig = conf;
291
292 mStack = new o2::data::Stack();
293 mStack->setExternalMode(true);
294
295 // MC ENGINE
296 LOG(info) << "ENGINE SET TO " << vm["mcEngine"].as<std::string>();
297 // CHUNK SIZE
298 mChunkGranularity = vm["chunkSize"].as<unsigned int>();
299 LOG(info) << "CHUNK SIZE SET TO " << mChunkGranularity;
300
301 // initial initial seed --> we should store this somewhere
302 mInitialSeed = vm["seed"].as<ULong_t>();
303 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
304 mSeedGenerator.SetSeed(mInitialSeed);
305 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
306
307 mMaxEvents = conf.getNEvents();
308
309 // need to make ROOT thread-safe since we use ROOT services in all places
310 ROOT::EnableThreadSafety();
311
313
314 // launch initialization of particle generator asynchronously
315 // so that we reach the RUNNING state of the server quickly
316 // and do not block here
317 mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
318 if (mGeneratorThread.joinable()) {
319 try {
320 mGeneratorThread.join();
321 } catch (std::exception const& e) {
322 LOG(warn) << "Exception during thread join ..ignoring";
323 }
324 }
325
326 // init pipe
327 auto pipeenv = getenv("ALICE_O2SIMSERVERTODRIVER_PIPE");
328 if (pipeenv) {
329 mPipeToDriver = atoi(pipeenv);
330 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
331 } else {
332 LOG(info) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
333 }
334
335 mAsService = vm["asservice"].as<bool>();
336 if (mAsService) {
337 mControlChannel = fair::mq::Channel{"o2sim-control", "sub", fTransportFactory};
338 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
339 if (!controlsocketname) {
340 LOG(fatal) << "Internal error: Socketname for control input missing";
341 }
342 mControlChannel.Connect(std::string(controlsocketname));
343 mControlChannel.Validate();
344 }
345
346 if (mMaxEvents <= 0) {
347 if (mAsService) {
349 }
350 } else {
352 }
353
354 // feedback to driver that we are done initializing
355 if (mPipeToDriver != -1) {
356 int message = -111; // special code meaning end of initialization
357 if (write(mPipeToDriver, &message, sizeof(int))) {
358 }
359 }
360 }
361
362 // function for intermediate/on-the-fly reinitializations
363 bool ReInit(o2::conf::SimReconfigData const& reconfig)
364 {
365 LOG(info) << "ReInit Server device ";
366
367 if (reconfig.stop) {
368 return false;
369 }
370
371 // mSimConfig.getConfigData().mKeyValueTokens=reconfig.keyValueTokens;
372 // Think about this:
373 // update the parameters from an INI/JSON file, if given (overrides code-based version)
375 // update the parameters from stuff given at command line (overrides file-based version)
377
378 // initial initial seed --> we should store this somewhere
379 mInitialSeed = reconfig.startSeed;
380 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
381 mSeedGenerator.SetSeed(mInitialSeed);
382 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
383
384 mMaxEvents = reconfig.nEvents;
385
386 // updating the simconfig member with new information especially concerning the generators
387 // TODO: put this into utility function?
388 mSimConfig.getConfigData().mGenerator = reconfig.generator;
389 mSimConfig.getConfigData().mTrigger = reconfig.trigger;
390 mSimConfig.getConfigData().mExtKinFileName = reconfig.extKinfileName;
391
392 mEventCounter = 0;
393 mPartCounter = 0;
394 mNeedNewEvent = true;
395 // reinit generator and start generation of a new event
396 if (mGeneratorThread.joinable()) {
397 try {
398 mGeneratorThread.join();
399 } catch (std::exception const& e) {
400 LOG(warn) << "Exception during thread join ..ignoring";
401 }
402 }
403 // mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
405
406 return true;
407 }
408
409 // method reacting to requests to get the simulation configuration
410 bool HandleConfigRequest(fair::mq::Channel& channel)
411 {
412 LOG(info) << "Received config request";
413 // just sending the simulation configuration to anyone that wants it
414 const auto& confdata = mSimConfig.getConfigData();
415
416 TMessage* tmsg = new TMessage(kMESS_OBJECT);
417 tmsg->WriteObjectAny((void*)&confdata, TClass::GetClass(typeid(confdata)));
418
419 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
420
421 std::unique_ptr<fair::mq::Message> message(
422 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
423
424 // send answer
425 if (channel.Send(message) > 0) {
426 LOG(info) << "config reply send ";
427 return true;
428 }
429 return true;
430 }
431
432 bool ConditionalRun() override
433 {
434 // we might come here in IDLE mode
435 if (mState.load() == O2PrimaryServerState::Idle) {
436 if (mWaitingControlInput.load() == 0) {
437 if (mControlThread.joinable()) {
438 mControlThread.join();
439 }
440 mControlThread = std::thread(&O2PrimaryServerDevice::waitForControlInput, this);
441 }
442 }
443
444 auto& channel = GetChannels().at("primary-get").at(0);
445 PrimaryChunkRequest requestpayload;
446 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
447 auto bytes = channel.Receive(request);
448 if (bytes < 0) {
449 LOG(error) << "Some error/interrupt occurred on socket during receive";
450 if (NewStatePending()) { // new state is typically pending if (term) signal was received
451 WaitForNextState();
452 // ask ourselves for termination of this loop
454 }
455 return false;
456 }
457
458 TStopwatch timer;
459 timer.Start();
460 auto& r = *((PrimaryChunkRequest*)(request->GetData()));
461 LOG(debug) << "PARTICLE REQUEST IN STATE " << PrimStateToString[(int)mState.load()] << " from " << r.workerid << ":" << r.requestid;
462
463 auto prestate = mState.load();
464 auto more = HandleRequest(request, 0, channel);
465 if (!more) {
466 if (mAsService) {
469 }
470 } else {
472 }
473 }
474 timer.Stop();
475 auto time = timer.CpuTime();
476 LOG(debug) << "COND-RUN TOOK " << time << " s";
477 return mState != O2PrimaryServerState::Stopped;
478 }
479
480 void PostRun() override
481 {
482 while (!mInfoThreadStopped) {
483 LOG(info) << "Waiting info thread";
484 using namespace std::chrono_literals;
485 std::this_thread::sleep_for(100ms);
486 }
487 }
488
489 bool HandleRequest(fair::mq::MessagePtr& request, int /*index*/, fair::mq::Channel& channel)
490 {
491 // LOG(debug) << "GOT A REQUEST WITH SIZE " << request->GetSize();
492 // std::string requeststring(static_cast<char*>(request->GetData()), request->GetSize());
493 // LOG(info) << "NORMAL REQUEST STRING " << requeststring;
494 bool workavailable = true;
495 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
496 workavailable = false;
497 }
499 // send a zero answer
500 workavailable = false;
501 }
502
503 PrimaryChunkAnswer header{mState, workavailable};
504 fair::mq::Parts reply;
505 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
506 reply.AddPart(std::move(headermsg));
507
508 LOG(debug) << "Received request for work " << mEventCounter << " " << mMaxEvents << " " << mNeedNewEvent << " available " << workavailable;
509 if (workavailable) {
510
511 if (mNeedNewEvent) {
512 // we need a newly generated event now
513 if (mGeneratorThread.joinable()) {
514 try {
515 mGeneratorThread.join();
516 } catch (std::exception const& e) {
517 LOG(warn) << "Exception during thread join ..ignoring";
518 }
519 }
520 // also if we are still in event waiting stage (doing some busy sleep)
521 while (mState.load() == O2PrimaryServerState::WaitingEvent) {
522 LOG(info) << "Waiting for event generation do become fully available";
523 usleep(100);
524 }
525 mNeedNewEvent = false;
526 mPartCounter = 0;
527 mEventCounter++;
528 }
529
530 auto& prims = mStack->getPrimaries();
531 auto numberofparts = (int)std::ceil(prims.size() / (1. * mChunkGranularity));
532 // number of parts should be at least 1 (even if empty)
533 numberofparts = std::max(1, numberofparts);
534
535 LOG(debug) << "Have " << prims.size() << " " << numberofparts;
536
539 i.eventID = workavailable ? mEventCounter : -1;
540 i.maxEvents = mMaxEvents;
541 i.part = mPartCounter + 1;
542 i.nparts = numberofparts;
543 // assign a deterministic (yet collision free seed) to process this particle chunk in Geant
544 // limit range to uint32_t since internal limit of TRandom (despite API suggesting otherwise)
545 const uint64_t drawnSeed = (uint64_t)(static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
546 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
547 i.index = m.mParticles.size();
548 i.mMCEventHeader = mEventHeader;
549 m.mSubEventInfo = i;
550
551 int endindex = prims.size() - mPartCounter * mChunkGranularity;
552 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
553 LOG(debug) << "indices " << startindex << " " << endindex;
554
555 if (startindex < 0) {
556 startindex = 0;
557 }
558 if (endindex < 0) {
559 endindex = 0;
560 }
561
562 for (int index = startindex; index < endindex; ++index) {
563 m.mParticles.emplace_back(prims[index]);
564 }
565
566 LOG(info) << "Sending " << m.mParticles.size() << " particles";
567 LOG(info) << "treating ev " << mEventCounter << " part " << i.part << " out of " << i.nparts;
568
569 // feedback to driver if new event started
570 if (mPipeToDriver != -1 && i.part == 1 && workavailable) {
571 if (write(mPipeToDriver, &mEventCounter, sizeof(mEventCounter))) {
572 }
573 }
574
575 mPartCounter++;
576 if (mPartCounter == numberofparts) {
577 mNeedNewEvent = true;
578 // start generation of a new event
579 if (mEventCounter < mMaxEvents) {
580 mGeneratorThread = std::thread(&O2PrimaryServerDevice::generateEvent, this);
581 }
582 }
583
584 TMessage* tmsg = new TMessage(kMESS_OBJECT);
585 tmsg->WriteObjectAny((void*)&m, TClass::GetClass("o2::data::PrimaryChunk"));
586
587 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
588
589 std::unique_ptr<fair::mq::Message> message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
590
591 reply.AddPart(std::move(message));
592 }
593
594 // send answer
595 TStopwatch timer;
596 timer.Start();
597 auto code = Send(reply, "primary-get", 0, 5000); // we introduce timeout in order not to block other requests
598 timer.Stop();
599 auto time = timer.CpuTime();
600 if (code > 0) {
601 LOG(debug) << "Reply send in " << time << "s";
602 return workavailable;
603 } else {
604 LOG(warn) << "Sending process had problems. Return code : " << code << " time " << time << "s";
605 }
606 return false; // -> error should not get here
607 }
608
610 {
611 LOG(info) << message << " CHANGING STATE TO " << PrimStateToString[(int)to];
612 mState = to;
613 }
614
616 {
617 mWaitingControlInput.store(1);
618 if (mState.load() != O2PrimaryServerState::Idle) {
619 mWaitingControlInput.store(0);
620 return;
621 }
622
623 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), o2::simpubsub::simStatusString("PRIMSERVER", "STATUS", "AWAITING INPUT"));
624 // this means we are idling
625
626 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
627
628 bool ok = false;
629
630 LOG(info) << "WAITING FOR CONTROL INPUT";
631 if (mControlChannel.Receive(reply) > 0) {
633 auto data = reply->GetData();
634 auto size = reply->GetSize();
635
636 std::string command(reinterpret_cast<char const*>(data), size);
637 LOG(info) << "message: " << command;
638
640 o2::conf::parseSimReconfigFromString(command, reconfig);
641 LOG(info) << "Processing " << reconfig.nEvents << " new events";
642 try {
643 LOG(info) << "REINIT START";
644 ok = ReInit(reconfig);
645 LOG(info) << "REINIT DONE";
646 } catch (std::exception e) {
647 LOG(info) << "Exception during reinit";
648 }
649 } else {
650 LOG(info) << "NOTHING RECEIVED";
651 }
652 if (ok) {
653 // stateTransition(O2PrimaryServerState::ReadyToServe, "CONTROL"); --> SHOULD BE DONE FROM EVENT GENERATOR (which get's however called only when mEvents>0)
654 } else {
656 }
657 mWaitingControlInput.store(0);
658 }
659
660 private:
661 o2::conf::SimConfig mSimConfig = o2::conf::SimConfig::Instance(); // local sim config object
662 o2::eventgen::PrimaryGenerator* mPrimGen = nullptr; // the current primary generator
664 o2::data::Stack* mStack = nullptr; // the stack which is filled (pointer since constructor to be called only init method)
665 int mChunkGranularity = 500; // how many primaries to send to a worker
666 int mPartCounter = 0;
667 bool mNeedNewEvent = true;
668 int mMaxEvents = 2;
669 ULong_t mInitialSeed = 0;
670 bool mUseFixedChunkSeed = false;
671 ULong_t mFixedChunkSeed = 0;
672 int mPipeToDriver = -1; // handle for direct piper to driver (to communicate meta info)
673 int mEventCounter = 0;
674
675 std::thread mGeneratorThread;
676 // or to generate events
677 std::thread mControlThread;
678
679 // Keeps various generators instantiated in memory
680 // useful when running simulation as a service (when generators
681 // change between batches). Also takes care of resource management of Primary generators via unique ptr
682 // TODO: some care needs to be taken (or the user warned) that the caching is based on generator name
683 // and that parameter-based reconfiguration is not yet implemented (for which we would need to hash all
684 // configuration parameters as well)
685 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
686
687 std::atomic<O2PrimaryServerState> mState{O2PrimaryServerState::Initializing};
688 std::atomic<int> mWaitingControlInput{0};
689 std::atomic<bool> mInfoThreadStopped{false};
690
691 bool mAsService = false;
692
693 // a dedicate (on-the-fly channel) for control messages
694 fair::mq::Channel mControlChannel;
695
696 // some information specific to use case when we have a collision context
697 o2::steer::DigitizationContext* mCollissionContext = nullptr;
698 std::unordered_map<int, int> mEventID_to_CollID;
699
700 TRandom3 mSeedGenerator;
701};
702
703} // namespace devices
704} // namespace o2
705
706#endif
Definition of the Stack class.
uint64_t vertex
Definition RawEventData.h:9
int16_t time
Definition RawEventData.h:4
int32_t i
Definition of the MagF class.
Methods to create simulation mag field.
std::ostringstream debug
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
Definition SimConfig.h:133
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
Definition SimConfig.h:111
std::string getOutPrefix() const
Definition SimConfig.h:163
void setExternalMode(bool m)
Definition Stack.h:205
void Reset() override
Resets arrays and stack and deletes particles and tracks.
Definition Stack.cxx:588
const std::vector< TParticle > & getPrimaries() const
Definition Stack.h:187
bool ReInit(o2::conf::SimReconfigData const &reconfig)
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
static void setTotalNEvents(unsigned int &n)
Definition Generator.h:89
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
Definition RngHelper.h:37
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
Definition SimConfig.h:58
std::string mGenerator
Definition SimConfig.h:55
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"