Skip to content

Commit a46da16

Browse files
[feat] Support messages with generic types (#149)
### Motivation Pulsar C++ client doesn't support schema yet. It only supports configuring `SchemaInfo` when creating producers or consumers. The main reason is that C++'s templates are processed at the time of compilation. Templatizing `Producer`, `Consumer` or `Message` could expose all internal code. Currently, users might write the following code for serialization and deserialization: ```c++ producer.send(MessageBuilder().setContent(encode(value)).build()); ``` ```c++ Message msg; consumer.receive(msg); auto value = decode(msg.getData(), msg.getSize()); ``` However, the `encode` and `decode` functions are just possible solutions from users, they might use some other interfaces like a class with two virtual methods. There is no way to provide a common interface for serialization and deserialization. ### Modifications Add a `TypedMessageBuilder<T>` class template that accepts an encoder function and a validation function. The validation function is used when users want to simulate the Java client's `AUTO_PRODUCE` schema. Define a full specialization for `std::string` template argument to avoid encoding. Since it inherits the `MessageBuilder`, it's compatible with the current code style: ```c++ // It should be noted you have to call `setValue` before methods in base class auto msg = TypedMessageBuilder<T>(encoder).setValue(value).setPartitionKey(key).build(); ``` Add a `TypedMessage<T>` class that only adds a decoder to the `Message` instance and can be converted from the `Message` directly: ```c++ auto typedMsg = TypedMessage<T>(msg, decoder); std::cout << typedMsg.getValue() << std::endl; // decode the bytes std::cout << typedMsg.getMessageId() << std::endl; // call methods from Message ``` For convenience, the following APIs are added to `Consumer`: ```c++ template <typename T> Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder decoder); template <typename T> Result receive(TypedMessage<T>& msg, int timeoutMs, typename TypedMessage<T>::Decoder decoder); template <typename T> void receiveAsync(std::function<void(Result result, const TypedMessage<T>&)> callback, typename TypedMessage<T>::Decoder decoder) ``` The `ConsumerConfiguration` can configure a listener that accepts a `TypedMessage<T>` now: ```c++ template <typename T> ConsumerConfiguration& setTypedMessageListener( std::function<void(Consumer&, const TypedMessage<T>&)> listener, typename TypedMessage<T>::Decoder decoder); ``` Since it calls the original listener and `Consumer` is only forward declared in `ConsumerConfiguration.h`, the 1st argument is changed from `Consumer` to `Consumer&`. It's an API change but it's backward compatible because `std::function<void(Consumer, ...)>` can be cast to `std::function<Consumer&, ...)>` implicitly. Based on these API changes, we can write a separated header-only C++ library as the schema extension.
1 parent 8990b93 commit a46da16

File tree

10 files changed

+321
-5
lines changed

10 files changed

+321
-5
lines changed

include/pulsar/Consumer.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <pulsar/BrokerConsumerStats.h>
2323
#include <pulsar/ConsumerConfiguration.h>
24+
#include <pulsar/TypedMessage.h>
2425
#include <pulsar/defines.h>
2526

2627
#include <iostream>
@@ -91,6 +92,14 @@ class PULSAR_PUBLIC Consumer {
9192
*/
9293
Result receive(Message& msg);
9394

95+
template <typename T>
96+
Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder decoder) {
97+
Message rawMsg;
98+
auto result = receive(rawMsg);
99+
msg = TypedMessage<T>{rawMsg, decoder};
100+
return result;
101+
}
102+
94103
/**
95104
*
96105
* @param msg a non-const reference where the received message will be copied
@@ -101,6 +110,14 @@ class PULSAR_PUBLIC Consumer {
101110
*/
102111
Result receive(Message& msg, int timeoutMs);
103112

113+
template <typename T>
114+
Result receive(TypedMessage<T>& msg, int timeoutMs, typename TypedMessage<T>::Decoder decoder) {
115+
Message rawMsg;
116+
auto result = receive(rawMsg, timeoutMs);
117+
msg = TypedMessage<T>{rawMsg, decoder};
118+
return result;
119+
}
120+
104121
/**
105122
* Receive a single message
106123
* <p>
@@ -114,6 +131,14 @@ class PULSAR_PUBLIC Consumer {
114131
*/
115132
void receiveAsync(ReceiveCallback callback);
116133

134+
template <typename T>
135+
void receiveAsync(std::function<void(Result result, const TypedMessage<T>&)> callback,
136+
typename TypedMessage<T>::Decoder decoder) {
137+
receiveAsync([callback, decoder](Result result, const Message& msg) {
138+
callback(result, TypedMessage<T>{msg, decoder});
139+
});
140+
}
141+
117142
/**
118143
* Batch receiving messages.
119144
*

include/pulsar/ConsumerConfiguration.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <pulsar/Message.h>
2929
#include <pulsar/Result.h>
3030
#include <pulsar/Schema.h>
31+
#include <pulsar/TypedMessage.h>
3132
#include <pulsar/defines.h>
3233

3334
#include <functional>
@@ -48,7 +49,7 @@ typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
4849
typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
4950

5051
/// Callback definition for MessageListener
51-
typedef std::function<void(Consumer consumer, const Message& msg)> MessageListener;
52+
typedef std::function<void(Consumer& consumer, const Message& msg)> MessageListener;
5253

5354
typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
5455

@@ -126,6 +127,15 @@ class PULSAR_PUBLIC ConsumerConfiguration {
126127
*/
127128
ConsumerConfiguration& setMessageListener(MessageListener messageListener);
128129

130+
template <typename T>
131+
ConsumerConfiguration& setTypedMessageListener(
132+
std::function<void(Consumer&, const TypedMessage<T>&)> listener,
133+
typename TypedMessage<T>::Decoder decoder) {
134+
return setMessageListener([listener, decoder](Consumer& consumer, const Message& msg) {
135+
listener(consumer, TypedMessage<T>{msg, decoder});
136+
});
137+
}
138+
129139
/**
130140
* @return the message listener
131141
*/

include/pulsar/Message.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class PULSAR_PUBLIC Message {
183183

184184
bool operator==(const Message& msg) const;
185185

186-
private:
186+
protected:
187187
typedef std::shared_ptr<MessageImpl> MessageImplPtr;
188188
MessageImplPtr impl_;
189189

include/pulsar/MessageBuilder.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,11 @@ class PULSAR_PUBLIC MessageBuilder {
162162
*/
163163
MessageBuilder& create();
164164

165+
protected:
166+
const char* data() const;
167+
std::size_t size() const;
168+
165169
private:
166-
MessageBuilder(const MessageBuilder&);
167170
void checkMetadata();
168171
static std::shared_ptr<MessageImpl> createMessageImpl();
169172
Message::MessageImplPtr impl_;

include/pulsar/TypedMessage.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Message.h>
22+
23+
#include <functional>
24+
25+
namespace pulsar {
26+
27+
template <typename T>
28+
class TypedMessage : public Message {
29+
public:
30+
using Decoder = std::function<T(const char*, std::size_t)>;
31+
32+
TypedMessage() = default;
33+
34+
TypedMessage(
35+
const Message& message, Decoder decoder = [](const char*, std::size_t) { return T{}; })
36+
: Message(message), decoder_(decoder) {}
37+
38+
T getValue() const { return decoder_(static_cast<const char*>(getData()), getLength()); }
39+
40+
TypedMessage& setDecoder(Decoder decoder) {
41+
decoder_ = decoder;
42+
return *this;
43+
}
44+
45+
private:
46+
Decoder decoder_;
47+
};
48+
49+
} // namespace pulsar
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/MessageBuilder.h>
22+
23+
#include <functional>
24+
25+
namespace pulsar {
26+
27+
template <typename T>
28+
class TypedMessageBuilder : public MessageBuilder {
29+
public:
30+
using Encoder = std::function<std::string(const T&)>;
31+
using Validator = std::function<void(const char* data, size_t)>;
32+
33+
TypedMessageBuilder(
34+
Encoder encoder, Validator validator = [](const char*, std::size_t) {})
35+
: encoder_(encoder), validator_(validator) {}
36+
37+
TypedMessageBuilder& setValue(const T& value) {
38+
setContent(encoder_(value));
39+
if (validator_) {
40+
validator_(data(), size());
41+
}
42+
return *this;
43+
}
44+
45+
private:
46+
const Encoder encoder_;
47+
const Validator validator_;
48+
};
49+
50+
template <>
51+
class TypedMessageBuilder<std::string> : public MessageBuilder {
52+
public:
53+
// The validator should throw an exception to indicate the message is corrupted.
54+
using Validator = std::function<void(const char* data, size_t)>;
55+
56+
TypedMessageBuilder(Validator validator = nullptr) : validator_(validator) {}
57+
58+
TypedMessageBuilder& setValue(const std::string& value) {
59+
if (validator_) {
60+
validator_(value.data(), value.size());
61+
}
62+
setContent(value);
63+
return *this;
64+
}
65+
66+
TypedMessageBuilder& setValue(std::string&& value) {
67+
if (validator_) {
68+
validator_(value.data(), value.size());
69+
}
70+
setContent(std::move(value));
71+
return *this;
72+
}
73+
74+
private:
75+
Validator validator_;
76+
};
77+
using BytesMessageBuilder = TypedMessageBuilder<std::string>;
78+
79+
} // namespace pulsar

lib/ConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,8 @@ void ConsumerImpl::internalListener() {
773773
try {
774774
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
775775
lastDequedMessageId_ = msg.getMessageId();
776-
messageListener_(Consumer(get_shared_this_ptr()), msg);
776+
Consumer consumer{get_shared_this_ptr()};
777+
messageListener_(consumer, msg);
777778
} catch (const std::exception& e) {
778779
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
779780
}

lib/MessageBuilder.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
#include <assert.h>
1920
#include <pulsar/MessageBuilder.h>
2021

2122
#include <memory>
@@ -155,4 +156,15 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
155156
r.Swap(impl_->metadata.mutable_replicate_to());
156157
return *this;
157158
}
159+
160+
const char* MessageBuilder::data() const {
161+
assert(impl_->payload.data());
162+
return impl_->payload.data();
163+
}
164+
165+
size_t MessageBuilder::size() const {
166+
assert(impl_->payload.data());
167+
return impl_->payload.readableBytes();
168+
}
169+
158170
} // namespace pulsar

lib/MultiTopicsConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
539539
Message m;
540540
incomingMessages_.pop(m);
541541
try {
542-
messageListener_(Consumer(get_shared_this_ptr()), m);
542+
Consumer self{get_shared_this_ptr()};
543+
messageListener_(self, m);
543544
messageProcessed(m);
544545
} catch (const std::exception& e) {
545546
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());

0 commit comments

Comments
 (0)