Skip to content

Commit 46a21bb

Browse files
ktfdavidrohr
authored andcommitted
DPL: keep dropping data while in ready, unless a new state was requested
For some reason, before we were dropping data only for 5 seconds. To be seen if this was needed to prevent some timeout by ECS / ODC. This has now changed to keep dropping data until the "cleanup" property changes or until there is a NewStatePending().
1 parent 2347696 commit 46a21bb

File tree

4 files changed

+17
-43
lines changed

4 files changed

+17
-43
lines changed

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ class DataProcessingDevice : public fair::mq::Device
119119
/// Handle to wake up the main loop from other threads
120120
/// e.g. when FairMQ notifies some callback in an asynchronous way
121121
uv_async_t* mAwakeHandle = nullptr;
122-
int64_t mCleanupCount = -1;
123122
};
124123

125124
} // namespace o2::framework

Framework/Core/include/Framework/DeviceState.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <string>
2020
#include <map>
2121
#include <utility>
22+
#include <atomic>
2223

2324
typedef struct uv_loop_s uv_loop_t;
2425
typedef struct uv_timer_s uv_timer_t;
@@ -59,6 +60,7 @@ struct DeviceState {
5960
std::vector<InputChannelInfo> inputChannelInfos;
6061
StreamingState streaming = StreamingState::Streaming;
6162
bool quitRequested = false;
63+
std::atomic<int64_t> cleanupCount = -1;
6264

6365
/// ComputingQuotaOffers which have not yet been
6466
/// evaluated by the ComputingQuotaEvaluator

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,16 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
136136
mServiceRegistry{registry},
137137
mProcessingPolicies{policies}
138138
{
139-
GetConfig()->Subscribe<std::string>("dpl", [&cleanupCount = mCleanupCount, &registry = mServiceRegistry](const std::string& key, std::string value) {
139+
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
140140
if (key == "cleanup") {
141+
auto ref = ServiceRegistryRef{registry, ServiceRegistry::globalDeviceSalt()};
142+
auto& deviceState = ref.get<DeviceState>();
143+
int64_t cleanupCount = deviceState.cleanupCount.load();
141144
int64_t newCleanupCount = std::stoll(value);
142145
if (newCleanupCount <= cleanupCount) {
143146
return;
144147
}
145-
cleanupCount = newCleanupCount;
146-
auto ref = ServiceRegistryRef{registry, ServiceRegistry::globalDeviceSalt()};
147-
auto& deviceState = ref.get<DeviceState>();
148+
deviceState.cleanupCount.store(newCleanupCount);
148149
for (auto& info : deviceState.inputChannelInfos) {
149150
fair::mq::Parts parts;
150151
while (info.channel->Receive(parts, 0)) {

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -687,53 +687,25 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
687687
if (fair::mq::State{state} != fair::mq::State::Ready) {
688688
return;
689689
}
690-
// We keep track of whether or not all channels have seen a new state.
691-
std::vector<bool> lastNewStatePending(deviceState.inputChannelInfos.size(), false);
692690
uv_update_time(deviceState.loop);
693-
auto start = uv_now(deviceState.loop);
694-
695-
// Continue iterating until all channels have seen a new state.
696-
while (std::all_of(lastNewStatePending.begin(), lastNewStatePending.end(), [](bool b) { return b; }) != true) {
697-
if (uv_now(deviceState.loop) - start > 5000) {
698-
LOGP(info, "Timeout while draining messages, going to next state anyway.");
699-
break;
700-
}
691+
bool doDrain = true;
692+
// Cleanup count is set by the cleanup property of the device.
693+
// It is incremented every time the device is cleaned up.
694+
// We use it to detect when the device is cleaned up.
695+
int64_t cleanupCount = deviceState.cleanupCount.load();
696+
697+
// Continue iterating we saw the cleanup property being reset or
698+
// the device state changing.
699+
while (doDrain) {
700+
doDrain = device->NewStatePending() || deviceState.cleanupCount == cleanupCount;
701701
fair::mq::Parts parts;
702702
for (size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
703703
auto& info = deviceState.inputChannelInfos[ci];
704704
// We only care about rawfmq channels.
705705
if (info.channelType != ChannelAccountingType::RAWFMQ) {
706-
lastNewStatePending[ci] = true;
707-
continue;
708-
}
709-
// This means we have not set things up yet. I.e. the first iteration from
710-
// ready to run has not happened yet.
711-
if (info.channel == nullptr) {
712-
lastNewStatePending[ci] = true;
713706
continue;
714707
}
715708
info.channel->Receive(parts, 10);
716-
// Handle both cases of state changes:
717-
//
718-
// - The state has been changed from the outside and FairMQ knows about it.
719-
// - The state has been changed from the GUI, and deviceState.nextFairMQState knows about it.
720-
//
721-
// This latter case is probably better handled from DPL itself, after all it's fair to
722-
// assume we need to switch state as soon as the GUI notifies us.
723-
// For now we keep it here to avoid side effects.
724-
lastNewStatePending[ci] = device->NewStatePending() || (deviceState.nextFairMQState.empty() == false);
725-
if (parts.Size() == 0) {
726-
continue;
727-
}
728-
if (!lastNewStatePending[ci]) {
729-
LOGP(warn, "Unexpected {} message on channel {} while in Ready state. Dropping.", parts.Size(), info.channel->GetName());
730-
} else if (lastNewStatePending[ci]) {
731-
LOGP(detail, "Some {} parts were received on channel {} while switching away from Ready. Keeping.", parts.Size(), info.channel->GetName());
732-
for (int pi = 0; pi < parts.Size(); ++pi) {
733-
info.parts.fParts.emplace_back(std::move(parts.At(pi)));
734-
}
735-
info.readPolled = true;
736-
}
737709
}
738710
// Keep state transitions going also when running with the standalone GUI.
739711
uv_run(deviceState.loop, UV_RUN_NOWAIT);

0 commit comments

Comments
 (0)