Skip to content

Commit d67b00d

Browse files
knopers8ktf
authored andcommitted
Fix forwarding inputs to consumers which are time-pipelined
1 parent d9e8683 commit d67b00d

File tree

3 files changed

+6
-3
lines changed

3 files changed

+6
-3
lines changed

Framework/Core/include/Framework/ForwardRoute.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ namespace framework
2323
/// the InputSpec @a matcher matches an input which should also go to
2424
/// @a channel
2525
struct ForwardRoute {
26+
size_t timeslice;
27+
size_t maxTimeslices;
2628
InputSpec matcher;
2729
std::string channel;
2830
};

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,8 @@ bool DataProcessingDevice::tryDispatchComputation()
389389
LOG(DEBUG) << dh->dataOrigin.str;
390390
LOG(DEBUG) << dh->dataDescription.str;
391391
LOG(DEBUG) << dh->subSpecification;
392-
if (DataSpecUtils::match(forward.matcher, dh->dataOrigin,
393-
dh->dataDescription,
394-
dh->subSpecification)) {
392+
if (DataSpecUtils::match(forward.matcher, dh->dataOrigin, dh->dataDescription, dh->subSpecification)
393+
&& (dph->startTime % forward.maxTimeslices) == forward.timeslice) {
395394

396395
if (header.get() == nullptr) {
397396
LOG(ERROR) << "Missing header!";

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
255255
ForwardRoute route;
256256
route.matcher = workflow[edge.consumer].inputs[edge.consumerInputIndex];
257257
route.channel = channel.name;
258+
route.timeslice = edge.timeIndex;
259+
route.maxTimeslices = consumer.maxInputTimeslices;
258260
device.forwards.emplace_back(route);
259261
}
260262
};

0 commit comments

Comments
 (0)