26 auto stateWatcher = [
this](fair::mq::State newState) {
27 static bool first =
true;
28 LOG(info) <<
"State changed to " << newState;
29 if (newState != fair::mq::State::Ready) {
30 LOG(info) <<
"Not ready state, ignoring";
37 fair::mq::Parts parts;
38 LOG(info) <<
"Draining messages" << std::endl;
39 auto&
channels = this->GetChannels();
40 LOG(info) <<
"Number of channels: " <<
channels.size();
42 LOG(info) <<
"No messages to drain";
45 auto& channel =
channels.at(
"data")[0];
46 while (this->NewStatePending() ==
false) {
47 channel.Receive(parts, 10);
48 if (parts.Size() != 0) {
49 LOG(info) <<
"Draining" << parts.Size() <<
"messages";
52 LOG(info) <<
"Done draining";
54 this->SubscribeToStateChange(
"99-drain", stateWatcher);