Project
Loading...
Searching...
No Matches
o2DeadlockReproducer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <fairmq/Device.h>
13#include <fairmq/runDevice.h>
14
15#include <string>
16
17namespace bpo = boost::program_options;
18
19void addCustomOptions(bpo::options_description& options)
20{
21}
22
25 {
26 auto stateWatcher = [this](fair::mq::State newState) {
27 static bool first = true;
28 LOG(info) << "State changed to " << newState;
29 if (newState != fair::mq::State::Ready) {
30 LOG(info) << "Not ready state, ignoring";
31 return;
32 }
33 if (first) {
34 first = false;
35 return;
36 }
37 fair::mq::Parts parts;
38 LOG(info) << "Draining messages" << std::endl;
39 auto& channels = this->GetChannels();
40 LOG(info) << "Number of channels: " << channels.size();
41 if (channels.size() == 0) {
42 LOG(info) << "No messages to drain";
43 return;
44 }
45 auto& channel = channels.at("data")[0];
46 while (this->NewStatePending() == false) {
47 channel.Receive(parts, 10);
48 if (parts.Size() != 0) {
49 LOG(info) << "Draining" << parts.Size() << "messages";
50 }
51 }
52 LOG(info) << "Done draining";
53 };
54 this->SubscribeToStateChange("99-drain", stateWatcher);
55 }
56
57 void Run() override
58 {
59 LOG(info) << "Simply exit";
60 }
61};
62
63std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
64{
65 return std::make_unique<Deadlocked>();
66}
o2::devices::O2SimDevice * getDevice()
void addCustomOptions(bpo::options_description &options)
void Run() override
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels