Skip to content

Commit 7d9df5c

Browse files
committed
consolidate writer configuration between injection and adjusting the topology
1 parent ba0b518 commit 7d9df5c

File tree

3 files changed

+40
-63
lines changed

3 files changed

+40
-63
lines changed

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -691,33 +691,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
691691
}
692692
}
693693

694-
// replace writer as some outputs may have become dangling and some are now consumed
695-
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
696-
697-
// create DataOutputDescriptor
698-
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
699-
700-
// select outputs of type AOD which need to be saved
701-
// ATTENTION: if there are dangling outputs the getGlobalAODSink
702-
// has to be created in any case!
703-
dec.outputsInputsAOD.clear();
704-
705-
for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
706-
if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
707-
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
708-
if (!ds.empty() || isDangling[ii]) {
709-
dec.outputsInputsAOD.emplace_back(outputsInputs[ii]);
710-
}
711-
}
712-
}
694+
WorkflowHelpers::injectAODWriter(workflow, ctx);
713695

714-
// file sink for any AOD output
715-
if (!dec.outputsInputsAOD.empty()) {
716-
// add TFNumber and TFFilename as input to the writer
717-
dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
718-
dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
719-
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
720-
}
721696
// Move the dummy sink at the end, if needed
722697
for (size_t i = 0; i < workflow.size(); ++i) {
723698
if (workflow[i].name == "internal-dpl-injected-dummy-sink") {

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -532,43 +532,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
532532
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
533533
extraSpecs.clear();
534534

535-
/// Analyze all ouputs
536-
auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
537-
dec.isDangling = isDanglingTmp;
538-
dec.outputsInputs = outputsInputsTmp;
539-
540-
// create DataOutputDescriptor
541-
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
542-
543-
// select outputs of type AOD which need to be saved
544-
// ATTENTION: if there are dangling outputs the getGlobalAODSink
545-
// has to be created in any case!
546-
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
547-
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
548-
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
549-
if (ds.size() > 0 || dec.isDangling[ii]) {
550-
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
551-
}
552-
}
553-
}
554-
555-
// file sink for any AOD output
556-
if (dec.outputsInputsAOD.size() > 0) {
557-
// add TFNumber and TFFilename as input to the writer
558-
dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
559-
dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
560-
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
561-
extraSpecs.push_back(fileSink);
562-
563-
auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec& spec) -> bool {
564-
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
565-
});
566-
size_t ii = std::distance(dec.outputsInputs.begin(), it);
567-
dec.isDangling[ii] = false;
568-
}
569-
570-
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
571-
extraSpecs.clear();
535+
injectAODWriter(workflow, ctx);
572536

573537
// Select dangling outputs which are not of type AOD
574538
std::vector<InputSpec> redirectedOutputsInputs;
@@ -712,6 +676,41 @@ void WorkflowHelpers::adjustTopology(WorkflowSpec& workflow, ConfigContext const
712676
}
713677
}
714678

679+
void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx)
680+
{
681+
auto& dec = ctx.services().get<DanglingEdgesContext>();
682+
/// Analyze all ouputs
683+
std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
684+
685+
// create DataOutputDescriptor
686+
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
687+
688+
// select outputs of type AOD which need to be saved
689+
dec.outputsInputsAOD.clear();
690+
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
691+
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
692+
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
693+
if (ds.size() > 0 || dec.isDangling[ii]) {
694+
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
695+
}
696+
}
697+
}
698+
699+
// file sink for any AOD output
700+
if (dec.outputsInputsAOD.size() > 0) {
701+
// add TFNumber and TFFilename as input to the writer
702+
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
703+
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
704+
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
705+
workflow.push_back(fileSink);
706+
707+
auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
708+
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
709+
});
710+
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
711+
}
712+
}
713+
715714
void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
716715
std::vector<DeviceConnectionEdge>& logicalEdges,
717716
std::vector<OutputSpec>& outputs,

Framework/Core/src/WorkflowHelpers.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ struct WorkflowHelpers {
182182
// @a ctx the context for the configuration phase
183183
static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx);
184184

185+
// Function to correctly add AOD writer
186+
static void injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx);
187+
185188
// Final adjustments to @a workflow after service devices have been injected.
186189
static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx);
187190

0 commit comments

Comments
 (0)