Project
Loading...
Searching...
No Matches
O2PrimaryServerDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef O2_DEVICES_PRIMSERVDEVICE_H_
15#define O2_DEVICES_PRIMSERVDEVICE_H_
16
17#include <fairmq/Device.h>
18#include <fairmq/TransportFactory.h>
19#include <FairPrimaryGenerator.h>
21#include <fairmq/Message.h>
22#include <DetectorsBase/Stack.h>
25#include <TMessage.h>
26#include <TClass.h>
31#include <SimConfig/SimConfig.h>
35#include <Field/MagneticField.h>
36#include <TGeoGlobalMagField.h>
37#include <typeinfo>
38#include <thread>
39#include <TROOT.h>
40#include <TStopwatch.h>
41#include <fstream>
42#include <iostream>
43#include <atomic>
44#include "PrimaryServerState.h"
46#include <chrono>
48#include <TRandom3.h>
49#include <regex>
50
51namespace o2
52{
53namespace devices
54{
55
57{
58 public:
61 {
62 mUseFixedChunkSeed = getenv("ALICEO2_O2SIM_SUBEVENTSEED") && atoi(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
63 if (mUseFixedChunkSeed) {
64 mFixedChunkSeed = atol(getenv("ALICEO2_O2SIM_SUBEVENTSEED"));
65 }
66 }
67
70 {
71 try {
72 if (mGeneratorThread.joinable()) {
73 mGeneratorThread.join();
74 }
75 if (mControlThread.joinable()) {
76 mControlThread.join();
77 }
78 } catch (...) {
79 }
80 }
81
82 protected:
84 {
85 TStopwatch timer;
86 timer.Start();
87 const auto& conf = mSimConfig;
89 ccdbmgr.setURL(conf.getConfigData().mCCDBUrl);
90 ccdbmgr.setTimestamp(conf.getTimestamp());
91
92 // set the global information about the number of events to be generated
93 unsigned int nTotalEvents = conf.getNEvents();
95
96 // init magnetic field as it might be needed by the generator
97 if (TGeoGlobalMagField::Instance()->GetField() == nullptr) {
98 TGeoGlobalMagField::Instance()->SetField(o2::base::SimFieldUtils::createMagField());
99 TGeoGlobalMagField::Instance()->Lock();
100 }
101
102 // look if we find a cached instances of Pythia8 or external generators in order to avoid
103 // (long) initialization times.
104 // This is evidently a bit weak, as generators might need reconfiguration (to be treated later).
105 // For now, we'd like to allow for fast switches between say a pythia8 instance and reading from kinematics
106 // to continue an already started simulation.
107 //
108 // Not using cached instances for external kinematics since these might change input filenames etc.
109 // and are in any case quickly setup.
110 mPrimGen = nullptr;
111 if (conf.getGenerator().compare("extkin") != 0 || conf.getGenerator().compare("extkinO2") != 0) {
112 auto iter = mPrimGeneratorCache.find(conf.getGenerator());
113 if (iter != mPrimGeneratorCache.end()) {
114 mPrimGen = iter->second.get();
115 LOG(info) << "Found cached generator for " << conf.getGenerator();
116 }
117 }
118
119 if (mPrimGen == nullptr) {
120 mPrimGen = new o2::eventgen::PrimaryGenerator;
122
123 // setup vertexing
124 auto vtxMode = conf.getVertexMode();
126 if (vtxMode == VertexMode::kNoVertex || vtxMode == VertexMode::kDiamondParam) {
127 mPrimGen->setVertexMode(vtxMode);
128 } else if (vtxMode == VertexMode::kCCDB) {
129 // we need to fetch the CCDB object
130 mPrimGen->setVertexMode(vtxMode, ccdbmgr.getForTimeStamp<o2::dataformats::MeanVertexObject>("GLO/Calib/MeanVertex", conf.getTimestamp()));
131 } else if (vtxMode == VertexMode::kCollCxt) {
132 // The vertex will be injected from the outside via setExternalVertex
133 } else {
134 LOG(fatal) << "Unsupported vertex mode";
135 }
136
137 auto embedinto_filename = conf.getEmbedIntoFileName();
138 if (!embedinto_filename.empty()) {
139 // determine the sim prefix from the embedding filename
140 // the filename should be an MCHeader file ... so it should match SOME_PATH/prefix_MCHeader.root
141 std::regex re(R"((.*/)?([^/]+)_MCHeader\.root$)");
142 std::smatch match;
143
144 if (std::regex_search(embedinto_filename, match, re)) {
145 std::cout << "Extracted embedding prefix : " << match[2] << '\n';
146 mEmbeddIntoPrefix = match[2];
147 } else {
148 LOG(fatal) << "Embedding asked but no suitable embedding prefix extractable from " << embedinto_filename;
149 }
150 mPrimGen->embedInto(embedinto_filename);
151 }
152
153 mPrimGen->Init();
154
155 std::unique_ptr<o2::eventgen::PrimaryGenerator> ptr_wrapper;
156 ptr_wrapper.reset(mPrimGen);
157 mPrimGeneratorCache[conf.getGenerator()] = std::move(ptr_wrapper);
158 }
159 mPrimGen->SetEvent(&mEventHeader);
160
161 // A good moment to couple to collision context
162 auto collContextFileName_PrefixPair = mSimConfig.getCollContextFilenameAndEventPrefix();
163 auto collContextFileName = collContextFileName_PrefixPair.first;
164 if (collContextFileName.size() > 0) {
165 LOG(info) << "Simulation has collission context";
166 mCollissionContext = o2::steer::DigitizationContext::loadFromFile(collContextFileName);
167 if (mCollissionContext) {
168 const auto& vertices = mCollissionContext->getInteractionVertices();
169 LOG(info) << "We found " << vertices.size() << " vertices included ";
170
171 // initialize the eventID to collID mapping
172 const auto source = mCollissionContext->findSimPrefix(collContextFileName_PrefixPair.second);
173 if (source == -1) {
174 LOG(fatal) << "Wrong simulation prefix";
175 }
176 mEventID_to_CollID.clear();
177 mEventID_to_CollID = mCollissionContext->getCollisionIndicesForSource(source);
178 }
179 }
180
181 LOG(info) << "Generator initialization took " << timer.CpuTime() << "s";
182 if (mMaxEvents > 0) {
183 generateEvent(); // generate a first event
184 }
185 }
186
187 // function generating one event
188 void generateEvent(/*bool changeState = false*/)
189 {
190 bool changeState = true; // false;
191 LOG(info) << "Event generation started ";
192 if (changeState) {
194 }
195 TStopwatch timer;
196 timer.Start();
197 try {
198 bool valid = false;
199 int retry_counter = 0;
200 const int MAX_RETRY = 100;
201 do {
202 mStack->Reset();
203 const auto& conf = mSimConfig;
204 // see if we the vertex comes from the collision context
205 if (mCollissionContext && conf.getVertexMode() == o2::conf::VertexMode::kCollCxt) {
206 const auto& vertices = mCollissionContext->getInteractionVertices();
207 if (vertices.size() > 0) {
208 auto collisionindex = mEventID_to_CollID.at(mEventCounter);
209 auto& vertex = vertices.at(collisionindex);
210 LOG(info) << "Setting vertex " << vertex << " for event " << mEventCounter << " for prefix " << mSimConfig.getOutPrefix() << " from CollContext";
211 mPrimGen->setExternalVertexForNextEvent(vertex.X(), vertex.Y(), vertex.Z());
212
213 // set correct embedding index for PrimaryGenerator ... based on collision context for embedding
214 auto& collisionParts = mCollissionContext->getEventParts()[collisionindex];
215 int background_index = -1; // -1 means no embedding taking place for this signal
216
217 // find the part that corresponds to the event embeded into
218 for (auto& part : collisionParts) {
219 if (mCollissionContext->getSimPrefixes()[part.sourceID] == mEmbeddIntoPrefix) {
220 background_index = part.entryID;
221 LOG(info) << "Setting embedding index to " << background_index;
222 }
223 }
224 mPrimGen->setEmbedIndex(background_index);
225 }
226 }
227 mPrimGen->GenerateEvent(mStack);
228 if (mStack->getPrimaries().size() > 0) {
229 valid = true;
230 } else {
231 retry_counter++;
232 if (retry_counter > MAX_RETRY) {
233 LOG(warn) << "Not able to generate a non-empty event in " << MAX_RETRY << " trials";
234 // empty event is sent out
235 valid = true;
236 }
237 }
238 } while (!valid);
239 } catch (std::exception const& e) {
240 LOG(error) << " Exception occurred during event gen " << e.what();
241 }
242 timer.Stop();
243 LOG(info) << "Event generation took " << timer.CpuTime() << "s"
244 << " and produced " << mStack->getPrimaries().size() << " primaries ";
245 if (changeState) {
247 }
248 }
249
250 // launches a thread that listens for status requests from outside asynchronously
252 {
253 static std::vector<std::thread> threads;
254 LOG(info) << "LAUNCHING STATUS THREAD";
255 auto lambda = [this]() {
256 while (mState != O2PrimaryServerState::Stopped) {
257 auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
258 if (!channel.IsValid()) {
259 LOG(error) << "channel primserv-info not valid";
260 }
261 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(-1));
262 int timeout = 100; // 100ms --> so as not to block and allow for proper termination of this thread
263 if (channel.Receive(request, timeout) > 0) {
264 LOG(info) << "INFO REQUEST RECEIVED";
265 if (*(int*)(request->GetData()) == (int)O2PrimaryServerInfoRequest::Status) {
266 LOG(info) << "Received status request";
267 // request needs to be a simple enum of type O2PrimaryServerInfoRequest
268 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)mState.load()));
269 if (channel.Send(reply) > 0) {
270 LOG(info) << "Send status successful";
271 }
272 } else if (*(int*)request->GetData() == (int)O2PrimaryServerInfoRequest::Config) {
273 HandleConfigRequest(channel);
274 } else {
275 LOG(fatal) << "UNKNOWN REQUEST";
276 std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage(404));
277 channel.Send(reply);
278 }
279 }
280 }
281 mInfoThreadStopped = true;
282 };
283 threads.push_back(std::thread(lambda));
284 threads.back().detach();
285 }
286
287 void InitTask() final
288 {
289 // fatal without core dump
290 fair::Logger::OnFatal([] { throw fair::FatalException("Fatal error occured. Exiting without core dump..."); });
291
292 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), "SERVER : INITIALIZING");
293
295 LOG(info) << "Init Server device ";
296
297 // init sim config
298 auto& vm = GetConfig()->GetVarMap();
299 auto& conf = o2::conf::SimConfig::Instance();
300 if (vm.count("isRun5")) {
301 conf.setRun5();
302 }
303 conf.resetFromParsedMap(vm);
304
305 // update the parameters from an INI/JSON file, if given (overrides code-based version)
307 // update the parameters from stuff given at command line (overrides file-based version)
308 o2::conf::ConfigurableParam::updateFromString(conf.getKeyValueString());
309
310 // customize the level of log output
311 FairLogger::GetLogger()->SetLogScreenLevel(conf.getLogSeverity().c_str());
312 FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
313
314 // from now on mSimConfig should be used within this process
315 mSimConfig = conf;
316
317 mStack = new o2::data::Stack();
318 mStack->setExternalMode(true);
319
320 // MC ENGINE
321 LOG(info) << "ENGINE SET TO " << vm["mcEngine"].as<std::string>();
322 // CHUNK SIZE
323 mChunkGranularity = vm["chunkSize"].as<unsigned int>();
324 LOG(info) << "CHUNK SIZE SET TO " << mChunkGranularity;
325
326 // initial initial seed --> we should store this somewhere
327 mInitialSeed = vm["seed"].as<ULong_t>();
328 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
329 mSeedGenerator.SetSeed(mInitialSeed);
330 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
331
332 mMaxEvents = conf.getNEvents();
333
334 // need to make ROOT thread-safe since we use ROOT services in all places
335 ROOT::EnableThreadSafety();
336
338
339 // launch initialization of particle generator asynchronously
340 // so that we reach the RUNNING state of the server quickly
341 // and do not block here
342 mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
343 if (mGeneratorThread.joinable()) {
344 try {
345 mGeneratorThread.join();
346 } catch (std::exception const& e) {
347 LOG(warn) << "Exception during thread join ..ignoring";
348 }
349 }
350
351 // init pipe
352 auto pipeenv = getenv("ALICE_O2SIMSERVERTODRIVER_PIPE");
353 if (pipeenv) {
354 mPipeToDriver = atoi(pipeenv);
355 LOG(info) << "ASSIGNED PIPE HANDLE " << mPipeToDriver;
356 } else {
357 LOG(info) << "DID NOT FIND ENVIRONMENT VARIABLE TO INIT PIPE";
358 }
359
360 mAsService = vm["asservice"].as<bool>();
361 if (mAsService) {
362 mControlChannel = fair::mq::Channel{"o2sim-control", "sub", fTransportFactory};
363 auto controlsocketname = getenv("ALICE_O2SIMCONTROL");
364 if (!controlsocketname) {
365 LOG(fatal) << "Internal error: Socketname for control input missing";
366 }
367 mControlChannel.Connect(std::string(controlsocketname));
368 mControlChannel.Validate();
369 }
370
371 if (mMaxEvents <= 0) {
372 if (mAsService) {
374 }
375 } else {
377 }
378
379 // feedback to driver that we are done initializing
380 if (mPipeToDriver != -1) {
381 int message = -111; // special code meaning end of initialization
382 if (write(mPipeToDriver, &message, sizeof(int))) {
383 }
384 }
385 }
386
387 // function for intermediate/on-the-fly reinitializations
388 bool ReInit(o2::conf::SimReconfigData const& reconfig)
389 {
390 LOG(info) << "ReInit Server device ";
391
392 if (reconfig.stop) {
393 return false;
394 }
395
396 // mSimConfig.getConfigData().mKeyValueTokens=reconfig.keyValueTokens;
397 // Think about this:
398 // update the parameters from an INI/JSON file, if given (overrides code-based version)
400 // update the parameters from stuff given at command line (overrides file-based version)
402
403 // initial initial seed --> we should store this somewhere
404 mInitialSeed = reconfig.startSeed;
405 mInitialSeed = o2::utils::RngHelper::setGRandomSeed(mInitialSeed);
406 mSeedGenerator.SetSeed(mInitialSeed);
407 LOG(info) << "RNG INITIAL SEED " << mInitialSeed;
408
409 mMaxEvents = reconfig.nEvents;
410
411 // updating the simconfig member with new information especially concerning the generators
412 // TODO: put this into utility function?
413 mSimConfig.getConfigData().mGenerator = reconfig.generator;
414 mSimConfig.getConfigData().mTrigger = reconfig.trigger;
415 mSimConfig.getConfigData().mExtKinFileName = reconfig.extKinfileName;
416
417 mEventCounter = 0;
418 mPartCounter = 0;
419 mNeedNewEvent = true;
420 // reinit generator and start generation of a new event
421 if (mGeneratorThread.joinable()) {
422 try {
423 mGeneratorThread.join();
424 } catch (std::exception const& e) {
425 LOG(warn) << "Exception during thread join ..ignoring";
426 }
427 }
428 // mGeneratorThread = std::thread(&O2PrimaryServerDevice::initGenerator, this);
430
431 return true;
432 }
433
434 // method reacting to requests to get the simulation configuration
435 bool HandleConfigRequest(fair::mq::Channel& channel)
436 {
437 LOG(info) << "Received config request";
438 // just sending the simulation configuration to anyone that wants it
439 const auto& confdata = mSimConfig.getConfigData();
440
441 TMessage* tmsg = new TMessage(kMESS_OBJECT);
442 tmsg->WriteObjectAny((void*)&confdata, TClass::GetClass(typeid(confdata)));
443
444 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
445
446 std::unique_ptr<fair::mq::Message> message(
447 fTransportFactory->CreateMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
448
449 // send answer
450 if (channel.Send(message) > 0) {
451 LOG(info) << "config reply send ";
452 return true;
453 }
454 return true;
455 }
456
457 bool ConditionalRun() override
458 {
459 // we might come here in IDLE mode
460 if (mState.load() == O2PrimaryServerState::Idle) {
461 if (mWaitingControlInput.load() == 0) {
462 if (mControlThread.joinable()) {
463 mControlThread.join();
464 }
465 mControlThread = std::thread(&O2PrimaryServerDevice::waitForControlInput, this);
466 }
467 }
468
469 auto& channel = GetChannels().at("primary-get").at(0);
470 PrimaryChunkRequest requestpayload;
471 std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(requestpayload));
472 auto bytes = channel.Receive(request);
473 if (bytes < 0) {
474 LOG(error) << "Some error/interrupt occurred on socket during receive";
475 if (NewStatePending()) { // new state is typically pending if (term) signal was received
476 WaitForNextState();
477 // ask ourselves for termination of this loop
479 }
480 return false;
481 }
482
483 TStopwatch timer;
484 timer.Start();
485 auto& r = *((PrimaryChunkRequest*)(request->GetData()));
486 LOG(debug) << "PARTICLE REQUEST IN STATE " << PrimStateToString[(int)mState.load()] << " from " << r.workerid << ":" << r.requestid;
487
488 auto prestate = mState.load();
489 auto more = HandleRequest(request, 0, channel);
490 if (!more) {
491 if (mAsService) {
494 }
495 } else {
497 }
498 }
499 timer.Stop();
500 auto time = timer.CpuTime();
501 LOG(debug) << "COND-RUN TOOK " << time << " s";
502 return mState != O2PrimaryServerState::Stopped;
503 }
504
505 void PostRun() override
506 {
507 while (!mInfoThreadStopped) {
508 LOG(info) << "Waiting info thread";
509 using namespace std::chrono_literals;
510 std::this_thread::sleep_for(100ms);
511 }
512 }
513
514 bool HandleRequest(fair::mq::MessagePtr& request, int /*index*/, fair::mq::Channel& channel)
515 {
516 // LOG(debug) << "GOT A REQUEST WITH SIZE " << request->GetSize();
517 // std::string requeststring(static_cast<char*>(request->GetData()), request->GetSize());
518 // LOG(info) << "NORMAL REQUEST STRING " << requeststring;
519 bool workavailable = true;
520 if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
521 workavailable = false;
522 }
524 // send a zero answer
525 workavailable = false;
526 }
527
528 PrimaryChunkAnswer header{mState, workavailable};
529 fair::mq::Parts reply;
530 std::unique_ptr<fair::mq::Message> headermsg(channel.NewSimpleMessage(header));
531 reply.AddPart(std::move(headermsg));
532
533 LOG(debug) << "Received request for work " << mEventCounter << " " << mMaxEvents << " " << mNeedNewEvent << " available " << workavailable;
534 if (workavailable) {
535
536 if (mNeedNewEvent) {
537 // we need a newly generated event now
538 if (mGeneratorThread.joinable()) {
539 try {
540 mGeneratorThread.join();
541 } catch (std::exception const& e) {
542 LOG(warn) << "Exception during thread join ..ignoring";
543 }
544 }
545 // also if we are still in event waiting stage (doing some busy sleep)
546 while (mState.load() == O2PrimaryServerState::WaitingEvent) {
547 LOG(info) << "Waiting for event generation do become fully available";
548 usleep(100);
549 }
550 mNeedNewEvent = false;
551 mPartCounter = 0;
552 mEventCounter++;
553 }
554
555 auto& prims = mStack->getPrimaries();
556 auto numberofparts = (int)std::ceil(prims.size() / (1. * mChunkGranularity));
557 // number of parts should be at least 1 (even if empty)
558 numberofparts = std::max(1, numberofparts);
559
560 LOG(debug) << "Have " << prims.size() << " " << numberofparts;
561
564 i.eventID = workavailable ? mEventCounter : -1;
565 i.maxEvents = mMaxEvents;
566 i.part = mPartCounter + 1;
567 i.nparts = numberofparts;
568 // assign a deterministic (yet collision free seed) to process this particle chunk in Geant
569 // limit range to uint32_t since internal limit of TRandom (despite API suggesting otherwise)
570 const uint64_t drawnSeed = (uint64_t)(static_cast<double>(std::numeric_limits<uint32_t>::max()) * mSeedGenerator.Rndm());
571 i.seed = mUseFixedChunkSeed ? mFixedChunkSeed : drawnSeed;
572 i.index = m.mParticles.size();
573 i.mMCEventHeader = mEventHeader;
574 m.mSubEventInfo = i;
575
576 int endindex = prims.size() - mPartCounter * mChunkGranularity;
577 int startindex = prims.size() - (mPartCounter + 1) * mChunkGranularity;
578 LOG(debug) << "indices " << startindex << " " << endindex;
579
580 if (startindex < 0) {
581 startindex = 0;
582 }
583 if (endindex < 0) {
584 endindex = 0;
585 }
586
587 for (int index = startindex; index < endindex; ++index) {
588 m.mParticles.emplace_back(prims[index]);
589 }
590
591 LOG(info) << "Sending " << m.mParticles.size() << " particles";
592 LOG(info) << "treating ev " << mEventCounter << " part " << i.part << " out of " << i.nparts;
593
594 // feedback to driver if new event started
595 if (mPipeToDriver != -1 && i.part == 1 && workavailable) {
596 if (write(mPipeToDriver, &mEventCounter, sizeof(mEventCounter))) {
597 }
598 }
599
600 mPartCounter++;
601 if (mPartCounter == numberofparts) {
602 mNeedNewEvent = true;
603 // start generation of a new event
604 if (mEventCounter < mMaxEvents) {
605 mGeneratorThread = std::thread(&O2PrimaryServerDevice::generateEvent, this);
606 }
607 }
608
609 TMessage* tmsg = new TMessage(kMESS_OBJECT);
610 tmsg->WriteObjectAny((void*)&m, TClass::GetClass("o2::data::PrimaryChunk"));
611
612 auto free_tmessage = [](void* data, void* hint) { delete static_cast<TMessage*>(hint); };
613
614 std::unique_ptr<fair::mq::Message> message(channel.NewMessage(tmsg->Buffer(), tmsg->BufferSize(), free_tmessage, tmsg));
615
616 reply.AddPart(std::move(message));
617 }
618
619 // send answer
620 TStopwatch timer;
621 timer.Start();
622 auto code = Send(reply, "primary-get", 0, 5000); // we introduce timeout in order not to block other requests
623 timer.Stop();
624 auto time = timer.CpuTime();
625 if (code > 0) {
626 LOG(debug) << "Reply send in " << time << "s";
627 return workavailable;
628 } else {
629 LOG(warn) << "Sending process had problems. Return code : " << code << " time " << time << "s";
630 }
631 return false; // -> error should not get here
632 }
633
635 {
636 LOG(info) << message << " CHANGING STATE TO " << PrimStateToString[(int)to];
637 mState = to;
638 }
639
641 {
642 mWaitingControlInput.store(1);
643 if (mState.load() != O2PrimaryServerState::Idle) {
644 mWaitingControlInput.store(0);
645 return;
646 }
647
648 o2::simpubsub::publishMessage(GetChannels()["primary-notifications"].at(0), o2::simpubsub::simStatusString("PRIMSERVER", "STATUS", "AWAITING INPUT"));
649 // this means we are idling
650
651 std::unique_ptr<fair::mq::Message> reply(mControlChannel.NewMessage());
652
653 bool ok = false;
654
655 LOG(info) << "WAITING FOR CONTROL INPUT";
656 if (mControlChannel.Receive(reply) > 0) {
658 auto data = reply->GetData();
659 auto size = reply->GetSize();
660
661 std::string command(reinterpret_cast<char const*>(data), size);
662 LOG(info) << "message: " << command;
663
665 o2::conf::parseSimReconfigFromString(command, reconfig);
666 LOG(info) << "Processing " << reconfig.nEvents << " new events";
667 try {
668 LOG(info) << "REINIT START";
669 ok = ReInit(reconfig);
670 LOG(info) << "REINIT DONE";
671 } catch (std::exception e) {
672 LOG(info) << "Exception during reinit";
673 }
674 } else {
675 LOG(info) << "NOTHING RECEIVED";
676 }
677 if (ok) {
678 // stateTransition(O2PrimaryServerState::ReadyToServe, "CONTROL"); --> SHOULD BE DONE FROM EVENT GENERATOR (which get's however called only when mEvents>0)
679 } else {
681 }
682 mWaitingControlInput.store(0);
683 }
684
685 private:
686 o2::conf::SimConfig mSimConfig = o2::conf::SimConfig::Instance(); // local sim config object
687 o2::eventgen::PrimaryGenerator* mPrimGen = nullptr; // the current primary generator
689 o2::data::Stack* mStack = nullptr; // the stack which is filled (pointer since constructor to be called only init method)
690 int mChunkGranularity = 500; // how many primaries to send to a worker
691 int mPartCounter = 0;
692 bool mNeedNewEvent = true;
693 int mMaxEvents = 2;
694 ULong_t mInitialSeed = 0;
695 bool mUseFixedChunkSeed = false;
696 ULong_t mFixedChunkSeed = 0;
697 int mPipeToDriver = -1; // handle for direct piper to driver (to communicate meta info)
698 int mEventCounter = 0;
699
700 std::thread mGeneratorThread;
701 // or to generate events
702 std::thread mControlThread;
703
704 // Keeps various generators instantiated in memory
705 // useful when running simulation as a service (when generators
706 // change between batches). Also takes care of resource management of Primary generators via unique ptr
707 // TODO: some care needs to be taken (or the user warned) that the caching is based on generator name
708 // and that parameter-based reconfiguration is not yet implemented (for which we would need to hash all
709 // configuration parameters as well)
710 std::map<std::string, std::unique_ptr<o2::eventgen::PrimaryGenerator>> mPrimGeneratorCache;
711
712 std::atomic<O2PrimaryServerState> mState{O2PrimaryServerState::Initializing};
713 std::atomic<int> mWaitingControlInput{0};
714 std::atomic<bool> mInfoThreadStopped{false};
715
716 bool mAsService = false;
717
718 // a dedicate (on-the-fly channel) for control messages
719 fair::mq::Channel mControlChannel;
720
721 // some information specific to use case when we have a collision context
722 o2::steer::DigitizationContext* mCollissionContext = nullptr;
723 std::unordered_map<int, int> mEventID_to_CollID;
724 std::string mEmbeddIntoPrefix;
725
726 TRandom3 mSeedGenerator;
727};
728
729} // namespace devices
730} // namespace o2
731
732#endif
Definition of the Stack class.
uint64_t vertex
Definition RawEventData.h:9
int16_t time
Definition RawEventData.h:4
int32_t i
Definition of the MagF class.
Methods to create simulation mag field.
std::ostringstream debug
static FairField *const createMagField()
static BasicCCDBManager & instance()
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
SimConfigData const & getConfigData() const
Definition SimConfig.h:133
std::pair< std::string, std::string > getCollContextFilenameAndEventPrefix() const
static SimConfig & Instance()
Definition SimConfig.h:111
std::string getOutPrefix() const
Definition SimConfig.h:163
void setExternalMode(bool m)
Definition Stack.h:205
void Reset() override
Resets arrays and stack and deletes particles and tracks.
Definition Stack.cxx:588
const std::vector< TParticle > & getPrimaries() const
Definition Stack.h:187
bool ReInit(o2::conf::SimReconfigData const &reconfig)
void stateTransition(O2PrimaryServerState to, const char *message)
~O2PrimaryServerDevice() final
Default destructor.
bool HandleConfigRequest(fair::mq::Channel &channel)
bool HandleRequest(fair::mq::MessagePtr &request, int, fair::mq::Channel &channel)
static void setTotalNEvents(unsigned int &n)
Definition Generator.h:93
Bool_t GenerateEvent(FairGenericStack *pStack) override
void setVertexMode(o2::conf::VertexMode const &mode, o2::dataformats::MeanVertexObject const *obj=nullptr)
void setEmbedIndex(int idx)
sets the embedding index
void setExternalVertexForNextEvent(double x, double y, double z)
std::unordered_map< int, int > getCollisionIndicesForSource(int source) const
std::vector< math_utils::Point3D< float > > const & getInteractionVertices() const
int findSimPrefix(std::string const &prefix) const
std::vector< std::vector< o2::steer::EventPart > > & getEventParts(bool withQED=false)
std::vector< std::string > const & getSimPrefixes() const
static DigitizationContext * loadFromFile(std::string_view filename="")
static ULong_t setGRandomSeed(ULong_t seed=0)
Definition RngHelper.h:37
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
bool parseSimReconfigFromString(std::string const &argumentstring, SimReconfigData &config)
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
O2PrimaryServerState
enum to represent state of the O2Sim event/primary server
std::string mExtKinFileName
Definition SimConfig.h:58
std::string mGenerator
Definition SimConfig.h:55
TODO: Make this a base class of SimConfigData?
Definition SimConfig.h:203
static void setPrimaryGenerator(o2::conf::SimConfig const &, FairPrimaryGenerator *)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"