Project
Loading...
Searching...
No Matches
runDataProcessing.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <memory>
14#define BOOST_BIND_GLOBAL_PLACEHOLDERS
15#include <stdexcept>
39#include "DeviceStateHelpers.h"
42#include "Framework/DebugGUI.h"
45#include "Framework/Logger.h"
49#include "Framework/Signpost.h"
70#include "DriverServerContext.h"
72#include "HTTPParser.h"
73#include "DPLWebSocket.h"
74#include "ArrowSupport.h"
76
79#include "DDSConfigHelpers.h"
80#include "O2ControlHelpers.h"
81#include "DeviceSpecHelpers.h"
82#include "GraphvizHelpers.h"
83#include "MermaidHelpers.h"
84#include "PropertyTreeHelpers.h"
87
88#include <Configuration/ConfigurationInterface.h>
89#include <Configuration/ConfigurationFactory.h>
90#include <Monitoring/MonitoringFactory.h>
92
93#include <fairmq/Device.h>
94#include <fairmq/DeviceRunner.h>
95#include <fairmq/shmem/Monitor.h>
96#include <fairmq/ProgOptions.h>
97
98#include <boost/program_options.hpp>
99#include <boost/program_options/options_description.hpp>
100#include <boost/program_options/variables_map.hpp>
101#include <boost/exception/diagnostic_information.hpp>
102#include <boost/property_tree/json_parser.hpp>
103
104#include <uv.h>
105#include <TEnv.h>
106#include <TSystem.h>
107
108#include <cinttypes>
109#include <cstdint>
110#include <cstdio>
111#include <cstdlib>
112#include <cstring>
113#include <csignal>
114#include <iostream>
115#include <map>
116#include <regex>
117#include <set>
118#include <string>
119#include <type_traits>
120#include <tuple>
121#include <chrono>
122#include <utility>
123#include <numeric>
124#include <functional>
125
126#include <fcntl.h>
127#include <netinet/ip.h>
128#include <sys/resource.h>
129#include <sys/select.h>
130#include <sys/socket.h>
131#include <sys/stat.h>
132#include <sys/time.h>
133#include <sys/types.h>
134#include <sys/un.h>
135#include <sys/wait.h>
136#include <unistd.h>
137#include <execinfo.h>
138#include <cfenv>
139#if defined(__linux__) && __has_include(<sched.h>)
140#include <sched.h>
141#elif __has_include(<linux/getcpu.h>)
142#include <linux/getcpu.h>
143#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
144#include <cpuid.h>
145#define CPUID(INFO, LEAF, SUBLEAF) __cpuid_count(LEAF, SUBLEAF, INFO[0], INFO[1], INFO[2], INFO[3])
146#define GETCPU(CPU) \
147 { \
148 uint32_t CPUInfo[4]; \
149 CPUID(CPUInfo, 1, 0); \
150 /* CPUInfo[1] is EBX, bits 24-31 are APIC ID */ \
151 if ((CPUInfo[3] & (1 << 9)) == 0) { \
152 CPU = -1; /* no APIC on chip */ \
153 } else { \
154 CPU = (unsigned)CPUInfo[1] >> 24; \
155 } \
156 if (CPU < 0) \
157 CPU = 0; \
158 }
159#endif
160
161using namespace o2::monitoring;
162using namespace o2::configuration;
163
164using namespace o2::framework;
165namespace bpo = boost::program_options;
166using DataProcessorInfos = std::vector<DataProcessorInfo>;
167using DeviceExecutions = std::vector<DeviceExecution>;
168using DeviceSpecs = std::vector<DeviceSpec>;
169using DeviceInfos = std::vector<DeviceInfo>;
170using DataProcessingStatesInfos = std::vector<DataProcessingStates>;
171using DeviceControls = std::vector<DeviceControl>;
172using DataProcessorSpecs = std::vector<DataProcessorSpec>;
173
174std::vector<DeviceMetricsInfo> gDeviceMetricsInfos;
175
176// FIXME: probably find a better place
177// these are the device options added by the framework, but they can be
178// overloaded in the config spec
179bpo::options_description gHiddenDeviceOptions("Hidden child options");
180
183
184void doBoostException(boost::exception& e, const char*);
186void doUnknownException(std::string const& s, char const*);
187
188char* getIdString(int argc, char** argv)
189{
190 for (int argi = 0; argi < argc; argi++) {
191 if (strcmp(argv[argi], "--id") == 0 && argi + 1 < argc) {
192 return argv[argi + 1];
193 }
194 }
195 return nullptr;
196}
197
198int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**))
199{
200 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
201 int result = 1;
202 if (noCatch) {
203 try {
204 result = mainNoCatch(argc, argv);
206 doDPLException(ref, argv[0]);
207 throw;
208 }
209 } else {
210 try {
211 // The 0 here is an int, therefore having the template matching in the
212 // SFINAE expression above fit better the version which invokes user code over
213 // the default one.
214 // The default policy is a catch all pub/sub setup to be consistent with the past.
215 result = mainNoCatch(argc, argv);
216 } catch (boost::exception& e) {
217 doBoostException(e, argv[0]);
218 throw;
219 } catch (std::exception const& error) {
220 doUnknownException(error.what(), argv[0]);
221 throw;
223 doDPLException(ref, argv[0]);
224 throw;
225 } catch (...) {
226 doUnknownException("", argv[0]);
227 throw;
228 }
229 }
230 return result;
231}
232
233// Read from a given fd and print it.
234// return true if we can still read from it,
235// return false if we need to close the input pipe.
236//
237// FIXME: We should really print full lines.
238void getChildData(int infd, DeviceInfo& outinfo)
239{
240 char buffer[1024 * 16];
241 int bytes_read;
242 // NOTE: do not quite understand read ends up blocking if I read more than
243 // once. Oh well... Good enough for now.
244 int64_t total_bytes_read = 0;
245 int64_t count = 0;
246 bool once = false;
247 while (true) {
248 bytes_read = read(infd, buffer, 1024 * 16);
249 if (bytes_read == 0) {
250 return;
251 }
252 if (!once) {
253 once = true;
254 }
255 if (bytes_read < 0) {
256 return;
257 }
258 assert(bytes_read > 0);
259 outinfo.unprinted.append(buffer, bytes_read);
260 count++;
261 }
262}
263
267bool checkIfCanExit(std::vector<DeviceInfo> const& infos)
268{
269 if (infos.empty()) {
270 return false;
271 }
272 for (auto& info : infos) {
273 if (info.readyToQuit == false) {
274 return false;
275 }
276 }
277 return true;
278}
279
280// Kill all the active children. Exit code
281// is != 0 if any of the children had an error.
282void killChildren(std::vector<DeviceInfo>& infos, int sig)
283{
284 for (auto& info : infos) {
285 if (info.active == true) {
286 kill(info.pid, sig);
287 }
288 }
289}
290
292bool areAllChildrenGone(std::vector<DeviceInfo>& infos)
293{
294 for (auto& info : infos) {
295 if ((info.pid != 0) && info.active) {
296 return false;
297 }
298 }
299 return true;
300}
301
303namespace
304{
305int calculateExitCode(DriverInfo& driverInfo, DeviceSpecs& deviceSpecs, DeviceInfos& infos)
306{
307 std::regex regexp(R"(^\[([\d+:]*)\]\[\w+\] )");
308 if (!driverInfo.lastError.empty()) {
309 LOGP(error, "SEVERE: DPL driver encountered an error while running.\n{}",
310 driverInfo.lastError);
311 return 1;
312 }
313 for (size_t di = 0; di < deviceSpecs.size(); ++di) {
314 auto& info = infos[di];
315 auto& spec = deviceSpecs[di];
316 if (info.maxLogLevel >= driverInfo.minFailureLevel) {
317 LOGP(error, "SEVERE: Device {} ({}) had at least one message above severity {}: {}",
318 spec.name,
319 info.pid,
320 (int)info.minFailureLevel,
321 std::regex_replace(info.firstSevereError, regexp, ""));
322 return 1;
323 }
324 if (info.exitStatus != 0) {
325 LOGP(error, "SEVERE: Device {} ({}) returned with {}",
326 spec.name,
327 info.pid,
328 info.exitStatus);
329 return info.exitStatus;
330 }
331 }
332 return 0;
333}
334} // namespace
335
336void createPipes(int* pipes)
337{
338 auto p = pipe(pipes);
339
340 if (p == -1) {
341 std::cerr << "Unable to create PIPE: ";
342 switch (errno) {
343 case EFAULT:
344 assert(false && "EFAULT while reading from pipe");
345 break;
346 case EMFILE:
347 std::cerr << "Too many active descriptors";
348 break;
349 case ENFILE:
350 std::cerr << "System file table is full";
351 break;
352 default:
353 std::cerr << "Unknown PIPE" << std::endl;
354 };
355 // Kill immediately both the parent and all the children
356 kill(-1 * getpid(), SIGKILL);
357 }
358}
359
360// We don't do anything in the signal handler but
361// we simply note down the fact a signal arrived.
362// All the processing is done by the state machine.
363volatile sig_atomic_t graceful_exit = false;
364volatile sig_atomic_t forceful_exit = false;
365volatile sig_atomic_t sigchld_requested = false;
366volatile sig_atomic_t double_sigint = false;
367
368static void handle_sigint(int)
369{
370 if (graceful_exit == false) {
371 graceful_exit = true;
372 } else {
373 forceful_exit = true;
374 // We keep track about forceful exiting via
375 // a double SIGINT, so that we do not print
376 // any extra message. This means that if the
377 // forceful_exit is set by the timer, we will
378 // get an error message about each child which
379 // did not gracefully exited.
380 double_sigint = true;
381 }
382}
383
385void cleanupSHM(std::string const& uniqueWorkflowId)
386{
387 using namespace fair::mq::shmem;
388 fair::mq::shmem::Monitor::Cleanup(SessionId{"dpl_" + uniqueWorkflowId}, false);
389}
390
391static void handle_sigchld(int) { sigchld_requested = true; }
392
394 std::string const&,
395 DeviceSpec const& spec,
398 DeviceInfos& deviceInfos,
399 DataProcessingStatesInfos& allStates)
400{
401 LOG(info) << "Starting " << spec.id << " as remote device";
402 DeviceInfo info{
403 .pid = 0,
404 .historyPos = 0,
405 .historySize = 1000,
406 .maxLogLevel = LogParsingHelpers::LogLevel::Debug,
407 .active = true,
408 .readyToQuit = false,
409 .inputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_timeslice", 0, 0, {}},
410 .outputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_output", 0, 0, {}},
411 .lastSignal = uv_hrtime() - 10000000};
412
413 deviceInfos.emplace_back(info);
414 timespec now;
415 clock_gettime(CLOCK_REALTIME, &now);
416 uint64_t offset = now.tv_sec * 1000 - uv_now(loop);
417 allStates.emplace_back(TimingHelpers::defaultRealtimeBaseConfigurator(offset, loop),
419 // Let's add also metrics information for the given device
421}
422
428
429void log_callback(uv_poll_t* handle, int status, int events)
430{
431 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
432 auto* logContext = reinterpret_cast<DeviceLogContext*>(handle->data);
433 std::vector<DeviceInfo>* infos = logContext->serverContext->infos;
434 DeviceInfo& info = infos->at(logContext->index);
435
436 if (status < 0) {
437 info.active = false;
438 }
439 if (events & UV_READABLE) {
440 getChildData(logContext->fd, info);
441 }
442 if (events & UV_DISCONNECT) {
443 info.active = false;
444 }
445 O2_SIGNPOST_EVENT_EMIT(driver, sid, "loop", "log_callback invoked by poller for device %{xcode:pid}d which is %{public}s%{public}s",
446 info.pid, info.active ? "active" : "inactive",
447 info.active ? " and still has data to read." : ".");
448 if (info.active == false) {
449 uv_poll_stop(handle);
450 }
451 uv_async_send(logContext->serverContext->asyncLogProcessing);
452}
453
455{
456 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
457 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "close_websocket");
458 delete (WSDPLHandler*)handle->data;
459}
460
461void websocket_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
462{
463 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, stream->loop);
464 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
465 auto* handler = (WSDPLHandler*)stream->data;
466 if (nread == 0) {
467 return;
468 }
469 if (nread == UV_EOF) {
470 if (buf->base) {
471 free(buf->base);
472 }
473 uv_read_stop(stream);
475 return;
476 }
477 if (nread < 0) {
478 // FIXME: should I close?
479 LOG(error) << "websocket_callback: Error while reading from websocket";
480 if (buf->base) {
481 free(buf->base);
482 }
483 uv_read_stop(stream);
485 return;
486 }
487 try {
488 LOG(debug3) << "Parsing request with " << handler << " with " << nread << " bytes";
489 parse_http_request(buf->base, nread, handler);
490 if (buf->base) {
491 free(buf->base);
492 }
493 } catch (WSError& e) {
494 LOG(error) << "Error while parsing request: " << e.message;
495 handler->error(e.code, e.message.c_str());
496 free(buf->base);
497 }
498}
499
500static void my_alloc_cb(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
501{
502 buf->base = (char*)malloc(suggested_size);
503 buf->len = suggested_size;
504}
505
507void ws_connect_callback(uv_stream_t* server, int status)
508{
509 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, server->loop);
510 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
511 auto* serverContext = reinterpret_cast<DriverServerContext*>(server->data);
512 if (status < 0) {
513 LOGF(error, "New connection error %s\n", uv_strerror(status));
514 // error!
515 return;
516 }
517
518 auto* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
519 uv_tcp_init(serverContext->loop, client);
520 if (uv_accept(server, (uv_stream_t*)client) == 0) {
521 client->data = new WSDPLHandler((uv_stream_t*)client, serverContext);
522 uv_read_start((uv_stream_t*)client, (uv_alloc_cb)my_alloc_cb, websocket_callback);
523 } else {
524 uv_close((uv_handle_t*)client, nullptr);
525 }
526}
527
529 std::string configuration;
530 int fd;
531};
532
533void stream_config(uv_work_t* req)
534{
535 auto* context = (StreamConfigContext*)req->data;
536 size_t result = write(context->fd, context->configuration.data(), context->configuration.size());
537 if (result != context->configuration.size()) {
538 LOG(error) << "Unable to pass configuration to children";
539 }
540 {
541 auto error = fsync(context->fd);
542 switch (error) {
543 case EBADF:
544 LOGP(error, "EBADF while flushing child stdin");
545 break;
546 case EINVAL:
547 LOGP(error, "EINVAL while flushing child stdin");
548 break;
549 case EINTR:
550 LOGP(error, "EINTR while flushing child stdin");
551 break;
552 case EIO:
553 LOGP(error, "EIO while flushing child stdin");
554 break;
555 default:;
556 }
557 }
558 {
559 auto error = close(context->fd); // Not allowing further communication...
560 switch (error) {
561 case EBADF:
562 LOGP(error, "EBADF while closing child stdin");
563 break;
564 case EINTR:
565 LOGP(error, "EINTR while closing child stdin");
566 break;
567 case EIO:
568 LOGP(error, "EIO while closing child stdin");
569 break;
570 default:;
571 }
572 }
573}
574
575struct DeviceRef {
576 int index;
577};
578
582};
583
585{
586 struct sigaction sa_handle_int;
587 sa_handle_int.sa_handler = handle_sigint;
588 sigemptyset(&sa_handle_int.sa_mask);
589 sa_handle_int.sa_flags = SA_RESTART;
590 if (sigaction(SIGINT, &sa_handle_int, nullptr) == -1) {
591 perror("Unable to install signal handler");
592 exit(1);
593 }
594 struct sigaction sa_handle_term;
595 sa_handle_term.sa_handler = handle_sigint;
596 sigemptyset(&sa_handle_term.sa_mask);
597 sa_handle_term.sa_flags = SA_RESTART;
598 if (sigaction(SIGTERM, &sa_handle_int, nullptr) == -1) {
599 perror("Unable to install signal handler");
600 exit(1);
601 }
602}
603
605 std::string const& forwardedStdin,
606 std::vector<DeviceStdioContext>& childFds,
607 std::vector<uv_poll_t*>& handles)
608{
609 for (size_t i = 0; i < childFds.size(); ++i) {
610 auto& childstdin = childFds[i].childstdin;
611 auto& childstdout = childFds[i].childstdout;
612
613 auto* req = (uv_work_t*)malloc(sizeof(uv_work_t));
614 req->data = new StreamConfigContext{forwardedStdin, childstdin[1]};
615 uv_queue_work(serverContext->loop, req, stream_config, nullptr);
616
617 // Setting them to non-blocking to avoid haing the driver hang when
618 // reading from child.
619 int resultCode = fcntl(childstdout[0], F_SETFL, O_NONBLOCK);
620 if (resultCode == -1) {
621 LOGP(error, "Error while setting the socket to non-blocking: {}", strerror(errno));
622 }
623
625 auto addPoller = [&handles, &serverContext](int index, int fd) {
626 auto* context = new DeviceLogContext{};
627 context->index = index;
628 context->fd = fd;
629 context->serverContext = serverContext;
630 handles.push_back((uv_poll_t*)malloc(sizeof(uv_poll_t)));
631 auto handle = handles.back();
632 handle->data = context;
633 uv_poll_init(serverContext->loop, handle, fd);
634 uv_poll_start(handle, UV_READABLE, log_callback);
635 };
636
637 addPoller(i, childstdout[0]);
638 }
639}
640
641void handle_crash(int sig)
642{
643 // dump demangled stack trace
644 void* array[1024];
645 int size = backtrace(array, 1024);
646
647 {
648 char buffer[1024];
649 char const* msg = "*** Program crashed (%s)\nBacktrace by DPL:\n";
650 snprintf(buffer, 1024, msg, strsignal(sig));
651 if (sig == SIGFPE) {
652 if (std::fetestexcept(FE_DIVBYZERO)) {
653 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - DIVISION BY ZERO");
654 } else if (std::fetestexcept(FE_INVALID)) {
655 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - INVALID RESULT");
656 } else {
657 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - UNKNOWN REASON");
658 }
659 }
660 auto retVal = write(STDERR_FILENO, buffer, strlen(buffer));
661 (void)retVal;
662 }
664 {
665 char const* msg = "Backtrace complete.\n";
666 int len = strlen(msg); /* the byte length of the string */
667
668 auto retVal = write(STDERR_FILENO, msg, len);
669 (void)retVal;
670 fsync(STDERR_FILENO);
671 }
672 _exit(1);
673}
674
679 std::vector<DeviceSpec> const& specs,
680 DriverInfo& driverInfo,
681 std::vector<DeviceControl>&,
682 std::vector<DeviceExecution>& executions,
683 std::vector<DeviceInfo>& deviceInfos,
684 std::vector<DataProcessingStates>& allStates,
685 ServiceRegistryRef serviceRegistry,
686 boost::program_options::variables_map& varmap,
687 std::vector<DeviceStdioContext>& childFds,
688 unsigned parentCPU,
689 unsigned parentNode)
690{
691 // FIXME: this might not work when more than one DPL driver on the same
692 // machine. Hopefully we do not care.
693 // Not how the first port is actually used to broadcast clients.
694 auto& spec = specs[ref.index];
695 auto& execution = executions[ref.index];
696
697 for (auto& service : spec.services) {
698 if (service.preFork != nullptr) {
699 service.preFork(serviceRegistry, DeviceConfig{varmap});
700 }
701 }
702 // If we have a framework id, it means we have already been respawned
703 // and that we are in a child. If not, we need to fork and re-exec, adding
704 // the framework-id as one of the options.
705 pid_t id = 0;
706 id = fork();
707 // We are the child: prepare options and reexec.
708 if (id == 0) {
709 // We allow being debugged and do not terminate on SIGTRAP
710 signal(SIGTRAP, SIG_IGN);
711 // We immediately ignore SIGUSR1 and SIGUSR2 so that we do not
712 // get killed by the parent trying to force stepping children.
713 // We will re-enable them later on, when it is actually safe to
714 // do so.
715 signal(SIGUSR1, SIG_IGN);
716 signal(SIGUSR2, SIG_IGN);
717
718 // This is the child.
719 // For stdout / stderr, we close the read part of the pipe, the
720 // old descriptor, and then replace it with the write part of the pipe.
721 // For stdin, we close the write part of the pipe, the old descriptor,
722 // and then we replace it with the read part of the pipe.
723 // We also close all the filedescriptors for our sibilings.
724 struct rlimit rlim;
725 getrlimit(RLIMIT_NOFILE, &rlim);
726 // We close all FD, but the one which are actually
727 // used to communicate with the driver. This is a bad
728 // idea in the first place, because rlim_cur could be huge
729 // FIXME: I should understand which one is really to be closed and use
730 // CLOEXEC on it.
731 int rlim_cur = std::min((int)rlim.rlim_cur, 10000);
732 for (int i = 0; i < rlim_cur; ++i) {
733 if (childFds[ref.index].childstdin[0] == i) {
734 continue;
735 }
736 if (childFds[ref.index].childstdout[1] == i) {
737 continue;
738 }
739 close(i);
740 }
741 dup2(childFds[ref.index].childstdin[0], STDIN_FILENO);
742 dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO);
743 dup2(childFds[ref.index].childstdout[1], STDERR_FILENO);
744
745 for (auto& service : spec.services) {
746 if (service.postForkChild != nullptr) {
747 service.postForkChild(serviceRegistry);
748 }
749 }
750 for (auto& env : execution.environ) {
751 putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data()));
752 }
753 int err = execvp(execution.args[0], execution.args.data());
754 if (err) {
755 perror("Unable to start child process");
756 exit(1);
757 }
758 } else {
759 O2_SIGNPOST_ID_GENERATE(sid, driver);
760 O2_SIGNPOST_EVENT_EMIT(driver, sid, "spawnDevice", "New child at %{pid}d", id);
761 }
762 close(childFds[ref.index].childstdin[0]);
763 close(childFds[ref.index].childstdout[1]);
764 if (varmap.count("post-fork-command")) {
765 auto templateCmd = varmap["post-fork-command"];
766 auto cmd = fmt::format(fmt::runtime(templateCmd.as<std::string>()),
767 fmt::arg("pid", id),
768 fmt::arg("id", spec.id),
769 fmt::arg("cpu", parentCPU),
770 fmt::arg("node", parentNode),
771 fmt::arg("name", spec.name),
772 fmt::arg("timeslice0", spec.inputTimesliceId),
773 fmt::arg("timeslice1", spec.inputTimesliceId + 1),
774 fmt::arg("rank0", spec.rank),
775 fmt::arg("maxRank0", spec.nSlots));
776 int err = system(cmd.c_str());
777 if (err) {
778 LOG(error) << "Post fork command `" << cmd << "` returned with status " << err;
779 }
780 LOG(debug) << "Successfully executed `" << cmd;
781 }
782 // This is the parent. We close the write end of
783 // the child pipe and and keep track of the fd so
784 // that we can later select on it.
785 for (auto& service : spec.services) {
786 if (service.postForkParent != nullptr) {
787 service.postForkParent(serviceRegistry);
788 }
789 }
790
791 LOG(info) << "Starting " << spec.id << " on pid " << id;
792 deviceInfos.push_back({.pid = id,
793 .historyPos = 0,
794 .historySize = 1000,
795 .maxLogLevel = LogParsingHelpers::LogLevel::Debug,
796 .minFailureLevel = driverInfo.minFailureLevel,
797 .active = true,
798 .readyToQuit = false,
799 .inputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_timeslice", 0, 0, {}},
800 .outputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_output", 0, 0, {}},
801 .lastSignal = uv_hrtime() - 10000000});
802 // create the offset using uv_hrtime
803 timespec now;
804 clock_gettime(CLOCK_REALTIME, &now);
805 uint64_t offset = now.tv_sec * 1000 - uv_now(loop);
806 allStates.emplace_back(
809
810 allStates.back().registerState(DataProcessingStates::StateSpec{
811 .name = "data_queries",
812 .stateId = (short)ProcessingStateId::DATA_QUERIES,
813 .sendInitialValue = true,
814 });
815 allStates.back().registerState(DataProcessingStates::StateSpec{
816 .name = "output_matchers",
817 .stateId = (short)ProcessingStateId::OUTPUT_MATCHERS,
818 .sendInitialValue = true,
819 });
820
821 unsigned int pipelineLength = DefaultsHelpers::pipelineLength(DeviceConfig{varmap});
822 for (size_t i = 0; i < pipelineLength; ++i) {
823 allStates.back().registerState(DataProcessingStates::StateSpec{
824 .name = fmt::format("matcher_variables/{}", i),
825 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
826 .minPublishInterval = 200, // if we publish too often we flood the GUI and we are not able to read it in any case
827 .sendInitialValue = true,
828 });
829 }
830
831 for (size_t i = 0; i < pipelineLength; ++i) {
832 allStates.back().registerState(DataProcessingStates::StateSpec{
833 .name = fmt::format("data_relayer/{}", i),
834 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + i),
835 .minPublishInterval = 200, // if we publish too often we flood the GUI and we are not able to read it in any case
836 .sendInitialValue = true,
837 });
838 }
839
840 // Let's add also metrics information for the given device
842}
843
845 DriverInfo& driverInfo,
846 DeviceInfos& infos,
847 DeviceSpecs const& specs,
848 DeviceControls& controls)
849{
850 // Display part. All you need to display should actually be in
851 // `infos`.
852 // TODO: split at \n
853 // TODO: update this only once per 1/60 of a second or
854 // things like this.
855 // TODO: have multiple display modes
856 // TODO: graphical view of the processing?
857 assert(infos.size() == controls.size());
858 ParsedMetricMatch metricMatch;
859
860 int processed = 0;
861 for (size_t di = 0, de = infos.size(); di < de; ++di) {
862 DeviceInfo& info = infos[di];
863 DeviceControl& control = controls[di];
864 assert(specs.size() == infos.size());
865 DeviceSpec const& spec = specs[di];
866
867 if (info.unprinted.empty()) {
868 continue;
869 }
870 processed++;
871
872 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, &info);
873 O2_SIGNPOST_START(driver, sid, "bytes_processed", "bytes processed by %{xcode:pid}d", info.pid);
874
875 std::string_view s = info.unprinted;
876 size_t pos = 0;
877 info.history.resize(info.historySize);
878 info.historyLevel.resize(info.historySize);
879
880 while ((pos = s.find("\n")) != std::string::npos) {
881 std::string_view token{s.substr(0, pos)};
882 auto logLevel = LogParsingHelpers::parseTokenLevel(token);
883
884 // Check if the token is a metric from SimpleMetricsService
885 // if yes, we do not print it out and simply store it to be displayed
886 // in the GUI.
887 // Then we check if it is part of our Poor man control system
888 // if yes, we execute the associated command.
889 if (!control.quiet && (token.find(control.logFilter) != std::string::npos) && logLevel >= info.logLevel) {
890 assert(info.historyPos >= 0);
891 assert(info.historyPos < info.history.size());
892 info.history[info.historyPos] = token;
893 info.historyLevel[info.historyPos] = logLevel;
894 info.historyPos = (info.historyPos + 1) % info.history.size();
895 info.logSeq++;
896 fmt::print("[{}:{}]: {}\n", info.pid, spec.id, token);
897 }
898 // We keep track of the maximum log error a
899 // device has seen.
900 bool maxLogLevelIncreased = false;
901 if (logLevel > info.maxLogLevel && logLevel > LogParsingHelpers::LogLevel::Info &&
902 logLevel != LogParsingHelpers::LogLevel::Unknown) {
903 info.maxLogLevel = logLevel;
904 maxLogLevelIncreased = true;
905 }
906 if (logLevel >= driverInfo.minFailureLevel) {
907 info.lastError = token;
908 if (info.firstSevereError.empty() || maxLogLevelIncreased) {
909 info.firstSevereError = token;
910 }
911 }
912 // +1 is to skip the \n
913 s.remove_prefix(pos + 1);
914 }
915 size_t oldSize = info.unprinted.size();
916 info.unprinted = std::string(s);
917 int64_t bytesProcessed = oldSize - info.unprinted.size();
918 O2_SIGNPOST_END(driver, sid, "bytes_processed", "bytes processed by %{xcode:network-size-in-bytes}" PRIi64, bytesProcessed);
919 }
920 if (processed == 0) {
921 O2_SIGNPOST_ID_FROM_POINTER(lid, driver, loop);
922 O2_SIGNPOST_EVENT_EMIT(driver, lid, "mainloop", "processChildrenOutput invoked for nothing!");
923 }
924}
925
926// Process all the sigchld which are pending
927// @return wether or not a given child exited with an error condition.
929{
930 bool hasError = false;
931 while (true) {
932 int status;
933 pid_t pid = waitpid((pid_t)(-1), &status, WNOHANG);
934 if (pid > 0) {
935 // Normal exit
936 int es = WEXITSTATUS(status);
937 if (WIFEXITED(status) == false || es != 0) {
938 // Look for the name associated to the pid in the infos
939 std::string id = "unknown";
940 assert(specs.size() == infos.size());
941 for (size_t ii = 0; ii < infos.size(); ++ii) {
942 if (infos[ii].pid == pid) {
943 id = specs[ii].id;
944 }
945 }
946 // No need to print anything if the user
947 // force quitted doing a double Ctrl-C.
948 if (double_sigint) {
949 } else if (forceful_exit) {
950 LOGP(error, "pid {} ({}) was forcefully terminated after being requested to quit", pid, id);
951 } else {
952 if (WIFSIGNALED(status)) {
953 int exitSignal = WTERMSIG(status);
954 es = exitSignal + 128;
955 LOGP(error, "Workflow crashed - PID {} ({}) was killed abnormally with {} and exited code was set to {}.", pid, id, strsignal(exitSignal), es);
956 } else {
957 es = 128;
958 LOGP(error, "Workflow crashed - PID {} ({}) did not exit correctly however it's not clear why. Exit code forced to {}.", pid, id, es);
959 }
960 }
961 hasError |= true;
962 }
963 for (auto& info : infos) {
964 if (info.pid == pid) {
965 info.active = false;
966 info.exitStatus = es;
967 }
968 }
969 continue;
970 } else {
971 break;
972 }
973 }
974 return hasError;
975}
976
977void doDPLException(RuntimeErrorRef& e, char const* processName)
978{
979 auto& err = o2::framework::error_from_ref(e);
980 if (err.maxBacktrace != 0) {
981 LOGP(fatal,
982 "Unhandled o2::framework::runtime_error reached the top of main of {}, device shutting down."
983 " Reason: {}",
984 processName, err.what);
985 LOGP(error, "Backtrace follow:");
986 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
987 } else {
988 LOGP(fatal,
989 "Unhandled o2::framework::runtime_error reached the top of main of {}, device shutting down."
990 " Reason: {}",
991 processName, err.what);
992 LOGP(error, "Recompile with DPL_ENABLE_BACKTRACE=1 to get more information.");
993 }
994}
995
996void doUnknownException(std::string const& s, char const* processName)
997{
998 if (s.empty()) {
999 LOGP(fatal, "unknown error while setting up workflow in {}.", processName);
1000 } else {
1001 LOGP(fatal, "error while setting up workflow in {}: {}", processName, s);
1002 }
1003}
1004
1005[[maybe_unused]] AlgorithmSpec dryRun(DeviceSpec const& spec)
1006{
1008 [&routes = spec.outputs](DataAllocator& outputs) {
1009 LOG(info) << "Dry run enforced. Creating dummy messages to simulate computation happended";
1010 for (auto& route : routes) {
1011 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
1012 outputs.make<int>(Output{concrete.origin, concrete.description, concrete.subSpec}, 2);
1013 }
1014 })};
1015}
1016
1018{
1019 // LOG(info) << "Process " << getpid() << " is exiting.";
1020}
1021
1022int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
1023 DanglingEdgesContext& danglingEdgesContext,
1024 RunningWorkflowInfo const& runningWorkflow,
1026 DriverConfig const& driverConfig,
1027 ProcessingPolicies processingPolicies,
1028 std::string const& defaultDriverClient,
1029 uv_loop_t* loop)
1030{
1031 fair::Logger::SetConsoleColor(false);
1032 fair::Logger::OnFatal([]() { throw runtime_error("Fatal error"); });
1033 DeviceSpec const& spec = runningWorkflow.devices[ref.index];
1034 LOG(info) << "Spawning new device " << spec.id << " in process with pid " << getpid();
1035
1036 fair::mq::DeviceRunner runner{argc, argv};
1037
1038 // Populate options from the command line. Notice that only the options
1039 // declared in the workflow definition are allowed.
1040 runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
1041 std::string defaultExitTransitionTimeout = "0";
1042 std::string defaultDataProcessingTimeout = "0";
1043 std::string defaultInfologgerMode = "";
1045 if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
1046 defaultExitTransitionTimeout = "40";
1047 defaultDataProcessingTimeout = "20";
1048 defaultInfologgerMode = "infoLoggerD";
1049 } else if (deploymentMode == o2::framework::DeploymentMode::OnlineECS) {
1050 defaultExitTransitionTimeout = "40";
1051 defaultDataProcessingTimeout = "20";
1052 }
1053 boost::program_options::options_description optsDesc;
1055 char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
1056 optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
1057 ("dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value("0"), "minimum flushing interval for online metrics (in s)") //
1058 ("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
1059 ("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
1060 ("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
1061 ("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
1062 ("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
1063 ("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
1064 ("error-on-exit-transition-timeout", bpo::value<bool>()->zero_tokens()->default_value(false), "print error instead of warning when exit transition timer expires") //
1065 ("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
1066 ("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in flight at the same moment (0 disables)") //
1067 ("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
1068 ("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override") //
1069 ("log-timestamp-us", bpo::value<bool>()->zero_tokens()->default_value(false), "enable microsecond timestamps in log messages");
1070 r.fConfig.AddToCmdLineOptions(optsDesc, true);
1071 });
1072
1073 // This is to control lifetime. All these services get destroyed
1074 // when the runner is done.
1075 std::unique_ptr<SimpleRawDeviceService> simpleRawDeviceService;
1076 std::unique_ptr<DeviceState> deviceState;
1077 std::unique_ptr<ComputingQuotaEvaluator> quotaEvaluator;
1078 std::unique_ptr<FairMQDeviceProxy> deviceProxy;
1079 std::unique_ptr<DeviceContext> deviceContext;
1080
1081 auto afterConfigParsingCallback = [&simpleRawDeviceService,
1082 &runningWorkflow,
1083 ref,
1084 &spec,
1085 &quotaEvaluator,
1086 &serviceRegistry,
1087 &danglingEdgesContext,
1088 &deviceState,
1089 &deviceProxy,
1090 &processingPolicies,
1091 &deviceContext,
1092 &driverConfig,
1093 &loop](fair::mq::DeviceRunner& r) {
1094 ServiceRegistryRef serviceRef = {serviceRegistry};
1095 simpleRawDeviceService = std::make_unique<SimpleRawDeviceService>(nullptr, spec);
1096 serviceRef.registerService(ServiceRegistryHelpers::handleForService<RawDeviceService>(simpleRawDeviceService.get()));
1097
1098 deviceState = std::make_unique<DeviceState>();
1099 deviceState->loop = loop;
1100 deviceState->tracingFlags = DeviceStateHelpers::parseTracingFlags(r.fConfig.GetPropertyAsString("dpl-tracing-flags"));
1101 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(deviceState.get()));
1102
1103 quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
1104 serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
1105
1106 deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
1107 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
1108 serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
1109 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
1110 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
1111 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(&danglingEdgesContext));
1112
1113 auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);
1114
1115 serviceRef.get<RawDeviceService>().setDevice(device.get());
1116 r.fDevice = std::move(device);
1117 fair::Logger::SetConsoleColor(false);
1118 if (r.fConfig.GetProperty<bool>("log-timestamp-us")) {
1119 fair::Logger::DefineVerbosity(fair::Verbosity::user1,
1120 fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us,
1121 fair::VerbositySpec::Info::severity));
1122 fair::Logger::SetVerbosity(fair::Verbosity::user1);
1123 }
1124
1126 for (auto& service : spec.services) {
1127 LOG(debug) << "Declaring service " << service.name;
1128 serviceRegistry.declareService(service, *deviceState.get(), r.fConfig);
1129 }
1130 if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(spec.resourceMonitoringInterval)) {
1131 serviceRef.get<Monitoring>().enableProcessMonitoring(spec.resourceMonitoringInterval, {PmMeasurement::Cpu, PmMeasurement::Mem, PmMeasurement::Smaps});
1132 }
1133 };
1134
1135 runner.AddHook<fair::mq::hooks::InstantiateDevice>(afterConfigParsingCallback);
1136
1137 auto result = runner.Run();
1138 ServiceRegistryRef serviceRef = {serviceRegistry};
1139 auto& context = serviceRef.get<DataProcessorContext>();
1140 DataProcessorContext::preExitCallbacks(context.preExitHandles, serviceRef);
1141 return result;
1142}
1143
1145 std::string executable;
1146 std::vector<std::string> args;
1147 std::vector<ConfigParamSpec> options;
1148};
1149
1150void gui_callback(uv_timer_s* ctx)
1151{
1152 auto* gui = reinterpret_cast<GuiCallbackContext*>(ctx->data);
1153 if (gui->plugin == nullptr) {
1154 // The gui is not there. Why are we here?
1155 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
1156 O2_SIGNPOST_EVENT_EMIT_ERROR(driver, sid, "gui", "GUI timer callback invoked without a GUI plugin.");
1157 uv_timer_stop(ctx);
1158 return;
1159 }
1160 *gui->guiTimerExpired = true;
1161 static int counter = 0;
1162 if ((counter++ % 6000) == 0) {
1163 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
1164 O2_SIGNPOST_EVENT_EMIT(driver, sid, "gui", "The GUI callback got called %d times.", counter);
1165 *gui->guiTimerExpired = false;
1166 }
1167 // One interval per GUI invocation, using the loop as anchor.
1168 O2_SIGNPOST_ID_FROM_POINTER(sid, gui, ctx->loop);
1169 O2_SIGNPOST_START(gui, sid, "gui", "gui_callback");
1170
1171 // New version which allows deferred closure of windows
1172 if (gui->plugin->supportsDeferredClose()) {
1173 // For now, there is nothing for which we want to defer the close
1174 // so if the flag is set, we simply exit
1175 if (*(gui->guiQuitRequested)) {
1176 O2_SIGNPOST_END(gui, sid, "gui", "Quit requested by the GUI.");
1177 return;
1178 }
1179 void* draw_data = nullptr;
1180 uint64_t frameStart = uv_hrtime();
1181 uint64_t frameLatency = frameStart - gui->frameLast;
1182
1183 // if less than 15ms have passed reuse old frame
1184 if (frameLatency / 1000000 <= 15) {
1185 draw_data = gui->lastFrame;
1186 O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
1187 return;
1188 }
1189 // The result of the pollGUIPreRender is used to determine if we
1190 // should quit the GUI, however, the rendering is started in any
1191 // case, so we should complete it.
1192 if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) {
1193 *(gui->guiQuitRequested) = true;
1194 }
1195 draw_data = gui->plugin->pollGUIRender(gui->callback);
1196 gui->plugin->pollGUIPostRender(gui->window, draw_data);
1197
1198 uint64_t frameEnd = uv_hrtime();
1199 *(gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
1200 *(gui->frameLatency) = frameLatency / 1000000.f;
1201 gui->frameLast = frameStart;
1202 } else {
1203 void* draw_data = nullptr;
1204
1205 uint64_t frameStart = uv_hrtime();
1206 uint64_t frameLatency = frameStart - gui->frameLast;
1207
1208 // if less than 15ms have passed reuse old frame
1209 if (frameLatency / 1000000 > 15) {
1210 if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) {
1211 *(gui->guiQuitRequested) = true;
1212 O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
1213 return;
1214 }
1215 draw_data = gui->plugin->pollGUIRender(gui->callback);
1216 gui->plugin->pollGUIPostRender(gui->window, draw_data);
1217 } else {
1218 draw_data = gui->lastFrame;
1219 }
1220
1221 if (frameLatency / 1000000 > 15) {
1222 uint64_t frameEnd = uv_hrtime();
1223 *(gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
1224 *(gui->frameLatency) = frameLatency / 1000000.f;
1225 gui->frameLast = frameStart;
1226 }
1227 }
1228 O2_SIGNPOST_END(gui, sid, "gui", "Gui redrawn.");
1229}
1230
1232void single_step_callback(uv_timer_s* ctx)
1233{
1234 auto* infos = reinterpret_cast<DeviceInfos*>(ctx->data);
1235 killChildren(*infos, SIGUSR1);
1236}
1237
1238void force_exit_callback(uv_timer_s* ctx)
1239{
1240 auto* infos = reinterpret_cast<DeviceInfos*>(ctx->data);
1241 killChildren(*infos, SIGKILL);
1242}
1243
1244std::vector<std::regex> getDumpableMetrics()
1245{
1246 auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames();
1247 auto dumpableMetrics = std::vector<std::regex>{};
1248 for (const auto& metric : performanceMetrics) {
1249 dumpableMetrics.emplace_back(metric);
1250 }
1251 dumpableMetrics.emplace_back("^arrow-bytes-delta$");
1252 dumpableMetrics.emplace_back("^aod-bytes-read-uncompressed$");
1253 dumpableMetrics.emplace_back("^aod-bytes-read-compressed$");
1254 dumpableMetrics.emplace_back("^aod-file-read-info$");
1255 dumpableMetrics.emplace_back("^aod-largest-object-written$");
1256 dumpableMetrics.emplace_back("^table-bytes-.*");
1257 dumpableMetrics.emplace_back("^total-timeframes.*");
1258 dumpableMetrics.emplace_back("^device_state.*");
1259 dumpableMetrics.emplace_back("^total_wall_time_ms$");
1260 dumpableMetrics.emplace_back("^ccdb-.*$");
1261 return dumpableMetrics;
1262}
1263
1265{
1266 auto* context = (DriverServerContext*)handle->data;
1267
1268 static auto performanceMetrics = getDumpableMetrics();
1269 std::ofstream file(context->driver->resourcesMonitoringFilename, std::ios::out);
1271 context->driver->metrics, *(context->specs), performanceMetrics,
1272 file);
1273}
1274
1275void dumpRunSummary(DriverServerContext& context, DriverInfo const& driverInfo, DeviceInfos const& infos, DeviceSpecs const& specs)
1276{
1277 if (infos.empty()) {
1278 return;
1279 }
1280 LOGP(info, "## Processes completed. Run summary:");
1281 LOGP(info, "### Devices started: {}", infos.size());
1282 for (size_t di = 0; di < infos.size(); ++di) {
1283 auto& info = infos[di];
1284 auto& spec = specs[di];
1285 if (info.exitStatus) {
1286 LOGP(error, " - Device {}: pid {} (exit {})", spec.name, info.pid, info.exitStatus);
1287 } else {
1288 LOGP(info, " - Device {}: pid {} (exit {})", spec.name, info.pid, info.exitStatus);
1289 }
1290 if (info.exitStatus != 0 && info.firstSevereError.empty() == false) {
1291 LOGP(info, " - First error: {}", info.firstSevereError);
1292 }
1293 if (info.exitStatus != 0 && info.lastError != info.firstSevereError) {
1294 LOGP(info, " - Last error: {}", info.lastError);
1295 }
1296 }
1297 for (auto& summary : *context.summaryCallbacks) {
1298 summary(ServiceMetricsInfo{*context.metrics, *context.specs, *context.infos, context.driver->metrics, driverInfo});
1299 }
1300}
1301
1302auto bindGUIPort = [](DriverInfo& driverInfo, DriverServerContext& serverContext, std::string frameworkId) {
1303 uv_tcp_init(serverContext.loop, &serverContext.serverHandle);
1304
1305 driverInfo.port = 8080 + (getpid() % 30000);
1306
1307 if (getenv("DPL_REMOTE_GUI_PORT")) {
1308 try {
1309 driverInfo.port = stoi(std::string(getenv("DPL_REMOTE_GUI_PORT")));
1310 } catch (std::invalid_argument) {
1311 LOG(error) << "DPL_REMOTE_GUI_PORT not a valid integer";
1312 } catch (std::out_of_range) {
1313 LOG(error) << "DPL_REMOTE_GUI_PORT out of range (integer)";
1314 }
1315 if (driverInfo.port < 1024 || driverInfo.port > 65535) {
1316 LOG(error) << "DPL_REMOTE_GUI_PORT out of range (1024-65535)";
1317 }
1318 }
1319
1320 int result = 0;
1321 struct sockaddr_in* serverAddr = nullptr;
1322
1323 // Do not offer websocket endpoint for devices
1324 // FIXME: this was blocking david's workflows. For now
1325 // there is no point in any case to have devices
1326 // offering a web based API, but it might make sense in
1327 // the future to inspect them via some web based interface.
1328 if (serverContext.isDriver) {
1329 do {
1330 free(serverAddr);
1331 if (driverInfo.port > 64000) {
1332 throw runtime_error_f("Unable to find a free port for the driver. Last attempt returned %d", result);
1333 }
1334 serverAddr = (sockaddr_in*)malloc(sizeof(sockaddr_in));
1335 uv_ip4_addr("0.0.0.0", driverInfo.port, serverAddr);
1336 auto bindResult = uv_tcp_bind(&serverContext.serverHandle, (const struct sockaddr*)serverAddr, 0);
1337 if (bindResult != 0) {
1338 driverInfo.port++;
1339 usleep(1000);
1340 continue;
1341 }
1342 result = uv_listen((uv_stream_t*)&serverContext.serverHandle, 100, ws_connect_callback);
1343 if (result != 0) {
1344 driverInfo.port++;
1345 usleep(1000);
1346 continue;
1347 }
1348 } while (result != 0);
1349 } else if (getenv("DPL_DEVICE_REMOTE_GUI") && !serverContext.isDriver) {
1350 do {
1351 free(serverAddr);
1352 if (driverInfo.port > 64000) {
1353 throw runtime_error_f("Unable to find a free port for the driver. Last attempt returned %d", result);
1354 }
1355 serverAddr = (sockaddr_in*)malloc(sizeof(sockaddr_in));
1356 uv_ip4_addr("0.0.0.0", driverInfo.port, serverAddr);
1357 auto bindResult = uv_tcp_bind(&serverContext.serverHandle, (const struct sockaddr*)serverAddr, 0);
1358 if (bindResult != 0) {
1359 driverInfo.port++;
1360 usleep(1000);
1361 continue;
1362 }
1363 result = uv_listen((uv_stream_t*)&serverContext.serverHandle, 100, ws_connect_callback);
1364 if (result != 0) {
1365 driverInfo.port++;
1366 usleep(1000);
1367 continue;
1368 }
1369 LOG(info) << "Device GUI port: " << driverInfo.port << " " << frameworkId;
1370 } while (result != 0);
1371 }
1372};
1373
1374// This is the handler for the parent inner loop.
1376 WorkflowInfo const& workflowInfo,
1377 DataProcessorInfos const& previousDataProcessorInfos,
1378 CommandInfo const& commandInfo,
1379 DriverControl& driverControl,
1380 DriverInfo& driverInfo,
1381 DriverConfig& driverConfig,
1382 std::vector<DeviceMetricsInfo>& metricsInfos,
1383 std::vector<ConfigParamSpec> const& detectedParams,
1384 boost::program_options::variables_map& varmap,
1385 std::vector<ServiceSpec>& driverServices,
1386 std::string frameworkId)
1387{
1388 RunningWorkflowInfo runningWorkflow{
1389 .uniqueWorkflowId = driverInfo.uniqueWorkflowId,
1390 .shmSegmentId = (int16_t)atoi(varmap["shm-segment-id"].as<std::string>().c_str())};
1391 DeviceInfos infos;
1392 DeviceControls controls;
1393 DataProcessingStatesInfos allStates;
1394 auto* devicesManager = new DevicesManager{.controls = controls, .infos = infos, .specs = runningWorkflow.devices, .messages = {}};
1395 DeviceExecutions deviceExecutions;
1396 DataProcessorInfos dataProcessorInfos = previousDataProcessorInfos;
1397
1398 std::vector<uv_poll_t*> pollHandles;
1399 std::vector<DeviceStdioContext> childFds;
1400
1401 std::vector<ComputingResource> resources;
1402
1403 if (driverInfo.resources != "") {
1404 resources = ComputingResourceHelpers::parseResources(driverInfo.resources);
1405 } else {
1407 }
1408
1409 auto resourceManager = std::make_unique<SimpleResourceManager>(resources);
1410
1411 DebugGUI* debugGUI = nullptr;
1412 void* window = nullptr;
1413 decltype(debugGUI->getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl)) debugGUICallback;
1414
1415 // An empty frameworkId means this is the driver, so we initialise the GUI
1416 auto initDebugGUI = []() -> DebugGUI* {
1417 uv_lib_t supportLib;
1418 int result = 0;
1419#ifdef __APPLE__
1420 result = uv_dlopen("libO2FrameworkGUISupport.dylib", &supportLib);
1421#else
1422 result = uv_dlopen("libO2FrameworkGUISupport.so", &supportLib);
1423#endif
1424 if (result == -1) {
1425 LOG(error) << uv_dlerror(&supportLib);
1426 return nullptr;
1427 }
1428 DPLPluginHandle* (*dpl_plugin_callback)(DPLPluginHandle*);
1429
1430 result = uv_dlsym(&supportLib, "dpl_plugin_callback", (void**)&dpl_plugin_callback);
1431 if (result == -1) {
1432 LOG(error) << uv_dlerror(&supportLib);
1433 return nullptr;
1434 }
1435 DPLPluginHandle* pluginInstance = dpl_plugin_callback(nullptr);
1436 return PluginManager::getByName<DebugGUI>(pluginInstance, "ImGUIDebugGUI");
1437 };
1438
1439 // We initialise this in the driver, because different drivers might have
1440 // different versions of the service
1441 ServiceRegistry serviceRegistry;
1443
1444 if ((driverConfig.batch == false || getenv("DPL_DRIVER_REMOTE_GUI") != nullptr) && frameworkId.empty()) {
1445 debugGUI = initDebugGUI();
1446 if (debugGUI) {
1447 if (driverConfig.batch == false) {
1448 window = debugGUI->initGUI("O2 Framework debug GUI", serviceRegistry);
1449 } else {
1450 window = debugGUI->initGUI(nullptr, serviceRegistry);
1451 }
1452 }
1453 } else if (getenv("DPL_DEVICE_REMOTE_GUI") && !frameworkId.empty()) {
1454 debugGUI = initDebugGUI();
1455 // We never run the GUI on desktop for devices. All
1456 // you can do is to connect to the remote version.
1457 // this is done to avoid having a proliferation of
1458 // GUIs popping up when the variable is set globally.
1459 // FIXME: maybe this is not what we want, but it should
1460 // be ok for now.
1461 if (debugGUI) {
1462 window = debugGUI->initGUI(nullptr, serviceRegistry);
1463 }
1464 }
1465 if (driverConfig.batch == false && window == nullptr && frameworkId.empty()) {
1466 LOG(warn) << "Could not create GUI. Switching to batch mode. Do you have GLFW on your system?";
1467 driverConfig.batch = true;
1468 if (varmap["error-policy"].defaulted()) {
1469 driverInfo.processingPolicies.error = TerminationPolicy::QUIT;
1470 }
1471 }
1472 bool guiQuitRequested = false;
1473 bool hasError = false;
1474
1475 // FIXME: I should really have some way of exiting the
1476 // parent..
1477 DriverState current;
1478 DriverState previous;
1479
1480 uv_loop_t* loop = uv_loop_new();
1481
1482 uv_timer_t* gui_timer = nullptr;
1483
1484 if (!driverConfig.batch) {
1485 gui_timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
1486 uv_timer_init(loop, gui_timer);
1487 }
1488
1489 std::vector<ServiceMetricHandling> metricProcessingCallbacks;
1490 std::vector<ServiceSummaryHandling> summaryCallbacks;
1491 std::vector<ServicePreSchedule> preScheduleCallbacks;
1492 std::vector<ServicePostSchedule> postScheduleCallbacks;
1493 std::vector<ServiceDriverInit> driverInitCallbacks;
1494 for (auto& service : driverServices) {
1495 if (service.driverStartup == nullptr) {
1496 continue;
1497 }
1498 service.driverStartup(serviceRegistry, DeviceConfig{varmap});
1499 }
1500
1501 ServiceRegistryRef ref{serviceRegistry};
1502 ref.registerService(ServiceRegistryHelpers::handleForService<DevicesManager>(devicesManager));
1503
1504 bool guiTimerExpired = false;
1505 GuiCallbackContext guiContext;
1506 guiContext.plugin = debugGUI;
1507 guiContext.frameLast = uv_hrtime();
1508 guiContext.frameLatency = &driverInfo.frameLatency;
1509 guiContext.frameCost = &driverInfo.frameCost;
1510 guiContext.guiQuitRequested = &guiQuitRequested;
1511 guiContext.guiTimerExpired = &guiTimerExpired;
1512
1513 // This is to make sure we can process metrics, commands, configuration
1514 // changes coming from websocket (or even via any standard uv_stream_t, I guess).
1515 DriverServerContext serverContext{
1516 .registry = {serviceRegistry},
1517 .loop = loop,
1518 .controls = &controls,
1519 .infos = &infos,
1520 .states = &allStates,
1521 .specs = &runningWorkflow.devices,
1522 .metrics = &metricsInfos,
1523 .metricProcessingCallbacks = &metricProcessingCallbacks,
1524 .summaryCallbacks = &summaryCallbacks,
1525 .driver = &driverInfo,
1526 .gui = &guiContext,
1527 .isDriver = frameworkId.empty(),
1528 };
1529
1530 serverContext.serverHandle.data = &serverContext;
1531
1532 uv_timer_t force_step_timer;
1533 uv_timer_init(loop, &force_step_timer);
1534 uv_timer_t force_exit_timer;
1535 uv_timer_init(loop, &force_exit_timer);
1536
1537 bool guiDeployedOnce = false;
1538 bool once = false;
1539
1540 uv_timer_t metricDumpTimer;
1541 metricDumpTimer.data = &serverContext;
1542 bool allChildrenGone = false;
1543 guiContext.allChildrenGone = &allChildrenGone;
1544 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, loop);
1545 O2_SIGNPOST_START(driver, sid, "driver", "Starting driver loop");
1546
1547 // Async callback to process the output of the children, if needed.
1548 serverContext.asyncLogProcessing = (uv_async_t*)malloc(sizeof(uv_async_t));
1549 serverContext.asyncLogProcessing->data = &serverContext;
1550 uv_async_init(loop, serverContext.asyncLogProcessing, [](uv_async_t* handle) {
1551 auto* context = (DriverServerContext*)handle->data;
1552 processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls);
1553 for (auto* statusHandler : context->statusHandlers) {
1554 for (size_t di = 0; di < context->infos->size(); ++di) {
1555 statusHandler->sendNewLogs(di);
1556 }
1557 }
1558 });
1559
1560 while (true) {
1561 // If control forced some transition on us, we push it to the queue.
1562 if (driverControl.forcedTransitions.empty() == false) {
1563 for (auto transition : driverControl.forcedTransitions) {
1564 driverInfo.states.push_back(transition);
1565 }
1566 driverControl.forcedTransitions.resize(0);
1567 }
1568 // In case a timeout was requested, we check if we are running
1569 // for more than the timeout duration and exit in case that's the case.
1570 {
1571 auto currentTime = uv_hrtime();
1572 uint64_t diff = (currentTime - driverInfo.startTime) / 1000000000LL;
1573 if ((graceful_exit == false) && (driverInfo.timeout > 0) && (diff > driverInfo.timeout)) {
1574 LOG(info) << "Timout ellapsed. Requesting to quit.";
1575 graceful_exit = true;
1576 }
1577 }
1578 // Move to exit loop if sigint was sent we execute this only once.
1579 if (graceful_exit == true && driverInfo.sigintRequested == false) {
1580 driverInfo.sigintRequested = true;
1581 driverInfo.states.resize(0);
1582 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1583 }
1584 // If one of the children dies and sigint was not requested
1585 // we should decide what to do.
1586 if (sigchld_requested == true && driverInfo.sigchldRequested == false) {
1587 driverInfo.sigchldRequested = true;
1588 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
1589 }
1590 if (driverInfo.states.empty() == false) {
1591 previous = current;
1592 current = driverInfo.states.back();
1593 } else {
1594 current = DriverState::UNKNOWN;
1595 }
1596 driverInfo.states.pop_back();
1597 switch (current) {
1598 case DriverState::BIND_GUI_PORT:
1599 bindGUIPort(driverInfo, serverContext, frameworkId);
1600 break;
1601 case DriverState::INIT:
1602 LOGP(info, "Initialising O2 Data Processing Layer. Driver PID: {}.", getpid());
1603 LOGP(info, "Driver listening on port: {}", driverInfo.port);
1604
1605 // Install signal handler for quitting children.
1606 driverInfo.sa_handle_child.sa_handler = &handle_sigchld;
1607 sigemptyset(&driverInfo.sa_handle_child.sa_mask);
1608 driverInfo.sa_handle_child.sa_flags = SA_RESTART | SA_NOCLDSTOP;
1609 if (sigaction(SIGCHLD, &driverInfo.sa_handle_child, nullptr) == -1) {
1610 perror(nullptr);
1611 exit(1);
1612 }
1613
1616 if (driverInfo.noSHMCleanup) {
1617 LOGP(warning, "Not cleaning up shared memory.");
1618 } else {
1619 cleanupSHM(driverInfo.uniqueWorkflowId);
1620 }
1625 for (auto& callback : driverInitCallbacks) {
1626 callback(serviceRegistry, {varmap});
1627 }
1628 driverInfo.states.push_back(DriverState::RUNNING);
1629 // driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
1630 LOG(info) << "O2 Data Processing Layer initialised. We brake for nobody.";
1631#ifdef NDEBUG
1632 LOGF(info, "Optimised build. O2DEBUG / LOG(debug) / LOGF(debug) / assert statement will not be shown.");
1633#endif
1634 break;
1635 case DriverState::IMPORT_CURRENT_WORKFLOW:
1636 // This state is needed to fill the metadata structure
1637 // which contains how to run the current workflow
1638 dataProcessorInfos = previousDataProcessorInfos;
1639 for (auto const& device : runningWorkflow.devices) {
1640 auto exists = std::find_if(dataProcessorInfos.begin(),
1641 dataProcessorInfos.end(),
1642 [id = device.id](DataProcessorInfo const& info) -> bool { return info.name == id; });
1643 if (exists != dataProcessorInfos.end()) {
1644 continue;
1645 }
1646 std::vector<std::string> channels;
1647 for (auto channel : device.inputChannels) {
1648 channels.push_back(channel.name);
1649 }
1650 for (auto channel : device.outputChannels) {
1651 channels.push_back(channel.name);
1652 }
1653 dataProcessorInfos.push_back(
1655 device.id,
1656 workflowInfo.executable,
1657 workflowInfo.args,
1658 workflowInfo.options,
1659 channels});
1660 }
1661 break;
1662 case DriverState::MATERIALISE_WORKFLOW:
1663 try {
1664 auto workflowState = WorkflowHelpers::verifyWorkflow(workflow);
1665 if (driverConfig.batch == true && varmap["dds"].as<std::string>().empty() && !varmap["dump-workflow"].as<bool>() && workflowState == WorkflowParsingState::Empty) {
1666 LOGP(error, "Empty workflow provided while running in batch mode.");
1667 return 1;
1668 }
1669
1672 auto altered_workflow = workflow;
1673
1674 auto confNameFromParam = [](std::string const& paramName) {
1675 std::regex name_regex(R"(^control:([\w-]+)\/(\w+))");
1676 auto match = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 0);
1677 if (match == std::sregex_token_iterator()) {
1678 throw runtime_error_f("Malformed process control spec: %s", paramName.c_str());
1679 }
1680 std::string task = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 1)->str();
1681 std::string conf = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 2)->str();
1682 return std::pair{task, conf};
1683 };
1684 bool altered = false;
1685 for (auto& device : altered_workflow) {
1686 // ignore internal devices
1687 if (device.name.find("internal") != std::string::npos) {
1688 continue;
1689 }
1690 // ignore devices with no inputs
1691 if (device.inputs.empty() == true) {
1692 continue;
1693 }
1694 // ignore devices with no metadata in inputs
1695 auto hasMetadata = std::ranges::any_of(device.inputs, [](InputSpec const& spec) {
1696 return spec.metadata.empty() == false;
1697 });
1698 if (!hasMetadata) {
1699 continue;
1700 }
1701 // ignore devices with no control options
1702 auto hasControls = std::ranges::any_of(device.inputs, [](InputSpec const& spec) {
1703 return std::ranges::any_of(spec.metadata, [](ConfigParamSpec const& param) {
1704 return param.type == VariantType::Bool && param.name.find("control:") != std::string::npos;
1705 });
1706 });
1707 if (!hasControls) {
1708 continue;
1709 }
1710
1711 LOGP(debug, "Adjusting device {}", device.name.c_str());
1712
1713 auto configStore = DeviceConfigurationHelpers::getConfiguration(serviceRegistry, device.name.c_str(), device.options);
1714 if (configStore != nullptr) {
1715 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1716 for (auto& input : device.inputs) {
1717 for (auto& param : input.metadata) {
1718 if (param.type == VariantType::Bool && param.name.find("control:") != std::string::npos) {
1719 if (param.name != "control:default" && param.name != "control:spawn" && param.name != "control:build" && param.name != "control:define") {
1720 auto confName = confNameFromParam(param.name).second;
1721 param.defaultValue = reg->get<bool>(confName.c_str());
1722 }
1723 }
1724 }
1725 }
1726 }
1728 LOGP(debug, "Original inputs: ");
1729 for (auto& input : device.inputs) {
1730 LOGP(debug, "-> {}", input.binding);
1731 }
1732 auto end = device.inputs.end();
1733 auto new_end = std::remove_if(device.inputs.begin(), device.inputs.end(), [](InputSpec& input) {
1734 auto requested = false;
1735 auto hasControls = false;
1736 for (auto& param : input.metadata) {
1737 if (param.type != VariantType::Bool) {
1738 continue;
1739 }
1740 if (param.name.find("control:") != std::string::npos) {
1741 hasControls = true;
1742 if (param.defaultValue.get<bool>() == true) {
1743 requested = true;
1744 break;
1745 }
1746 }
1747 }
1748 if (hasControls) {
1749 return !requested;
1750 }
1751 return false;
1752 });
1753 device.inputs.erase(new_end, end);
1754 LOGP(debug, "Adjusted inputs: ");
1755 for (auto& input : device.inputs) {
1756 LOGP(debug, "-> {}", input.binding);
1757 }
1758 altered = true;
1759 }
1760 WorkflowHelpers::adjustTopology(altered_workflow, *driverInfo.configContext);
1761 if (altered) {
1762 WorkflowSpecNode node{altered_workflow};
1763 for (auto& service : driverServices) {
1764 if (service.adjustTopology == nullptr) {
1765 continue;
1766 }
1767 service.adjustTopology(node, *driverInfo.configContext);
1768 }
1769 }
1770
1771 // These allow services customization via an environment variable
1772 OverrideServiceSpecs overrides = ServiceSpecHelpers::parseOverrides(getenv("DPL_OVERRIDE_SERVICES"));
1773 DeviceSpecHelpers::validate(altered_workflow);
1775 driverInfo.channelPolicies,
1776 driverInfo.completionPolicies,
1777 driverInfo.dispatchPolicies,
1778 driverInfo.resourcePolicies,
1779 driverInfo.callbacksPolicies,
1780 driverInfo.sendingPolicies,
1781 driverInfo.forwardingPolicies,
1782 runningWorkflow.devices,
1783 *resourceManager,
1784 driverInfo.uniqueWorkflowId,
1785 *driverInfo.configContext,
1786 !varmap["no-IPC"].as<bool>(),
1787 driverInfo.resourcesMonitoringInterval,
1788 varmap["channel-prefix"].as<std::string>(),
1789 overrides);
1790 metricProcessingCallbacks.clear();
1791 std::vector<std::string> matchingServices;
1792
1793 // FIXME: once moving to C++20, we can use templated lambdas.
1794 matchingServices.clear();
1795 for (auto& device : runningWorkflow.devices) {
1796 for (auto& service : device.services) {
1797 // If a service with the same name is already registered, skip it
1798 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1799 continue;
1800 }
1801 if (service.metricHandling) {
1802 metricProcessingCallbacks.push_back(service.metricHandling);
1803 matchingServices.push_back(service.name);
1804 }
1805 }
1806 }
1807
1808 // FIXME: once moving to C++20, we can use templated lambdas.
1809 matchingServices.clear();
1810 for (auto& device : runningWorkflow.devices) {
1811 for (auto& service : device.services) {
1812 // If a service with the same name is already registered, skip it
1813 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1814 continue;
1815 }
1816 if (service.summaryHandling) {
1817 summaryCallbacks.push_back(service.summaryHandling);
1818 matchingServices.push_back(service.name);
1819 }
1820 }
1821 }
1822
1823 preScheduleCallbacks.clear();
1824 matchingServices.clear();
1825 for (auto& device : runningWorkflow.devices) {
1826 for (auto& service : device.services) {
1827 // If a service with the same name is already registered, skip it
1828 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1829 continue;
1830 }
1831 if (service.preSchedule) {
1832 preScheduleCallbacks.push_back(service.preSchedule);
1833 }
1834 }
1835 }
1836 postScheduleCallbacks.clear();
1837 matchingServices.clear();
1838 for (auto& device : runningWorkflow.devices) {
1839 for (auto& service : device.services) {
1840 // If a service with the same name is already registered, skip it
1841 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1842 continue;
1843 }
1844 if (service.postSchedule) {
1845 postScheduleCallbacks.push_back(service.postSchedule);
1846 }
1847 }
1848 }
1849 driverInitCallbacks.clear();
1850 matchingServices.clear();
1851 for (auto& device : runningWorkflow.devices) {
1852 for (auto& service : device.services) {
1853 // If a service with the same name is already registered, skip it
1854 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1855 continue;
1856 }
1857 if (service.driverInit) {
1858 driverInitCallbacks.push_back(service.driverInit);
1859 }
1860 }
1861 }
1862
1863 // This should expand nodes so that we can build a consistent DAG.
1864
1865 // This updates the options in the runningWorkflow.devices
1866 for (auto& device : runningWorkflow.devices) {
1867 // ignore internal devices
1868 if (device.name.find("internal") != std::string::npos) {
1869 continue;
1870 }
1871 auto configStore = DeviceConfigurationHelpers::getConfiguration(serviceRegistry, device.name.c_str(), device.options);
1872 if (configStore != nullptr) {
1873 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1874 for (auto& option : device.options) {
1875 const char* name = option.name.c_str();
1876 switch (option.type) {
1877 case VariantType::Int:
1878 option.defaultValue = reg->get<int32_t>(name);
1879 break;
1880 case VariantType::Int8:
1881 option.defaultValue = reg->get<int8_t>(name);
1882 break;
1883 case VariantType::Int16:
1884 option.defaultValue = reg->get<int16_t>(name);
1885 break;
1886 case VariantType::UInt8:
1887 option.defaultValue = reg->get<uint8_t>(name);
1888 break;
1889 case VariantType::UInt16:
1890 option.defaultValue = reg->get<uint16_t>(name);
1891 break;
1892 case VariantType::UInt32:
1893 option.defaultValue = reg->get<uint32_t>(name);
1894 break;
1895 case VariantType::UInt64:
1896 option.defaultValue = reg->get<uint64_t>(name);
1897 break;
1898 case VariantType::Int64:
1899 option.defaultValue = reg->get<int64_t>(name);
1900 break;
1901 case VariantType::Float:
1902 option.defaultValue = reg->get<float>(name);
1903 break;
1904 case VariantType::Double:
1905 option.defaultValue = reg->get<double>(name);
1906 break;
1907 case VariantType::String:
1908 option.defaultValue = reg->get<std::string>(name);
1909 break;
1910 case VariantType::Bool:
1911 option.defaultValue = reg->get<bool>(name);
1912 break;
1913 case VariantType::ArrayInt:
1914 option.defaultValue = reg->get<std::vector<int>>(name);
1915 break;
1916 case VariantType::ArrayFloat:
1917 option.defaultValue = reg->get<std::vector<float>>(name);
1918 break;
1919 case VariantType::ArrayDouble:
1920 option.defaultValue = reg->get<std::vector<double>>(name);
1921 break;
1922 case VariantType::ArrayString:
1923 option.defaultValue = reg->get<std::vector<std::string>>(name);
1924 break;
1925 case VariantType::Array2DInt:
1926 option.defaultValue = reg->get<Array2D<int>>(name);
1927 break;
1928 case VariantType::Array2DFloat:
1929 option.defaultValue = reg->get<Array2D<float>>(name);
1930 break;
1931 case VariantType::Array2DDouble:
1932 option.defaultValue = reg->get<Array2D<double>>(name);
1933 break;
1934 case VariantType::LabeledArrayInt:
1935 option.defaultValue = reg->get<LabeledArray<int>>(name);
1936 break;
1937 case VariantType::LabeledArrayFloat:
1938 option.defaultValue = reg->get<LabeledArray<float>>(name);
1939 break;
1940 case VariantType::LabeledArrayDouble:
1941 option.defaultValue = reg->get<LabeledArray<double>>(name);
1942 break;
1943 case VariantType::LabeledArrayString:
1944 option.defaultValue = reg->get<LabeledArray<std::string>>(name);
1945 break;
1946 default:
1947 break;
1948 }
1949 }
1950 }
1951 }
1952 } catch (std::runtime_error& e) {
1953 LOGP(error, "invalid workflow in {}: {}", driverInfo.argv[0], e.what());
1954 return 1;
1957#ifdef DPL_ENABLE_BACKTRACE
1958 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1959#endif
1960 LOGP(error, "invalid workflow in {}: {}", driverInfo.argv[0], err.what);
1961 return 1;
1962 } catch (...) {
1963 LOGP(error, "invalid workflow in {}: Unknown error while materialising workflow", driverInfo.argv[0]);
1964 return 1;
1965 }
1966 break;
1967 case DriverState::DO_CHILD:
1968 // We do not start the process if by default we are stopped.
1969 if (driverControl.defaultStopped) {
1970 kill(getpid(), SIGSTOP);
1971 }
1972 for (size_t di = 0; di < runningWorkflow.devices.size(); di++) {
1974 if (runningWorkflow.devices[di].id == frameworkId) {
1975 return doChild(driverInfo.argc, driverInfo.argv,
1976 serviceRegistry,
1977 driverInfo.configContext->services().get<DanglingEdgesContext>(),
1978 runningWorkflow, ref,
1979 driverConfig,
1980 driverInfo.processingPolicies,
1981 driverInfo.defaultDriverClient,
1982 loop);
1983 }
1984 }
1985 {
1986 std::ostringstream ss;
1987 for (auto& processor : workflow) {
1988 ss << " - " << processor.name << "\n";
1989 }
1990 for (auto& spec : runningWorkflow.devices) {
1991 ss << " - " << spec.name << "(" << spec.id << ")"
1992 << "\n";
1993 }
1994 driverInfo.lastError = fmt::format(
1995 "Unable to find component with id {}."
1996 " Available options:\n{}",
1997 frameworkId, ss.str());
1998 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1999 }
2000 break;
2001 case DriverState::REDEPLOY_GUI:
2002 // The callback for the GUI needs to be recalculated every time
2003 // the deployed configuration changes, e.g. a new device
2004 // has been added to the topology.
2005 // We need to recreate the GUI callback every time we reschedule
2006 // because getGUIDebugger actually recreates the GUI state.
2007 // Notice also that we need the actual gui_timer only for the
2008 // case the GUI runs in interactive mode, however we deploy the
2009 // GUI in both interactive and non-interactive mode, if the
2010 // DPL_DRIVER_REMOTE_GUI environment variable is set.
2011 if (!driverConfig.batch || getenv("DPL_DRIVER_REMOTE_GUI")) {
2012 if (gui_timer) {
2013 uv_timer_stop(gui_timer);
2014 }
2015
2016 auto callback = debugGUI->getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl);
2017 guiContext.callback = [&serviceRegistry, &driverServices, &debugGUI, &infos, &runningWorkflow, &dataProcessorInfos, &metricsInfos, &driverInfo, &controls, &driverControl, callback]() {
2018 callback();
2019 for (auto& service : driverServices) {
2020 if (service.postRenderGUI) {
2021 service.postRenderGUI(serviceRegistry);
2022 }
2023 }
2024 };
2025 guiContext.window = window;
2026
2027 if (gui_timer) {
2028 gui_timer->data = &guiContext;
2029 uv_timer_start(gui_timer, gui_callback, 0, 20);
2030 }
2031 guiDeployedOnce = true;
2032 }
2033 break;
2034 case DriverState::MERGE_CONFIGS: {
2035 try {
2036 controls.resize(runningWorkflow.devices.size());
2039 if (varmap.count("dpl-tracing-flags")) {
2040 for (auto& control : controls) {
2041 auto tracingFlags = DeviceStateHelpers::parseTracingFlags(varmap["dpl-tracing-flags"].as<std::string>());
2042 control.tracingFlags = tracingFlags;
2043 }
2044 }
2045 deviceExecutions.resize(runningWorkflow.devices.size());
2046
2047 // Options which should be uniform across all
2048 // the subworkflow invokations.
2049 const auto uniformOptions = {
2050 "--aod-file",
2051 "--aod-memory-rate-limit",
2052 "--aod-writer-json",
2053 "--aod-writer-ntfmerge",
2054 "--aod-writer-resdir",
2055 "--aod-writer-resfile",
2056 "--aod-writer-resmode",
2057 "--aod-writer-maxfilesize",
2058 "--aod-writer-keep",
2059 "--aod-max-io-rate",
2060 "--aod-parent-access-level",
2061 "--aod-parent-base-path-replacement",
2062 "--aod-origin-level-mapping",
2063 "--driver-client-backend",
2064 "--fairmq-ipc-prefix",
2065 "--readers",
2066 "--ccdb-fetchers",
2067 "--resources-monitoring",
2068 "--resources-monitoring-file",
2069 "--resources-monitoring-dump-interval",
2070 "--time-limit",
2071 };
2072
2073 for (auto& option : uniformOptions) {
2074 DeviceSpecHelpers::reworkHomogeneousOption(dataProcessorInfos, option, nullptr);
2075 }
2076
2077 DeviceSpecHelpers::reworkShmSegmentSize(dataProcessorInfos);
2078 DeviceSpecHelpers::prepareArguments(driverControl.defaultQuiet,
2079 driverControl.defaultStopped,
2080 driverInfo.processingPolicies.termination == TerminationPolicy::WAIT,
2081 driverInfo.port,
2082 driverConfig,
2083 dataProcessorInfos,
2084 runningWorkflow.devices,
2085 deviceExecutions,
2086 controls,
2087 detectedParams,
2088 driverInfo.uniqueWorkflowId);
2091 LOGP(error, "unable to merge configurations in {}: {}", driverInfo.argv[0], err.what);
2092#ifdef DPL_ENABLE_BACKTRACE
2093 std::cerr << "\nStacktrace follows:\n\n";
2094 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
2095#endif
2096 return 1;
2097 }
2098 } break;
2099 case DriverState::SCHEDULE: {
2100 // FIXME: for the moment modifying the topology means we rebuild completely
2101 // all the devices and we restart them. This is also what DDS does at
2102 // a larger scale. In principle one could try to do a delta and only
2103 // restart the data processors which need to be restarted.
2104 LOG(info) << "Redeployment of configuration asked.";
2105 std::ostringstream forwardedStdin;
2106 WorkflowSerializationHelpers::dump(forwardedStdin, workflow, dataProcessorInfos, commandInfo);
2107 infos.reserve(runningWorkflow.devices.size());
2108
2109 // This is guaranteed to be a single CPU.
2110 unsigned parentCPU = -1;
2111 unsigned parentNode = -1;
2112#if defined(__linux__) && __has_include(<sched.h>)
2113 parentCPU = sched_getcpu();
2114#elif __has_include(<linux/getcpu.h>)
2115 getcpu(&parentCPU, &parentNode, nullptr);
2116#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
2117 // FIXME: this is a last resort as it is apparently buggy
2118 // on some Intel CPUs.
2119 GETCPU(parentCPU);
2120#endif
2121 for (auto& callback : preScheduleCallbacks) {
2122 callback(serviceRegistry, {varmap});
2123 }
2124 childFds.resize(runningWorkflow.devices.size());
2125 for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) {
2126 auto& context = childFds[di];
2127 createPipes(context.childstdin);
2128 createPipes(context.childstdout);
2129 if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[di].resource.hostname != driverInfo.deployHostname) {
2130 spawnRemoteDevice(loop, forwardedStdin.str(),
2131 runningWorkflow.devices[di], controls[di], deviceExecutions[di], infos, allStates);
2132 } else {
2133 DeviceRef ref{di};
2134 spawnDevice(loop,
2135 ref,
2136 runningWorkflow.devices, driverInfo,
2137 controls, deviceExecutions, infos,
2138 allStates,
2139 serviceRegistry, varmap,
2140 childFds, parentCPU, parentNode);
2141 }
2142 }
2143 handleSignals();
2144 handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles);
2145 for (auto& callback : postScheduleCallbacks) {
2146 callback(serviceRegistry, {varmap});
2147 }
2148 assert(infos.empty() == false);
2149
2150 // In case resource monitoring is requested, we dump metrics to disk
2151 // every 3 minutes.
2152 if (driverInfo.resourcesMonitoringDumpInterval && ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
2153 uv_timer_init(loop, &metricDumpTimer);
2154 uv_timer_start(&metricDumpTimer, dumpMetricsCallback,
2155 driverInfo.resourcesMonitoringDumpInterval * 1000,
2156 driverInfo.resourcesMonitoringDumpInterval * 1000);
2157 }
2159 for (const auto& processorInfo : dataProcessorInfos) {
2160 const auto& cmdLineArgs = processorInfo.cmdLineArgs;
2161 if (std::find(cmdLineArgs.begin(), cmdLineArgs.end(), "--severity") != cmdLineArgs.end()) {
2162 for (size_t counter = 0; const auto& spec : runningWorkflow.devices) {
2163 if (spec.name.compare(processorInfo.name) == 0) {
2164 auto& info = infos[counter];
2165 const auto logLevelIt = std::find(cmdLineArgs.begin(), cmdLineArgs.end(), "--severity") + 1;
2166 if ((*logLevelIt).compare("debug") == 0) {
2167 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2168 } else if ((*logLevelIt).compare("detail") == 0) {
2169 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2170 } else if ((*logLevelIt).compare("info") == 0) {
2171 info.logLevel = LogParsingHelpers::LogLevel::Info;
2172 } else if ((*logLevelIt).compare("warning") == 0) {
2173 info.logLevel = LogParsingHelpers::LogLevel::Warning;
2174 } else if ((*logLevelIt).compare("error") == 0) {
2175 info.logLevel = LogParsingHelpers::LogLevel::Error;
2176 } else if ((*logLevelIt).compare("important") == 0) {
2177 info.logLevel = LogParsingHelpers::LogLevel::Info;
2178 } else if ((*logLevelIt).compare("alarm") == 0) {
2179 info.logLevel = LogParsingHelpers::LogLevel::Alarm;
2180 } else if ((*logLevelIt).compare("critical") == 0) {
2181 info.logLevel = LogParsingHelpers::LogLevel::Critical;
2182 } else if ((*logLevelIt).compare("fatal") == 0) {
2183 info.logLevel = LogParsingHelpers::LogLevel::Fatal;
2184 }
2185 break;
2186 }
2187 ++counter;
2188 }
2189 }
2190 }
2191 LOG(info) << "Redeployment of configuration done.";
2192 } break;
2193 case DriverState::RUNNING:
2194 // Run any pending libUV event loop, block if
2195 // any, so that we do not consume CPU time when the driver is
2196 // idle.
2197 devicesManager->flush();
2198 // We print the event loop for the gui only once every
2199 // 6000 iterations (i.e. ~2 minutes). To avoid spamming, while still
2200 // being able to see the event loop in case of a deadlock / systematic failure.
2201 if (guiTimerExpired == false) {
2202 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "Entering event loop with %{public}s", once ? "UV_RUN_ONCE" : "UV_RUN_NOWAIT");
2203 }
2204 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2205 once = true;
2206 // Calculate what we should do next and eventually
2207 // show the GUI
2208 if (guiQuitRequested ||
2209 (driverInfo.processingPolicies.termination == TerminationPolicy::QUIT && (checkIfCanExit(infos) == true))) {
2210 // Something requested to quit. This can be a user
2211 // interaction with the GUI or (if --completion-policy=quit)
2212 // it could mean that the workflow does not have anything else to do.
2213 // Let's update the GUI one more time and then EXIT.
2214 LOG(info) << "Quitting";
2215 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2216 } else if (infos.size() != runningWorkflow.devices.size()) {
2217 // If the number of devices is different from
2218 // the DeviceInfos it means the speicification
2219 // does not match what is running, so we need to do
2220 // further scheduling.
2221 driverInfo.states.push_back(DriverState::RUNNING);
2222 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2223 driverInfo.states.push_back(DriverState::SCHEDULE);
2224 driverInfo.states.push_back(DriverState::MERGE_CONFIGS);
2225 } else if (runningWorkflow.devices.empty() && driverConfig.batch == true) {
2226 LOG(info) << "No device resulting from the workflow. Quitting.";
2227 // If there are no deviceSpecs, we exit.
2228 driverInfo.states.push_back(DriverState::EXIT);
2229 } else if (runningWorkflow.devices.empty() && driverConfig.batch == false && !guiDeployedOnce) {
2230 // In case of an empty workflow, we need to deploy the GUI at least once.
2231 driverInfo.states.push_back(DriverState::RUNNING);
2232 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2233 } else {
2234 driverInfo.states.push_back(DriverState::RUNNING);
2235 }
2236 break;
2237 case DriverState::QUIT_REQUESTED: {
2238 std::time_t result = std::time(nullptr);
2239 char buffer[32];
2240 std::strncpy(buffer, std::ctime(&result), 26);
2241 O2_SIGNPOST_EVENT_EMIT_INFO(driver, sid, "mainloop", "Quit requested at %{public}s", buffer);
2242 guiQuitRequested = true;
2243 // We send SIGCONT to make sure stopped children are resumed
2244 killChildren(infos, SIGCONT);
2245 // We send SIGTERM to make sure we do the STOP transition in FairMQ
2246 killChildren(infos, SIGTERM);
2247 // We have a timer to send SIGUSR1 to make sure we advance all devices
2248 // in a timely manner.
2249 force_step_timer.data = &infos;
2250 uv_timer_start(&force_step_timer, single_step_callback, 0, 300);
2251 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2252 break;
2253 }
2254 case DriverState::HANDLE_CHILDREN: {
2255 // Run any pending libUV event loop, block if
2256 // any, so that we do not consume CPU time when the driver is
2257 // idle.
2258 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2259 once = true;
2260 // I allow queueing of more sigchld only when
2261 // I process the previous call
2262 if (forceful_exit == true) {
2263 static bool forcefulExitMessage = true;
2264 if (forcefulExitMessage) {
2265 LOG(info) << "Forceful exit requested.";
2266 forcefulExitMessage = false;
2267 }
2268 killChildren(infos, SIGCONT);
2269 killChildren(infos, SIGKILL);
2270 }
2271 sigchld_requested = false;
2272 driverInfo.sigchldRequested = false;
2273 processChildrenOutput(loop, driverInfo, infos, runningWorkflow.devices, controls);
2274 hasError = processSigChild(infos, runningWorkflow.devices);
2275 allChildrenGone = areAllChildrenGone(infos);
2276 bool canExit = checkIfCanExit(infos);
2277 bool supposedToQuit = (guiQuitRequested || canExit || graceful_exit);
2278
2279 if (allChildrenGone && (supposedToQuit || driverInfo.processingPolicies.termination == TerminationPolicy::QUIT)) {
2280 // We move to the exit, regardless of where we were
2281 driverInfo.states.resize(0);
2282 driverInfo.states.push_back(DriverState::EXIT);
2283 } else if (hasError && driverInfo.processingPolicies.error == TerminationPolicy::QUIT && !supposedToQuit) {
2284 graceful_exit = 1;
2285 force_exit_timer.data = &infos;
2286 static bool forceful_timer_started = false;
2287 if (forceful_timer_started == false) {
2288 forceful_timer_started = true;
2289 uv_timer_start(&force_exit_timer, force_exit_callback, 15000, 3000);
2290 }
2291 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2292 } else if (allChildrenGone == false && supposedToQuit) {
2293 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2294 } else {
2295 }
2296 } break;
2297 case DriverState::EXIT: {
2298 if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
2299 if (driverInfo.resourcesMonitoringDumpInterval) {
2300 uv_timer_stop(&metricDumpTimer);
2301 }
2302 LOGP(info, "Dumping performance metrics to {}.json file", driverInfo.resourcesMonitoringFilename);
2303 dumpMetricsCallback(&metricDumpTimer);
2304 }
2305 dumpRunSummary(serverContext, driverInfo, infos, runningWorkflow.devices);
2306 // This is a clean exit. Before we do so, if required,
2307 // we dump the configuration of all the devices so that
2308 // we can reuse it. Notice we do not dump anything if
2309 // the workflow was not really run.
2310 // NOTE: is this really what we want? should we run
2311 // SCHEDULE and dump the full configuration as well?
2312 if (infos.empty()) {
2313 return 0;
2314 }
2315 boost::property_tree::ptree finalConfig;
2316 assert(infos.size() == runningWorkflow.devices.size());
2317 for (size_t di = 0; di < infos.size(); ++di) {
2318 auto& info = infos[di];
2319 auto& spec = runningWorkflow.devices[di];
2320 finalConfig.put_child(spec.name, info.currentConfig);
2321 }
2322 LOG(info) << "Dumping used configuration in dpl-config.json";
2323
2324 std::ofstream outDPLConfigFile("dpl-config.json", std::ios::out);
2325 if (outDPLConfigFile.is_open()) {
2326 boost::property_tree::write_json(outDPLConfigFile, finalConfig);
2327 } else {
2328 LOGP(warning, "Could not write out final configuration file. Read only run folder?");
2329 }
2330 if (driverInfo.noSHMCleanup) {
2331 LOGP(warning, "Not cleaning up shared memory.");
2332 } else {
2333 cleanupSHM(driverInfo.uniqueWorkflowId);
2334 }
2335 return calculateExitCode(driverInfo, runningWorkflow.devices, infos);
2336 }
2337 case DriverState::PERFORM_CALLBACKS:
2338 for (auto& callback : driverControl.callbacks) {
2339 callback(workflow, runningWorkflow.devices, deviceExecutions, dataProcessorInfos, commandInfo);
2340 }
2341 driverControl.callbacks.clear();
2342 break;
2343 default:
2344 LOG(error) << "Driver transitioned in an unknown state("
2345 << "current: " << (int)current
2346 << ", previous: " << (int)previous
2347 << "). Shutting down.";
2348 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2349 }
2350 }
2351 O2_SIGNPOST_END(driver, sid, "driver", "End driver loop");
2352}
2353
2354// Print help
2355void printHelp(bpo::variables_map const& varmap,
2356 bpo::options_description const& executorOptions,
2357 std::vector<DataProcessorSpec> const& physicalWorkflow,
2358 std::vector<ConfigParamSpec> const& currentWorkflowOptions)
2359{
2360 auto mode = varmap["help"].as<std::string>();
2361 bpo::options_description helpOptions;
2362 if (mode == "full" || mode == "short" || mode == "executor") {
2363 helpOptions.add(executorOptions);
2364 }
2365 // this time no veto is applied, so all the options are added for printout
2366 if (mode == "executor") {
2367 // nothing more
2368 } else if (mode == "workflow") {
2369 // executor options and workflow options, skip the actual workflow
2370 o2::framework::WorkflowSpec emptyWorkflow;
2371 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(emptyWorkflow, currentWorkflowOptions));
2372 } else if (mode == "full" || mode == "short") {
2373 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions,
2374 bpo::options_description(),
2375 mode));
2376 } else {
2377 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, {},
2378 bpo::options_description(),
2379 mode));
2380 }
2381 if (helpOptions.options().size() == 0) {
2382 // the specified argument is invalid, add at leat the executor options
2383 mode += " is an invalid argument, please use correct argument for";
2384 helpOptions.add(executorOptions);
2385 }
2386 std::cout << "ALICE O2 DPL workflow driver" //
2387 << " (" << mode << " help)" << std::endl //
2388 << helpOptions << std::endl; //
2389}
2390
2391// Helper to find out if stdout is actually attached to a pipe.
2393{
2394 struct stat s;
2395 fstat(STDOUT_FILENO, &s);
2396 return ((s.st_mode & S_IFIFO) != 0);
2397}
2398
2400{
2401 struct stat s;
2402 int r = fstat(STDIN_FILENO, &s);
2403 // If stdin cannot be statted, we assume the shell is some sort of
2404 // non-interactive container thing
2405 if (r < 0) {
2406 return false;
2407 }
2408 // If stdin is a pipe or a file, we try to fetch configuration from there
2409 return ((s.st_mode & S_IFIFO) != 0 || (s.st_mode & S_IFREG) != 0);
2410}
2411
2413{
2414 struct CloningSpec {
2415 std::string templateMatcher;
2416 std::string cloneName;
2417 };
2418 auto s = ctx.options().get<std::string>("clone");
2419 std::vector<CloningSpec> specs;
2420 std::string delimiter = ",";
2421
2422 while (s.empty() == false) {
2423 auto newPos = s.find(delimiter);
2424 auto token = s.substr(0, newPos);
2425 auto split = token.find(":");
2426 if (split == std::string::npos) {
2427 throw std::runtime_error("bad clone definition. Syntax <template-processor>:<clone-name>");
2428 }
2429 auto key = token.substr(0, split);
2430 token.erase(0, split + 1);
2431 size_t error;
2432 std::string value = "";
2433 try {
2434 auto numValue = std::stoll(token, &error, 10);
2435 if (token[error] != '\0') {
2436 throw std::runtime_error("bad name for clone:" + token);
2437 }
2438 value = key + "_c" + std::to_string(numValue);
2439 } catch (std::invalid_argument& e) {
2440 value = token;
2441 }
2442 specs.push_back({key, value});
2443 s.erase(0, newPos + (newPos == std::string::npos ? 0 : 1));
2444 }
2445 if (s.empty() == false && specs.empty() == true) {
2446 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2447 }
2448
2449 std::vector<DataProcessorSpec> extraSpecs;
2450 for (auto& spec : specs) {
2451 for (auto& processor : workflow) {
2452 if (processor.name == spec.templateMatcher) {
2453 auto clone = processor;
2454 clone.name = spec.cloneName;
2455 extraSpecs.push_back(clone);
2456 }
2457 }
2458 }
2459 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
2460}
2461
2463{
2464 struct PipelineSpec {
2465 std::string matcher;
2466 int64_t pipeline;
2467 };
2468 auto s = ctx.options().get<std::string>("pipeline");
2469 std::vector<PipelineSpec> specs;
2470 std::string delimiter = ",";
2471
2472 while (s.empty() == false) {
2473 auto newPos = s.find(delimiter);
2474 auto token = s.substr(0, newPos);
2475 auto split = token.find(":");
2476 if (split == std::string::npos) {
2477 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2478 }
2479 auto key = token.substr(0, split);
2480 token.erase(0, split + 1);
2481 size_t error;
2482 auto value = std::stoll(token, &error, 10);
2483 if (token[error] != '\0') {
2484 throw std::runtime_error("Bad pipeline definition. Expecting integer");
2485 }
2486 specs.push_back({key, value});
2487 s.erase(0, newPos + (newPos == std::string::npos ? 0 : 1));
2488 }
2489 if (s.empty() == false && specs.empty() == true) {
2490 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2491 }
2492
2493 for (auto& spec : specs) {
2494 for (auto& processor : workflow) {
2495 if (processor.name == spec.matcher) {
2496 processor.maxInputTimeslices = spec.pipeline;
2497 }
2498 }
2499 }
2500}
2501
2503{
2504 struct LabelsSpec {
2505 std::string_view matcher;
2506 std::vector<std::string> labels;
2507 };
2508 std::vector<LabelsSpec> specs;
2509
2510 auto labelsString = ctx.options().get<std::string>("labels");
2511 if (labelsString.empty()) {
2512 return;
2513 }
2514 std::string_view sv{labelsString};
2515
2516 size_t specStart = 0;
2517 size_t specEnd = 0;
2518 constexpr char specDelim = ',';
2519 constexpr char labelDelim = ':';
2520 do {
2521 specEnd = sv.find(specDelim, specStart);
2522 auto token = sv.substr(specStart, specEnd == std::string_view::npos ? std::string_view::npos : specEnd - specStart);
2523 if (token.empty()) {
2524 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2525 }
2526
2527 size_t labelDelimPos = token.find(labelDelim);
2528 if (labelDelimPos == 0 || labelDelimPos == std::string_view::npos) {
2529 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2530 }
2531 LabelsSpec spec{.matcher = token.substr(0, labelDelimPos), .labels = {}};
2532
2533 size_t labelEnd = labelDelimPos + 1;
2534 do {
2535 size_t labelStart = labelDelimPos + 1;
2536 labelEnd = token.find(labelDelim, labelStart);
2537 auto label = labelEnd == std::string_view::npos ? token.substr(labelStart) : token.substr(labelStart, labelEnd - labelStart);
2538 if (label.empty()) {
2539 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2540 }
2541 spec.labels.emplace_back(label);
2542 labelDelimPos = labelEnd;
2543 } while (labelEnd != std::string_view::npos);
2544
2545 specs.push_back(spec);
2546 specStart = specEnd + 1;
2547 } while (specEnd != std::string_view::npos);
2548
2549 if (labelsString.empty() == false && specs.empty() == true) {
2550 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2551 }
2552
2553 for (auto& spec : specs) {
2554 for (auto& processor : workflow) {
2555 if (processor.name == spec.matcher) {
2556 for (const auto& label : spec.labels) {
2557 if (std::find_if(processor.labels.begin(), processor.labels.end(),
2558 [label](const auto& procLabel) { return procLabel.value == label; }) == processor.labels.end()) {
2559 processor.labels.push_back({label});
2560 }
2561 }
2562 }
2563 }
2564 }
2565}
2566
2568void initialiseDriverControl(bpo::variables_map const& varmap,
2569 DriverInfo& driverInfo,
2570 DriverControl& control)
2571{
2572 // Control is initialised outside the main loop because
2573 // command line options are really affecting control.
2574 control.defaultQuiet = varmap["quiet"].as<bool>();
2575 control.defaultStopped = varmap["stop"].as<bool>();
2576
2577 if (varmap["single-step"].as<bool>()) {
2578 control.state = DriverControlState::STEP;
2579 } else {
2580 control.state = DriverControlState::PLAY;
2581 }
2582
2583 if (varmap["graphviz"].as<bool>()) {
2584 // Dump a graphviz representation of what I will do.
2585 control.callbacks = {[](WorkflowSpec const&,
2586 DeviceSpecs const& specs,
2587 DeviceExecutions const&,
2589 CommandInfo const&) {
2591 }};
2592 control.forcedTransitions = {
2593 DriverState::EXIT, //
2594 DriverState::PERFORM_CALLBACKS, //
2595 DriverState::MERGE_CONFIGS, //
2596 DriverState::IMPORT_CURRENT_WORKFLOW, //
2597 DriverState::MATERIALISE_WORKFLOW //
2598 };
2599 } else if (!varmap["dds"].as<std::string>().empty()) {
2600 // Dump a DDS representation of what I will do.
2601 // Notice that compared to DDS we need to schedule things,
2602 // because DDS needs to be able to have actual Executions in
2603 // order to provide a correct configuration.
2604 control.callbacks = {[filename = varmap["dds"].as<std::string>(),
2605 workflowSuffix = varmap["dds-workflow-suffix"],
2606 driverMode = driverInfo.mode](WorkflowSpec const& workflow,
2607 DeviceSpecs const& specs,
2608 DeviceExecutions const& executions,
2609 DataProcessorInfos& dataProcessorInfos,
2610 CommandInfo const& commandInfo) {
2611 if (filename == "-") {
2612 DDSConfigHelpers::dumpDeviceSpec2DDS(std::cout, driverMode, workflowSuffix.as<std::string>(), workflow, dataProcessorInfos, specs, executions, commandInfo);
2613 } else {
2614 std::ofstream out(filename);
2615 DDSConfigHelpers::dumpDeviceSpec2DDS(out, driverMode, workflowSuffix.as<std::string>(), workflow, dataProcessorInfos, specs, executions, commandInfo);
2616 }
2617 }};
2618 control.forcedTransitions = {
2619 DriverState::EXIT, //
2620 DriverState::PERFORM_CALLBACKS, //
2621 DriverState::MERGE_CONFIGS, //
2622 DriverState::IMPORT_CURRENT_WORKFLOW, //
2623 DriverState::MATERIALISE_WORKFLOW //
2624 };
2625 } else if (!varmap["o2-control"].as<std::string>().empty() or !varmap["mermaid"].as<std::string>().empty()) {
2626 // Dump the workflow in o2-control and/or mermaid format
2627 control.callbacks = {[filename = varmap["mermaid"].as<std::string>(),
2628 workflowName = varmap["o2-control"].as<std::string>()](WorkflowSpec const&,
2629 DeviceSpecs const& specs,
2630 DeviceExecutions const& executions,
2632 CommandInfo const& commandInfo) {
2633 if (!workflowName.empty()) {
2634 dumpDeviceSpec2O2Control(workflowName, specs, executions, commandInfo);
2635 }
2636 if (!filename.empty()) {
2637 if (filename == "-") {
2639 } else {
2640 std::ofstream output(filename);
2642 }
2643 }
2644 }};
2645 control.forcedTransitions = {
2646 DriverState::EXIT, //
2647 DriverState::PERFORM_CALLBACKS, //
2648 DriverState::MERGE_CONFIGS, //
2649 DriverState::IMPORT_CURRENT_WORKFLOW, //
2650 DriverState::MATERIALISE_WORKFLOW //
2651 };
2652
2653 } else if (varmap.count("id")) {
2654 // Add our own stacktrace dumping
2655 if (getenv("O2_NO_CATCHALL_EXCEPTIONS") != nullptr && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0") != 0) {
2656 LOGP(info, "Not instrumenting crash signals because O2_NO_CATCHALL_EXCEPTIONS is set");
2657 gEnv->SetValue("Root.Stacktrace", "no");
2658 gSystem->ResetSignal(kSigSegmentationViolation, kTRUE);
2659 rlimit limit;
2660 if (getrlimit(RLIMIT_CORE, &limit) == 0) {
2661 LOGP(info, "Core limit: {} {}", limit.rlim_cur, limit.rlim_max);
2662 }
2663 }
2664 if (varmap["stacktrace-on-signal"].as<std::string>() == "simple" && (getenv("O2_NO_CATCHALL_EXCEPTIONS") == nullptr || strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0") == 0)) {
2665 LOGP(info, "Instrumenting crash signals");
2666 signal(SIGSEGV, handle_crash);
2667 signal(SIGABRT, handle_crash);
2668 signal(SIGBUS, handle_crash);
2669 signal(SIGILL, handle_crash);
2670 signal(SIGFPE, handle_crash);
2671 }
2672 // FIXME: for the time being each child needs to recalculate the workflow,
2673 // so that it can understand what it needs to do. This is obviously
2674 // a bad idea. In the future we should have the client be pushed
2675 // it's own configuration by the driver.
2676 control.forcedTransitions = {
2677 DriverState::DO_CHILD, //
2678 DriverState::BIND_GUI_PORT, //
2679 DriverState::MERGE_CONFIGS, //
2680 DriverState::IMPORT_CURRENT_WORKFLOW, //
2681 DriverState::MATERIALISE_WORKFLOW //
2682 };
2683 } else if ((varmap["dump-workflow"].as<bool>() == true) || (varmap["run"].as<bool>() == false && varmap.count("id") == 0 && isOutputToPipe())) {
2684 control.callbacks = {[filename = varmap["dump-workflow-file"].as<std::string>()](WorkflowSpec const& workflow,
2685 DeviceSpecs const&,
2686 DeviceExecutions const&,
2687 DataProcessorInfos& dataProcessorInfos,
2688 CommandInfo const& commandInfo) {
2689 if (filename == "-") {
2690 WorkflowSerializationHelpers::dump(std::cout, workflow, dataProcessorInfos, commandInfo);
2691 // FIXME: this is to avoid trailing garbage..
2692 exit(0);
2693 } else {
2694 std::ofstream output(filename);
2695 WorkflowSerializationHelpers::dump(output, workflow, dataProcessorInfos, commandInfo);
2696 }
2697 }};
2698 control.forcedTransitions = {
2699 DriverState::EXIT, //
2700 DriverState::PERFORM_CALLBACKS, //
2701 DriverState::MERGE_CONFIGS, //
2702 DriverState::IMPORT_CURRENT_WORKFLOW, //
2703 DriverState::MATERIALISE_WORKFLOW //
2704 };
2705 } else {
2706 // By default we simply start the main loop of the driver.
2707 control.forcedTransitions = {
2708 DriverState::INIT, //
2709 DriverState::BIND_GUI_PORT, //
2710 DriverState::IMPORT_CURRENT_WORKFLOW, //
2711 DriverState::MATERIALISE_WORKFLOW //
2712 };
2713 }
2714}
2715
2717void conflicting_options(const boost::program_options::variables_map& vm,
2718 const std::string& opt1, const std::string& opt2)
2719{
2720 if (vm.count(opt1) && !vm[opt1].defaulted() &&
2721 vm.count(opt2) && !vm[opt2].defaulted()) {
2722 throw std::logic_error(std::string("Conflicting options '") +
2723 opt1 + "' and '" + opt2 + "'.");
2724 }
2725}
2726
2727template <typename T>
2729 std::vector<T>& v,
2730 std::vector<int>& indices)
2731{
2732 using std::swap; // to permit Koenig lookup
2733 for (int i = 0; i < (int)indices.size(); i++) {
2734 auto current = i;
2735 while (i != indices[current]) {
2736 auto next = indices[current];
2737 swap(v[current], v[next]);
2738 indices[current] = current;
2739 current = next;
2740 }
2741 indices[current] = current;
2742 }
2743}
2744
2745// Check if the workflow is resiliant to failures
2746void checkNonResiliency(std::vector<DataProcessorSpec> const& specs,
2747 std::vector<std::pair<int, int>> const& edges)
2748{
2749 auto checkExpendable = [](DataProcessorLabel const& label) {
2750 return label.value == "expendable";
2751 };
2752 auto checkResilient = [](DataProcessorLabel const& label) {
2753 return label.value == "resilient" || label.value == "expendable";
2754 };
2755
2756 for (auto& edge : edges) {
2757 auto& src = specs[edge.first];
2758 auto& dst = specs[edge.second];
2759 if (std::none_of(src.labels.begin(), src.labels.end(), checkExpendable)) {
2760 continue;
2761 }
2762 if (std::any_of(dst.labels.begin(), dst.labels.end(), checkResilient)) {
2763 continue;
2764 }
2765 throw std::runtime_error("Workflow is not resiliant to failures. Processor " + dst.name + " gets inputs from expendable devices, but is not marked as expendable or resilient itself.");
2766 }
2767}
2768
2769std::string debugTopoInfo(std::vector<DataProcessorSpec> const& specs,
2770 std::vector<TopoIndexInfo> const& infos,
2771 std::vector<std::pair<int, int>> const& edges)
2772{
2773 std::ostringstream out;
2774
2775 out << "\nTopological info:\n";
2776 for (auto& ti : infos) {
2777 out << specs[ti.index].name << " (index: " << ti.index << ", layer: " << ti.layer << ")\n";
2778 out << " Inputs:\n";
2779 for (auto& ii : specs[ti.index].inputs) {
2780 out << " - " << DataSpecUtils::describe(ii) << "\n";
2781 }
2782 out << "\n Outputs:\n";
2783 for (auto& ii : specs[ti.index].outputs) {
2784 out << " - " << DataSpecUtils::describe(ii) << "\n";
2785 }
2786 }
2787 out << "\nEdges values:\n";
2788 for (auto& e : edges) {
2789 out << specs[e.second].name << " depends on " << specs[e.first].name << "\n";
2790 }
2791 for (auto& d : specs) {
2792 out << "- " << d.name << std::endl;
2793 }
2795 return out.str();
2796}
2797
2798void enableSignposts(std::string const& signpostsToEnable)
2799{
2800 static pid_t pid = getpid();
2801 if (signpostsToEnable.empty() == true) {
2802 auto printAllSignposts = [](char const* name, void* l, void* context) {
2803 auto* log = (_o2_log_t*)l;
2804 LOGP(detail, "Signpost stream {} disabled. Enable it with o2-log -p {} -a {}", name, pid, (void*)&log->stacktrace);
2805 return true;
2806 };
2807 o2_walk_logs(printAllSignposts, nullptr);
2808 return;
2809 }
2810 auto matchingLogEnabler = [](char const* name, void* l, void* context) {
2811 auto* log = (_o2_log_t*)l;
2812 auto* selectedName = (char const*)context;
2813 std::string prefix = "ch.cern.aliceo2.";
2814 auto* last = strchr(selectedName, ':');
2815 int maxDepth = 1;
2816 if (last) {
2817 char* err;
2818 maxDepth = strtol(last + 1, &err, 10);
2819 if (*(last + 1) == '\0' || *err != '\0') {
2820 maxDepth = 1;
2821 }
2822 }
2823
2824 auto fullName = prefix + std::string{selectedName, last ? last - selectedName : strlen(selectedName)};
2825 if (fullName == name) {
2826 LOGP(info, "Enabling signposts for stream \"{}\" with depth {}.", fullName, maxDepth);
2827 _o2_log_set_stacktrace(log, maxDepth);
2828 return false;
2829 } else {
2830 LOGP(info, "Signpost stream \"{}\" disabled. Enable it with o2-log -p {} -a {}", name, pid, (void*)&log->stacktrace);
2831 }
2832 return true;
2833 };
2834 // Split signpostsToEnable by comma using strtok_r
2835 char* saveptr;
2836 char* src = const_cast<char*>(signpostsToEnable.data());
2837 auto* token = strtok_r(src, ",", &saveptr);
2838 while (token) {
2839 o2_walk_logs(matchingLogEnabler, token);
2840 token = strtok_r(nullptr, ",", &saveptr);
2841 }
2842}
2843
2844void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow)
2845{
2846 overrideCloning(ctx, workflow);
2847 overridePipeline(ctx, workflow);
2848 overrideLabels(ctx, workflow);
2849}
2850
2851o2::framework::ConfigContext createConfigContext(std::unique_ptr<ConfigParamRegistry>& workflowOptionsRegistry,
2852 o2::framework::ServiceRegistry& configRegistry,
2853 std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
2854 std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv)
2855{
2856 std::vector<std::unique_ptr<o2::framework::ParamRetriever>> retrievers;
2857 std::unique_ptr<o2::framework::ParamRetriever> retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)};
2858 retrievers.emplace_back(std::move(retriever));
2859 auto workflowOptionsStore = std::make_unique<o2::framework::ConfigParamStore>(workflowOptions, std::move(retrievers));
2860 workflowOptionsStore->preload();
2861 workflowOptionsStore->activate();
2862 workflowOptionsRegistry = std::make_unique<ConfigParamRegistry>(std::move(workflowOptionsStore));
2863 extraOptions = o2::framework::ConfigParamDiscovery::discover(*workflowOptionsRegistry, argc, argv);
2864 for (auto& extra : extraOptions) {
2865 workflowOptions.push_back(extra);
2866 }
2867
2868 return o2::framework::ConfigContext(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv);
2869}
2870
2871std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
2872{
2873 return std::make_unique<o2::framework::ServiceRegistry>();
2874}
2875
2876// This is a toy executor for the workflow spec
2877// What it needs to do is:
2878//
2879// - Print the properties of each DataProcessorSpec
2880// - Fork one process per DataProcessorSpec
2881// - Parent -> wait for all the children to complete (eventually
2882// killing them all on ctrl-c).
2883// - Child, pick the data-processor ID and start a O2DataProcessorDevice for
2884// each DataProcessorSpec
2885int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
2886 std::vector<ChannelConfigurationPolicy> const& channelPolicies,
2887 std::vector<CompletionPolicy> const& completionPolicies,
2888 std::vector<DispatchPolicy> const& dispatchPolicies,
2889 std::vector<ResourcePolicy> const& resourcePolicies,
2890 std::vector<CallbacksPolicy> const& callbacksPolicies,
2891 std::vector<SendingPolicy> const& sendingPolicies,
2892 std::vector<ConfigParamSpec> const& currentWorkflowOptions,
2893 std::vector<ConfigParamSpec> const& detectedParams,
2894 o2::framework::ConfigContext& configContext)
2895{
2896 // Peek very early in the driver options and look for
2897 // signposts, so the we can enable it without going through the whole dance
2898 if (getenv("DPL_DRIVER_SIGNPOSTS")) {
2899 enableSignposts(getenv("DPL_DRIVER_SIGNPOSTS"));
2900 }
2901
2902 std::vector<std::string> currentArgs;
2903 std::vector<PluginInfo> plugins;
2904 std::vector<ForwardingPolicy> forwardingPolicies = ForwardingPolicy::createDefaultPolicies();
2905
2906 for (int ai = 1; ai < argc; ++ai) {
2907 currentArgs.emplace_back(argv[ai]);
2908 }
2909
2910 WorkflowInfo currentWorkflow{
2911 argv[0],
2912 currentArgs,
2913 currentWorkflowOptions};
2914
2915 ProcessingPolicies processingPolicies;
2916 enum LogParsingHelpers::LogLevel minFailureLevel;
2917 bpo::options_description executorOptions("Executor options");
2918 const char* helpDescription = "print help: short, full, executor, or processor name";
2919 enum DriverMode driverMode;
2920 executorOptions.add_options() //
2921 ("help,h", bpo::value<std::string>()->implicit_value("short"), helpDescription) // //
2922 ("quiet,q", bpo::value<bool>()->zero_tokens()->default_value(false), "quiet operation") // //
2923 ("stop,s", bpo::value<bool>()->zero_tokens()->default_value(false), "stop before device start") // //
2924 ("single-step", bpo::value<bool>()->zero_tokens()->default_value(false), "start in single step mode") // //
2925 ("batch,b", bpo::value<std::vector<std::string>>()->zero_tokens()->composing(), "batch processing mode") // //
2926 ("no-batch", bpo::value<bool>()->zero_tokens(), "force gui processing mode") // //
2927 ("no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // //
2928 ("hostname", bpo::value<std::string>()->default_value("localhost"), "hostname to deploy") // //
2929 ("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
2930 ("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
2931 ("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
2932 ("completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.termination)->default_value(TerminationPolicy::QUIT), // //
2933 "what to do when processing is finished: quit, wait") // //
2934 ("error-policy", bpo::value<TerminationPolicy>(&processingPolicies.error)->default_value(TerminationPolicy::QUIT), // //
2935 "what to do when a device has an error: quit, wait") // //
2936 ("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
2937 "minimum message level which will be considered as fatal and exit with 1") // //
2938 ("graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(false), "produce graphviz output") // //
2939 ("mermaid", bpo::value<std::string>()->default_value(""), "produce graph output in mermaid format in file under specified name or on stdout if argument is \"-\"") // //
2940 ("timeout,t", bpo::value<uint64_t>()->default_value(0), "forced exit timeout (in seconds)") // //
2941 ("dds,D", bpo::value<std::string>()->default_value(""), "create DDS configuration") // //
2942 ("dds-workflow-suffix,D", bpo::value<std::string>()->default_value(""), "suffix for DDS names") // //
2943 ("dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(false), "dump workflow as JSON") // //
2944 ("dump-workflow-file", bpo::value<std::string>()->default_value("-"), "file to which do the dump") // //
2945 ("driver-mode", bpo::value<DriverMode>(&driverMode)->default_value(DriverMode::STANDALONE), R"(how to run the driver. default: "standalone". Valid: "embedded")") // //
2946 ("run", bpo::value<bool>()->zero_tokens()->default_value(false), "run workflow merged so far. It implies --batch. Use --no-batch to see the GUI") // //
2947 ("no-IPC", bpo::value<bool>()->zero_tokens()->default_value(false), "disable IPC topology optimization") // //
2948 ("o2-control,o2", bpo::value<std::string>()->default_value(""), "dump O2 Control workflow configuration under the specified name") //
2949 ("resources-monitoring", bpo::value<unsigned short>()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds") //
2950 ("resources-monitoring-file", bpo::value<std::string>()->default_value("performanceMetrics.json"), "file where to dump the metrics") //
2951 ("resources-monitoring-dump-interval", bpo::value<unsigned short>()->default_value(0), "dump monitoring information to disk every provided seconds"); //
2952 // some of the options must be forwarded by default to the device
2953 executorOptions.add(DeviceSpecHelpers::getForwardedDeviceOptions());
2954
2955 gHiddenDeviceOptions.add_options() //
2956 ("id,i", bpo::value<std::string>(), "device id for child spawning") //
2957 ("channel-config", bpo::value<std::vector<std::string>>(), "channel configuration") //
2958 ("control", "control plugin") //
2959 ("log-color", "logging color scheme")("color", "logging color scheme");
2960
2961 bpo::options_description visibleOptions;
2962 visibleOptions.add(executorOptions);
2963
2964 auto physicalWorkflow = workflow;
2965 std::map<std::string, size_t> rankIndex;
2966 // We remove the duplicates because for the moment child get themself twice:
2967 // once from the actual definition in the child, a second time from the
2968 // configuration they get passed by their parents.
2969 // Notice that we do not know in which order we will get the workflows, so
2970 // while we keep the order of DataProcessors we reshuffle them based on
2971 // some hopefully unique hash.
2972 size_t workflowHashA = 0;
2973 std::hash<std::string> hash_fn;
2974
2975 for (auto& dp : workflow) {
2976 workflowHashA += hash_fn(dp.name);
2977 }
2978
2979 for (auto& dp : workflow) {
2980 rankIndex.insert(std::make_pair(dp.name, workflowHashA));
2981 }
2982
2983 std::vector<DataProcessorInfo> dataProcessorInfos;
2984 CommandInfo commandInfo{};
2985
2986 if (isatty(STDIN_FILENO) == false && isInputConfig()) {
2987 std::vector<DataProcessorSpec> importedWorkflow;
2988 bool previousWorked = WorkflowSerializationHelpers::import(std::cin, importedWorkflow, dataProcessorInfos, commandInfo);
2989 if (previousWorked == false) {
2990 exit(1);
2991 }
2992
2993 size_t workflowHashB = 0;
2994 for (auto& dp : importedWorkflow) {
2995 workflowHashB += hash_fn(dp.name);
2996 }
2997
2998 // FIXME: Streamline...
2999 // We remove the duplicates because for the moment child get themself twice:
3000 // once from the actual definition in the child, a second time from the
3001 // configuration they get passed by their parents.
3002 for (auto& dp : importedWorkflow) {
3003 auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(),
3004 [&name = dp.name](DataProcessorSpec const& spec) { return spec.name == name; });
3005 if (found == physicalWorkflow.end()) {
3006 physicalWorkflow.push_back(dp);
3007 rankIndex.insert(std::make_pair(dp.name, workflowHashB));
3008 }
3009 }
3010 }
3011
3016 for (auto& dp : physicalWorkflow) {
3017 auto isExpendable = [](DataProcessorLabel const& label) { return label.value == "expendable" || label.value == "non-critical"; };
3018 if (std::find_if(dp.labels.begin(), dp.labels.end(), isExpendable) != dp.labels.end()) {
3019 for (auto& output : dp.outputs) {
3020 if (output.lifetime == Lifetime::Timeframe) {
3021 output.lifetime = Lifetime::Sporadic;
3022 }
3023 }
3024 }
3025 }
3026
3028 OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES"));
3030 // We insert the hash for the internal devices.
3031 WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext);
3032 auto& dec = configContext.services().get<DanglingEdgesContext>();
3033 if (!(dec.requestedAODs.empty() && dec.requestedDYNs.empty() && dec.requestedIDXs.empty() && dec.requestedTIMs.empty())) {
3034 driverServices.push_back(ArrowSupport::arrowBackendSpec());
3035 }
3036 for (auto& service : driverServices) {
3037 if (service.injectTopology == nullptr) {
3038 continue;
3039 }
3040 WorkflowSpecNode node{physicalWorkflow};
3041 service.injectTopology(node, configContext);
3042 }
3043 for (auto& dp : physicalWorkflow) {
3044 if (dp.name.rfind("internal-", 0) == 0) {
3045 rankIndex.insert(std::make_pair(dp.name, hash_fn("internal")));
3046 }
3047 }
3048
3049 // We sort dataprocessors and Inputs / outputs by name, so that the edges are
3050 // always in the same order.
3051 std::stable_sort(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec const& a, DataProcessorSpec const& b) {
3052 return a.name < b.name;
3053 });
3054
3055 for (auto& dp : physicalWorkflow) {
3056 std::stable_sort(dp.inputs.begin(), dp.inputs.end(),
3057 [](InputSpec const& a, InputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3058 std::stable_sort(dp.outputs.begin(), dp.outputs.end(),
3059 [](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3060 }
3061
3062 // Create a list of all the edges, so that we can do a topological sort
3063 // before we create the graph.
3064 std::vector<std::pair<int, int>> edges;
3065
3066 if (physicalWorkflow.size() > 1) {
3067 edges = TopologyPolicyHelpers::buildEdges(physicalWorkflow);
3068
3069 auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair<int, int>), edges.size());
3070 if (topoInfos.size() != physicalWorkflow.size()) {
3071 // Check missing resilincy of one of the tasks
3072 checkNonResiliency(physicalWorkflow, edges);
3073 throw std::runtime_error("Unable to do topological sort of the resulting workflow. Do you have loops?\n" + debugTopoInfo(physicalWorkflow, topoInfos, edges));
3074 }
3075 // Sort by layer and then by name, to ensure stability.
3076 std::stable_sort(topoInfos.begin(), topoInfos.end(), [&workflow = physicalWorkflow](TopoIndexInfo const& a, TopoIndexInfo const& b) {
3077 auto aRank = std::make_tuple(a.layer, -workflow.at(a.index).outputs.size(), workflow.at(a.index).name);
3078 auto bRank = std::make_tuple(b.layer, -workflow.at(b.index).outputs.size(), workflow.at(b.index).name);
3079 return aRank < bRank;
3080 });
3081 // Reverse index and apply the result
3082 std::vector<int> dataProcessorOrder;
3083 dataProcessorOrder.resize(topoInfos.size());
3084 for (size_t i = 0; i < topoInfos.size(); ++i) {
3085 dataProcessorOrder[topoInfos[i].index] = i;
3086 }
3087 std::vector<int> newLocations;
3088 newLocations.resize(dataProcessorOrder.size());
3089 for (size_t i = 0; i < dataProcessorOrder.size(); ++i) {
3090 newLocations[dataProcessorOrder[i]] = i;
3091 }
3092 apply_permutation(physicalWorkflow, newLocations);
3093 }
3094
3095 // Use the hidden options as veto, all config specs matching a definition
3096 // in the hidden options are skipped in order to avoid duplicate definitions
3097 // in the main parser. Note: all config specs are forwarded to devices
3098 visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions));
3099
3100 bpo::options_description od;
3101 od.add(visibleOptions);
3102 od.add(gHiddenDeviceOptions);
3103
3104 // FIXME: decide about the policy for handling unrecognized arguments
3105 // command_line_parser with option allow_unregistered() can be used
3106 using namespace bpo::command_line_style;
3107 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
3108 bpo::variables_map varmap;
3109 try {
3110 bpo::store(
3111 bpo::command_line_parser(argc, argv)
3112 .options(od)
3113 .style(style)
3114 .run(),
3115 varmap);
3116 } catch (std::exception const& e) {
3117 LOGP(error, "error parsing options of {}: {}", argv[0], e.what());
3118 exit(1);
3119 }
3120 conflicting_options(varmap, "dds", "o2-control");
3121 conflicting_options(varmap, "dds", "dump-workflow");
3122 conflicting_options(varmap, "dds", "run");
3123 conflicting_options(varmap, "dds", "graphviz");
3124 conflicting_options(varmap, "o2-control", "dump-workflow");
3125 conflicting_options(varmap, "o2-control", "run");
3126 conflicting_options(varmap, "o2-control", "graphviz");
3127 conflicting_options(varmap, "run", "dump-workflow");
3128 conflicting_options(varmap, "run", "graphviz");
3129 conflicting_options(varmap, "run", "mermaid");
3130 conflicting_options(varmap, "dump-workflow", "graphviz");
3131 conflicting_options(varmap, "no-batch", "batch");
3132
3133 if (varmap.count("help")) {
3134 printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions);
3135 exit(0);
3136 }
3140 if (varmap.count("severity")) {
3141 auto logLevel = varmap["severity"].as<std::string>();
3142 if (logLevel == "debug") {
3143 fair::Logger::SetConsoleSeverity(fair::Severity::debug);
3144 } else if (logLevel == "detail") {
3145 fair::Logger::SetConsoleSeverity(fair::Severity::detail);
3146 } else if (logLevel == "info") {
3147 fair::Logger::SetConsoleSeverity(fair::Severity::info);
3148 } else if (logLevel == "warning") {
3149 fair::Logger::SetConsoleSeverity(fair::Severity::warning);
3150 } else if (logLevel == "error") {
3151 fair::Logger::SetConsoleSeverity(fair::Severity::error);
3152 } else if (logLevel == "important") {
3153 fair::Logger::SetConsoleSeverity(fair::Severity::important);
3154 } else if (logLevel == "alarm") {
3155 fair::Logger::SetConsoleSeverity(fair::Severity::alarm);
3156 } else if (logLevel == "critical") {
3157 fair::Logger::SetConsoleSeverity(fair::Severity::critical);
3158 } else if (logLevel == "fatal") {
3159 fair::Logger::SetConsoleSeverity(fair::Severity::fatal);
3160 } else {
3161 LOGP(error, "Invalid log level '{}'", logLevel);
3162 exit(1);
3163 }
3164 }
3165
3166 if (varmap["log-timestamp-us"].as<bool>()) {
3167 fair::Logger::DefineVerbosity(fair::Verbosity::user1,
3168 fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us,
3169 fair::VerbositySpec::Info::severity));
3170 fair::Logger::SetVerbosity(fair::Verbosity::user1);
3171 }
3172
3173 enableSignposts(varmap["signposts"].as<std::string>());
3174
3175 auto evaluateBatchOption = [&varmap]() -> bool {
3176 if (varmap.count("no-batch") > 0) {
3177 return false;
3178 }
3179 if (varmap.count("batch") == 0) {
3180 // default value
3181 return isatty(fileno(stdout)) == 0;
3182 }
3183 // FIXME: should actually use the last value, but for some reason the
3184 // values are not filled into the vector, even if specifying `-b true`
3185 // need to find out why the boost program options example is not working
3186 // in our case. Might depend on the parser options
3187 // auto value = varmap["batch"].as<std::vector<std::string>>();
3188 return true;
3189 };
3190 DriverInfo driverInfo{
3191 .sendingPolicies = sendingPolicies,
3192 .forwardingPolicies = forwardingPolicies,
3193 .callbacksPolicies = callbacksPolicies};
3194 driverInfo.states.reserve(10);
3195 driverInfo.sigintRequested = false;
3196 driverInfo.sigchldRequested = false;
3197 driverInfo.channelPolicies = channelPolicies;
3198 driverInfo.completionPolicies = completionPolicies;
3199 driverInfo.dispatchPolicies = dispatchPolicies;
3200 driverInfo.resourcePolicies = resourcePolicies;
3201 driverInfo.argc = argc;
3202 driverInfo.argv = argv;
3203 driverInfo.noSHMCleanup = varmap["no-cleanup"].as<bool>();
3204 driverInfo.processingPolicies.termination = varmap["completion-policy"].as<TerminationPolicy>();
3205 driverInfo.processingPolicies.earlyForward = varmap["early-forward-policy"].as<EarlyForwardPolicy>();
3206 driverInfo.mode = varmap["driver-mode"].as<DriverMode>();
3207
3208 auto batch = evaluateBatchOption();
3209 DriverConfig driverConfig{
3210 .batch = batch,
3211 .driverHasGUI = (batch == false) || getenv("DPL_DRIVER_REMOTE_GUI") != nullptr,
3212 };
3213
3214 if (varmap["error-policy"].defaulted() && driverConfig.batch == false) {
3215 driverInfo.processingPolicies.error = TerminationPolicy::WAIT;
3216 } else {
3217 driverInfo.processingPolicies.error = varmap["error-policy"].as<TerminationPolicy>();
3218 }
3219 driverInfo.minFailureLevel = varmap["min-failure-level"].as<LogParsingHelpers::LogLevel>();
3220 driverInfo.startTime = uv_hrtime();
3221 driverInfo.startTimeMsFromEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
3222 std::chrono::system_clock::now().time_since_epoch())
3223 .count();
3224 driverInfo.timeout = varmap["timeout"].as<uint64_t>();
3225 driverInfo.deployHostname = varmap["hostname"].as<std::string>();
3226 driverInfo.resources = varmap["resources"].as<std::string>();
3227 driverInfo.resourcesMonitoringInterval = varmap["resources-monitoring"].as<unsigned short>();
3228 driverInfo.resourcesMonitoringFilename = varmap["resources-monitoring-file"].as<std::string>();
3229 driverInfo.resourcesMonitoringDumpInterval = varmap["resources-monitoring-dump-interval"].as<unsigned short>();
3230
3231 // FIXME: should use the whole dataProcessorInfos, actually...
3232 driverInfo.processorInfo = dataProcessorInfos;
3233 driverInfo.configContext = &configContext;
3234
3235 DriverControl driverControl;
3236 initialiseDriverControl(varmap, driverInfo, driverControl);
3237
3238 commandInfo.merge(CommandInfo(argc, argv));
3239
3240 std::string frameworkId;
3241 // If the id is set, this means this is a device,
3242 // otherwise this is the driver.
3243 if (varmap.count("id")) {
3244 // The framework id does not want to know anything about DDS template expansion
3245 // so we simply drop it. Notice that the "id" Property is still the same as the
3246 // original --id option.
3247 frameworkId = std::regex_replace(varmap["id"].as<std::string>(), std::regex{"_dds.*"}, "");
3248 driverInfo.uniqueWorkflowId = fmt::format("{}", getppid());
3249 driverInfo.defaultDriverClient = "stdout://";
3250 } else {
3251 driverInfo.uniqueWorkflowId = fmt::format("{}", getpid());
3252 driverInfo.defaultDriverClient = "ws://";
3253 }
3254 return runStateMachine(physicalWorkflow,
3255 currentWorkflow,
3256 dataProcessorInfos,
3257 commandInfo,
3258 driverControl,
3259 driverInfo,
3260 driverConfig,
3262 detectedParams,
3263 varmap,
3264 driverServices,
3265 frameworkId);
3266}
3267
3268void doBoostException(boost::exception&, char const* processName)
3269{
3270 LOGP(error, "error while setting up workflow in {}: {}",
3271 processName, boost::current_exception_diagnostic_information(true));
3272}
3273#pragma GCC diagnostic push
std::vector< std::string > labels
struct uv_timer_s uv_timer_t
struct uv_async_s uv_async_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
std::vector< OutputRoute > routes
std::ostringstream debug
std::unique_ptr< expressions::Node > node
int32_t i
int32_t retVal
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
o2::phos::PHOSEnergySlot es
uint16_t pos
Definition RawData.h:3
uint16_t pid
Definition RawData.h:2
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:554
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:532
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
o2::monitoring::Monitoring Monitoring
StringRef key
ServiceRegistryRef services() const
T get(uint32_t y, uint32_t x) const
Definition Array2D.h:199
static ServiceRegistryRef * globalDeviceRef(ServiceRegistryRef *ref=nullptr)
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLenum mode
Definition glcorearb.h:266
GLenum src
Definition glcorearb.h:1767
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum GLenum dst
Definition glcorearb.h:1767
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum const void * indices
Definition glcorearb.h:400
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean r
Definition glcorearb.h:1233
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLfloat param
Definition glcorearb.h:271
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
uint8_t itsSharedClusterMap uint8_t
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ServiceSpec > ServiceSpecs
RuntimeErrorRef runtime_error(const char *)
EarlyForwardPolicy
When to enable the early forwarding optimization:
std::vector< OverrideServiceSpec > OverrideServiceSpecs
void parse_http_request(char *start, size_t size, HTTPParser *parser)
RuntimeError & error_from_ref(RuntimeErrorRef)
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
void dumpDeviceSpec2O2Control(std::string workflowName, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
Dumps the AliECS compatible workflow and task templates for a DPL workflow.
if(!okForPhiMin(phi0, phi1))
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
void empty(int)
std::vector< std::string > split(const std::string &str, char delimiter=',')
int runStateMachine(DataProcessorSpecs const &workflow, WorkflowInfo const &workflowInfo, DataProcessorInfos const &previousDataProcessorInfos, CommandInfo const &commandInfo, DriverControl &driverControl, DriverInfo &driverInfo, DriverConfig &driverConfig, std::vector< DeviceMetricsInfo > &metricsInfos, std::vector< ConfigParamSpec > const &detectedParams, boost::program_options::variables_map &varmap, std::vector< ServiceSpec > &driverServices, std::string frameworkId)
AlgorithmSpec dryRun(DeviceSpec const &spec)
auto bindGUIPort
void getChildData(int infd, DeviceInfo &outinfo)
void overrideLabels(ConfigContext &ctx, WorkflowSpec &workflow)
void apply_permutation(std::vector< T > &v, std::vector< int > &indices)
int doMain(int argc, char **argv, o2::framework::WorkflowSpec const &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ConfigParamSpec > const &currentWorkflowOptions, std::vector< ConfigParamSpec > const &detectedParams, o2::framework::ConfigContext &configContext)
void overridePipeline(ConfigContext &ctx, WorkflowSpec &workflow)
void enableSignposts(std::string const &signpostsToEnable)
void spawnDevice(uv_loop_t *loop, DeviceRef ref, std::vector< DeviceSpec > const &specs, DriverInfo &driverInfo, std::vector< DeviceControl > &, std::vector< DeviceExecution > &executions, std::vector< DeviceInfo > &deviceInfos, std::vector< DataProcessingStates > &allStates, ServiceRegistryRef serviceRegistry, boost::program_options::variables_map &varmap, std::vector< DeviceStdioContext > &childFds, unsigned parentCPU, unsigned parentNode)
void killChildren(std::vector< DeviceInfo > &infos, int sig)
void ws_connect_callback(uv_stream_t *server, int status)
A callback for the rest engine.
std::vector< DataProcessingStates > DataProcessingStatesInfos
void createPipes(int *pipes)
void doDPLException(o2::framework::RuntimeErrorRef &ref, char const *)
std::vector< DeviceExecution > DeviceExecutions
void overrideAll(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
std::vector< DeviceMetricsInfo > gDeviceMetricsInfos
void force_exit_callback(uv_timer_s *ctx)
std::string debugTopoInfo(std::vector< DataProcessorSpec > const &specs, std::vector< TopoIndexInfo > const &infos, std::vector< std::pair< int, int > > const &edges)
void overrideCloning(ConfigContext &ctx, WorkflowSpec &workflow)
void doBoostException(boost::exception &e, const char *)
bool processSigChild(DeviceInfos &infos, DeviceSpecs &specs)
void checkNonResiliency(std::vector< DataProcessorSpec > const &specs, std::vector< std::pair< int, int > > const &edges)
std::vector< std::regex > getDumpableMetrics()
void stream_config(uv_work_t *req)
std::vector< DataProcessorSpec > DataProcessorSpecs
void dumpRunSummary(DriverServerContext &context, DriverInfo const &driverInfo, DeviceInfos const &infos, DeviceSpecs const &specs)
void conflicting_options(const boost::program_options::variables_map &vm, const std::string &opt1, const std::string &opt2)
Helper to to detect conflicting options.
void doDefaultWorkflowTerminationHook()
bool checkIfCanExit(std::vector< DeviceInfo > const &infos)
volatile sig_atomic_t sigchld_requested
bool isOutputToPipe()
void handleSignals()
std::vector< DeviceSpec > DeviceSpecs
std::vector< DataProcessorInfo > DataProcessorInfos
volatile sig_atomic_t forceful_exit
bool areAllChildrenGone(std::vector< DeviceInfo > &infos)
Check the state of the children.
std::vector< DeviceControl > DeviceControls
volatile sig_atomic_t double_sigint
void close_websocket(uv_handle_t *handle)
void handleChildrenStdio(DriverServerContext *serverContext, std::string const &forwardedStdin, std::vector< DeviceStdioContext > &childFds, std::vector< uv_poll_t * > &handles)
char * getIdString(int argc, char **argv)
bool isInputConfig()
std::unique_ptr< o2::framework::ServiceRegistry > createRegistry()
void log_callback(uv_poll_t *handle, int status, int events)
void processChildrenOutput(uv_loop_t *loop, DriverInfo &driverInfo, DeviceInfos &infos, DeviceSpecs const &specs, DeviceControls &controls)
volatile sig_atomic_t graceful_exit
void single_step_callback(uv_timer_s *ctx)
Force single stepping of the children.
bpo::options_description gHiddenDeviceOptions("Hidden child options")
void doUnknownException(std::string const &s, char const *)
int doChild(int argc, char **argv, ServiceRegistry &serviceRegistry, DanglingEdgesContext &danglingEdgesContext, RunningWorkflowInfo const &runningWorkflow, RunningDeviceRef ref, DriverConfig const &driverConfig, ProcessingPolicies processingPolicies, std::string const &defaultDriverClient, uv_loop_t *loop)
o2::framework::ConfigContext createConfigContext(std::unique_ptr< ConfigParamRegistry > &workflowOptionsRegistry, o2::framework::ServiceRegistry &configRegistry, std::vector< o2::framework::ConfigParamSpec > &workflowOptions, std::vector< o2::framework::ConfigParamSpec > &extraOptions, int argc, char **argv)
int callMain(int argc, char **argv, int(*mainNoCatch)(int, char **))
void handle_crash(int sig)
void dumpMetricsCallback(uv_timer_t *handle)
void cleanupSHM(std::string const &uniqueWorkflowId)
Helper to invoke shared memory cleanup.
void initialiseDriverControl(bpo::variables_map const &varmap, DriverInfo &driverInfo, DriverControl &control)
Helper function to initialise the controller from the command line options.
void gui_callback(uv_timer_s *ctx)
std::vector< DeviceInfo > DeviceInfos
void printHelp(bpo::variables_map const &varmap, bpo::options_description const &executorOptions, std::vector< DataProcessorSpec > const &physicalWorkflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions)
void spawnRemoteDevice(uv_loop_t *loop, std::string const &, DeviceSpec const &spec, DeviceControl &, DeviceExecution &, DeviceInfos &deviceInfos, DataProcessingStatesInfos &allStates)
void websocket_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
int mainNoCatch(int argc, char **argv)
DriverServerContext * serverContext
std::vector< ConfigParamSpec > options
std::vector< std::string > args
static ServiceSpec arrowBackendSpec()
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static std::vector< ServiceSpec > defaultServices()
static std::vector< ComputingResource > parseResources(std::string const &resourceString)
static std::vector< ConfigParamSpec > discover(ConfigParamRegistry &, int, char **)
static boost::program_options::options_description prepareOptionDescriptions(ContainerType const &workflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions, options_description vetos=options_description(), std::string mode="full")
populate boost program options for a complete workflow
static void populateBoostProgramOptions(options_description &options, const std::vector< ConfigParamSpec > &specs, options_description vetos=options_description())
static void dumpDeviceSpec2DDS(std::ostream &out, DriverMode mode, std::string const &workflowSuffix, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
static void preExitCallbacks(std::vector< ServiceExitHandle >, ServiceRegistryRef)
Invoke callback to be executed on exit, in reverse order.
std::string executable
The executable name of the program which holds the DataProcessorSpec.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
Plugin interface for DPL GUIs.
Definition DebugGUI.h:30
virtual void * initGUI(char const *windowTitle, ServiceRegistry &registry)=0
virtual std::function< void(void)> getGUIDebugger(std::vector< o2::framework::DeviceInfo > const &infos, std::vector< o2::framework::DeviceSpec > const &devices, std::vector< o2::framework::DataProcessingStates > const &allStates, std::vector< o2::framework::DataProcessorInfo > const &metadata, std::vector< o2::framework::DeviceMetricsInfo > const &metricsInfos, o2::framework::DriverInfo const &driverInfo, std::vector< o2::framework::DeviceControl > &controls, o2::framework::DriverControl &driverControl)=0
static DeploymentMode deploymentMode()
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
char logFilter[MAX_USER_FILTER_SIZE]
Lines in the log should match this to be displayed.
bool quiet
wether we should be capturing device output.
std::string unprinted
An unterminated string which is not ready to be printed yet.
Definition DeviceInfo.h:63
static void validate(WorkflowSpec const &workflow)
static boost::program_options::options_description getForwardedDeviceOptions()
define the options which are forwarded to every child
static std::string reworkTimeslicePlaceholder(std::string const &str, DeviceSpec const &spec)
static void prepareArguments(bool defaultQuiet, bool defaultStopped, bool intereactive, unsigned short driverPort, DriverConfig const &driverConfig, std::vector< DataProcessorInfo > const &processorInfos, std::vector< DeviceSpec > const &deviceSpecs, std::vector< DeviceExecution > &deviceExecutions, std::vector< DeviceControl > &deviceControls, std::vector< ConfigParamSpec > const &detectedOptions, std::string const &uniqueWorkflowId)
static void reworkShmSegmentSize(std::vector< DataProcessorInfo > &infos)
static void reworkHomogeneousOption(std::vector< DataProcessorInfo > &infos, char const *name, char const *defaultValue)
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
std::string id
The id of the device, including time-pipelining and suffix.
Definition DeviceSpec.h:52
static int parseTracingFlags(std::string const &events)
std::vector< DeviceControl > & controls
bool batch
Whether the driver was started in batch mode or not.
std::vector< DriverState > forcedTransitions
std::vector< Callback > callbacks
DriverControlState state
Current state of the state machine player.
std::vector< DeviceSpec > * specs
std::vector< ServiceSummaryHandling > * summaryCallbacks
std::vector< DeviceMetricsInfo > * metrics
std::vector< DeviceInfo > * infos
static std::vector< ForwardingPolicy > createDefaultPolicies()
static void dumpDeviceSpec2Graphviz(std::ostream &, const Devices &specs)
Helper to dump a set of devices as a graphviz file.
static void dumpDataProcessorSpec2Graphviz(std::ostream &, const WorkflowSpec &specs, std::vector< std::pair< int, int > > const &edges={})
Helper to dump a workflow as a graphviz file.
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
LogLevel
Possible log levels for device log entries.
static LogLevel parseTokenLevel(std::string_view const s)
static void dumpDeviceSpec2Mermaid(std::ostream &, const Devices &specs)
Helper to dump a set of devices as a mermaid file.
Temporary struct to hold a metric after it has been parsed.
static bool dumpMetricsToJSON(std::vector< DeviceMetricsInfo > const &metrics, DeviceMetricsInfo const &driverMetrics, std::vector< DeviceSpec > const &specs, std::vector< std::regex > const &metricsToDump, std::ostream &out) noexcept
static bool isResourcesMonitoringEnabled(unsigned short interval) noexcept
Information about the running workflow.
void declareService(ServiceSpec const &spec, DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt=ServiceRegistry::globalDeviceSalt())
static OverrideServiceSpecs parseOverrides(char const *overrideString)
static ServiceSpecs filterDisabled(ServiceSpecs originals, OverrideServiceSpecs const &overrides)
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
Helper struct to keep track of the results of the topological sort.
static auto buildEdges(WorkflowSpec &physicalWorkflow) -> std::vector< std::pair< int, int > >
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)
static bool import(std::istream &s, std::vector< DataProcessorSpec > &workflow, std::vector< DataProcessorInfo > &metadata, CommandInfo &command)
uint16_t de
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels
uint64_t const void const *restrict const msg
Definition x9.h:153