337 std::shared_ptr<WriterType> writer;
339 std::vector<std::pair<std::string, std::string>> branchNameOptions;
343 int nEventsAutoSave = -1;
345 std::unordered_set<std::string> activeInputs;
357 auto processAttributes = std::make_shared<ProcessAttributes>();
358 processAttributes->writer = mWriter;
359 processAttributes->branchNameOptions = mBranchNameOptions;
360 processAttributes->terminationCondition = std::move(mTerminationCondition);
361 processAttributes->preprocessor = std::move(mPreprocessor);
365 for (
auto const& input : mInputs) {
366 processAttributes->activeInputs.emplace(input.binding);
368 processAttributes->nofBranches = mNofBranches;
372 auto& branchNameOptions = processAttributes->branchNameOptions;
373 auto filename = ic.options().get<std::string>(
"outfile");
374 auto treename = ic.options().get<std::string>(
"treename");
375 auto treetitle = ic.options().get<std::string>(
"treetitle");
376 auto outdir = ic.options().get<std::string>(
"output-dir");
377 processAttributes->nEvents = ic.options().get<
int>(
"nevents");
378 if (processAttributes->nEvents > 0 && processAttributes->activeInputs.size() != processAttributes->nofBranches) {
379 LOG(warning) <<
"the n inputs serve in total m branches with n != m, this means that there will be data for\n"
380 <<
"different branches on the same input. Be aware that the --nevents option might lead to incomplete\n"
381 <<
"data in the output file as the number of processed input sets is counted";
383 processAttributes->nEventsAutoSave = ic.options().get<
int>(
"autosave");
385 processAttributes->terminationPolicy =
TerminationPolicyMap.at(ic.options().get<std::string>(
"terminate"));
386 }
catch (std::out_of_range&) {
387 throw std::invalid_argument(std::string(
"invalid termination policy: ") + ic.options().get<std::string>(
"terminate"));
389 if (
filename.empty() || treename.empty()) {
390 throw std::invalid_argument(
"output file name and tree name are mandatory options");
392 for (
size_t branchIndex = 0; branchIndex < branchNameOptions.size(); branchIndex++) {
394 if (branchNameOptions[branchIndex].
first.empty()) {
397 auto branchName = ic.options().get<std::string>(branchNameOptions[branchIndex].first.c_str());
398 processAttributes->writer->setBranchName(branchIndex, branchName.c_str());
400 if (!outdir.empty() && outdir !=
"none") {
401 if (outdir.back() !=
'/') {
406 processAttributes->writer->init(
filename.c_str(), treename.c_str(), treetitle.c_str());
408 auto finishWriting = [processAttributes]() {
409 processAttributes->writer->close();
412 ic.services().get<
CallbackService>().set<CallbackService::Id::Stop>(finishWriting);
415 auto& writer = processAttributes->writer;
416 auto& terminationPolicy = processAttributes->terminationPolicy;
417 auto& terminationCondition = processAttributes->terminationCondition;
418 auto& preprocessor = processAttributes->preprocessor;
419 auto& activeInputs = processAttributes->activeInputs;
420 auto&
counter = processAttributes->counter;
421 auto&
nEvents = processAttributes->nEvents;
422 auto& nEventsAutoSave = processAttributes->nEventsAutoSave;
423 if (writer->isClosed()) {
431 auto checkProcessing = [&terminationCondition, &activeInputs](
auto const& inputs) {
432 bool doProcessing =
true;
433 if (std::holds_alternative<TerminationCondition::CheckProcessing>(terminationCondition.check)) {
434 auto&
check = std::get<TerminationCondition::CheckProcessing>(terminationCondition.check);
435 for (
auto const&
ref : inputs) {
436 auto iter = activeInputs.find(
ref.spec->binding);
440 doProcessing =
false;
442 if (iter != activeInputs.end() && ready) {
444 activeInputs.erase(iter);
453 auto checkReady = [&terminationCondition, &activeInputs](
auto const& inputs) {
454 if (std::holds_alternative<TerminationCondition::CheckReady>(terminationCondition.check)) {
455 auto&
check = std::get<TerminationCondition::CheckReady>(terminationCondition.check);
456 for (
auto const&
ref : inputs) {
457 auto iter = activeInputs.find(
ref.spec->binding);
458 if (iter != activeInputs.end() &&
check(
ref)) {
460 activeInputs.erase(iter);
464 return activeInputs.size() == 0;
470 if (checkProcessing(pc.inputs())) {
471 (*writer)(pc.inputs());
478 }
else if (nEventsAutoSave > 0 &&
counter && (
counter % nEventsAutoSave) == 0) {
483 return processingFct;
492 {
"nevents",
VariantType::Int, mDefaultNofEvents, {
"Number of events to execute"}},
493 {
"autosave",
VariantType::Int, mDefaultAutoSave, {
"Autosave after number of events"}},
494 {
"terminate",
VariantType::String, mDefaultTerminationPolicy.c_str(), {
"Terminate the 'process' or 'workflow'"}},
496 for (
size_t branchIndex = 0; branchIndex < mBranchNameOptions.size(); branchIndex++) {
498 if (mBranchNameOptions[branchIndex].
first.empty()) {
503 mBranchNameOptions[branchIndex].second.c_str(),
504 {
"configurable branch name"}
508 mInputRoutes.insert(mInputRoutes.end(), mInputs.begin(), mInputs.end());
511 mProcessName.c_str(),