Skip to content

Commit 898736e

Browse files
matthiasrichterktf
authored andcommitted
DPL: Optimization for associating output messages with actual channels
This is a follow-up to commit f6d950b 1. Introducing a registry of channel names in the MessageContext The unique references of string objects from the registry allow fast access and association of messages to actual FairMQChannels by avoiding string allocation. 2. Optimization to unordered_map Using pointer to channel string as key to avoid allocation of strings. The ContextObjects hold references of the string variables from the registry. The string references from the OutputRoute definition of the device do not fulfill the requirement to refer to one single string object per channel, because the device stores a vector of OutputRoutes.
1 parent 4f6ecd8 commit 898736e

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

Framework/Core/include/Framework/MessageContext.h

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class MessageContext
6060
class ContextObject
6161
{
6262
public:
63-
ContextObject() = default;
63+
ContextObject() = delete;
6464
ContextObject(FairMQMessagePtr&& headerMsg, FairMQMessagePtr&& payloadMsg, const std::string& bindingChannel)
6565
: mParts{}, mChannel{bindingChannel}
6666
{
@@ -114,7 +114,7 @@ class MessageContext
114114

115115
protected:
116116
FairMQParts mParts;
117-
std::string mChannel;
117+
std::string const& mChannel;
118118
};
119119

120120
/// TrivialObject handles a message object
@@ -125,14 +125,14 @@ class MessageContext
125125
TrivialObject() = delete;
126126
/// constructor consuming the header and payload messages for a given channel by move
127127
template <typename ContextType>
128-
TrivialObject(ContextType*, FairMQMessagePtr&& headerMsg, FairMQMessagePtr&& payloadMsg, const std::string& bindingChannel)
129-
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), std::forward<FairMQMessagePtr>(payloadMsg), bindingChannel)
128+
TrivialObject(ContextType* context, FairMQMessagePtr&& headerMsg, FairMQMessagePtr&& payloadMsg, const std::string& bindingChannel)
129+
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), std::forward<FairMQMessagePtr>(payloadMsg), context->getChannelRef(bindingChannel))
130130
{
131131
}
132132
/// constructor taking header message by move and creating the paypload message
133133
template <typename ContextType, typename... Args>
134134
TrivialObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, Args... args)
135-
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), context->createMessage(bindingChannel, index, std::forward<Args>(args)...), bindingChannel)
135+
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), context->createMessage(bindingChannel, index, std::forward<Args>(args)...), context->getChannelRef(bindingChannel))
136136
{
137137
}
138138
~TrivialObject() override = default;
@@ -162,7 +162,7 @@ class MessageContext
162162
/// constructor taking header message by move and creating the paypload message
163163
template <typename ContextType, typename... Args>
164164
ContainerRefObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, Args&&... args)
165-
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), bindingChannel),
165+
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), context->getChannelRef(bindingChannel)),
166166
// the transport factory
167167
mFactory{context->proxy().getTransport(bindingChannel, index)},
168168
// the memory resource takes ownership of the message
@@ -242,7 +242,7 @@ class MessageContext
242242
/// constructor taking header message by move and creating the payload message for the span
243243
template <typename ContextType>
244244
SpanObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, size_t nElements)
245-
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), bindingChannel)
245+
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), context->getChannelRef(bindingChannel))
246246
{
247247
// create the span object for the memory of the payload message
248248
// TODO: we probably also want to check consistency of the header message, i.e. payloadSize member
@@ -284,7 +284,7 @@ class MessageContext
284284
/// constructor taking header message by move and creating the object from variadic argument list
285285
template <typename ContextType, typename... Args>
286286
RootSerializedObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, Args&&... args)
287-
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), bindingChannel)
287+
: ContextObject(std::forward<FairMQMessagePtr>(headerMsg), context->getChannelRef(bindingChannel))
288288
{
289289
mObject = std::make_unique<value_type>(std::forward<Args>(args)...);
290290
mPayloadMsg = context->proxy().createMessage();
@@ -408,17 +408,17 @@ class MessageContext
408408
if (mDispatchControl.dispatch != nullptr) {
409409
// send all scheduled messages if there is no trigger callback or its result is true
410410
if (mDispatchControl.trigger == nullptr || mDispatchControl.trigger(*header)) {
411-
std::unordered_map<std::string, FairMQParts> outputs;
411+
std::unordered_map<std::string const*, FairMQParts> outputs;
412412
for (auto& message : mScheduledMessages) {
413413
FairMQParts parts = std::move(message->finalize());
414414
assert(message->empty());
415415
assert(parts.Size() == 2);
416416
for (auto& part : parts) {
417-
outputs[message->channel()].AddPart(std::move(part));
417+
outputs[&(message->channel())].AddPart(std::move(part));
418418
}
419419
}
420420
for (auto& [channel, parts] : outputs) {
421-
mDispatchControl.dispatch(std::move(parts), channel, DefaultChannelIndex);
421+
mDispatchControl.dispatch(std::move(parts), *channel, DefaultChannelIndex);
422422
}
423423
mScheduledMessages.clear();
424424
}
@@ -452,6 +452,20 @@ class MessageContext
452452
mMessages.clear();
453453
}
454454

455+
/// Get a reference to channel string unique within the context
456+
/// The unique references are stored in context objects instead of allocating string objects.
457+
/// Based on the references, messages going over the same channel are grouped together in a
458+
/// multimessage.
459+
std::string const& getChannelRef(std::string const& channel)
460+
{
461+
auto ref = mChannelRefs.find(channel);
462+
if (ref != mChannelRefs.end()) {
463+
return *(ref->second);
464+
}
465+
mChannelRefs[channel] = std::make_unique<std::string>(channel);
466+
return *(mChannelRefs[channel]);
467+
}
468+
455469
FairMQDeviceProxy& proxy()
456470
{
457471
return mProxy;
@@ -469,6 +483,7 @@ class MessageContext
469483
Messages mMessages;
470484
Messages mScheduledMessages;
471485
DispatchControl mDispatchControl;
486+
std::unordered_map<std::string, std::unique_ptr<std::string>> mChannelRefs;
472487
};
473488
} // namespace framework
474489
} // namespace o2

Framework/Core/src/DataProcessor.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ void DataProcessor::doSend(FairMQDevice& device, FairMQParts&& parts, const char
3838

3939
void DataProcessor::doSend(FairMQDevice& device, MessageContext& context)
4040
{
41-
std::unordered_map<std::string, FairMQParts> outputs;
41+
std::unordered_map<std::string const*, FairMQParts> outputs;
4242
auto contextMessages = context.getMessagesForSending();
4343
for (auto& message : contextMessages) {
4444
// monitoringService.send({ message->parts.Size(), "outputs/total" });
4545
FairMQParts parts = std::move(message->finalize());
4646
assert(message->empty());
4747
assert(parts.Size() == 2);
4848
for (auto& part : parts) {
49-
outputs[message->channel()].AddPart(std::move(part));
49+
outputs[&(message->channel())].AddPart(std::move(part));
5050
}
5151
}
5252
for (auto& [channel, parts] : outputs) {
53-
device.Send(parts, channel, 0);
53+
device.Send(parts, *channel, 0);
5454
}
5555
}
5656

0 commit comments

Comments
 (0)