Skip to content

Commit 63b1cf2

Browse files
committed
DPL: introduce Lifetime::OutOfBand
This lifetime can be used to trigger processing on messages incoming to an out of band fairmq channel. The idea is that we can use this e.g. for the feedback look required by rate limiting the readout-proxy.
1 parent f3e5554 commit 63b1cf2

File tree

9 files changed

+118
-24
lines changed

9 files changed

+118
-24
lines changed

Framework/Core/include/Framework/DeviceState.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ struct DeviceState {
5555
WS_CLOSING = 256, // Events related to WS shutting down
5656
WS_READING = 512, // Events related to WS shutting down
5757
WS_WRITING = 1024, // Events related to WS shutting down
58-
ASYNC_NOTIFICATION = 2048
58+
ASYNC_NOTIFICATION = 2048,
59+
OOB_ACTIVITY = 4096 // Out of band activity
5960
};
6061

6162
std::vector<InputChannelInfo> inputChannelInfos;
@@ -79,6 +80,8 @@ struct DeviceState {
7980
std::vector<uv_poll_t*> activeOutputPollers;
8081
/// The list of active signal handlers
8182
std::vector<uv_signal_t*> activeSignals;
83+
/// The list for active out-of-bound pollers
84+
std::vector<uv_poll_t*> activeOutOfBandPollers;
8285

8386
uv_async_t* awakeMainThread = nullptr;
8487

Framework/Core/include/Framework/InputRoute.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct DeviceState;
2727
class ConfigParamRegistry;
2828

2929
struct RouteConfigurator {
30-
using CreationConfigurator = std::function<ExpirationHandler::Creator(DeviceState&, ConfigParamRegistry const&)>;
30+
using CreationConfigurator = std::function<ExpirationHandler::Creator(DeviceState&, ServiceRegistry&, ConfigParamRegistry const&)>;
3131
using DanglingConfigurator = std::function<ExpirationHandler::Checker(DeviceState&, ConfigParamRegistry const&)>;
3232
using ExpirationConfigurator = std::function<ExpirationHandler::Handler(DeviceState&, ConfigParamRegistry const&)>;
3333

Framework/Core/include/Framework/Lifetime.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ enum struct Lifetime {
3636
/// This comes handy e.g. to handle Raw Data, since DataDistribution will provide
3737
/// everything in one go so whatever is expected but not there, for whatever reason
3838
/// will be substituted with a dummy entry.
39-
Optional
39+
Optional,
40+
/// An input which is materialised with the contents of some out of band
41+
/// FairMQ channel.
42+
OutOfBand
4043
};
4144

4245
} // namespace o2::framework

Framework/Core/include/Framework/LifetimeHelpers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ struct LifetimeHelpers {
6666
std::string const& overrideTimestamp,
6767
std::string const& sourceChannel);
6868

69+
/// Build a fetcher for an object from an out of band FairMQ channel whenever the record is expired.
70+
/// @a spec is the associated InputSpec
71+
/// @a channelName the channel we should Receive data from
72+
static ExpirationHandler::Handler fetchFromFairMQ(InputSpec const& spec,
73+
std::string const& channelName);
74+
6975
/// Create an entry in the registry for histograms on the first
7076
/// FIXME: actually implement this
7177
/// FIXME: provide a way to customise the histogram from the configuration.

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,22 +289,6 @@ void DataProcessingDevice::Init()
289289

290290
mExpirationHandlers.clear();
291291

292-
auto distinct = DataRelayerHelpers::createDistinctRouteIndex(mSpec.inputs);
293-
int i = 0;
294-
for (auto& di : distinct) {
295-
auto& route = mSpec.inputs[di];
296-
if (route.configurator.has_value() == false) {
297-
i++;
298-
continue;
299-
}
300-
ExpirationHandler handler{
301-
RouteIndex{i++},
302-
route.matcher.lifetime,
303-
route.configurator->creatorConfigurator(mState, *mConfigRegistry),
304-
route.configurator->danglingConfigurator(mState, *mConfigRegistry),
305-
route.configurator->expirationConfigurator(mState, *mConfigRegistry)};
306-
mExpirationHandlers.emplace_back(std::move(handler));
307-
}
308292

309293
if (mInit) {
310294
InitContext initContext{*mConfigRegistry, mServiceRegistry};
@@ -380,6 +364,23 @@ void on_awake_main_thread(uv_async_t* handle)
380364
} // namespace
381365
void DataProcessingDevice::InitTask()
382366
{
367+
auto distinct = DataRelayerHelpers::createDistinctRouteIndex(mSpec.inputs);
368+
int i = 0;
369+
for (auto& di : distinct) {
370+
auto& route = mSpec.inputs[di];
371+
if (route.configurator.has_value() == false) {
372+
i++;
373+
continue;
374+
}
375+
ExpirationHandler handler{
376+
RouteIndex{i++},
377+
route.matcher.lifetime,
378+
route.configurator->creatorConfigurator(mState, mServiceRegistry, *mConfigRegistry),
379+
route.configurator->danglingConfigurator(mState, *mConfigRegistry),
380+
route.configurator->expirationConfigurator(mState, *mConfigRegistry)};
381+
mExpirationHandlers.emplace_back(std::move(handler));
382+
}
383+
383384
if (mState.awakeMainThread == nullptr) {
384385
mState.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
385386
mState.awakeMainThread->data = &mState;

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "Framework/ComputingResource.h"
3434
#include "Framework/Logger.h"
3535
#include "Framework/RuntimeError.h"
36+
#include "Framework/RawDeviceService.h"
3637
#include "ProcessingPoliciesHelpers.h"
3738

3839
#include "WorkflowHelpers.h"
@@ -74,12 +75,12 @@ void signal_callback(uv_signal_t* handle, int)
7475
struct ExpirationHandlerHelpers {
7576
static RouteConfigurator::CreationConfigurator dataDrivenConfigurator()
7677
{
77-
return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::dataDrivenCreation(); };
78+
return [](DeviceState&, ServiceRegistry&, ConfigParamRegistry const&) { return LifetimeHelpers::dataDrivenCreation(); };
7879
}
7980

8081
static RouteConfigurator::CreationConfigurator timeDrivenConfigurator(InputSpec const& matcher)
8182
{
82-
return [matcher](DeviceState& state, ConfigParamRegistry const& options) {
83+
return [matcher](DeviceState& state, ServiceRegistry&, ConfigParamRegistry const& options) {
8384
std::string rateName = std::string{"period-"} + matcher.binding;
8485
auto period = options.get<int>(rateName.c_str());
8586
// We create a timer to wake us up. Notice the actual
@@ -97,7 +98,7 @@ struct ExpirationHandlerHelpers {
9798

9899
static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
99100
{
100-
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState& state, ConfigParamRegistry const& options) {
101+
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState& state, ServiceRegistry&, ConfigParamRegistry const& options) {
101102
std::string startName = std::string{"start-value-"} + matcher.binding;
102103
std::string endName = std::string{"end-value-"} + matcher.binding;
103104
std::string stepName = std::string{"step-value-"} + matcher.binding;
@@ -119,7 +120,7 @@ struct ExpirationHandlerHelpers {
119120

120121
static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
121122
{
122-
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState&, ConfigParamRegistry const& options) {
123+
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState&, ServiceRegistry&, ConfigParamRegistry const& options) {
123124
std::string startName = std::string{"start-value-"} + matcher.binding;
124125
std::string endName = std::string{"end-value-"} + matcher.binding;
125126
std::string stepName = std::string{"step-value-"} + matcher.binding;
@@ -164,6 +165,52 @@ struct ExpirationHandlerHelpers {
164165
};
165166
}
166167

168+
static RouteConfigurator::CreationConfigurator fairmqDrivenConfiguration(InputSpec const& spec, int inputTimeslice, int maxInputTimeslices)
169+
{
170+
return [spec, inputTimeslice, maxInputTimeslices](DeviceState& state, ServiceRegistry& services, ConfigParamRegistry const& options) {
171+
std::string channelNameOption = std::string{"out-of-band-channel-name-"} + spec.binding;
172+
auto channelName = options.get<std::string>(channelNameOption.c_str());
173+
auto device = services.get<RawDeviceService>().device();
174+
auto& channel = device->fChannels[channelName];
175+
176+
// We assume there is always a ZeroMQ socket behind.
177+
int zmq_fd = 0;
178+
size_t zmq_fd_len = sizeof(zmq_fd);
179+
uv_poll_t* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
180+
channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
181+
if (zmq_fd == 0) {
182+
throw runtime_error_f("Cannot get file descriptor for channel %s", channelName.c_str());
183+
}
184+
LOG(debug) << "Polling socket for " << channel[0].GetName();
185+
186+
state.activeOutOfBandPollers.push_back(poller);
187+
188+
// We always create entries whenever we get invoked.
189+
// Notice this works only if we are the only input.
190+
// Otherwise we should check the channel for new data,
191+
// before we create an entry.
192+
return LifetimeHelpers::enumDrivenCreation(0, -1, 1, inputTimeslice, maxInputTimeslices, 1);
193+
};
194+
}
195+
196+
static RouteConfigurator::DanglingConfigurator danglingOutOfBandConfigurator()
197+
{
198+
return [](DeviceState&, ConfigParamRegistry const& options) {
199+
// If the entry is there it means that something awoke
200+
// the loop, so we can materialise it immediately.
201+
return LifetimeHelpers::expireAlways();
202+
};
203+
}
204+
205+
static RouteConfigurator::ExpirationConfigurator expiringOutOfBandConfigurator(InputSpec const& spec)
206+
{
207+
return [spec](DeviceState&, ConfigParamRegistry const& options) {
208+
std::string channelNameOption = std::string{"out-of-band-channel-name-"} + spec.binding;
209+
auto channelName = options.get<std::string>(channelNameOption.c_str());
210+
return LifetimeHelpers::fetchFromFairMQ(spec, channelName);
211+
};
212+
}
213+
167214
static RouteConfigurator::DanglingConfigurator danglingQAConfigurator()
168215
{
169216
// FIXME: this should really be expireAlways. However, since we do not have
@@ -234,7 +281,7 @@ struct ExpirationHandlerHelpers {
234281
/// This behaves as data. I.e. we never create it unless data arrives.
235282
static RouteConfigurator::CreationConfigurator createOptionalConfigurator()
236283
{
237-
return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::dataDrivenCreation(); };
284+
return [](DeviceState&, ServiceRegistry&, ConfigParamRegistry const&) { return LifetimeHelpers::dataDrivenCreation(); };
238285
}
239286

240287
/// This will always exipire an optional record when no data is received.
@@ -663,6 +710,12 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
663710
std::nullopt};
664711

665712
switch (consumer.inputs[edge.consumerInputIndex].lifetime) {
713+
case Lifetime::OutOfBand:
714+
route.configurator = {
715+
ExpirationHandlerHelpers::fairmqDrivenConfiguration(inputSpec, consumerDevice.inputTimesliceId, consumerDevice.maxInputTimeslices),
716+
ExpirationHandlerHelpers::danglingOutOfBandConfigurator(),
717+
ExpirationHandlerHelpers::expiringOutOfBandConfigurator(inputSpec)};
718+
break;
666719
case Lifetime::Condition:
667720
route.configurator = {
668721
ExpirationHandlerHelpers::dataDrivenConfigurator(),

Framework/Core/src/DeviceSpecHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "Framework/ProcessingPolicies.h"
2828
#include "ResourceManager.h"
2929
#include "WorkflowHelpers.h"
30+
31+
#include <FairMQDevice.h>
3032
#include <boost/program_options.hpp>
3133

3234
#include <vector>

Framework/Core/src/LifetimeHelpers.cxx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,24 @@ ExpirationHandler::Handler
374374
};
375375
}
376376

377+
ExpirationHandler::Handler
378+
LifetimeHelpers::fetchFromFairMQ(InputSpec const& spec,
379+
std::string const& channelName)
380+
{
381+
return [spec, channelName](ServiceRegistry& services, PartRef& ref, data_matcher::VariableContext& variables) -> void {
382+
auto& rawDeviceService = services.get<RawDeviceService>();
383+
auto device = rawDeviceService.device();
384+
385+
// Receive parts and put them in the PartRef
386+
// we know this is not blocking because we were polled
387+
// on the channel.
388+
FairMQParts parts;
389+
device->Receive(parts, channelName, 0);
390+
ref.header = std::move(parts.At(0));
391+
ref.payload = std::move(parts.At(1));
392+
};
393+
}
394+
377395
/// Create an entry in the registry for histograms on the first
378396
/// FIXME: actually implement this
379397
/// FIXME: provide a way to customise the histogram from the configuration.
@@ -498,6 +516,7 @@ std::ostream& operator<<(std::ostream& oss, Lifetime const& val)
498516
STREAM_ENUM(Lifetime::Enumeration)
499517
STREAM_ENUM(Lifetime::Signal)
500518
STREAM_ENUM(Lifetime::Optional)
519+
STREAM_ENUM(Lifetime::OutOfBand)
501520
};
502521
return oss;
503522
}

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
336336
}
337337
requestedCCDBs.emplace_back(input);
338338
} break;
339+
case Lifetime::OutOfBand: {
340+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
341+
auto hasOption = std::any_of(processor.options.begin(), processor.options.end(), [&input](auto const& option) { return (option.name == "out-of-band-channel-name-" + input.binding); });
342+
if (hasOption == false) {
343+
processor.options.push_back(ConfigParamSpec{"out-of-band-channel-name-" + input.binding, VariantType::String, "out-of-band", {"channel to listen for out of band data"}});
344+
}
345+
} break;
339346
case Lifetime::QA:
340347
case Lifetime::Transient:
341348
case Lifetime::Timeframe:

0 commit comments

Comments
 (0)