Project
Loading...
Searching...
No Matches
runDataProcessing.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <memory>
12#define BOOST_BIND_GLOBAL_PLACEHOLDERS
13#include <stdexcept>
37#include "DeviceStateHelpers.h"
40#include "Framework/DebugGUI.h"
43#include "Framework/Logger.h"
47#include "Framework/Signpost.h"
68#include "DriverServerContext.h"
69#include "HTTPParser.h"
70#include "DPLWebSocket.h"
71#include "ArrowSupport.h"
73
76#include "DDSConfigHelpers.h"
77#include "O2ControlHelpers.h"
78#include "DeviceSpecHelpers.h"
79#include "GraphvizHelpers.h"
80#include "MermaidHelpers.h"
81#include "PropertyTreeHelpers.h"
84
85#include <Configuration/ConfigurationInterface.h>
86#include <Configuration/ConfigurationFactory.h>
87#include <Monitoring/MonitoringFactory.h>
89
90#include <fairmq/Device.h>
91#include <fairmq/DeviceRunner.h>
92#include <fairmq/shmem/Monitor.h>
93#include <fairmq/ProgOptions.h>
94
95#include <boost/program_options.hpp>
96#include <boost/program_options/options_description.hpp>
97#include <boost/program_options/variables_map.hpp>
98#include <boost/exception/diagnostic_information.hpp>
99#include <boost/property_tree/json_parser.hpp>
100
101#include <uv.h>
102#include <TEnv.h>
103#include <TSystem.h>
104
105#include <cinttypes>
106#include <cstdint>
107#include <cstdio>
108#include <cstdlib>
109#include <cstring>
110#include <csignal>
111#include <iostream>
112#include <map>
113#include <regex>
114#include <set>
115#include <string>
116#include <type_traits>
117#include <tuple>
118#include <chrono>
119#include <utility>
120#include <numeric>
121#include <functional>
122
123#include <fcntl.h>
124#include <netinet/ip.h>
125#include <sys/resource.h>
126#include <sys/select.h>
127#include <sys/socket.h>
128#include <sys/stat.h>
129#include <sys/time.h>
130#include <sys/types.h>
131#include <sys/un.h>
132#include <sys/wait.h>
133#include <unistd.h>
134#include <execinfo.h>
135#include <cfenv>
136#if defined(__linux__) && __has_include(<sched.h>)
137#include <sched.h>
138#elif __has_include(<linux/getcpu.h>)
139#include <linux/getcpu.h>
140#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
141#include <cpuid.h>
142#define CPUID(INFO, LEAF, SUBLEAF) __cpuid_count(LEAF, SUBLEAF, INFO[0], INFO[1], INFO[2], INFO[3])
143#define GETCPU(CPU) \
144 { \
145 uint32_t CPUInfo[4]; \
146 CPUID(CPUInfo, 1, 0); \
147 /* CPUInfo[1] is EBX, bits 24-31 are APIC ID */ \
148 if ((CPUInfo[3] & (1 << 9)) == 0) { \
149 CPU = -1; /* no APIC on chip */ \
150 } else { \
151 CPU = (unsigned)CPUInfo[1] >> 24; \
152 } \
153 if (CPU < 0) \
154 CPU = 0; \
155 }
156#endif
157
158using namespace o2::monitoring;
159using namespace o2::configuration;
160
161using namespace o2::framework;
162namespace bpo = boost::program_options;
163using DataProcessorInfos = std::vector<DataProcessorInfo>;
164using DeviceExecutions = std::vector<DeviceExecution>;
165using DeviceSpecs = std::vector<DeviceSpec>;
166using DeviceInfos = std::vector<DeviceInfo>;
167using DataProcessingStatesInfos = std::vector<DataProcessingStates>;
168using DeviceControls = std::vector<DeviceControl>;
169using DataProcessorSpecs = std::vector<DataProcessorSpec>;
170
171std::vector<DeviceMetricsInfo> gDeviceMetricsInfos;
172
173// FIXME: probably find a better place
174// these are the device options added by the framework, but they can be
175// overloaded in the config spec
176bpo::options_description gHiddenDeviceOptions("Hidden child options");
177
180
181void doBoostException(boost::exception& e, const char*);
183void doUnknownException(std::string const& s, char const*);
184
185char* getIdString(int argc, char** argv)
186{
187 for (int argi = 0; argi < argc; argi++) {
188 if (strcmp(argv[argi], "--id") == 0 && argi + 1 < argc) {
189 return argv[argi + 1];
190 }
191 }
192 return nullptr;
193}
194
195int callMain(int argc, char** argv, int (*mainNoCatch)(int, char**))
196{
197 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
198 int result = 1;
199 if (noCatch) {
200 try {
201 result = mainNoCatch(argc, argv);
203 doDPLException(ref, argv[0]);
204 throw;
205 }
206 } else {
207 try {
208 // The 0 here is an int, therefore having the template matching in the
209 // SFINAE expression above fit better the version which invokes user code over
210 // the default one.
211 // The default policy is a catch all pub/sub setup to be consistent with the past.
212 result = mainNoCatch(argc, argv);
213 } catch (boost::exception& e) {
214 doBoostException(e, argv[0]);
215 throw;
216 } catch (std::exception const& error) {
217 doUnknownException(error.what(), argv[0]);
218 throw;
220 doDPLException(ref, argv[0]);
221 throw;
222 } catch (...) {
223 doUnknownException("", argv[0]);
224 throw;
225 }
226 }
227 return result;
228}
229
230// Read from a given fd and print it.
231// return true if we can still read from it,
232// return false if we need to close the input pipe.
233//
234// FIXME: We should really print full lines.
235void getChildData(int infd, DeviceInfo& outinfo)
236{
237 char buffer[1024 * 16];
238 int bytes_read;
239 // NOTE: do not quite understand read ends up blocking if I read more than
240 // once. Oh well... Good enough for now.
241 int64_t total_bytes_read = 0;
242 int64_t count = 0;
243 bool once = false;
244 while (true) {
245 bytes_read = read(infd, buffer, 1024 * 16);
246 if (bytes_read == 0) {
247 return;
248 }
249 if (!once) {
250 once = true;
251 }
252 if (bytes_read < 0) {
253 return;
254 }
255 assert(bytes_read > 0);
256 outinfo.unprinted.append(buffer, bytes_read);
257 count++;
258 }
259}
260
264bool checkIfCanExit(std::vector<DeviceInfo> const& infos)
265{
266 if (infos.empty()) {
267 return false;
268 }
269 for (auto& info : infos) {
270 if (info.readyToQuit == false) {
271 return false;
272 }
273 }
274 return true;
275}
276
277// Kill all the active children. Exit code
278// is != 0 if any of the children had an error.
279void killChildren(std::vector<DeviceInfo>& infos, int sig)
280{
281 for (auto& info : infos) {
282 if (info.active == true) {
283 kill(info.pid, sig);
284 }
285 }
286}
287
289bool areAllChildrenGone(std::vector<DeviceInfo>& infos)
290{
291 for (auto& info : infos) {
292 if ((info.pid != 0) && info.active) {
293 return false;
294 }
295 }
296 return true;
297}
298
300namespace
301{
302int calculateExitCode(DriverInfo& driverInfo, DeviceSpecs& deviceSpecs, DeviceInfos& infos)
303{
304 std::regex regexp(R"(^\[([\d+:]*)\]\[\w+\] )");
305 if (!driverInfo.lastError.empty()) {
306 LOGP(error, "SEVERE: DPL driver encountered an error while running.\n{}",
307 driverInfo.lastError);
308 return 1;
309 }
310 for (size_t di = 0; di < deviceSpecs.size(); ++di) {
311 auto& info = infos[di];
312 auto& spec = deviceSpecs[di];
313 if (info.maxLogLevel >= driverInfo.minFailureLevel) {
314 LOGP(error, "SEVERE: Device {} ({}) had at least one message above severity {}: {}",
315 spec.name,
316 info.pid,
317 (int)info.minFailureLevel,
318 std::regex_replace(info.firstSevereError, regexp, ""));
319 return 1;
320 }
321 if (info.exitStatus != 0) {
322 LOGP(error, "SEVERE: Device {} ({}) returned with {}",
323 spec.name,
324 info.pid,
325 info.exitStatus);
326 return info.exitStatus;
327 }
328 }
329 return 0;
330}
331} // namespace
332
333void createPipes(int* pipes)
334{
335 auto p = pipe(pipes);
336
337 if (p == -1) {
338 std::cerr << "Unable to create PIPE: ";
339 switch (errno) {
340 case EFAULT:
341 assert(false && "EFAULT while reading from pipe");
342 break;
343 case EMFILE:
344 std::cerr << "Too many active descriptors";
345 break;
346 case ENFILE:
347 std::cerr << "System file table is full";
348 break;
349 default:
350 std::cerr << "Unknown PIPE" << std::endl;
351 };
352 // Kill immediately both the parent and all the children
353 kill(-1 * getpid(), SIGKILL);
354 }
355}
356
357// We don't do anything in the signal handler but
358// we simply note down the fact a signal arrived.
359// All the processing is done by the state machine.
360volatile sig_atomic_t graceful_exit = false;
361volatile sig_atomic_t forceful_exit = false;
362volatile sig_atomic_t sigchld_requested = false;
363volatile sig_atomic_t double_sigint = false;
364
365static void handle_sigint(int)
366{
367 if (graceful_exit == false) {
368 graceful_exit = true;
369 } else {
370 forceful_exit = true;
371 // We keep track about forceful exiting via
372 // a double SIGINT, so that we do not print
373 // any extra message. This means that if the
374 // forceful_exit is set by the timer, we will
375 // get an error message about each child which
376 // did not gracefully exited.
377 double_sigint = true;
378 }
379}
380
382void cleanupSHM(std::string const& uniqueWorkflowId)
383{
384 using namespace fair::mq::shmem;
385 fair::mq::shmem::Monitor::Cleanup(SessionId{"dpl_" + uniqueWorkflowId}, false);
386}
387
388static void handle_sigchld(int) { sigchld_requested = true; }
389
391 std::string const&,
392 DeviceSpec const& spec,
395 DeviceInfos& deviceInfos,
396 DataProcessingStatesInfos& allStates)
397{
398 LOG(info) << "Starting " << spec.id << " as remote device";
399 DeviceInfo info{
400 .pid = 0,
401 .historyPos = 0,
402 .historySize = 1000,
403 .maxLogLevel = LogParsingHelpers::LogLevel::Debug,
404 .active = true,
405 .readyToQuit = false,
406 .inputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_timeslice", 0, 0, {}},
407 .outputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_output", 0, 0, {}},
408 .lastSignal = uv_hrtime() - 10000000};
409
410 deviceInfos.emplace_back(info);
411 timespec now;
412 clock_gettime(CLOCK_REALTIME, &now);
413 uint64_t offset = now.tv_sec * 1000 - uv_now(loop);
414 allStates.emplace_back(TimingHelpers::defaultRealtimeBaseConfigurator(offset, loop),
416 // Let's add also metrics information for the given device
418}
419
425
426void log_callback(uv_poll_t* handle, int status, int events)
427{
428 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
429 auto* logContext = reinterpret_cast<DeviceLogContext*>(handle->data);
430 std::vector<DeviceInfo>* infos = logContext->serverContext->infos;
431 DeviceInfo& info = infos->at(logContext->index);
432
433 if (status < 0) {
434 info.active = false;
435 }
436 if (events & UV_READABLE) {
437 getChildData(logContext->fd, info);
438 }
439 if (events & UV_DISCONNECT) {
440 info.active = false;
441 }
442 O2_SIGNPOST_EVENT_EMIT(driver, sid, "loop", "log_callback invoked by poller for device %{xcode:pid}d which is %{public}s%{public}s",
443 info.pid, info.active ? "active" : "inactive",
444 info.active ? " and still has data to read." : ".");
445 if (info.active == false) {
446 uv_poll_stop(handle);
447 }
448 uv_async_send(logContext->serverContext->asyncLogProcessing);
449}
450
452{
453 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
454 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "close_websocket");
455 delete (WSDPLHandler*)handle->data;
456}
457
458void websocket_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
459{
460 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, stream->loop);
461 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
462 auto* handler = (WSDPLHandler*)stream->data;
463 if (nread == 0) {
464 return;
465 }
466 if (nread == UV_EOF) {
467 if (buf->base) {
468 free(buf->base);
469 }
470 uv_read_stop(stream);
472 return;
473 }
474 if (nread < 0) {
475 // FIXME: should I close?
476 LOG(error) << "websocket_callback: Error while reading from websocket";
477 if (buf->base) {
478 free(buf->base);
479 }
480 uv_read_stop(stream);
482 return;
483 }
484 try {
485 LOG(debug3) << "Parsing request with " << handler << " with " << nread << " bytes";
486 parse_http_request(buf->base, nread, handler);
487 if (buf->base) {
488 free(buf->base);
489 }
490 } catch (WSError& e) {
491 LOG(error) << "Error while parsing request: " << e.message;
492 handler->error(e.code, e.message.c_str());
493 free(buf->base);
494 }
495}
496
497static void my_alloc_cb(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
498{
499 buf->base = (char*)malloc(suggested_size);
500 buf->len = suggested_size;
501}
502
504void ws_connect_callback(uv_stream_t* server, int status)
505{
506 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, server->loop);
507 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
508 auto* serverContext = reinterpret_cast<DriverServerContext*>(server->data);
509 if (status < 0) {
510 LOGF(error, "New connection error %s\n", uv_strerror(status));
511 // error!
512 return;
513 }
514
515 auto* client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
516 uv_tcp_init(serverContext->loop, client);
517 if (uv_accept(server, (uv_stream_t*)client) == 0) {
518 client->data = new WSDPLHandler((uv_stream_t*)client, serverContext);
519 uv_read_start((uv_stream_t*)client, (uv_alloc_cb)my_alloc_cb, websocket_callback);
520 } else {
521 uv_close((uv_handle_t*)client, nullptr);
522 }
523}
524
526 std::string configuration;
527 int fd;
528};
529
530void stream_config(uv_work_t* req)
531{
532 auto* context = (StreamConfigContext*)req->data;
533 size_t result = write(context->fd, context->configuration.data(), context->configuration.size());
534 if (result != context->configuration.size()) {
535 LOG(error) << "Unable to pass configuration to children";
536 }
537 {
538 auto error = fsync(context->fd);
539 switch (error) {
540 case EBADF:
541 LOGP(error, "EBADF while flushing child stdin");
542 break;
543 case EINVAL:
544 LOGP(error, "EINVAL while flushing child stdin");
545 break;
546 case EINTR:
547 LOGP(error, "EINTR while flushing child stdin");
548 break;
549 case EIO:
550 LOGP(error, "EIO while flushing child stdin");
551 break;
552 default:;
553 }
554 }
555 {
556 auto error = close(context->fd); // Not allowing further communication...
557 switch (error) {
558 case EBADF:
559 LOGP(error, "EBADF while closing child stdin");
560 break;
561 case EINTR:
562 LOGP(error, "EINTR while closing child stdin");
563 break;
564 case EIO:
565 LOGP(error, "EIO while closing child stdin");
566 break;
567 default:;
568 }
569 }
570}
571
572struct DeviceRef {
573 int index;
574};
575
579};
580
582{
583 struct sigaction sa_handle_int;
584 sa_handle_int.sa_handler = handle_sigint;
585 sigemptyset(&sa_handle_int.sa_mask);
586 sa_handle_int.sa_flags = SA_RESTART;
587 if (sigaction(SIGINT, &sa_handle_int, nullptr) == -1) {
588 perror("Unable to install signal handler");
589 exit(1);
590 }
591 struct sigaction sa_handle_term;
592 sa_handle_term.sa_handler = handle_sigint;
593 sigemptyset(&sa_handle_term.sa_mask);
594 sa_handle_term.sa_flags = SA_RESTART;
595 if (sigaction(SIGTERM, &sa_handle_int, nullptr) == -1) {
596 perror("Unable to install signal handler");
597 exit(1);
598 }
599}
600
602 std::string const& forwardedStdin,
603 std::vector<DeviceStdioContext>& childFds,
604 std::vector<uv_poll_t*>& handles)
605{
606 for (size_t i = 0; i < childFds.size(); ++i) {
607 auto& childstdin = childFds[i].childstdin;
608 auto& childstdout = childFds[i].childstdout;
609
610 auto* req = (uv_work_t*)malloc(sizeof(uv_work_t));
611 req->data = new StreamConfigContext{forwardedStdin, childstdin[1]};
612 uv_queue_work(serverContext->loop, req, stream_config, nullptr);
613
614 // Setting them to non-blocking to avoid haing the driver hang when
615 // reading from child.
616 int resultCode = fcntl(childstdout[0], F_SETFL, O_NONBLOCK);
617 if (resultCode == -1) {
618 LOGP(error, "Error while setting the socket to non-blocking: {}", strerror(errno));
619 }
620
622 auto addPoller = [&handles, &serverContext](int index, int fd) {
623 auto* context = new DeviceLogContext{};
624 context->index = index;
625 context->fd = fd;
626 context->serverContext = serverContext;
627 handles.push_back((uv_poll_t*)malloc(sizeof(uv_poll_t)));
628 auto handle = handles.back();
629 handle->data = context;
630 uv_poll_init(serverContext->loop, handle, fd);
631 uv_poll_start(handle, UV_READABLE, log_callback);
632 };
633
634 addPoller(i, childstdout[0]);
635 }
636}
637
638void handle_crash(int sig)
639{
640 // dump demangled stack trace
641 void* array[1024];
642 int size = backtrace(array, 1024);
643
644 {
645 char buffer[1024];
646 char const* msg = "*** Program crashed (%s)\nBacktrace by DPL:\n";
647 snprintf(buffer, 1024, msg, strsignal(sig));
648 if (sig == SIGFPE) {
649 if (std::fetestexcept(FE_DIVBYZERO)) {
650 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - DIVISION BY ZERO");
651 } else if (std::fetestexcept(FE_INVALID)) {
652 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - INVALID RESULT");
653 } else {
654 snprintf(buffer, 1024, msg, "FLOATING POINT EXCEPTION - UNKNOWN REASON");
655 }
656 }
657 auto retVal = write(STDERR_FILENO, buffer, strlen(buffer));
658 (void)retVal;
659 }
661 {
662 char const* msg = "Backtrace complete.\n";
663 int len = strlen(msg); /* the byte length of the string */
664
665 auto retVal = write(STDERR_FILENO, msg, len);
666 (void)retVal;
667 fsync(STDERR_FILENO);
668 }
669 _exit(1);
670}
671
676 std::vector<DeviceSpec> const& specs,
677 DriverInfo& driverInfo,
678 std::vector<DeviceControl>&,
679 std::vector<DeviceExecution>& executions,
680 std::vector<DeviceInfo>& deviceInfos,
681 std::vector<DataProcessingStates>& allStates,
682 ServiceRegistryRef serviceRegistry,
683 boost::program_options::variables_map& varmap,
684 std::vector<DeviceStdioContext>& childFds,
685 unsigned parentCPU,
686 unsigned parentNode)
687{
688 // FIXME: this might not work when more than one DPL driver on the same
689 // machine. Hopefully we do not care.
690 // Not how the first port is actually used to broadcast clients.
691 auto& spec = specs[ref.index];
692 auto& execution = executions[ref.index];
693
694 for (auto& service : spec.services) {
695 if (service.preFork != nullptr) {
696 service.preFork(serviceRegistry, DeviceConfig{varmap});
697 }
698 }
699 // If we have a framework id, it means we have already been respawned
700 // and that we are in a child. If not, we need to fork and re-exec, adding
701 // the framework-id as one of the options.
702 pid_t id = 0;
703 id = fork();
704 // We are the child: prepare options and reexec.
705 if (id == 0) {
706 // We allow being debugged and do not terminate on SIGTRAP
707 signal(SIGTRAP, SIG_IGN);
708 // We immediately ignore SIGUSR1 and SIGUSR2 so that we do not
709 // get killed by the parent trying to force stepping children.
710 // We will re-enable them later on, when it is actually safe to
711 // do so.
712 signal(SIGUSR1, SIG_IGN);
713 signal(SIGUSR2, SIG_IGN);
714
715 // This is the child.
716 // For stdout / stderr, we close the read part of the pipe, the
717 // old descriptor, and then replace it with the write part of the pipe.
718 // For stdin, we close the write part of the pipe, the old descriptor,
719 // and then we replace it with the read part of the pipe.
720 // We also close all the filedescriptors for our sibilings.
721 struct rlimit rlim;
722 getrlimit(RLIMIT_NOFILE, &rlim);
723 // We close all FD, but the one which are actually
724 // used to communicate with the driver. This is a bad
725 // idea in the first place, because rlim_cur could be huge
726 // FIXME: I should understand which one is really to be closed and use
727 // CLOEXEC on it.
728 int rlim_cur = std::min((int)rlim.rlim_cur, 10000);
729 for (int i = 0; i < rlim_cur; ++i) {
730 if (childFds[ref.index].childstdin[0] == i) {
731 continue;
732 }
733 if (childFds[ref.index].childstdout[1] == i) {
734 continue;
735 }
736 close(i);
737 }
738 dup2(childFds[ref.index].childstdin[0], STDIN_FILENO);
739 dup2(childFds[ref.index].childstdout[1], STDOUT_FILENO);
740 dup2(childFds[ref.index].childstdout[1], STDERR_FILENO);
741
742 for (auto& service : spec.services) {
743 if (service.postForkChild != nullptr) {
744 service.postForkChild(serviceRegistry);
745 }
746 }
747 for (auto& env : execution.environ) {
748 putenv(strdup(DeviceSpecHelpers::reworkTimeslicePlaceholder(env, spec).data()));
749 }
750 execvp(execution.args[0], execution.args.data());
751 } else {
752 O2_SIGNPOST_ID_GENERATE(sid, driver);
753 O2_SIGNPOST_EVENT_EMIT(driver, sid, "spawnDevice", "New child at %{pid}d", id);
754 }
755 close(childFds[ref.index].childstdin[0]);
756 close(childFds[ref.index].childstdout[1]);
757 if (varmap.count("post-fork-command")) {
758 auto templateCmd = varmap["post-fork-command"];
759 auto cmd = fmt::format(fmt::runtime(templateCmd.as<std::string>()),
760 fmt::arg("pid", id),
761 fmt::arg("id", spec.id),
762 fmt::arg("cpu", parentCPU),
763 fmt::arg("node", parentNode),
764 fmt::arg("name", spec.name),
765 fmt::arg("timeslice0", spec.inputTimesliceId),
766 fmt::arg("timeslice1", spec.inputTimesliceId + 1),
767 fmt::arg("rank0", spec.rank),
768 fmt::arg("maxRank0", spec.nSlots));
769 int err = system(cmd.c_str());
770 if (err) {
771 LOG(error) << "Post fork command `" << cmd << "` returned with status " << err;
772 }
773 LOG(debug) << "Successfully executed `" << cmd;
774 }
775 // This is the parent. We close the write end of
776 // the child pipe and and keep track of the fd so
777 // that we can later select on it.
778 for (auto& service : spec.services) {
779 if (service.postForkParent != nullptr) {
780 service.postForkParent(serviceRegistry);
781 }
782 }
783
784 LOG(info) << "Starting " << spec.id << " on pid " << id;
785 deviceInfos.push_back({.pid = id,
786 .historyPos = 0,
787 .historySize = 1000,
788 .maxLogLevel = LogParsingHelpers::LogLevel::Debug,
789 .minFailureLevel = driverInfo.minFailureLevel,
790 .active = true,
791 .readyToQuit = false,
792 .inputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_timeslice", 0, 0, {}},
793 .outputChannelMetricsViewIndex = Metric2DViewIndex{"oldest_possible_output", 0, 0, {}},
794 .lastSignal = uv_hrtime() - 10000000});
795 // create the offset using uv_hrtime
796 timespec now;
797 clock_gettime(CLOCK_REALTIME, &now);
798 uint64_t offset = now.tv_sec * 1000 - uv_now(loop);
799 allStates.emplace_back(
802
803 allStates.back().registerState(DataProcessingStates::StateSpec{
804 .name = "data_queries",
805 .stateId = (short)ProcessingStateId::DATA_QUERIES,
806 .sendInitialValue = true,
807 });
808 allStates.back().registerState(DataProcessingStates::StateSpec{
809 .name = "output_matchers",
810 .stateId = (short)ProcessingStateId::OUTPUT_MATCHERS,
811 .sendInitialValue = true,
812 });
813
814 for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
815 allStates.back().registerState(DataProcessingStates::StateSpec{
816 .name = fmt::format("matcher_variables/{}", i),
817 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
818 .minPublishInterval = 200, // if we publish too often we flood the GUI and we are not able to read it in any case
819 .sendInitialValue = true,
820 });
821 }
822
823 for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
824 allStates.back().registerState(DataProcessingStates::StateSpec{
825 .name = fmt::format("data_relayer/{}", i),
826 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + i),
827 .minPublishInterval = 200, // if we publish too often we flood the GUI and we are not able to read it in any case
828 .sendInitialValue = true,
829 });
830 }
831
832 // Let's add also metrics information for the given device
834}
835
837 DriverInfo& driverInfo,
838 DeviceInfos& infos,
839 DeviceSpecs const& specs,
840 DeviceControls& controls)
841{
842 // Display part. All you need to display should actually be in
843 // `infos`.
844 // TODO: split at \n
845 // TODO: update this only once per 1/60 of a second or
846 // things like this.
847 // TODO: have multiple display modes
848 // TODO: graphical view of the processing?
849 assert(infos.size() == controls.size());
850 ParsedMetricMatch metricMatch;
851
852 int processed = 0;
853 for (size_t di = 0, de = infos.size(); di < de; ++di) {
854 DeviceInfo& info = infos[di];
855 DeviceControl& control = controls[di];
856 assert(specs.size() == infos.size());
857 DeviceSpec const& spec = specs[di];
858
859 if (info.unprinted.empty()) {
860 continue;
861 }
862 processed++;
863
864 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, &info);
865 O2_SIGNPOST_START(driver, sid, "bytes_processed", "bytes processed by %{xcode:pid}d", info.pid);
866
867 std::string_view s = info.unprinted;
868 size_t pos = 0;
869 info.history.resize(info.historySize);
870 info.historyLevel.resize(info.historySize);
871
872 while ((pos = s.find("\n")) != std::string::npos) {
873 std::string_view token{s.substr(0, pos)};
874 auto logLevel = LogParsingHelpers::parseTokenLevel(token);
875
876 // Check if the token is a metric from SimpleMetricsService
877 // if yes, we do not print it out and simply store it to be displayed
878 // in the GUI.
879 // Then we check if it is part of our Poor man control system
880 // if yes, we execute the associated command.
881 if (!control.quiet && (token.find(control.logFilter) != std::string::npos) && logLevel >= info.logLevel) {
882 assert(info.historyPos >= 0);
883 assert(info.historyPos < info.history.size());
884 info.history[info.historyPos] = token;
885 info.historyLevel[info.historyPos] = logLevel;
886 info.historyPos = (info.historyPos + 1) % info.history.size();
887 fmt::print("[{}:{}]: {}\n", info.pid, spec.id, token);
888 }
889 // We keep track of the maximum log error a
890 // device has seen.
891 bool maxLogLevelIncreased = false;
892 if (logLevel > info.maxLogLevel && logLevel > LogParsingHelpers::LogLevel::Info &&
893 logLevel != LogParsingHelpers::LogLevel::Unknown) {
894 info.maxLogLevel = logLevel;
895 maxLogLevelIncreased = true;
896 }
897 if (logLevel >= driverInfo.minFailureLevel) {
898 info.lastError = token;
899 if (info.firstSevereError.empty() || maxLogLevelIncreased) {
900 info.firstSevereError = token;
901 }
902 }
903 // +1 is to skip the \n
904 s.remove_prefix(pos + 1);
905 }
906 size_t oldSize = info.unprinted.size();
907 info.unprinted = std::string(s);
908 int64_t bytesProcessed = oldSize - info.unprinted.size();
909 O2_SIGNPOST_END(driver, sid, "bytes_processed", "bytes processed by %{xcode:network-size-in-bytes}" PRIi64, bytesProcessed);
910 }
911 if (processed == 0) {
912 O2_SIGNPOST_ID_FROM_POINTER(lid, driver, loop);
913 O2_SIGNPOST_EVENT_EMIT(driver, lid, "mainloop", "processChildrenOutput invoked for nothing!");
914 }
915}
916
917// Process all the sigchld which are pending
918// @return wether or not a given child exited with an error condition.
920{
921 bool hasError = false;
922 while (true) {
923 int status;
924 pid_t pid = waitpid((pid_t)(-1), &status, WNOHANG);
925 if (pid > 0) {
926 // Normal exit
927 int es = WEXITSTATUS(status);
928 if (WIFEXITED(status) == false || es != 0) {
929 // Look for the name associated to the pid in the infos
930 std::string id = "unknown";
931 assert(specs.size() == infos.size());
932 for (size_t ii = 0; ii < infos.size(); ++ii) {
933 if (infos[ii].pid == pid) {
934 id = specs[ii].id;
935 }
936 }
937 // No need to print anything if the user
938 // force quitted doing a double Ctrl-C.
939 if (double_sigint) {
940 } else if (forceful_exit) {
941 LOGP(error, "pid {} ({}) was forcefully terminated after being requested to quit", pid, id);
942 } else {
943 if (WIFSIGNALED(status)) {
944 int exitSignal = WTERMSIG(status);
945 es = exitSignal + 128;
946 LOGP(error, "Workflow crashed - PID {} ({}) was killed abnormally with {} and exited code was set to {}.", pid, id, strsignal(exitSignal), es);
947 } else {
948 es = 128;
949 LOGP(error, "Workflow crashed - PID {} ({}) did not exit correctly however it's not clear why. Exit code forced to {}.", pid, id, es);
950 }
951 }
952 hasError |= true;
953 }
954 for (auto& info : infos) {
955 if (info.pid == pid) {
956 info.active = false;
957 info.exitStatus = es;
958 }
959 }
960 continue;
961 } else {
962 break;
963 }
964 }
965 return hasError;
966}
967
968void doDPLException(RuntimeErrorRef& e, char const* processName)
969{
970 auto& err = o2::framework::error_from_ref(e);
971 if (err.maxBacktrace != 0) {
972 LOGP(fatal,
973 "Unhandled o2::framework::runtime_error reached the top of main of {}, device shutting down."
974 " Reason: {}",
975 processName, err.what);
976 LOGP(error, "Backtrace follow:");
977 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
978 } else {
979 LOGP(fatal,
980 "Unhandled o2::framework::runtime_error reached the top of main of {}, device shutting down."
981 " Reason: {}",
982 processName, err.what);
983 LOGP(error, "Recompile with DPL_ENABLE_BACKTRACE=1 to get more information.");
984 }
985}
986
987void doUnknownException(std::string const& s, char const* processName)
988{
989 if (s.empty()) {
990 LOGP(fatal, "unknown error while setting up workflow in {}.", processName);
991 } else {
992 LOGP(fatal, "error while setting up workflow in {}: {}", processName, s);
993 }
994}
995
996[[maybe_unused]] AlgorithmSpec dryRun(DeviceSpec const& spec)
997{
999 [&routes = spec.outputs](DataAllocator& outputs) {
1000 LOG(info) << "Dry run enforced. Creating dummy messages to simulate computation happended";
1001 for (auto& route : routes) {
1002 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
1003 outputs.make<int>(Output{concrete.origin, concrete.description, concrete.subSpec}, 2);
1004 }
1005 })};
1006}
1007
1009{
1010 // LOG(info) << "Process " << getpid() << " is exiting.";
1011}
1012
1013int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
1014 RunningWorkflowInfo const& runningWorkflow,
1016 DriverConfig const& driverConfig,
1017 ProcessingPolicies processingPolicies,
1018 std::string const& defaultDriverClient,
1019 uv_loop_t* loop)
1020{
1021 fair::Logger::SetConsoleColor(false);
1022 fair::Logger::OnFatal([]() { throw runtime_error("Fatal error"); });
1023 DeviceSpec const& spec = runningWorkflow.devices[ref.index];
1024 LOG(info) << "Spawing new device " << spec.id << " in process with pid " << getpid();
1025
1026 fair::mq::DeviceRunner runner{argc, argv};
1027
1028 // Populate options from the command line. Notice that only the options
1029 // declared in the workflow definition are allowed.
1030 runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
1031 std::string defaultExitTransitionTimeout = "0";
1032 std::string defaultDataProcessingTimeout = "0";
1033 std::string defaultInfologgerMode = "";
1035 if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
1036 defaultExitTransitionTimeout = "20";
1037 defaultInfologgerMode = "infoLoggerD";
1038 } else if (deploymentMode == o2::framework::DeploymentMode::OnlineECS) {
1039 defaultExitTransitionTimeout = "20";
1040 }
1041 boost::program_options::options_description optsDesc;
1043 char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
1044 optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
1045 ("dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value("0"), "minimum flushing interval for online metrics (in s)") //
1046 ("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
1047 ("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
1048 ("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
1049 ("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
1050 ("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
1051 ("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
1052 ("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
1053 ("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
1054 ("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
1055 ("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
1056 r.fConfig.AddToCmdLineOptions(optsDesc, true);
1057 });
1058
1059 // This is to control lifetime. All these services get destroyed
1060 // when the runner is done.
1061 std::unique_ptr<SimpleRawDeviceService> simpleRawDeviceService;
1062 std::unique_ptr<DeviceState> deviceState;
1063 std::unique_ptr<ComputingQuotaEvaluator> quotaEvaluator;
1064 std::unique_ptr<FairMQDeviceProxy> deviceProxy;
1065 std::unique_ptr<DeviceContext> deviceContext;
1066
1067 auto afterConfigParsingCallback = [&simpleRawDeviceService,
1068 &runningWorkflow,
1069 ref,
1070 &spec,
1071 &quotaEvaluator,
1072 &serviceRegistry,
1073 &deviceState,
1074 &deviceProxy,
1075 &processingPolicies,
1076 &deviceContext,
1077 &driverConfig,
1078 &loop](fair::mq::DeviceRunner& r) {
1079 ServiceRegistryRef serviceRef = {serviceRegistry};
1080 simpleRawDeviceService = std::make_unique<SimpleRawDeviceService>(nullptr, spec);
1081 serviceRef.registerService(ServiceRegistryHelpers::handleForService<RawDeviceService>(simpleRawDeviceService.get()));
1082
1083 deviceState = std::make_unique<DeviceState>();
1084 deviceState->loop = loop;
1085 deviceState->tracingFlags = DeviceStateHelpers::parseTracingFlags(r.fConfig.GetPropertyAsString("dpl-tracing-flags"));
1086 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(deviceState.get()));
1087
1088 quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
1089 serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));
1090
1091 deviceContext = std::make_unique<DeviceContext>();
1092 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
1093 serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
1094 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
1095 serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
1096
1097 auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
1098
1099 serviceRef.get<RawDeviceService>().setDevice(device.get());
1100 r.fDevice = std::move(device);
1101 fair::Logger::SetConsoleColor(false);
1102
1104 for (auto& service : spec.services) {
1105 LOG(debug) << "Declaring service " << service.name;
1106 serviceRegistry.declareService(service, *deviceState.get(), r.fConfig);
1107 }
1108 if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(spec.resourceMonitoringInterval)) {
1109 serviceRef.get<Monitoring>().enableProcessMonitoring(spec.resourceMonitoringInterval, {PmMeasurement::Cpu, PmMeasurement::Mem, PmMeasurement::Smaps});
1110 }
1111 };
1112
1113 runner.AddHook<fair::mq::hooks::InstantiateDevice>(afterConfigParsingCallback);
1114
1115 auto result = runner.Run();
1116 ServiceRegistryRef serviceRef = {serviceRegistry};
1117 auto& context = serviceRef.get<DataProcessorContext>();
1118 DataProcessorContext::preExitCallbacks(context.preExitHandles, serviceRef);
1119 return result;
1120}
1121
1123 std::string executable;
1124 std::vector<std::string> args;
1125 std::vector<ConfigParamSpec> options;
1126};
1127
1128void gui_callback(uv_timer_s* ctx)
1129{
1130 auto* gui = reinterpret_cast<GuiCallbackContext*>(ctx->data);
1131 if (gui->plugin == nullptr) {
1132 // The gui is not there. Why are we here?
1133 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
1134 O2_SIGNPOST_EVENT_EMIT_ERROR(driver, sid, "gui", "GUI timer callback invoked without a GUI plugin.");
1135 uv_timer_stop(ctx);
1136 return;
1137 }
1138 *gui->guiTimerExpired = true;
1139 static int counter = 0;
1140 if ((counter++ % 6000) == 0) {
1141 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
1142 O2_SIGNPOST_EVENT_EMIT(driver, sid, "gui", "The GUI callback got called %d times.", counter);
1143 *gui->guiTimerExpired = false;
1144 }
1145 // One interval per GUI invocation, using the loop as anchor.
1146 O2_SIGNPOST_ID_FROM_POINTER(sid, gui, ctx->loop);
1147 O2_SIGNPOST_START(gui, sid, "gui", "gui_callback");
1148
1149 // New version which allows deferred closure of windows
1150 if (gui->plugin->supportsDeferredClose()) {
1151 // For now, there is nothing for which we want to defer the close
1152 // so if the flag is set, we simply exit
1153 if (*(gui->guiQuitRequested)) {
1154 O2_SIGNPOST_END(gui, sid, "gui", "Quit requested by the GUI.");
1155 return;
1156 }
1157 void* draw_data = nullptr;
1158 uint64_t frameStart = uv_hrtime();
1159 uint64_t frameLatency = frameStart - gui->frameLast;
1160
1161 // if less than 15ms have passed reuse old frame
1162 if (frameLatency / 1000000 <= 15) {
1163 draw_data = gui->lastFrame;
1164 O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
1165 return;
1166 }
1167 // The result of the pollGUIPreRender is used to determine if we
1168 // should quit the GUI, however, the rendering is started in any
1169 // case, so we should complete it.
1170 if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) {
1171 *(gui->guiQuitRequested) = true;
1172 }
1173 draw_data = gui->plugin->pollGUIRender(gui->callback);
1174 gui->plugin->pollGUIPostRender(gui->window, draw_data);
1175
1176 uint64_t frameEnd = uv_hrtime();
1177 *(gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
1178 *(gui->frameLatency) = frameLatency / 1000000.f;
1179 gui->frameLast = frameStart;
1180 } else {
1181 void* draw_data = nullptr;
1182
1183 uint64_t frameStart = uv_hrtime();
1184 uint64_t frameLatency = frameStart - gui->frameLast;
1185
1186 // if less than 15ms have passed reuse old frame
1187 if (frameLatency / 1000000 > 15) {
1188 if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) {
1189 *(gui->guiQuitRequested) = true;
1190 O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
1191 return;
1192 }
1193 draw_data = gui->plugin->pollGUIRender(gui->callback);
1194 gui->plugin->pollGUIPostRender(gui->window, draw_data);
1195 } else {
1196 draw_data = gui->lastFrame;
1197 }
1198
1199 if (frameLatency / 1000000 > 15) {
1200 uint64_t frameEnd = uv_hrtime();
1201 *(gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
1202 *(gui->frameLatency) = frameLatency / 1000000.f;
1203 gui->frameLast = frameStart;
1204 }
1205 }
1206 O2_SIGNPOST_END(gui, sid, "gui", "Gui redrawn.");
1207}
1208
1210void single_step_callback(uv_timer_s* ctx)
1211{
1212 auto* infos = reinterpret_cast<DeviceInfos*>(ctx->data);
1213 killChildren(*infos, SIGUSR1);
1214}
1215
1216void force_exit_callback(uv_timer_s* ctx)
1217{
1218 auto* infos = reinterpret_cast<DeviceInfos*>(ctx->data);
1219 killChildren(*infos, SIGKILL);
1220}
1221
1222std::vector<std::regex> getDumpableMetrics()
1223{
1224 auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames();
1225 auto dumpableMetrics = std::vector<std::regex>{};
1226 for (const auto& metric : performanceMetrics) {
1227 dumpableMetrics.emplace_back(metric);
1228 }
1229 dumpableMetrics.emplace_back("^arrow-bytes-delta$");
1230 dumpableMetrics.emplace_back("^aod-bytes-read-uncompressed$");
1231 dumpableMetrics.emplace_back("^aod-bytes-read-compressed$");
1232 dumpableMetrics.emplace_back("^aod-file-read-info$");
1233 dumpableMetrics.emplace_back("^table-bytes-.*");
1234 dumpableMetrics.emplace_back("^total-timeframes.*");
1235 dumpableMetrics.emplace_back("^device_state.*");
1236 dumpableMetrics.emplace_back("^total_wall_time_ms$");
1237 return dumpableMetrics;
1238}
1239
1241{
1242 auto* context = (DriverServerContext*)handle->data;
1243
1244 static auto performanceMetrics = getDumpableMetrics();
1246 context->driver->metrics, *(context->specs), performanceMetrics);
1247}
1248
1249void dumpRunSummary(DriverServerContext& context, DriverInfo const& driverInfo, DeviceInfos const& infos, DeviceSpecs const& specs)
1250{
1251 if (infos.empty()) {
1252 return;
1253 }
1254 LOGP(info, "## Processes completed. Run summary:");
1255 LOGP(info, "### Devices started: {}", infos.size());
1256 for (size_t di = 0; di < infos.size(); ++di) {
1257 auto& info = infos[di];
1258 auto& spec = specs[di];
1259 if (info.exitStatus) {
1260 LOGP(error, " - Device {}: pid {} (exit {})", spec.name, info.pid, info.exitStatus);
1261 } else {
1262 LOGP(info, " - Device {}: pid {} (exit {})", spec.name, info.pid, info.exitStatus);
1263 }
1264 if (info.exitStatus != 0 && info.firstSevereError.empty() == false) {
1265 LOGP(info, " - First error: {}", info.firstSevereError);
1266 }
1267 if (info.exitStatus != 0 && info.lastError != info.firstSevereError) {
1268 LOGP(info, " - Last error: {}", info.lastError);
1269 }
1270 }
1271 for (auto& summary : *context.summaryCallbacks) {
1272 summary(ServiceMetricsInfo{*context.metrics, *context.specs, *context.infos, context.driver->metrics, driverInfo});
1273 }
1274}
1275
1276auto bindGUIPort = [](DriverInfo& driverInfo, DriverServerContext& serverContext, std::string frameworkId) {
1277 uv_tcp_init(serverContext.loop, &serverContext.serverHandle);
1278
1279 driverInfo.port = 8080 + (getpid() % 30000);
1280
1281 if (getenv("DPL_REMOTE_GUI_PORT")) {
1282 try {
1283 driverInfo.port = stoi(std::string(getenv("DPL_REMOTE_GUI_PORT")));
1284 } catch (std::invalid_argument) {
1285 LOG(error) << "DPL_REMOTE_GUI_PORT not a valid integer";
1286 } catch (std::out_of_range) {
1287 LOG(error) << "DPL_REMOTE_GUI_PORT out of range (integer)";
1288 }
1289 if (driverInfo.port < 1024 || driverInfo.port > 65535) {
1290 LOG(error) << "DPL_REMOTE_GUI_PORT out of range (1024-65535)";
1291 }
1292 }
1293
1294 int result = 0;
1295 struct sockaddr_in* serverAddr = nullptr;
1296
1297 // Do not offer websocket endpoint for devices
1298 // FIXME: this was blocking david's workflows. For now
1299 // there is no point in any case to have devices
1300 // offering a web based API, but it might make sense in
1301 // the future to inspect them via some web based interface.
1302 if (serverContext.isDriver) {
1303 do {
1304 free(serverAddr);
1305 if (driverInfo.port > 64000) {
1306 throw runtime_error_f("Unable to find a free port for the driver. Last attempt returned %d", result);
1307 }
1308 serverAddr = (sockaddr_in*)malloc(sizeof(sockaddr_in));
1309 uv_ip4_addr("0.0.0.0", driverInfo.port, serverAddr);
1310 auto bindResult = uv_tcp_bind(&serverContext.serverHandle, (const struct sockaddr*)serverAddr, 0);
1311 if (bindResult != 0) {
1312 driverInfo.port++;
1313 usleep(1000);
1314 continue;
1315 }
1316 result = uv_listen((uv_stream_t*)&serverContext.serverHandle, 100, ws_connect_callback);
1317 if (result != 0) {
1318 driverInfo.port++;
1319 usleep(1000);
1320 continue;
1321 }
1322 } while (result != 0);
1323 } else if (getenv("DPL_DEVICE_REMOTE_GUI") && !serverContext.isDriver) {
1324 do {
1325 free(serverAddr);
1326 if (driverInfo.port > 64000) {
1327 throw runtime_error_f("Unable to find a free port for the driver. Last attempt returned %d", result);
1328 }
1329 serverAddr = (sockaddr_in*)malloc(sizeof(sockaddr_in));
1330 uv_ip4_addr("0.0.0.0", driverInfo.port, serverAddr);
1331 auto bindResult = uv_tcp_bind(&serverContext.serverHandle, (const struct sockaddr*)serverAddr, 0);
1332 if (bindResult != 0) {
1333 driverInfo.port++;
1334 usleep(1000);
1335 continue;
1336 }
1337 result = uv_listen((uv_stream_t*)&serverContext.serverHandle, 100, ws_connect_callback);
1338 if (result != 0) {
1339 driverInfo.port++;
1340 usleep(1000);
1341 continue;
1342 }
1343 LOG(info) << "Device GUI port: " << driverInfo.port << " " << frameworkId;
1344 } while (result != 0);
1345 }
1346};
1347
1348// This is the handler for the parent inner loop.
1350 WorkflowInfo const& workflowInfo,
1351 DataProcessorInfos const& previousDataProcessorInfos,
1352 CommandInfo const& commandInfo,
1353 DriverControl& driverControl,
1354 DriverInfo& driverInfo,
1355 DriverConfig& driverConfig,
1356 std::vector<DeviceMetricsInfo>& metricsInfos,
1357 std::vector<ConfigParamSpec> const& detectedParams,
1358 boost::program_options::variables_map& varmap,
1359 std::vector<ServiceSpec>& driverServices,
1360 std::string frameworkId)
1361{
1362 RunningWorkflowInfo runningWorkflow{
1363 .uniqueWorkflowId = driverInfo.uniqueWorkflowId,
1364 .shmSegmentId = (int16_t)atoi(varmap["shm-segment-id"].as<std::string>().c_str())};
1365 DeviceInfos infos;
1366 DeviceControls controls;
1367 DataProcessingStatesInfos allStates;
1368 auto* devicesManager = new DevicesManager{.controls = controls, .infos = infos, .specs = runningWorkflow.devices, .messages = {}};
1369 DeviceExecutions deviceExecutions;
1370 DataProcessorInfos dataProcessorInfos = previousDataProcessorInfos;
1371
1372 std::vector<uv_poll_t*> pollHandles;
1373 std::vector<DeviceStdioContext> childFds;
1374
1375 std::vector<ComputingResource> resources;
1376
1377 if (driverInfo.resources != "") {
1378 resources = ComputingResourceHelpers::parseResources(driverInfo.resources);
1379 } else {
1381 }
1382
1383 auto resourceManager = std::make_unique<SimpleResourceManager>(resources);
1384
1385 DebugGUI* debugGUI = nullptr;
1386 void* window = nullptr;
1387 decltype(debugGUI->getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl)) debugGUICallback;
1388
1389 // An empty frameworkId means this is the driver, so we initialise the GUI
1390 auto initDebugGUI = []() -> DebugGUI* {
1391 uv_lib_t supportLib;
1392 int result = 0;
1393#ifdef __APPLE__
1394 result = uv_dlopen("libO2FrameworkGUISupport.dylib", &supportLib);
1395#else
1396 result = uv_dlopen("libO2FrameworkGUISupport.so", &supportLib);
1397#endif
1398 if (result == -1) {
1399 LOG(error) << uv_dlerror(&supportLib);
1400 return nullptr;
1401 }
1402 DPLPluginHandle* (*dpl_plugin_callback)(DPLPluginHandle*);
1403
1404 result = uv_dlsym(&supportLib, "dpl_plugin_callback", (void**)&dpl_plugin_callback);
1405 if (result == -1) {
1406 LOG(error) << uv_dlerror(&supportLib);
1407 return nullptr;
1408 }
1409 DPLPluginHandle* pluginInstance = dpl_plugin_callback(nullptr);
1410 return PluginManager::getByName<DebugGUI>(pluginInstance, "ImGUIDebugGUI");
1411 };
1412
1413 // We initialise this in the driver, because different drivers might have
1414 // different versions of the service
1415 ServiceRegistry serviceRegistry;
1416
1417 if ((driverConfig.batch == false || getenv("DPL_DRIVER_REMOTE_GUI") != nullptr) && frameworkId.empty()) {
1418 debugGUI = initDebugGUI();
1419 if (debugGUI) {
1420 if (driverConfig.batch == false) {
1421 window = debugGUI->initGUI("O2 Framework debug GUI", serviceRegistry);
1422 } else {
1423 window = debugGUI->initGUI(nullptr, serviceRegistry);
1424 }
1425 }
1426 } else if (getenv("DPL_DEVICE_REMOTE_GUI") && !frameworkId.empty()) {
1427 debugGUI = initDebugGUI();
1428 // We never run the GUI on desktop for devices. All
1429 // you can do is to connect to the remote version.
1430 // this is done to avoid having a proliferation of
1431 // GUIs popping up when the variable is set globally.
1432 // FIXME: maybe this is not what we want, but it should
1433 // be ok for now.
1434 if (debugGUI) {
1435 window = debugGUI->initGUI(nullptr, serviceRegistry);
1436 }
1437 }
1438 if (driverConfig.batch == false && window == nullptr && frameworkId.empty()) {
1439 LOG(warn) << "Could not create GUI. Switching to batch mode. Do you have GLFW on your system?";
1440 driverConfig.batch = true;
1441 if (varmap["error-policy"].defaulted()) {
1442 driverInfo.processingPolicies.error = TerminationPolicy::QUIT;
1443 }
1444 }
1445 bool guiQuitRequested = false;
1446 bool hasError = false;
1447
1448 // FIXME: I should really have some way of exiting the
1449 // parent..
1450 DriverState current;
1451 DriverState previous;
1452
1453 uv_loop_t* loop = uv_loop_new();
1454
1455 uv_timer_t* gui_timer = nullptr;
1456
1457 if (!driverConfig.batch) {
1458 gui_timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
1459 uv_timer_init(loop, gui_timer);
1460 }
1461
1462 std::vector<ServiceMetricHandling> metricProcessingCallbacks;
1463 std::vector<ServiceSummaryHandling> summaryCallbacks;
1464 std::vector<ServicePreSchedule> preScheduleCallbacks;
1465 std::vector<ServicePostSchedule> postScheduleCallbacks;
1466 std::vector<ServiceDriverInit> driverInitCallbacks;
1467 for (auto& service : driverServices) {
1468 if (service.driverStartup == nullptr) {
1469 continue;
1470 }
1471 service.driverStartup(serviceRegistry, DeviceConfig{varmap});
1472 }
1473
1474 ServiceRegistryRef ref{serviceRegistry};
1475 ref.registerService(ServiceRegistryHelpers::handleForService<DevicesManager>(devicesManager));
1476
1477 bool guiTimerExpired = false;
1478 GuiCallbackContext guiContext;
1479 guiContext.plugin = debugGUI;
1480 guiContext.frameLast = uv_hrtime();
1481 guiContext.frameLatency = &driverInfo.frameLatency;
1482 guiContext.frameCost = &driverInfo.frameCost;
1483 guiContext.guiQuitRequested = &guiQuitRequested;
1484 guiContext.guiTimerExpired = &guiTimerExpired;
1485
1486 // This is to make sure we can process metrics, commands, configuration
1487 // changes coming from websocket (or even via any standard uv_stream_t, I guess).
1488 DriverServerContext serverContext{
1489 .registry = {serviceRegistry},
1490 .loop = loop,
1491 .controls = &controls,
1492 .infos = &infos,
1493 .states = &allStates,
1494 .specs = &runningWorkflow.devices,
1495 .metrics = &metricsInfos,
1496 .metricProcessingCallbacks = &metricProcessingCallbacks,
1497 .summaryCallbacks = &summaryCallbacks,
1498 .driver = &driverInfo,
1499 .gui = &guiContext,
1500 .isDriver = frameworkId.empty(),
1501 };
1502
1503 serverContext.serverHandle.data = &serverContext;
1504
1505 uv_timer_t force_step_timer;
1506 uv_timer_init(loop, &force_step_timer);
1507 uv_timer_t force_exit_timer;
1508 uv_timer_init(loop, &force_exit_timer);
1509
1510 bool guiDeployedOnce = false;
1511 bool once = false;
1512
1513 uv_timer_t metricDumpTimer;
1514 metricDumpTimer.data = &serverContext;
1515 bool allChildrenGone = false;
1516 guiContext.allChildrenGone = &allChildrenGone;
1517 O2_SIGNPOST_ID_FROM_POINTER(sid, driver, loop);
1518 O2_SIGNPOST_START(driver, sid, "driver", "Starting driver loop");
1519
1520 // Async callback to process the output of the children, if needed.
1521 serverContext.asyncLogProcessing = (uv_async_t*)malloc(sizeof(uv_async_t));
1522 serverContext.asyncLogProcessing->data = &serverContext;
1523 uv_async_init(loop, serverContext.asyncLogProcessing, [](uv_async_t* handle) {
1524 auto* context = (DriverServerContext*)handle->data;
1525 processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls);
1526 });
1527
1528 while (true) {
1529 // If control forced some transition on us, we push it to the queue.
1530 if (driverControl.forcedTransitions.empty() == false) {
1531 for (auto transition : driverControl.forcedTransitions) {
1532 driverInfo.states.push_back(transition);
1533 }
1534 driverControl.forcedTransitions.resize(0);
1535 }
1536 // In case a timeout was requested, we check if we are running
1537 // for more than the timeout duration and exit in case that's the case.
1538 {
1539 auto currentTime = uv_hrtime();
1540 uint64_t diff = (currentTime - driverInfo.startTime) / 1000000000LL;
1541 if ((graceful_exit == false) && (driverInfo.timeout > 0) && (diff > driverInfo.timeout)) {
1542 LOG(info) << "Timout ellapsed. Requesting to quit.";
1543 graceful_exit = true;
1544 }
1545 }
1546 // Move to exit loop if sigint was sent we execute this only once.
1547 if (graceful_exit == true && driverInfo.sigintRequested == false) {
1548 driverInfo.sigintRequested = true;
1549 driverInfo.states.resize(0);
1550 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1551 }
1552 // If one of the children dies and sigint was not requested
1553 // we should decide what to do.
1554 if (sigchld_requested == true && driverInfo.sigchldRequested == false) {
1555 driverInfo.sigchldRequested = true;
1556 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
1557 }
1558 if (driverInfo.states.empty() == false) {
1559 previous = current;
1560 current = driverInfo.states.back();
1561 } else {
1562 current = DriverState::UNKNOWN;
1563 }
1564 driverInfo.states.pop_back();
1565 switch (current) {
1566 case DriverState::BIND_GUI_PORT:
1567 bindGUIPort(driverInfo, serverContext, frameworkId);
1568 break;
1569 case DriverState::INIT:
1570 LOGP(info, "Initialising O2 Data Processing Layer. Driver PID: {}.", getpid());
1571 LOGP(info, "Driver listening on port: {}", driverInfo.port);
1572
1573 // Install signal handler for quitting children.
1574 driverInfo.sa_handle_child.sa_handler = &handle_sigchld;
1575 sigemptyset(&driverInfo.sa_handle_child.sa_mask);
1576 driverInfo.sa_handle_child.sa_flags = SA_RESTART | SA_NOCLDSTOP;
1577 if (sigaction(SIGCHLD, &driverInfo.sa_handle_child, nullptr) == -1) {
1578 perror(nullptr);
1579 exit(1);
1580 }
1581
1584 if (driverInfo.noSHMCleanup) {
1585 LOGP(warning, "Not cleaning up shared memory.");
1586 } else {
1587 cleanupSHM(driverInfo.uniqueWorkflowId);
1588 }
1593 for (auto& callback : driverInitCallbacks) {
1594 callback(serviceRegistry, {varmap});
1595 }
1596 driverInfo.states.push_back(DriverState::RUNNING);
1597 // driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
1598 LOG(info) << "O2 Data Processing Layer initialised. We brake for nobody.";
1599#ifdef NDEBUG
1600 LOGF(info, "Optimised build. O2DEBUG / LOG(debug) / LOGF(debug) / assert statement will not be shown.");
1601#endif
1602 break;
1603 case DriverState::IMPORT_CURRENT_WORKFLOW:
1604 // This state is needed to fill the metadata structure
1605 // which contains how to run the current workflow
1606 dataProcessorInfos = previousDataProcessorInfos;
1607 for (auto const& device : runningWorkflow.devices) {
1608 auto exists = std::find_if(dataProcessorInfos.begin(),
1609 dataProcessorInfos.end(),
1610 [id = device.id](DataProcessorInfo const& info) -> bool { return info.name == id; });
1611 if (exists != dataProcessorInfos.end()) {
1612 continue;
1613 }
1614 std::vector<std::string> channels;
1615 for (auto channel : device.inputChannels) {
1616 channels.push_back(channel.name);
1617 }
1618 for (auto channel : device.outputChannels) {
1619 channels.push_back(channel.name);
1620 }
1621 dataProcessorInfos.push_back(
1623 device.id,
1624 workflowInfo.executable,
1625 workflowInfo.args,
1626 workflowInfo.options,
1627 channels});
1628 }
1629 break;
1630 case DriverState::MATERIALISE_WORKFLOW:
1631 try {
1632 auto workflowState = WorkflowHelpers::verifyWorkflow(workflow);
1633 if (driverConfig.batch == true && varmap["dds"].as<std::string>().empty() && !varmap["dump-workflow"].as<bool>() && workflowState == WorkflowParsingState::Empty) {
1634 LOGP(error, "Empty workflow provided while running in batch mode.");
1635 return 1;
1636 }
1637
1640 auto altered_workflow = workflow;
1641
1642 auto confNameFromParam = [](std::string const& paramName) {
1643 std::regex name_regex(R"(^control:([\w-]+)\/(\w+))");
1644 auto match = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 0);
1645 if (match == std::sregex_token_iterator()) {
1646 throw runtime_error_f("Malformed process control spec: %s", paramName.c_str());
1647 }
1648 std::string task = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 1)->str();
1649 std::string conf = std::sregex_token_iterator(paramName.begin(), paramName.end(), name_regex, 2)->str();
1650 return std::pair{task, conf};
1651 };
1652 bool altered = false;
1653 for (auto& device : altered_workflow) {
1654 // ignore internal devices
1655 if (device.name.find("internal") != std::string::npos) {
1656 continue;
1657 }
1658 // ignore devices with no inputs
1659 if (device.inputs.empty() == true) {
1660 continue;
1661 }
1662 // ignore devices with no metadata in inputs
1663 auto hasMetadata = std::any_of(device.inputs.begin(), device.inputs.end(), [](InputSpec const& spec) {
1664 return spec.metadata.empty() == false;
1665 });
1666 if (!hasMetadata) {
1667 continue;
1668 }
1669 // ignore devices with no control options
1670 auto hasControls = std::any_of(device.inputs.begin(), device.inputs.end(), [](InputSpec const& spec) {
1671 return std::any_of(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& param) {
1672 return param.type == VariantType::Bool && param.name.find("control:") != std::string::npos;
1673 });
1674 });
1675 if (!hasControls) {
1676 continue;
1677 }
1678
1679 LOGP(debug, "Adjusting device {}", device.name.c_str());
1680
1681 auto configStore = DeviceConfigurationHelpers::getConfiguration(serviceRegistry, device.name.c_str(), device.options);
1682 if (configStore != nullptr) {
1683 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1684 for (auto& input : device.inputs) {
1685 for (auto& param : input.metadata) {
1686 if (param.type == VariantType::Bool && param.name.find("control:") != std::string::npos) {
1687 if (param.name != "control:default" && param.name != "control:spawn" && param.name != "control:build") {
1688 auto confName = confNameFromParam(param.name).second;
1689 param.defaultValue = reg->get<bool>(confName.c_str());
1690 }
1691 }
1692 }
1693 }
1694 }
1696 LOGP(debug, "Original inputs: ");
1697 for (auto& input : device.inputs) {
1698 LOGP(debug, "-> {}", input.binding);
1699 }
1700 auto end = device.inputs.end();
1701 auto new_end = std::remove_if(device.inputs.begin(), device.inputs.end(), [](InputSpec& input) {
1702 auto requested = false;
1703 auto hasControls = false;
1704 for (auto& param : input.metadata) {
1705 if (param.type != VariantType::Bool) {
1706 continue;
1707 }
1708 if (param.name.find("control:") != std::string::npos) {
1709 hasControls = true;
1710 if (param.defaultValue.get<bool>() == true) {
1711 requested = true;
1712 break;
1713 }
1714 }
1715 }
1716 if (hasControls) {
1717 return !requested;
1718 }
1719 return false;
1720 });
1721 device.inputs.erase(new_end, end);
1722 LOGP(debug, "Adjusted inputs: ");
1723 for (auto& input : device.inputs) {
1724 LOGP(debug, "-> {}", input.binding);
1725 }
1726 altered = true;
1727 }
1728 WorkflowHelpers::adjustTopology(altered_workflow, *driverInfo.configContext);
1729 if (altered) {
1730 WorkflowSpecNode node{altered_workflow};
1731 for (auto& service : driverServices) {
1732 if (service.adjustTopology == nullptr) {
1733 continue;
1734 }
1735 service.adjustTopology(node, *driverInfo.configContext);
1736 }
1737 }
1738
1739 // These allow services customization via an environment variable
1740 OverrideServiceSpecs overrides = ServiceSpecHelpers::parseOverrides(getenv("DPL_OVERRIDE_SERVICES"));
1741 DeviceSpecHelpers::validate(altered_workflow);
1743 driverInfo.channelPolicies,
1744 driverInfo.completionPolicies,
1745 driverInfo.dispatchPolicies,
1746 driverInfo.resourcePolicies,
1747 driverInfo.callbacksPolicies,
1748 driverInfo.sendingPolicies,
1749 driverInfo.forwardingPolicies,
1750 runningWorkflow.devices,
1751 *resourceManager,
1752 driverInfo.uniqueWorkflowId,
1753 *driverInfo.configContext,
1754 !varmap["no-IPC"].as<bool>(),
1755 driverInfo.resourcesMonitoringInterval,
1756 varmap["channel-prefix"].as<std::string>(),
1757 overrides);
1758 metricProcessingCallbacks.clear();
1759 std::vector<std::string> matchingServices;
1760
1761 // FIXME: once moving to C++20, we can use templated lambdas.
1762 matchingServices.clear();
1763 for (auto& device : runningWorkflow.devices) {
1764 for (auto& service : device.services) {
1765 // If a service with the same name is already registered, skip it
1766 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1767 continue;
1768 }
1769 if (service.metricHandling) {
1770 metricProcessingCallbacks.push_back(service.metricHandling);
1771 matchingServices.push_back(service.name);
1772 }
1773 }
1774 }
1775
1776 // FIXME: once moving to C++20, we can use templated lambdas.
1777 matchingServices.clear();
1778 for (auto& device : runningWorkflow.devices) {
1779 for (auto& service : device.services) {
1780 // If a service with the same name is already registered, skip it
1781 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1782 continue;
1783 }
1784 if (service.summaryHandling) {
1785 summaryCallbacks.push_back(service.summaryHandling);
1786 matchingServices.push_back(service.name);
1787 }
1788 }
1789 }
1790
1791 preScheduleCallbacks.clear();
1792 matchingServices.clear();
1793 for (auto& device : runningWorkflow.devices) {
1794 for (auto& service : device.services) {
1795 // If a service with the same name is already registered, skip it
1796 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1797 continue;
1798 }
1799 if (service.preSchedule) {
1800 preScheduleCallbacks.push_back(service.preSchedule);
1801 }
1802 }
1803 }
1804 postScheduleCallbacks.clear();
1805 matchingServices.clear();
1806 for (auto& device : runningWorkflow.devices) {
1807 for (auto& service : device.services) {
1808 // If a service with the same name is already registered, skip it
1809 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1810 continue;
1811 }
1812 if (service.postSchedule) {
1813 postScheduleCallbacks.push_back(service.postSchedule);
1814 }
1815 }
1816 }
1817 driverInitCallbacks.clear();
1818 matchingServices.clear();
1819 for (auto& device : runningWorkflow.devices) {
1820 for (auto& service : device.services) {
1821 // If a service with the same name is already registered, skip it
1822 if (std::find(matchingServices.begin(), matchingServices.end(), service.name) != matchingServices.end()) {
1823 continue;
1824 }
1825 if (service.driverInit) {
1826 driverInitCallbacks.push_back(service.driverInit);
1827 }
1828 }
1829 }
1830
1831 // This should expand nodes so that we can build a consistent DAG.
1832
1833 // This updates the options in the runningWorkflow.devices
1834 for (auto& device : runningWorkflow.devices) {
1835 // ignore internal devices
1836 if (device.name.find("internal") != std::string::npos) {
1837 continue;
1838 }
1839 auto configStore = DeviceConfigurationHelpers::getConfiguration(serviceRegistry, device.name.c_str(), device.options);
1840 if (configStore != nullptr) {
1841 auto reg = std::make_unique<ConfigParamRegistry>(std::move(configStore));
1842 for (auto& option : device.options) {
1843 const char* name = option.name.c_str();
1844 switch (option.type) {
1845 case VariantType::Int:
1846 option.defaultValue = reg->get<int32_t>(name);
1847 break;
1848 case VariantType::Int8:
1849 option.defaultValue = reg->get<int8_t>(name);
1850 break;
1851 case VariantType::Int16:
1852 option.defaultValue = reg->get<int16_t>(name);
1853 break;
1854 case VariantType::UInt8:
1855 option.defaultValue = reg->get<uint8_t>(name);
1856 break;
1857 case VariantType::UInt16:
1858 option.defaultValue = reg->get<uint16_t>(name);
1859 break;
1860 case VariantType::UInt32:
1861 option.defaultValue = reg->get<uint32_t>(name);
1862 break;
1863 case VariantType::UInt64:
1864 option.defaultValue = reg->get<uint64_t>(name);
1865 break;
1866 case VariantType::Int64:
1867 option.defaultValue = reg->get<int64_t>(name);
1868 break;
1869 case VariantType::Float:
1870 option.defaultValue = reg->get<float>(name);
1871 break;
1872 case VariantType::Double:
1873 option.defaultValue = reg->get<double>(name);
1874 break;
1875 case VariantType::String:
1876 option.defaultValue = reg->get<std::string>(name);
1877 break;
1878 case VariantType::Bool:
1879 option.defaultValue = reg->get<bool>(name);
1880 break;
1881 case VariantType::ArrayInt:
1882 option.defaultValue = reg->get<std::vector<int>>(name);
1883 break;
1884 case VariantType::ArrayFloat:
1885 option.defaultValue = reg->get<std::vector<float>>(name);
1886 break;
1887 case VariantType::ArrayDouble:
1888 option.defaultValue = reg->get<std::vector<double>>(name);
1889 break;
1890 case VariantType::ArrayString:
1891 option.defaultValue = reg->get<std::vector<std::string>>(name);
1892 break;
1893 case VariantType::Array2DInt:
1894 option.defaultValue = reg->get<Array2D<int>>(name);
1895 break;
1896 case VariantType::Array2DFloat:
1897 option.defaultValue = reg->get<Array2D<float>>(name);
1898 break;
1899 case VariantType::Array2DDouble:
1900 option.defaultValue = reg->get<Array2D<double>>(name);
1901 break;
1902 case VariantType::LabeledArrayInt:
1903 option.defaultValue = reg->get<LabeledArray<int>>(name);
1904 break;
1905 case VariantType::LabeledArrayFloat:
1906 option.defaultValue = reg->get<LabeledArray<float>>(name);
1907 break;
1908 case VariantType::LabeledArrayDouble:
1909 option.defaultValue = reg->get<LabeledArray<double>>(name);
1910 break;
1911 case VariantType::LabeledArrayString:
1912 option.defaultValue = reg->get<LabeledArray<std::string>>(name);
1913 break;
1914 default:
1915 break;
1916 }
1917 }
1918 }
1919 }
1920 } catch (std::runtime_error& e) {
1921 LOGP(error, "invalid workflow in {}: {}", driverInfo.argv[0], e.what());
1922 return 1;
1925#ifdef DPL_ENABLE_BACKTRACE
1926 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1927#endif
1928 LOGP(error, "invalid workflow in {}: {}", driverInfo.argv[0], err.what);
1929 return 1;
1930 } catch (...) {
1931 LOGP(error, "invalid workflow in {}: Unknown error while materialising workflow", driverInfo.argv[0]);
1932 return 1;
1933 }
1934 break;
1935 case DriverState::DO_CHILD:
1936 // We do not start the process if by default we are stopped.
1937 if (driverControl.defaultStopped) {
1938 kill(getpid(), SIGSTOP);
1939 }
1940 for (size_t di = 0; di < runningWorkflow.devices.size(); di++) {
1942 if (runningWorkflow.devices[di].id == frameworkId) {
1943 return doChild(driverInfo.argc, driverInfo.argv,
1944 serviceRegistry,
1945 runningWorkflow, ref,
1946 driverConfig,
1947 driverInfo.processingPolicies,
1948 driverInfo.defaultDriverClient,
1949 loop);
1950 }
1951 }
1952 {
1953 std::ostringstream ss;
1954 for (auto& processor : workflow) {
1955 ss << " - " << processor.name << "\n";
1956 }
1957 for (auto& spec : runningWorkflow.devices) {
1958 ss << " - " << spec.name << "(" << spec.id << ")"
1959 << "\n";
1960 }
1961 driverInfo.lastError = fmt::format(
1962 "Unable to find component with id {}."
1963 " Available options:\n{}",
1964 frameworkId, ss.str());
1965 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
1966 }
1967 break;
1968 case DriverState::REDEPLOY_GUI:
1969 // The callback for the GUI needs to be recalculated every time
1970 // the deployed configuration changes, e.g. a new device
1971 // has been added to the topology.
1972 // We need to recreate the GUI callback every time we reschedule
1973 // because getGUIDebugger actually recreates the GUI state.
1974 // Notice also that we need the actual gui_timer only for the
1975 // case the GUI runs in interactive mode, however we deploy the
1976 // GUI in both interactive and non-interactive mode, if the
1977 // DPL_DRIVER_REMOTE_GUI environment variable is set.
1978 if (!driverConfig.batch || getenv("DPL_DRIVER_REMOTE_GUI")) {
1979 if (gui_timer) {
1980 uv_timer_stop(gui_timer);
1981 }
1982
1983 auto callback = debugGUI->getGUIDebugger(infos, runningWorkflow.devices, allStates, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl);
1984 guiContext.callback = [&serviceRegistry, &driverServices, &debugGUI, &infos, &runningWorkflow, &dataProcessorInfos, &metricsInfos, &driverInfo, &controls, &driverControl, callback]() {
1985 callback();
1986 for (auto& service : driverServices) {
1987 if (service.postRenderGUI) {
1988 service.postRenderGUI(serviceRegistry);
1989 }
1990 }
1991 };
1992 guiContext.window = window;
1993
1994 if (gui_timer) {
1995 gui_timer->data = &guiContext;
1996 uv_timer_start(gui_timer, gui_callback, 0, 20);
1997 }
1998 guiDeployedOnce = true;
1999 }
2000 break;
2001 case DriverState::MERGE_CONFIGS: {
2002 try {
2003 controls.resize(runningWorkflow.devices.size());
2006 if (varmap.count("dpl-tracing-flags")) {
2007 for (auto& control : controls) {
2008 auto tracingFlags = DeviceStateHelpers::parseTracingFlags(varmap["dpl-tracing-flags"].as<std::string>());
2009 control.tracingFlags = tracingFlags;
2010 }
2011 }
2012 deviceExecutions.resize(runningWorkflow.devices.size());
2013
2014 // Options which should be uniform across all
2015 // the subworkflow invokations.
2016 const auto uniformOptions = {
2017 "--aod-file",
2018 "--aod-memory-rate-limit",
2019 "--aod-writer-json",
2020 "--aod-writer-ntfmerge",
2021 "--aod-writer-resdir",
2022 "--aod-writer-resfile",
2023 "--aod-writer-resmode",
2024 "--aod-writer-maxfilesize",
2025 "--aod-writer-keep",
2026 "--aod-max-io-rate",
2027 "--aod-parent-access-level",
2028 "--aod-parent-base-path-replacement",
2029 "--driver-client-backend",
2030 "--fairmq-ipc-prefix",
2031 "--readers",
2032 "--resources-monitoring",
2033 "--resources-monitoring-dump-interval",
2034 "--time-limit",
2035 };
2036
2037 for (auto& option : uniformOptions) {
2038 DeviceSpecHelpers::reworkHomogeneousOption(dataProcessorInfos, option, nullptr);
2039 }
2040
2041 DeviceSpecHelpers::reworkShmSegmentSize(dataProcessorInfos);
2043 driverControl.defaultStopped,
2044 driverInfo.processingPolicies.termination == TerminationPolicy::WAIT,
2045 driverInfo.port,
2046 driverConfig,
2047 dataProcessorInfos,
2048 runningWorkflow.devices,
2049 deviceExecutions,
2050 controls,
2051 detectedParams,
2052 driverInfo.uniqueWorkflowId);
2055 LOGP(error, "unable to merge configurations in {}: {}", driverInfo.argv[0], err.what);
2056#ifdef DPL_ENABLE_BACKTRACE
2057 std::cerr << "\nStacktrace follows:\n\n";
2058 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
2059#endif
2060 return 1;
2061 }
2062 } break;
2063 case DriverState::SCHEDULE: {
2064 // FIXME: for the moment modifying the topology means we rebuild completely
2065 // all the devices and we restart them. This is also what DDS does at
2066 // a larger scale. In principle one could try to do a delta and only
2067 // restart the data processors which need to be restarted.
2068 LOG(info) << "Redeployment of configuration asked.";
2069 std::ostringstream forwardedStdin;
2070 WorkflowSerializationHelpers::dump(forwardedStdin, workflow, dataProcessorInfos, commandInfo);
2071 infos.reserve(runningWorkflow.devices.size());
2072
2073 // This is guaranteed to be a single CPU.
2074 unsigned parentCPU = -1;
2075 unsigned parentNode = -1;
2076#if defined(__linux__) && __has_include(<sched.h>)
2077 parentCPU = sched_getcpu();
2078#elif __has_include(<linux/getcpu.h>)
2079 getcpu(&parentCPU, &parentNode, nullptr);
2080#elif __has_include(<cpuid.h>) && (__x86_64__ || __i386__)
2081 // FIXME: this is a last resort as it is apparently buggy
2082 // on some Intel CPUs.
2083 GETCPU(parentCPU);
2084#endif
2085 for (auto& callback : preScheduleCallbacks) {
2086 callback(serviceRegistry, {varmap});
2087 }
2088 childFds.resize(runningWorkflow.devices.size());
2089 for (int di = 0; di < (int)runningWorkflow.devices.size(); ++di) {
2090 auto& context = childFds[di];
2091 createPipes(context.childstdin);
2092 createPipes(context.childstdout);
2093 if (driverInfo.mode == DriverMode::EMBEDDED || runningWorkflow.devices[di].resource.hostname != driverInfo.deployHostname) {
2094 spawnRemoteDevice(loop, forwardedStdin.str(),
2095 runningWorkflow.devices[di], controls[di], deviceExecutions[di], infos, allStates);
2096 } else {
2097 DeviceRef ref{di};
2098 spawnDevice(loop,
2099 ref,
2100 runningWorkflow.devices, driverInfo,
2101 controls, deviceExecutions, infos,
2102 allStates,
2103 serviceRegistry, varmap,
2104 childFds, parentCPU, parentNode);
2105 }
2106 }
2107 handleSignals();
2108 handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles);
2109 for (auto& callback : postScheduleCallbacks) {
2110 callback(serviceRegistry, {varmap});
2111 }
2112 assert(infos.empty() == false);
2113
2114 // In case resource monitoring is requested, we dump metrics to disk
2115 // every 3 minutes.
2116 if (driverInfo.resourcesMonitoringDumpInterval && ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
2117 uv_timer_init(loop, &metricDumpTimer);
2118 uv_timer_start(&metricDumpTimer, dumpMetricsCallback,
2119 driverInfo.resourcesMonitoringDumpInterval * 1000,
2120 driverInfo.resourcesMonitoringDumpInterval * 1000);
2121 }
2123 for (const auto& processorInfo : dataProcessorInfos) {
2124 const auto& cmdLineArgs = processorInfo.cmdLineArgs;
2125 if (std::find(cmdLineArgs.begin(), cmdLineArgs.end(), "--severity") != cmdLineArgs.end()) {
2126 for (size_t counter = 0; const auto& spec : runningWorkflow.devices) {
2127 if (spec.name.compare(processorInfo.name) == 0) {
2128 auto& info = infos[counter];
2129 const auto logLevelIt = std::find(cmdLineArgs.begin(), cmdLineArgs.end(), "--severity") + 1;
2130 if ((*logLevelIt).compare("debug") == 0) {
2131 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2132 } else if ((*logLevelIt).compare("detail") == 0) {
2133 info.logLevel = LogParsingHelpers::LogLevel::Debug;
2134 } else if ((*logLevelIt).compare("info") == 0) {
2135 info.logLevel = LogParsingHelpers::LogLevel::Info;
2136 } else if ((*logLevelIt).compare("warning") == 0) {
2137 info.logLevel = LogParsingHelpers::LogLevel::Warning;
2138 } else if ((*logLevelIt).compare("error") == 0) {
2139 info.logLevel = LogParsingHelpers::LogLevel::Error;
2140 } else if ((*logLevelIt).compare("important") == 0) {
2141 info.logLevel = LogParsingHelpers::LogLevel::Info;
2142 } else if ((*logLevelIt).compare("alarm") == 0) {
2143 info.logLevel = LogParsingHelpers::LogLevel::Alarm;
2144 } else if ((*logLevelIt).compare("critical") == 0) {
2145 info.logLevel = LogParsingHelpers::LogLevel::Critical;
2146 } else if ((*logLevelIt).compare("fatal") == 0) {
2147 info.logLevel = LogParsingHelpers::LogLevel::Fatal;
2148 }
2149 break;
2150 }
2151 ++counter;
2152 }
2153 }
2154 }
2155 LOG(info) << "Redeployment of configuration done.";
2156 } break;
2157 case DriverState::RUNNING:
2158 // Run any pending libUV event loop, block if
2159 // any, so that we do not consume CPU time when the driver is
2160 // idle.
2161 devicesManager->flush();
2162 // We print the event loop for the gui only once every
2163 // 6000 iterations (i.e. ~2 minutes). To avoid spamming, while still
2164 // being able to see the event loop in case of a deadlock / systematic failure.
2165 if (guiTimerExpired == false) {
2166 O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "Entering event loop with %{public}s", once ? "UV_RUN_ONCE" : "UV_RUN_NOWAIT");
2167 }
2168 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2169 once = true;
2170 // Calculate what we should do next and eventually
2171 // show the GUI
2172 if (guiQuitRequested ||
2173 (driverInfo.processingPolicies.termination == TerminationPolicy::QUIT && (checkIfCanExit(infos) == true))) {
2174 // Something requested to quit. This can be a user
2175 // interaction with the GUI or (if --completion-policy=quit)
2176 // it could mean that the workflow does not have anything else to do.
2177 // Let's update the GUI one more time and then EXIT.
2178 LOG(info) << "Quitting";
2179 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2180 } else if (infos.size() != runningWorkflow.devices.size()) {
2181 // If the number of devices is different from
2182 // the DeviceInfos it means the speicification
2183 // does not match what is running, so we need to do
2184 // further scheduling.
2185 driverInfo.states.push_back(DriverState::RUNNING);
2186 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2187 driverInfo.states.push_back(DriverState::SCHEDULE);
2188 driverInfo.states.push_back(DriverState::MERGE_CONFIGS);
2189 } else if (runningWorkflow.devices.empty() && driverConfig.batch == true) {
2190 LOG(info) << "No device resulting from the workflow. Quitting.";
2191 // If there are no deviceSpecs, we exit.
2192 driverInfo.states.push_back(DriverState::EXIT);
2193 } else if (runningWorkflow.devices.empty() && driverConfig.batch == false && !guiDeployedOnce) {
2194 // In case of an empty workflow, we need to deploy the GUI at least once.
2195 driverInfo.states.push_back(DriverState::RUNNING);
2196 driverInfo.states.push_back(DriverState::REDEPLOY_GUI);
2197 } else {
2198 driverInfo.states.push_back(DriverState::RUNNING);
2199 }
2200 break;
2201 case DriverState::QUIT_REQUESTED:
2202 LOG(info) << "QUIT_REQUESTED";
2203 guiQuitRequested = true;
2204 // We send SIGCONT to make sure stopped children are resumed
2205 killChildren(infos, SIGCONT);
2206 // We send SIGTERM to make sure we do the STOP transition in FairMQ
2207 killChildren(infos, SIGTERM);
2208 // We have a timer to send SIGUSR1 to make sure we advance all devices
2209 // in a timely manner.
2210 force_step_timer.data = &infos;
2211 uv_timer_start(&force_step_timer, single_step_callback, 0, 300);
2212 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2213 break;
2214 case DriverState::HANDLE_CHILDREN: {
2215 // Run any pending libUV event loop, block if
2216 // any, so that we do not consume CPU time when the driver is
2217 // idle.
2218 uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
2219 once = true;
2220 // I allow queueing of more sigchld only when
2221 // I process the previous call
2222 if (forceful_exit == true) {
2223 static bool forcefulExitMessage = true;
2224 if (forcefulExitMessage) {
2225 LOG(info) << "Forceful exit requested.";
2226 forcefulExitMessage = false;
2227 }
2228 killChildren(infos, SIGCONT);
2229 killChildren(infos, SIGKILL);
2230 }
2231 sigchld_requested = false;
2232 driverInfo.sigchldRequested = false;
2233 processChildrenOutput(loop, driverInfo, infos, runningWorkflow.devices, controls);
2234 hasError = processSigChild(infos, runningWorkflow.devices);
2235 allChildrenGone = areAllChildrenGone(infos);
2236 bool canExit = checkIfCanExit(infos);
2237 bool supposedToQuit = (guiQuitRequested || canExit || graceful_exit);
2238
2239 if (allChildrenGone && (supposedToQuit || driverInfo.processingPolicies.termination == TerminationPolicy::QUIT)) {
2240 // We move to the exit, regardless of where we were
2241 driverInfo.states.resize(0);
2242 driverInfo.states.push_back(DriverState::EXIT);
2243 } else if (hasError && driverInfo.processingPolicies.error == TerminationPolicy::QUIT && !supposedToQuit) {
2244 graceful_exit = 1;
2245 force_exit_timer.data = &infos;
2246 static bool forceful_timer_started = false;
2247 if (forceful_timer_started == false) {
2248 forceful_timer_started = true;
2249 uv_timer_start(&force_exit_timer, force_exit_callback, 15000, 3000);
2250 }
2251 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2252 } else if (allChildrenGone == false && supposedToQuit) {
2253 driverInfo.states.push_back(DriverState::HANDLE_CHILDREN);
2254 } else {
2255 }
2256 } break;
2257 case DriverState::EXIT: {
2258 if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
2259 if (driverInfo.resourcesMonitoringDumpInterval) {
2260 uv_timer_stop(&metricDumpTimer);
2261 }
2262 LOG(info) << "Dumping performance metrics to performanceMetrics.json file";
2263 dumpMetricsCallback(&metricDumpTimer);
2264 }
2265 dumpRunSummary(serverContext, driverInfo, infos, runningWorkflow.devices);
2266 // This is a clean exit. Before we do so, if required,
2267 // we dump the configuration of all the devices so that
2268 // we can reuse it. Notice we do not dump anything if
2269 // the workflow was not really run.
2270 // NOTE: is this really what we want? should we run
2271 // SCHEDULE and dump the full configuration as well?
2272 if (infos.empty()) {
2273 return 0;
2274 }
2275 boost::property_tree::ptree finalConfig;
2276 assert(infos.size() == runningWorkflow.devices.size());
2277 for (size_t di = 0; di < infos.size(); ++di) {
2278 auto& info = infos[di];
2279 auto& spec = runningWorkflow.devices[di];
2280 finalConfig.put_child(spec.name, info.currentConfig);
2281 }
2282 LOG(info) << "Dumping used configuration in dpl-config.json";
2283
2284 std::ofstream outDPLConfigFile("dpl-config.json", std::ios::out);
2285 if (outDPLConfigFile.is_open()) {
2286 boost::property_tree::write_json(outDPLConfigFile, finalConfig);
2287 } else {
2288 LOGP(warning, "Could not write out final configuration file. Read only run folder?");
2289 }
2290 if (driverInfo.noSHMCleanup) {
2291 LOGP(warning, "Not cleaning up shared memory.");
2292 } else {
2293 cleanupSHM(driverInfo.uniqueWorkflowId);
2294 }
2295 return calculateExitCode(driverInfo, runningWorkflow.devices, infos);
2296 }
2297 case DriverState::PERFORM_CALLBACKS:
2298 for (auto& callback : driverControl.callbacks) {
2299 callback(workflow, runningWorkflow.devices, deviceExecutions, dataProcessorInfos, commandInfo);
2300 }
2301 driverControl.callbacks.clear();
2302 break;
2303 default:
2304 LOG(error) << "Driver transitioned in an unknown state("
2305 << "current: " << (int)current
2306 << ", previous: " << (int)previous
2307 << "). Shutting down.";
2308 driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
2309 }
2310 }
2311 O2_SIGNPOST_END(driver, sid, "driver", "End driver loop");
2312}
2313
2314// Print help
2315void printHelp(bpo::variables_map const& varmap,
2316 bpo::options_description const& executorOptions,
2317 std::vector<DataProcessorSpec> const& physicalWorkflow,
2318 std::vector<ConfigParamSpec> const& currentWorkflowOptions)
2319{
2320 auto mode = varmap["help"].as<std::string>();
2321 bpo::options_description helpOptions;
2322 if (mode == "full" || mode == "short" || mode == "executor") {
2323 helpOptions.add(executorOptions);
2324 }
2325 // this time no veto is applied, so all the options are added for printout
2326 if (mode == "executor") {
2327 // nothing more
2328 } else if (mode == "workflow") {
2329 // executor options and workflow options, skip the actual workflow
2330 o2::framework::WorkflowSpec emptyWorkflow;
2331 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(emptyWorkflow, currentWorkflowOptions));
2332 } else if (mode == "full" || mode == "short") {
2333 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions,
2334 bpo::options_description(),
2335 mode));
2336 } else {
2337 helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, {},
2338 bpo::options_description(),
2339 mode));
2340 }
2341 if (helpOptions.options().size() == 0) {
2342 // the specified argument is invalid, add at leat the executor options
2343 mode += " is an invalid argument, please use correct argument for";
2344 helpOptions.add(executorOptions);
2345 }
2346 std::cout << "ALICE O2 DPL workflow driver" //
2347 << " (" << mode << " help)" << std::endl //
2348 << helpOptions << std::endl; //
2349}
2350
2351// Helper to find out if stdout is actually attached to a pipe.
2353{
2354 struct stat s;
2355 fstat(STDOUT_FILENO, &s);
2356 return ((s.st_mode & S_IFIFO) != 0);
2357}
2358
2360{
2361 struct stat s;
2362 int r = fstat(STDIN_FILENO, &s);
2363 // If stdin cannot be statted, we assume the shell is some sort of
2364 // non-interactive container thing
2365 if (r < 0) {
2366 return false;
2367 }
2368 // If stdin is a pipe or a file, we try to fetch configuration from there
2369 return ((s.st_mode & S_IFIFO) != 0 || (s.st_mode & S_IFREG) != 0);
2370}
2371
2373{
2374 struct CloningSpec {
2375 std::string templateMatcher;
2376 std::string cloneName;
2377 };
2378 auto s = ctx.options().get<std::string>("clone");
2379 std::vector<CloningSpec> specs;
2380 std::string delimiter = ",";
2381
2382 while (s.empty() == false) {
2383 auto newPos = s.find(delimiter);
2384 auto token = s.substr(0, newPos);
2385 auto split = token.find(":");
2386 if (split == std::string::npos) {
2387 throw std::runtime_error("bad clone definition. Syntax <template-processor>:<clone-name>");
2388 }
2389 auto key = token.substr(0, split);
2390 token.erase(0, split + 1);
2391 size_t error;
2392 std::string value = "";
2393 try {
2394 auto numValue = std::stoll(token, &error, 10);
2395 if (token[error] != '\0') {
2396 throw std::runtime_error("bad name for clone:" + token);
2397 }
2398 value = key + "_c" + std::to_string(numValue);
2399 } catch (std::invalid_argument& e) {
2400 value = token;
2401 }
2402 specs.push_back({key, value});
2403 s.erase(0, newPos + (newPos == std::string::npos ? 0 : 1));
2404 }
2405 if (s.empty() == false && specs.empty() == true) {
2406 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2407 }
2408
2409 std::vector<DataProcessorSpec> extraSpecs;
2410 for (auto& spec : specs) {
2411 for (auto& processor : workflow) {
2412 if (processor.name == spec.templateMatcher) {
2413 auto clone = processor;
2414 clone.name = spec.cloneName;
2415 extraSpecs.push_back(clone);
2416 }
2417 }
2418 }
2419 workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
2420}
2421
2423{
2424 struct PipelineSpec {
2425 std::string matcher;
2426 int64_t pipeline;
2427 };
2428 auto s = ctx.options().get<std::string>("pipeline");
2429 std::vector<PipelineSpec> specs;
2430 std::string delimiter = ",";
2431
2432 while (s.empty() == false) {
2433 auto newPos = s.find(delimiter);
2434 auto token = s.substr(0, newPos);
2435 auto split = token.find(":");
2436 if (split == std::string::npos) {
2437 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2438 }
2439 auto key = token.substr(0, split);
2440 token.erase(0, split + 1);
2441 size_t error;
2442 auto value = std::stoll(token, &error, 10);
2443 if (token[error] != '\0') {
2444 throw std::runtime_error("Bad pipeline definition. Expecting integer");
2445 }
2446 specs.push_back({key, value});
2447 s.erase(0, newPos + (newPos == std::string::npos ? 0 : 1));
2448 }
2449 if (s.empty() == false && specs.empty() == true) {
2450 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
2451 }
2452
2453 for (auto& spec : specs) {
2454 for (auto& processor : workflow) {
2455 if (processor.name == spec.matcher) {
2456 processor.maxInputTimeslices = spec.pipeline;
2457 }
2458 }
2459 }
2460}
2461
2463{
2464 struct LabelsSpec {
2465 std::string_view matcher;
2466 std::vector<std::string> labels;
2467 };
2468 std::vector<LabelsSpec> specs;
2469
2470 auto labelsString = ctx.options().get<std::string>("labels");
2471 if (labelsString.empty()) {
2472 return;
2473 }
2474 std::string_view sv{labelsString};
2475
2476 size_t specStart = 0;
2477 size_t specEnd = 0;
2478 constexpr char specDelim = ',';
2479 constexpr char labelDelim = ':';
2480 do {
2481 specEnd = sv.find(specDelim, specStart);
2482 auto token = sv.substr(specStart, specEnd == std::string_view::npos ? std::string_view::npos : specEnd - specStart);
2483 if (token.empty()) {
2484 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2485 }
2486
2487 size_t labelDelimPos = token.find(labelDelim);
2488 if (labelDelimPos == 0 || labelDelimPos == std::string_view::npos) {
2489 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2490 }
2491 LabelsSpec spec{.matcher = token.substr(0, labelDelimPos), .labels = {}};
2492
2493 size_t labelEnd = labelDelimPos + 1;
2494 do {
2495 size_t labelStart = labelDelimPos + 1;
2496 labelEnd = token.find(labelDelim, labelStart);
2497 auto label = labelEnd == std::string_view::npos ? token.substr(labelStart) : token.substr(labelStart, labelEnd - labelStart);
2498 if (label.empty()) {
2499 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2500 }
2501 spec.labels.emplace_back(label);
2502 labelDelimPos = labelEnd;
2503 } while (labelEnd != std::string_view::npos);
2504
2505 specs.push_back(spec);
2506 specStart = specEnd + 1;
2507 } while (specEnd != std::string_view::npos);
2508
2509 if (labelsString.empty() == false && specs.empty() == true) {
2510 throw std::runtime_error("bad labels definition. Syntax <processor>:<label>[:<label>][,<processor>:<label>[:<label>]");
2511 }
2512
2513 for (auto& spec : specs) {
2514 for (auto& processor : workflow) {
2515 if (processor.name == spec.matcher) {
2516 for (const auto& label : spec.labels) {
2517 if (std::find_if(processor.labels.begin(), processor.labels.end(),
2518 [label](const auto& procLabel) { return procLabel.value == label; }) == processor.labels.end()) {
2519 processor.labels.push_back({label});
2520 }
2521 }
2522 }
2523 }
2524 }
2525}
2526
2528void initialiseDriverControl(bpo::variables_map const& varmap,
2529 DriverInfo& driverInfo,
2530 DriverControl& control)
2531{
2532 // Control is initialised outside the main loop because
2533 // command line options are really affecting control.
2534 control.defaultQuiet = varmap["quiet"].as<bool>();
2535 control.defaultStopped = varmap["stop"].as<bool>();
2536
2537 if (varmap["single-step"].as<bool>()) {
2538 control.state = DriverControlState::STEP;
2539 } else {
2540 control.state = DriverControlState::PLAY;
2541 }
2542
2543 if (varmap["graphviz"].as<bool>()) {
2544 // Dump a graphviz representation of what I will do.
2545 control.callbacks = {[](WorkflowSpec const&,
2546 DeviceSpecs const& specs,
2547 DeviceExecutions const&,
2549 CommandInfo const&) {
2551 }};
2552 control.forcedTransitions = {
2553 DriverState::EXIT, //
2554 DriverState::PERFORM_CALLBACKS, //
2555 DriverState::MERGE_CONFIGS, //
2556 DriverState::IMPORT_CURRENT_WORKFLOW, //
2557 DriverState::MATERIALISE_WORKFLOW //
2558 };
2559 } else if (!varmap["dds"].as<std::string>().empty()) {
2560 // Dump a DDS representation of what I will do.
2561 // Notice that compared to DDS we need to schedule things,
2562 // because DDS needs to be able to have actual Executions in
2563 // order to provide a correct configuration.
2564 control.callbacks = {[filename = varmap["dds"].as<std::string>(),
2565 workflowSuffix = varmap["dds-workflow-suffix"],
2566 driverMode = driverInfo.mode](WorkflowSpec const& workflow,
2567 DeviceSpecs const& specs,
2568 DeviceExecutions const& executions,
2569 DataProcessorInfos& dataProcessorInfos,
2570 CommandInfo const& commandInfo) {
2571 if (filename == "-") {
2572 DDSConfigHelpers::dumpDeviceSpec2DDS(std::cout, driverMode, workflowSuffix.as<std::string>(), workflow, dataProcessorInfos, specs, executions, commandInfo);
2573 } else {
2574 std::ofstream out(filename);
2575 DDSConfigHelpers::dumpDeviceSpec2DDS(out, driverMode, workflowSuffix.as<std::string>(), workflow, dataProcessorInfos, specs, executions, commandInfo);
2576 }
2577 }};
2578 control.forcedTransitions = {
2579 DriverState::EXIT, //
2580 DriverState::PERFORM_CALLBACKS, //
2581 DriverState::MERGE_CONFIGS, //
2582 DriverState::IMPORT_CURRENT_WORKFLOW, //
2583 DriverState::MATERIALISE_WORKFLOW //
2584 };
2585 } else if (!varmap["o2-control"].as<std::string>().empty() or !varmap["mermaid"].as<std::string>().empty()) {
2586 // Dump the workflow in o2-control and/or mermaid format
2587 control.callbacks = {[filename = varmap["mermaid"].as<std::string>(),
2588 workflowName = varmap["o2-control"].as<std::string>()](WorkflowSpec const&,
2589 DeviceSpecs const& specs,
2590 DeviceExecutions const& executions,
2592 CommandInfo const& commandInfo) {
2593 if (!workflowName.empty()) {
2594 dumpDeviceSpec2O2Control(workflowName, specs, executions, commandInfo);
2595 }
2596 if (!filename.empty()) {
2597 if (filename == "-") {
2599 } else {
2600 std::ofstream output(filename);
2602 }
2603 }
2604 }};
2605 control.forcedTransitions = {
2606 DriverState::EXIT, //
2607 DriverState::PERFORM_CALLBACKS, //
2608 DriverState::MERGE_CONFIGS, //
2609 DriverState::IMPORT_CURRENT_WORKFLOW, //
2610 DriverState::MATERIALISE_WORKFLOW //
2611 };
2612
2613 } else if (varmap.count("id")) {
2614 // Add our own stacktrace dumping
2615 if (getenv("O2_NO_CATCHALL_EXCEPTIONS") != nullptr && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0") != 0) {
2616 LOGP(info, "Not instrumenting crash signals because O2_NO_CATCHALL_EXCEPTIONS is set");
2617 gEnv->SetValue("Root.Stacktrace", "no");
2618 gSystem->ResetSignal(kSigSegmentationViolation, kTRUE);
2619 rlimit limit;
2620 if (getrlimit(RLIMIT_CORE, &limit) == 0) {
2621 LOGP(info, "Core limit: {} {}", limit.rlim_cur, limit.rlim_max);
2622 }
2623 }
2624 if (varmap["stacktrace-on-signal"].as<std::string>() == "simple" && (getenv("O2_NO_CATCHALL_EXCEPTIONS") == nullptr || strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0") == 0)) {
2625 LOGP(info, "Instrumenting crash signals");
2626 signal(SIGSEGV, handle_crash);
2627 signal(SIGABRT, handle_crash);
2628 signal(SIGBUS, handle_crash);
2629 signal(SIGILL, handle_crash);
2630 signal(SIGFPE, handle_crash);
2631 }
2632 // FIXME: for the time being each child needs to recalculate the workflow,
2633 // so that it can understand what it needs to do. This is obviously
2634 // a bad idea. In the future we should have the client be pushed
2635 // it's own configuration by the driver.
2636 control.forcedTransitions = {
2637 DriverState::DO_CHILD, //
2638 DriverState::BIND_GUI_PORT, //
2639 DriverState::MERGE_CONFIGS, //
2640 DriverState::IMPORT_CURRENT_WORKFLOW, //
2641 DriverState::MATERIALISE_WORKFLOW //
2642 };
2643 } else if ((varmap["dump-workflow"].as<bool>() == true) || (varmap["run"].as<bool>() == false && varmap.count("id") == 0 && isOutputToPipe())) {
2644 control.callbacks = {[filename = varmap["dump-workflow-file"].as<std::string>()](WorkflowSpec const& workflow,
2645 DeviceSpecs const&,
2646 DeviceExecutions const&,
2647 DataProcessorInfos& dataProcessorInfos,
2648 CommandInfo const& commandInfo) {
2649 if (filename == "-") {
2650 WorkflowSerializationHelpers::dump(std::cout, workflow, dataProcessorInfos, commandInfo);
2651 // FIXME: this is to avoid trailing garbage..
2652 exit(0);
2653 } else {
2654 std::ofstream output(filename);
2655 WorkflowSerializationHelpers::dump(output, workflow, dataProcessorInfos, commandInfo);
2656 }
2657 }};
2658 control.forcedTransitions = {
2659 DriverState::EXIT, //
2660 DriverState::PERFORM_CALLBACKS, //
2661 DriverState::MERGE_CONFIGS, //
2662 DriverState::IMPORT_CURRENT_WORKFLOW, //
2663 DriverState::MATERIALISE_WORKFLOW //
2664 };
2665 } else {
2666 // By default we simply start the main loop of the driver.
2667 control.forcedTransitions = {
2668 DriverState::INIT, //
2669 DriverState::BIND_GUI_PORT, //
2670 DriverState::IMPORT_CURRENT_WORKFLOW, //
2671 DriverState::MATERIALISE_WORKFLOW //
2672 };
2673 }
2674}
2675
2677void conflicting_options(const boost::program_options::variables_map& vm,
2678 const std::string& opt1, const std::string& opt2)
2679{
2680 if (vm.count(opt1) && !vm[opt1].defaulted() &&
2681 vm.count(opt2) && !vm[opt2].defaulted()) {
2682 throw std::logic_error(std::string("Conflicting options '") +
2683 opt1 + "' and '" + opt2 + "'.");
2684 }
2685}
2686
2687template <typename T>
2689 std::vector<T>& v,
2690 std::vector<int>& indices)
2691{
2692 using std::swap; // to permit Koenig lookup
2693 for (int i = 0; i < (int)indices.size(); i++) {
2694 auto current = i;
2695 while (i != indices[current]) {
2696 auto next = indices[current];
2697 swap(v[current], v[next]);
2698 indices[current] = current;
2699 current = next;
2700 }
2701 indices[current] = current;
2702 }
2703}
2704
2705// Check if the workflow is resiliant to failures
2706void checkNonResiliency(std::vector<DataProcessorSpec> const& specs,
2707 std::vector<std::pair<int, int>> const& edges)
2708{
2709 auto checkExpendable = [](DataProcessorLabel const& label) {
2710 return label.value == "expendable";
2711 };
2712 auto checkResilient = [](DataProcessorLabel const& label) {
2713 return label.value == "resilient" || label.value == "expendable";
2714 };
2715
2716 for (auto& edge : edges) {
2717 auto& src = specs[edge.first];
2718 auto& dst = specs[edge.second];
2719 if (std::none_of(src.labels.begin(), src.labels.end(), checkExpendable)) {
2720 continue;
2721 }
2722 if (std::any_of(dst.labels.begin(), dst.labels.end(), checkResilient)) {
2723 continue;
2724 }
2725 throw std::runtime_error("Workflow is not resiliant to failures. Processor " + dst.name + " gets inputs from expendable devices, but is not marked as expendable or resilient itself.");
2726 }
2727}
2728
2729std::string debugTopoInfo(std::vector<DataProcessorSpec> const& specs,
2730 std::vector<TopoIndexInfo> const& infos,
2731 std::vector<std::pair<int, int>> const& edges)
2732{
2733 std::ostringstream out;
2734
2735 out << "\nTopological info:\n";
2736 for (auto& ti : infos) {
2737 out << specs[ti.index].name << " (index: " << ti.index << ", layer: " << ti.layer << ")\n";
2738 out << " Inputs:\n";
2739 for (auto& ii : specs[ti.index].inputs) {
2740 out << " - " << DataSpecUtils::describe(ii) << "\n";
2741 }
2742 out << "\n Outputs:\n";
2743 for (auto& ii : specs[ti.index].outputs) {
2744 out << " - " << DataSpecUtils::describe(ii) << "\n";
2745 }
2746 }
2747 out << "\nEdges values:\n";
2748 for (auto& e : edges) {
2749 out << specs[e.second].name << " depends on " << specs[e.first].name << "\n";
2750 }
2751 for (auto& d : specs) {
2752 out << "- " << d.name << std::endl;
2753 }
2755 return out.str();
2756}
2757
2758void enableSignposts(std::string const& signpostsToEnable)
2759{
2760 static pid_t pid = getpid();
2761 if (signpostsToEnable.empty() == true) {
2762 auto printAllSignposts = [](char const* name, void* l, void* context) {
2763 auto* log = (_o2_log_t*)l;
2764 LOGP(detail, "Signpost stream {} disabled. Enable it with o2-log -p {} -a {}", name, pid, (void*)&log->stacktrace);
2765 return true;
2766 };
2767 o2_walk_logs(printAllSignposts, nullptr);
2768 return;
2769 }
2770 auto matchingLogEnabler = [](char const* name, void* l, void* context) {
2771 auto* log = (_o2_log_t*)l;
2772 auto* selectedName = (char const*)context;
2773 std::string prefix = "ch.cern.aliceo2.";
2774 auto* last = strchr(selectedName, ':');
2775 int maxDepth = 1;
2776 if (last) {
2777 char* err;
2778 maxDepth = strtol(last + 1, &err, 10);
2779 if (*(last + 1) == '\0' || *err != '\0') {
2780 maxDepth = 1;
2781 }
2782 }
2783
2784 auto fullName = prefix + std::string{selectedName, last ? last - selectedName : strlen(selectedName)};
2785 if (fullName == name) {
2786 LOGP(info, "Enabling signposts for stream \"{}\" with depth {}.", fullName, maxDepth);
2787 _o2_log_set_stacktrace(log, maxDepth);
2788 return false;
2789 } else {
2790 LOGP(info, "Signpost stream \"{}\" disabled. Enable it with o2-log -p {} -a {}", name, pid, (void*)&log->stacktrace);
2791 }
2792 return true;
2793 };
2794 // Split signpostsToEnable by comma using strtok_r
2795 char* saveptr;
2796 char* src = const_cast<char*>(signpostsToEnable.data());
2797 auto* token = strtok_r(src, ",", &saveptr);
2798 while (token) {
2799 o2_walk_logs(matchingLogEnabler, token);
2800 token = strtok_r(nullptr, ",", &saveptr);
2801 }
2802}
2803
2804void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow)
2805{
2806 overrideCloning(ctx, workflow);
2807 overridePipeline(ctx, workflow);
2808 overrideLabels(ctx, workflow);
2809}
2810
2811o2::framework::ConfigContext createConfigContext(std::unique_ptr<ConfigParamRegistry>& workflowOptionsRegistry,
2812 o2::framework::ServiceRegistry& configRegistry,
2813 std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
2814 std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv)
2815{
2816 std::vector<std::unique_ptr<o2::framework::ParamRetriever>> retrievers;
2817 std::unique_ptr<o2::framework::ParamRetriever> retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)};
2818 retrievers.emplace_back(std::move(retriever));
2819 auto workflowOptionsStore = std::make_unique<o2::framework::ConfigParamStore>(workflowOptions, std::move(retrievers));
2820 workflowOptionsStore->preload();
2821 workflowOptionsStore->activate();
2822 workflowOptionsRegistry = std::make_unique<ConfigParamRegistry>(std::move(workflowOptionsStore));
2823 extraOptions = o2::framework::ConfigParamDiscovery::discover(*workflowOptionsRegistry, argc, argv);
2824 for (auto& extra : extraOptions) {
2825 workflowOptions.push_back(extra);
2826 }
2827
2828 return o2::framework::ConfigContext(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv);
2829}
2830
2831std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
2832{
2833 return std::make_unique<o2::framework::ServiceRegistry>();
2834}
2835
2836// This is a toy executor for the workflow spec
2837// What it needs to do is:
2838//
2839// - Print the properties of each DataProcessorSpec
2840// - Fork one process per DataProcessorSpec
2841// - Parent -> wait for all the children to complete (eventually
2842// killing them all on ctrl-c).
2843// - Child, pick the data-processor ID and start a O2DataProcessorDevice for
2844// each DataProcessorSpec
2845int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
2846 std::vector<ChannelConfigurationPolicy> const& channelPolicies,
2847 std::vector<CompletionPolicy> const& completionPolicies,
2848 std::vector<DispatchPolicy> const& dispatchPolicies,
2849 std::vector<ResourcePolicy> const& resourcePolicies,
2850 std::vector<CallbacksPolicy> const& callbacksPolicies,
2851 std::vector<SendingPolicy> const& sendingPolicies,
2852 std::vector<ConfigParamSpec> const& currentWorkflowOptions,
2853 std::vector<ConfigParamSpec> const& detectedParams,
2854 o2::framework::ConfigContext& configContext)
2855{
2856 // Peek very early in the driver options and look for
2857 // signposts, so the we can enable it without going through the whole dance
2858 if (getenv("DPL_DRIVER_SIGNPOSTS")) {
2859 enableSignposts(getenv("DPL_DRIVER_SIGNPOSTS"));
2860 }
2861
2862 std::vector<std::string> currentArgs;
2863 std::vector<PluginInfo> plugins;
2864 std::vector<ForwardingPolicy> forwardingPolicies = ForwardingPolicy::createDefaultPolicies();
2865
2866 for (int ai = 1; ai < argc; ++ai) {
2867 currentArgs.emplace_back(argv[ai]);
2868 }
2869
2870 WorkflowInfo currentWorkflow{
2871 argv[0],
2872 currentArgs,
2873 currentWorkflowOptions};
2874
2875 ProcessingPolicies processingPolicies;
2876 enum LogParsingHelpers::LogLevel minFailureLevel;
2877 bpo::options_description executorOptions("Executor options");
2878 const char* helpDescription = "print help: short, full, executor, or processor name";
2879 enum DriverMode driverMode;
2880 executorOptions.add_options() //
2881 ("help,h", bpo::value<std::string>()->implicit_value("short"), helpDescription) // //
2882 ("quiet,q", bpo::value<bool>()->zero_tokens()->default_value(false), "quiet operation") // //
2883 ("stop,s", bpo::value<bool>()->zero_tokens()->default_value(false), "stop before device start") // //
2884 ("single-step", bpo::value<bool>()->zero_tokens()->default_value(false), "start in single step mode") // //
2885 ("batch,b", bpo::value<std::vector<std::string>>()->zero_tokens()->composing(), "batch processing mode") // //
2886 ("no-batch", bpo::value<bool>()->zero_tokens(), "force gui processing mode") // //
2887 ("no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // //
2888 ("hostname", bpo::value<std::string>()->default_value("localhost"), "hostname to deploy") // //
2889 ("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
2890 ("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
2891 ("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
2892 ("completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.termination)->default_value(TerminationPolicy::QUIT), // //
2893 "what to do when processing is finished: quit, wait") // //
2894 ("error-policy", bpo::value<TerminationPolicy>(&processingPolicies.error)->default_value(TerminationPolicy::QUIT), // //
2895 "what to do when a device has an error: quit, wait") // //
2896 ("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
2897 "minimum message level which will be considered as fatal and exit with 1") // //
2898 ("graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(false), "produce graphviz output") // //
2899 ("mermaid", bpo::value<std::string>()->default_value(""), "produce graph output in mermaid format in file under specified name or on stdout if argument is \"-\"") // //
2900 ("timeout,t", bpo::value<uint64_t>()->default_value(0), "forced exit timeout (in seconds)") // //
2901 ("dds,D", bpo::value<std::string>()->default_value(""), "create DDS configuration") // //
2902 ("dds-workflow-suffix,D", bpo::value<std::string>()->default_value(""), "suffix for DDS names") // //
2903 ("dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(false), "dump workflow as JSON") // //
2904 ("dump-workflow-file", bpo::value<std::string>()->default_value("-"), "file to which do the dump") // //
2905 ("driver-mode", bpo::value<DriverMode>(&driverMode)->default_value(DriverMode::STANDALONE), R"(how to run the driver. default: "standalone". Valid: "embedded")") // //
2906 ("run", bpo::value<bool>()->zero_tokens()->default_value(false), "run workflow merged so far. It implies --batch. Use --no-batch to see the GUI") // //
2907 ("no-IPC", bpo::value<bool>()->zero_tokens()->default_value(false), "disable IPC topology optimization") // //
2908 ("o2-control,o2", bpo::value<std::string>()->default_value(""), "dump O2 Control workflow configuration under the specified name") //
2909 ("resources-monitoring", bpo::value<unsigned short>()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds") //
2910 ("resources-monitoring-dump-interval", bpo::value<unsigned short>()->default_value(0), "dump monitoring information to disk every provided seconds"); //
2911 // some of the options must be forwarded by default to the device
2912 executorOptions.add(DeviceSpecHelpers::getForwardedDeviceOptions());
2913
2914 gHiddenDeviceOptions.add_options() //
2915 ("id,i", bpo::value<std::string>(), "device id for child spawning") //
2916 ("channel-config", bpo::value<std::vector<std::string>>(), "channel configuration") //
2917 ("control", "control plugin") //
2918 ("log-color", "logging color scheme")("color", "logging color scheme");
2919
2920 bpo::options_description visibleOptions;
2921 visibleOptions.add(executorOptions);
2922
2923 auto physicalWorkflow = workflow;
2924 std::map<std::string, size_t> rankIndex;
2925 // We remove the duplicates because for the moment child get themself twice:
2926 // once from the actual definition in the child, a second time from the
2927 // configuration they get passed by their parents.
2928 // Notice that we do not know in which order we will get the workflows, so
2929 // while we keep the order of DataProcessors we reshuffle them based on
2930 // some hopefully unique hash.
2931 size_t workflowHashA = 0;
2932 std::hash<std::string> hash_fn;
2933
2934 for (auto& dp : workflow) {
2935 workflowHashA += hash_fn(dp.name);
2936 }
2937
2938 for (auto& dp : workflow) {
2939 rankIndex.insert(std::make_pair(dp.name, workflowHashA));
2940 }
2941
2942 std::vector<DataProcessorInfo> dataProcessorInfos;
2943 CommandInfo commandInfo{};
2944
2945 if (isatty(STDIN_FILENO) == false && isInputConfig()) {
2946 std::vector<DataProcessorSpec> importedWorkflow;
2947 bool previousWorked = WorkflowSerializationHelpers::import(std::cin, importedWorkflow, dataProcessorInfos, commandInfo);
2948 if (previousWorked == false) {
2949 exit(1);
2950 }
2951
2952 size_t workflowHashB = 0;
2953 for (auto& dp : importedWorkflow) {
2954 workflowHashB += hash_fn(dp.name);
2955 }
2956
2957 // FIXME: Streamline...
2958 // We remove the duplicates because for the moment child get themself twice:
2959 // once from the actual definition in the child, a second time from the
2960 // configuration they get passed by their parents.
2961 for (auto& dp : importedWorkflow) {
2962 auto found = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(),
2963 [&name = dp.name](DataProcessorSpec const& spec) { return spec.name == name; });
2964 if (found == physicalWorkflow.end()) {
2965 physicalWorkflow.push_back(dp);
2966 rankIndex.insert(std::make_pair(dp.name, workflowHashB));
2967 }
2968 }
2969 }
2970
2975 for (auto& dp : physicalWorkflow) {
2976 auto isExpendable = [](DataProcessorLabel const& label) { return label.value == "expendable" || label.value == "non-critical"; };
2977 if (std::find_if(dp.labels.begin(), dp.labels.end(), isExpendable) != dp.labels.end()) {
2978 for (auto& output : dp.outputs) {
2979 if (output.lifetime == Lifetime::Timeframe) {
2980 output.lifetime = Lifetime::Sporadic;
2981 }
2982 }
2983 }
2984 }
2985
2987 OverrideServiceSpecs driverServicesOverride = ServiceSpecHelpers::parseOverrides(getenv("DPL_DRIVER_OVERRIDE_SERVICES"));
2989 // We insert the hash for the internal devices.
2990 WorkflowHelpers::injectServiceDevices(physicalWorkflow, configContext);
2991 auto reader = std::find_if(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec& spec) { return spec.name == "internal-dpl-aod-reader"; });
2992 if (reader != physicalWorkflow.end()) {
2993 driverServices.push_back(ArrowSupport::arrowBackendSpec());
2994 }
2995 for (auto& service : driverServices) {
2996 if (service.injectTopology == nullptr) {
2997 continue;
2998 }
2999 WorkflowSpecNode node{physicalWorkflow};
3000 service.injectTopology(node, configContext);
3001 }
3002 for (auto& dp : physicalWorkflow) {
3003 if (dp.name.rfind("internal-", 0) == 0) {
3004 rankIndex.insert(std::make_pair(dp.name, hash_fn("internal")));
3005 }
3006 }
3007
3008 // We sort dataprocessors and Inputs / outputs by name, so that the edges are
3009 // always in the same order.
3010 std::stable_sort(physicalWorkflow.begin(), physicalWorkflow.end(), [](DataProcessorSpec const& a, DataProcessorSpec const& b) {
3011 return a.name < b.name;
3012 });
3013
3014 for (auto& dp : physicalWorkflow) {
3015 std::stable_sort(dp.inputs.begin(), dp.inputs.end(),
3016 [](InputSpec const& a, InputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3017 std::stable_sort(dp.outputs.begin(), dp.outputs.end(),
3018 [](OutputSpec const& a, OutputSpec const& b) { return DataSpecUtils::describe(a) < DataSpecUtils::describe(b); });
3019 }
3020
3021 std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
3022 std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
3023 dependencyCheckers.reserve(physicalWorkflow.size());
3024
3025 for (auto& spec : physicalWorkflow) {
3026 for (auto& policy : topologyPolicies) {
3027 if (policy.matcher(spec)) {
3028 dependencyCheckers.push_back(policy.checkDependency);
3029 break;
3030 }
3031 }
3032 }
3033 assert(dependencyCheckers.size() == physicalWorkflow.size());
3034 // check if DataProcessorSpec at i depends on j
3035 auto checkDependencies = [&workflow = physicalWorkflow,
3036 &dependencyCheckers](int i, int j) {
3037 TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
3038 return checker(workflow[i], workflow[j]);
3039 };
3040
3041 // Create a list of all the edges, so that we can do a topological sort
3042 // before we create the graph.
3043 std::vector<std::pair<int, int>> edges;
3044
3045 if (physicalWorkflow.size() > 1) {
3046 for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
3047 for (size_t j = i; j < physicalWorkflow.size(); ++j) {
3048 if (i == j && checkDependencies(i, j)) {
3049 throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
3050 }
3051 bool both = false;
3052 if (checkDependencies(i, j)) {
3053 edges.emplace_back(j, i);
3054 both = true;
3055 }
3056 if (checkDependencies(j, i)) {
3057 edges.emplace_back(i, j);
3058 if (both) {
3059 std::ostringstream str;
3060 for (auto x : {i, j}) {
3061 str << physicalWorkflow[x].name << ":\n";
3062 str << "inputs:\n";
3063 for (auto& input : physicalWorkflow[x].inputs) {
3064 str << "- " << input << "\n";
3065 }
3066 str << "outputs:\n";
3067 for (auto& output : physicalWorkflow[x].outputs) {
3068 str << "- " << output << "\n";
3069 }
3070 }
3071 throw std::runtime_error(physicalWorkflow[i].name + " has circular dependency with " + physicalWorkflow[j].name + ":\n" + str.str());
3072 }
3073 }
3074 }
3075 }
3076
3077 auto topoInfos = WorkflowHelpers::topologicalSort(physicalWorkflow.size(), &edges[0].first, &edges[0].second, sizeof(std::pair<int, int>), edges.size());
3078 if (topoInfos.size() != physicalWorkflow.size()) {
3079 // Check missing resilincy of one of the tasks
3080 checkNonResiliency(physicalWorkflow, edges);
3081 throw std::runtime_error("Unable to do topological sort of the resulting workflow. Do you have loops?\n" + debugTopoInfo(physicalWorkflow, topoInfos, edges));
3082 }
3083 // Sort by layer and then by name, to ensure stability.
3084 std::stable_sort(topoInfos.begin(), topoInfos.end(), [&workflow = physicalWorkflow](TopoIndexInfo const& a, TopoIndexInfo const& b) {
3085 auto aRank = std::make_tuple(a.layer, -workflow.at(a.index).outputs.size(), workflow.at(a.index).name);
3086 auto bRank = std::make_tuple(b.layer, -workflow.at(b.index).outputs.size(), workflow.at(b.index).name);
3087 return aRank < bRank;
3088 });
3089 // Reverse index and apply the result
3090 std::vector<int> dataProcessorOrder;
3091 dataProcessorOrder.resize(topoInfos.size());
3092 for (size_t i = 0; i < topoInfos.size(); ++i) {
3093 dataProcessorOrder[topoInfos[i].index] = i;
3094 }
3095 std::vector<int> newLocations;
3096 newLocations.resize(dataProcessorOrder.size());
3097 for (size_t i = 0; i < dataProcessorOrder.size(); ++i) {
3098 newLocations[dataProcessorOrder[i]] = i;
3099 }
3100 apply_permutation(physicalWorkflow, newLocations);
3101 }
3102
3103 // Use the hidden options as veto, all config specs matching a definition
3104 // in the hidden options are skipped in order to avoid duplicate definitions
3105 // in the main parser. Note: all config specs are forwarded to devices
3106 visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, currentWorkflowOptions, gHiddenDeviceOptions));
3107
3108 bpo::options_description od;
3109 od.add(visibleOptions);
3110 od.add(gHiddenDeviceOptions);
3111
3112 // FIXME: decide about the policy for handling unrecognized arguments
3113 // command_line_parser with option allow_unregistered() can be used
3114 using namespace bpo::command_line_style;
3115 auto style = (allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
3116 bpo::variables_map varmap;
3117 try {
3118 bpo::store(
3119 bpo::command_line_parser(argc, argv)
3120 .options(od)
3121 .style(style)
3122 .run(),
3123 varmap);
3124 } catch (std::exception const& e) {
3125 LOGP(error, "error parsing options of {}: {}", argv[0], e.what());
3126 exit(1);
3127 }
3128 conflicting_options(varmap, "dds", "o2-control");
3129 conflicting_options(varmap, "dds", "dump-workflow");
3130 conflicting_options(varmap, "dds", "run");
3131 conflicting_options(varmap, "dds", "graphviz");
3132 conflicting_options(varmap, "o2-control", "dump-workflow");
3133 conflicting_options(varmap, "o2-control", "run");
3134 conflicting_options(varmap, "o2-control", "graphviz");
3135 conflicting_options(varmap, "run", "dump-workflow");
3136 conflicting_options(varmap, "run", "graphviz");
3137 conflicting_options(varmap, "run", "mermaid");
3138 conflicting_options(varmap, "dump-workflow", "graphviz");
3139 conflicting_options(varmap, "no-batch", "batch");
3140
3141 if (varmap.count("help")) {
3142 printHelp(varmap, executorOptions, physicalWorkflow, currentWorkflowOptions);
3143 exit(0);
3144 }
3148 if (varmap.count("severity")) {
3149 auto logLevel = varmap["severity"].as<std::string>();
3150 if (logLevel == "debug") {
3151 fair::Logger::SetConsoleSeverity(fair::Severity::debug);
3152 } else if (logLevel == "detail") {
3153 fair::Logger::SetConsoleSeverity(fair::Severity::detail);
3154 } else if (logLevel == "info") {
3155 fair::Logger::SetConsoleSeverity(fair::Severity::info);
3156 } else if (logLevel == "warning") {
3157 fair::Logger::SetConsoleSeverity(fair::Severity::warning);
3158 } else if (logLevel == "error") {
3159 fair::Logger::SetConsoleSeverity(fair::Severity::error);
3160 } else if (logLevel == "important") {
3161 fair::Logger::SetConsoleSeverity(fair::Severity::important);
3162 } else if (logLevel == "alarm") {
3163 fair::Logger::SetConsoleSeverity(fair::Severity::alarm);
3164 } else if (logLevel == "critical") {
3165 fair::Logger::SetConsoleSeverity(fair::Severity::critical);
3166 } else if (logLevel == "fatal") {
3167 fair::Logger::SetConsoleSeverity(fair::Severity::fatal);
3168 } else {
3169 LOGP(error, "Invalid log level '{}'", logLevel);
3170 exit(1);
3171 }
3172 }
3173
3174 enableSignposts(varmap["signposts"].as<std::string>());
3175
3176 auto evaluateBatchOption = [&varmap]() -> bool {
3177 if (varmap.count("no-batch") > 0) {
3178 return false;
3179 }
3180 if (varmap.count("batch") == 0) {
3181 // default value
3182 return isatty(fileno(stdout)) == 0;
3183 }
3184 // FIXME: should actually use the last value, but for some reason the
3185 // values are not filled into the vector, even if specifying `-b true`
3186 // need to find out why the boost program options example is not working
3187 // in our case. Might depend on the parser options
3188 // auto value = varmap["batch"].as<std::vector<std::string>>();
3189 return true;
3190 };
3191 DriverInfo driverInfo{
3192 .sendingPolicies = sendingPolicies,
3193 .forwardingPolicies = forwardingPolicies,
3194 .callbacksPolicies = callbacksPolicies};
3195 driverInfo.states.reserve(10);
3196 driverInfo.sigintRequested = false;
3197 driverInfo.sigchldRequested = false;
3198 driverInfo.channelPolicies = channelPolicies;
3199 driverInfo.completionPolicies = completionPolicies;
3200 driverInfo.dispatchPolicies = dispatchPolicies;
3201 driverInfo.resourcePolicies = resourcePolicies;
3202 driverInfo.argc = argc;
3203 driverInfo.argv = argv;
3204 driverInfo.noSHMCleanup = varmap["no-cleanup"].as<bool>();
3205 driverInfo.processingPolicies.termination = varmap["completion-policy"].as<TerminationPolicy>();
3206 driverInfo.processingPolicies.earlyForward = varmap["early-forward-policy"].as<EarlyForwardPolicy>();
3207 driverInfo.mode = varmap["driver-mode"].as<DriverMode>();
3208
3209 auto batch = evaluateBatchOption();
3210 DriverConfig driverConfig{
3211 .batch = batch,
3212 .driverHasGUI = (batch == false) || getenv("DPL_DRIVER_REMOTE_GUI") != nullptr,
3213 };
3214
3215 if (varmap["error-policy"].defaulted() && driverConfig.batch == false) {
3216 driverInfo.processingPolicies.error = TerminationPolicy::WAIT;
3217 } else {
3218 driverInfo.processingPolicies.error = varmap["error-policy"].as<TerminationPolicy>();
3219 }
3220 driverInfo.minFailureLevel = varmap["min-failure-level"].as<LogParsingHelpers::LogLevel>();
3221 driverInfo.startTime = uv_hrtime();
3222 driverInfo.startTimeMsFromEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
3223 std::chrono::system_clock::now().time_since_epoch())
3224 .count();
3225 driverInfo.timeout = varmap["timeout"].as<uint64_t>();
3226 driverInfo.deployHostname = varmap["hostname"].as<std::string>();
3227 driverInfo.resources = varmap["resources"].as<std::string>();
3228 driverInfo.resourcesMonitoringInterval = varmap["resources-monitoring"].as<unsigned short>();
3229 driverInfo.resourcesMonitoringDumpInterval = varmap["resources-monitoring-dump-interval"].as<unsigned short>();
3230
3231 // FIXME: should use the whole dataProcessorInfos, actually...
3232 driverInfo.processorInfo = dataProcessorInfos;
3233 driverInfo.configContext = &configContext;
3234
3235 DriverControl driverControl;
3236 initialiseDriverControl(varmap, driverInfo, driverControl);
3237
3238 commandInfo.merge(CommandInfo(argc, argv));
3239
3240 std::string frameworkId;
3241 // If the id is set, this means this is a device,
3242 // otherwise this is the driver.
3243 if (varmap.count("id")) {
3244 // The framework id does not want to know anything about DDS template expansion
3245 // so we simply drop it. Notice that the "id" Property is still the same as the
3246 // original --id option.
3247 frameworkId = std::regex_replace(varmap["id"].as<std::string>(), std::regex{"_dds.*"}, "");
3248 driverInfo.uniqueWorkflowId = fmt::format("{}", getppid());
3249 driverInfo.defaultDriverClient = "stdout://";
3250 } else {
3251 driverInfo.uniqueWorkflowId = fmt::format("{}", getpid());
3252 driverInfo.defaultDriverClient = "ws://";
3253 }
3254 return runStateMachine(physicalWorkflow,
3255 currentWorkflow,
3256 dataProcessorInfos,
3257 commandInfo,
3258 driverControl,
3259 driverInfo,
3260 driverConfig,
3262 detectedParams,
3263 varmap,
3264 driverServices,
3265 frameworkId);
3266}
3267
3268void doBoostException(boost::exception&, char const* processName)
3269{
3270 LOGP(error, "error while setting up workflow in {}: {}",
3271 processName, boost::current_exception_diagnostic_information(true));
3272}
3273#pragma GCC diagnostic push
struct uv_timer_s uv_timer_t
struct uv_async_s uv_async_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
int32_t i
int32_t retVal
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
o2::phos::PHOSEnergySlot es
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
uint16_t pid
Definition RawData.h:2
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:536
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
ConfigParamRegistry & options() const
T get(uint32_t y, uint32_t x) const
Definition Array2D.h:199
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLint GLenum GLint x
Definition glcorearb.h:403
GLenum mode
Definition glcorearb.h:266
GLenum src
Definition glcorearb.h:1767
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum GLenum dst
Definition glcorearb.h:1767
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum const void * indices
Definition glcorearb.h:400
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean r
Definition glcorearb.h:1233
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
GLenum GLfloat param
Definition glcorearb.h:271
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
GLuint id
Definition glcorearb.h:650
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ServiceSpec > ServiceSpecs
RuntimeErrorRef runtime_error(const char *)
EarlyForwardPolicy
When to enable the early forwarding optimization:
std::vector< OverrideServiceSpec > OverrideServiceSpecs
void parse_http_request(char *start, size_t size, HTTPParser *parser)
RuntimeError & error_from_ref(RuntimeErrorRef)
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
void dumpDeviceSpec2O2Control(std::string workflowName, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
Dumps the AliECS compatible workflow and task templates for a DPL workflow.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
void empty(int)
std::vector< std::string > split(const std::string &str, char delimiter=',')
int runStateMachine(DataProcessorSpecs const &workflow, WorkflowInfo const &workflowInfo, DataProcessorInfos const &previousDataProcessorInfos, CommandInfo const &commandInfo, DriverControl &driverControl, DriverInfo &driverInfo, DriverConfig &driverConfig, std::vector< DeviceMetricsInfo > &metricsInfos, std::vector< ConfigParamSpec > const &detectedParams, boost::program_options::variables_map &varmap, std::vector< ServiceSpec > &driverServices, std::string frameworkId)
AlgorithmSpec dryRun(DeviceSpec const &spec)
auto bindGUIPort
void getChildData(int infd, DeviceInfo &outinfo)
void overrideLabels(ConfigContext &ctx, WorkflowSpec &workflow)
void apply_permutation(std::vector< T > &v, std::vector< int > &indices)
int doMain(int argc, char **argv, o2::framework::WorkflowSpec const &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ConfigParamSpec > const &currentWorkflowOptions, std::vector< ConfigParamSpec > const &detectedParams, o2::framework::ConfigContext &configContext)
void overridePipeline(ConfigContext &ctx, WorkflowSpec &workflow)
void enableSignposts(std::string const &signpostsToEnable)
void spawnDevice(uv_loop_t *loop, DeviceRef ref, std::vector< DeviceSpec > const &specs, DriverInfo &driverInfo, std::vector< DeviceControl > &, std::vector< DeviceExecution > &executions, std::vector< DeviceInfo > &deviceInfos, std::vector< DataProcessingStates > &allStates, ServiceRegistryRef serviceRegistry, boost::program_options::variables_map &varmap, std::vector< DeviceStdioContext > &childFds, unsigned parentCPU, unsigned parentNode)
void killChildren(std::vector< DeviceInfo > &infos, int sig)
void ws_connect_callback(uv_stream_t *server, int status)
A callback for the rest engine.
std::vector< DataProcessingStates > DataProcessingStatesInfos
void createPipes(int *pipes)
void doDPLException(o2::framework::RuntimeErrorRef &ref, char const *)
std::vector< DeviceExecution > DeviceExecutions
void overrideAll(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
std::vector< DeviceMetricsInfo > gDeviceMetricsInfos
void force_exit_callback(uv_timer_s *ctx)
std::string debugTopoInfo(std::vector< DataProcessorSpec > const &specs, std::vector< TopoIndexInfo > const &infos, std::vector< std::pair< int, int > > const &edges)
void overrideCloning(ConfigContext &ctx, WorkflowSpec &workflow)
void doBoostException(boost::exception &e, const char *)
bool processSigChild(DeviceInfos &infos, DeviceSpecs &specs)
void checkNonResiliency(std::vector< DataProcessorSpec > const &specs, std::vector< std::pair< int, int > > const &edges)
std::vector< std::regex > getDumpableMetrics()
void stream_config(uv_work_t *req)
std::vector< DataProcessorSpec > DataProcessorSpecs
void dumpRunSummary(DriverServerContext &context, DriverInfo const &driverInfo, DeviceInfos const &infos, DeviceSpecs const &specs)
void conflicting_options(const boost::program_options::variables_map &vm, const std::string &opt1, const std::string &opt2)
Helper to to detect conflicting options.
int doChild(int argc, char **argv, ServiceRegistry &serviceRegistry, RunningWorkflowInfo const &runningWorkflow, RunningDeviceRef ref, DriverConfig const &driverConfig, ProcessingPolicies processingPolicies, std::string const &defaultDriverClient, uv_loop_t *loop)
void doDefaultWorkflowTerminationHook()
bool checkIfCanExit(std::vector< DeviceInfo > const &infos)
volatile sig_atomic_t sigchld_requested
bool isOutputToPipe()
void handleSignals()
std::vector< DeviceSpec > DeviceSpecs
std::vector< DataProcessorInfo > DataProcessorInfos
volatile sig_atomic_t forceful_exit
bool areAllChildrenGone(std::vector< DeviceInfo > &infos)
Check the state of the children.
std::vector< DeviceControl > DeviceControls
volatile sig_atomic_t double_sigint
void close_websocket(uv_handle_t *handle)
void handleChildrenStdio(DriverServerContext *serverContext, std::string const &forwardedStdin, std::vector< DeviceStdioContext > &childFds, std::vector< uv_poll_t * > &handles)
char * getIdString(int argc, char **argv)
bool isInputConfig()
std::unique_ptr< o2::framework::ServiceRegistry > createRegistry()
void log_callback(uv_poll_t *handle, int status, int events)
void processChildrenOutput(uv_loop_t *loop, DriverInfo &driverInfo, DeviceInfos &infos, DeviceSpecs const &specs, DeviceControls &controls)
volatile sig_atomic_t graceful_exit
void single_step_callback(uv_timer_s *ctx)
Force single stepping of the children.
bpo::options_description gHiddenDeviceOptions("Hidden child options")
void doUnknownException(std::string const &s, char const *)
o2::framework::ConfigContext createConfigContext(std::unique_ptr< ConfigParamRegistry > &workflowOptionsRegistry, o2::framework::ServiceRegistry &configRegistry, std::vector< o2::framework::ConfigParamSpec > &workflowOptions, std::vector< o2::framework::ConfigParamSpec > &extraOptions, int argc, char **argv)
int callMain(int argc, char **argv, int(*mainNoCatch)(int, char **))
void handle_crash(int sig)
void dumpMetricsCallback(uv_timer_t *handle)
void cleanupSHM(std::string const &uniqueWorkflowId)
Helper to invoke shared memory cleanup.
void initialiseDriverControl(bpo::variables_map const &varmap, DriverInfo &driverInfo, DriverControl &control)
Helper function to initialise the controller from the command line options.
void gui_callback(uv_timer_s *ctx)
std::vector< DeviceInfo > DeviceInfos
void printHelp(bpo::variables_map const &varmap, bpo::options_description const &executorOptions, std::vector< DataProcessorSpec > const &physicalWorkflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions)
void spawnRemoteDevice(uv_loop_t *loop, std::string const &, DeviceSpec const &spec, DeviceControl &, DeviceExecution &, DeviceInfos &deviceInfos, DataProcessingStatesInfos &allStates)
void websocket_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
int mainNoCatch(int argc, char **argv)
DriverServerContext * serverContext
std::vector< ConfigParamSpec > options
std::vector< std::string > args
static ServiceSpec arrowBackendSpec()
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static std::vector< ServiceSpec > defaultServices()
static std::vector< ComputingResource > parseResources(std::string const &resourceString)
static std::vector< ConfigParamSpec > discover(ConfigParamRegistry &, int, char **)
static boost::program_options::options_description prepareOptionDescriptions(ContainerType const &workflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions, options_description vetos=options_description(), std::string mode="full")
populate boost program options for a complete workflow
static void populateBoostProgramOptions(options_description &options, const std::vector< ConfigParamSpec > &specs, options_description vetos=options_description())
static void dumpDeviceSpec2DDS(std::ostream &out, DriverMode mode, std::string const &workflowSuffix, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
static void preExitCallbacks(std::vector< ServiceExitHandle >, ServiceRegistryRef)
Invoke callback to be executed on exit, in reverse order.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
Plugin interface for DPL GUIs.
Definition DebugGUI.h:30
virtual void * initGUI(char const *windowTitle, ServiceRegistry &registry)=0
virtual std::function< void(void)> getGUIDebugger(std::vector< o2::framework::DeviceInfo > const &infos, std::vector< o2::framework::DeviceSpec > const &devices, std::vector< o2::framework::DataProcessingStates > const &allStates, std::vector< o2::framework::DataProcessorInfo > const &metadata, std::vector< o2::framework::DeviceMetricsInfo > const &metricsInfos, o2::framework::DriverInfo const &driverInfo, std::vector< o2::framework::DeviceControl > &controls, o2::framework::DriverControl &driverControl)=0
static DeploymentMode deploymentMode()
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
char logFilter[MAX_USER_FILTER_SIZE]
Lines in the log should match this to be displayed.
bool quiet
wether we should be capturing device output.
std::vector< std::string > history
Definition DeviceInfo.h:56
bool active
Whether the device is active (running) or not.
Definition DeviceInfo.h:65
size_t historySize
The size of the history circular buffer.
Definition DeviceInfo.h:45
std::vector< LogParsingHelpers::LogLevel > historyLevel
Definition DeviceInfo.h:59
pid_t pid
The pid of the device associated to this device.
Definition DeviceInfo.h:36
std::string unprinted
An unterminated string which is not ready to be printed yet.
Definition DeviceInfo.h:63
LogParsingHelpers::LogLevel logLevel
The minimum log level for log messages sent/displayed by this device.
Definition DeviceInfo.h:49
LogParsingHelpers::LogLevel maxLogLevel
The maximum log level ever seen by this device.
Definition DeviceInfo.h:47
size_t historyPos
The position inside the history circular buffer of this device.
Definition DeviceInfo.h:43
std::string firstSevereError
Definition DeviceInfo.h:60
static void validate(WorkflowSpec const &workflow)
static boost::program_options::options_description getForwardedDeviceOptions()
define the options which are forwarded to every child
static std::string reworkTimeslicePlaceholder(std::string const &str, DeviceSpec const &spec)
static void prepareArguments(bool defaultQuiet, bool defaultStopped, bool intereactive, unsigned short driverPort, DriverConfig const &driverConfig, std::vector< DataProcessorInfo > const &processorInfos, std::vector< DeviceSpec > const &deviceSpecs, std::vector< DeviceExecution > &deviceExecutions, std::vector< DeviceControl > &deviceControls, std::vector< ConfigParamSpec > const &detectedOptions, std::string const &uniqueWorkflowId)
static void reworkShmSegmentSize(std::vector< DataProcessorInfo > &infos)
static void reworkHomogeneousOption(std::vector< DataProcessorInfo > &infos, char const *name, char const *defaultValue)
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
std::string id
The id of the device, including time-pipelining and suffix.
Definition DeviceSpec.h:52
static int parseTracingFlags(std::string const &events)
std::vector< DeviceControl > & controls
bool batch
Whether the driver was started in batch mode or not.
std::vector< DriverState > forcedTransitions
std::vector< Callback > callbacks
DriverControlState state
Current state of the state machine player.
std::vector< DeviceSpec > * specs
std::vector< ServiceSummaryHandling > * summaryCallbacks
std::vector< DeviceMetricsInfo > * metrics
std::vector< DeviceInfo > * infos
static std::vector< ForwardingPolicy > createDefaultPolicies()
static void dumpDeviceSpec2Graphviz(std::ostream &, const Devices &specs)
Helper to dump a set of devices as a graphviz file.
static void dumpDataProcessorSpec2Graphviz(std::ostream &, const WorkflowSpec &specs, std::vector< std::pair< int, int > > const &edges={})
Helper to dump a workflow as a graphviz file.
std::function< void(void)> callback
LogLevel
Possible log levels for device log entries.
static LogLevel parseTokenLevel(std::string_view const s)
static void dumpDeviceSpec2Mermaid(std::ostream &, const Devices &specs)
Helper to dump a set of devices as a mermaid file.
Temporary struct to hold a metric after it has been parsed.
static bool dumpMetricsToJSON(std::vector< DeviceMetricsInfo > const &metrics, DeviceMetricsInfo const &driverMetrics, std::vector< DeviceSpec > const &specs, std::vector< std::regex > const &metricsToDump) noexcept
static bool isResourcesMonitoringEnabled(unsigned short interval) noexcept
Information about the running workflow.
void declareService(ServiceSpec const &spec, DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt=ServiceRegistry::globalDeviceSalt())
static OverrideServiceSpecs parseOverrides(char const *overrideString)
static ServiceSpecs filterDisabled(ServiceSpecs originals, OverrideServiceSpecs const &overrides)
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
Helper struct to keep track of the results of the topological sort.
static std::vector< TopologyPolicy > createDefaultPolicies()
std::function< bool(DataProcessorSpec const &dependent, DataProcessorSpec const &ascendant)> DependencyChecker
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)
static bool import(std::istream &s, std::vector< DataProcessorSpec > &workflow, std::vector< DataProcessorInfo > &metadata, CommandInfo &command)
uint16_t de
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153