Skip to content

Commit b30765e

Browse files
authored
Enhance message class to include serialisation support and rich comparison (#2153)
1 parent 9907f09 commit b30765e

File tree

3 files changed

+502
-124
lines changed

3 files changed

+502
-124
lines changed

src/confluent_kafka/src/Consumer.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ Consumer_commit(Handle *self, PyObject *args, PyObject *kwargs) {
462462
&offsets, &async_o, &async_o))
463463
return NULL;
464464

465-
msg = msg == Py_None ? NULL : msg;
465+
msg = msg == Py_None ? NULL : msg;
466466
offsets = offsets == Py_None ? NULL : offsets;
467467

468468
if (msg && offsets) {
@@ -605,7 +605,7 @@ Consumer_store_offsets(Handle *self, PyObject *args, PyObject *kwargs) {
605605
&offsets))
606606
return NULL;
607607

608-
msg = msg == Py_None ? NULL : msg;
608+
msg = msg == Py_None ? NULL : msg;
609609
offsets = offsets == Py_None ? NULL : offsets;
610610

611611
if (msg && offsets) {

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 233 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,43 @@ static PyObject *KafkaError_txn_requires_abort(KafkaError *self,
121121
return ret;
122122
}
123123

124+
static PyObject *KafkaError_reduce(KafkaError *self,
125+
PyObject *Py_UNUSED(ignored)) {
126+
PyObject *KafkaError_type = NULL;
127+
PyObject *result = NULL;
128+
PyObject *args = NULL;
129+
PyObject *reason = NULL;
130+
131+
KafkaError_type =
132+
cfl_PyObject_lookup("confluent_kafka.cimpl", "KafkaError");
133+
134+
if (self->str) {
135+
reason = cfl_PyUnistr(_FromString(self->str));
136+
} else {
137+
reason = NULL;
138+
}
139+
140+
/* Build args tuple: (error, reason, fatal, retriable,
141+
* txn_requires_abort) */
142+
if (reason) {
143+
args = Py_BuildValue("(iOiii)", self->code, reason, self->fatal,
144+
self->retriable, self->txn_requires_abort);
145+
Py_DECREF(reason);
146+
} else {
147+
args = Py_BuildValue("(iziii)", self->code, NULL, self->fatal,
148+
self->retriable, self->txn_requires_abort);
149+
}
150+
if (!args) {
151+
Py_DECREF(KafkaError_type);
152+
return NULL;
153+
}
154+
155+
result = Py_BuildValue("(OO)", KafkaError_type, args);
156+
157+
Py_DECREF(KafkaError_type);
158+
Py_DECREF(args);
159+
return result;
160+
}
124161

125162
static PyMethodDef KafkaError_methods[] = {
126163
{"code", (PyCFunction)KafkaError_code, METH_NOARGS,
@@ -162,7 +199,8 @@ static PyMethodDef KafkaError_methods[] = {
162199
"producer API.\n"
163200
" :rtype: bool\n"
164201
"\n"},
165-
202+
{"__reduce__", (PyCFunction)KafkaError_reduce, METH_NOARGS,
203+
" Function for serializing KafkaError using the pickle protocol."},
166204
{NULL}};
167205

168206

@@ -287,24 +325,25 @@ KafkaError_init0(PyObject *selfobj, PyObject *args, PyObject *kwargs) {
287325
}
288326

289327
static PyTypeObject KafkaErrorType = {
290-
PyVarObject_HEAD_INIT(NULL, 0) "cimpl.KafkaError", /*tp_name*/
291-
sizeof(KafkaError), /*tp_basicsize*/
292-
0, /*tp_itemsize*/
293-
(destructor)KafkaError_dealloc, /*tp_dealloc*/
294-
0, /*tp_print*/
295-
0, /*tp_getattr*/
296-
0, /*tp_setattr*/
297-
0, /*tp_compare*/
298-
(reprfunc)KafkaError_str0, /*tp_repr*/
299-
0, /*tp_as_number*/
300-
0, /*tp_as_sequence*/
301-
0, /*tp_as_mapping*/
302-
(hashfunc)KafkaError_hash, /*tp_hash */
303-
0, /*tp_call*/
304-
0, /*tp_str*/
305-
PyObject_GenericGetAttr, /*tp_getattro*/
306-
0, /*tp_setattro*/
307-
0, /*tp_as_buffer*/
328+
PyVarObject_HEAD_INIT(NULL,
329+
0) "confluent_kafka.cimpl.KafkaError", /*tp_name*/
330+
sizeof(KafkaError), /*tp_basicsize*/
331+
0, /*tp_itemsize*/
332+
(destructor)KafkaError_dealloc, /*tp_dealloc*/
333+
0, /*tp_print*/
334+
0, /*tp_getattr*/
335+
0, /*tp_setattr*/
336+
0, /*tp_compare*/
337+
(reprfunc)KafkaError_str0, /*tp_repr*/
338+
0, /*tp_as_number*/
339+
0, /*tp_as_sequence*/
340+
0, /*tp_as_mapping*/
341+
(hashfunc)KafkaError_hash, /*tp_hash */
342+
0, /*tp_call*/
343+
0, /*tp_str*/
344+
PyObject_GenericGetAttr, /*tp_getattro*/
345+
0, /*tp_setattro*/
346+
0, /*tp_as_buffer*/
308347
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_BASE_EXC_SUBCLASS |
309348
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
310349
"Kafka error and event object\n"
@@ -540,6 +579,61 @@ static PyObject *Message_set_key(Message *self, PyObject *new_key) {
540579
Py_RETURN_NONE;
541580
}
542581

582+
static PyObject *Message_set_topic(Message *self, PyObject *new_topic) {
583+
if (self->topic)
584+
Py_DECREF(self->topic);
585+
self->topic = new_topic;
586+
Py_INCREF(self->topic);
587+
588+
Py_RETURN_NONE;
589+
}
590+
591+
static PyObject *Message_set_error(Message *self, PyObject *new_error) {
592+
if (self->error)
593+
Py_DECREF(self->error);
594+
self->error = new_error;
595+
Py_INCREF(self->error);
596+
597+
Py_RETURN_NONE;
598+
}
599+
600+
static PyObject *Message_reduce(Message *self, PyObject *Py_UNUSED(ignored)) {
601+
PyObject *Message_type = NULL;
602+
PyObject *result = NULL;
603+
604+
#ifdef RD_KAFKA_V_HEADERS
605+
if (!self->headers && self->c_headers) {
606+
self->headers = c_headers_to_py(self->c_headers);
607+
rd_kafka_headers_destroy(self->c_headers);
608+
self->c_headers = NULL;
609+
}
610+
#endif
611+
612+
613+
Message_type = cfl_PyObject_lookup("confluent_kafka.cimpl", "Message");
614+
615+
PyObject *latency_obj = NULL;
616+
if (self->latency >= 0) {
617+
latency_obj =
618+
PyFloat_FromDouble((double)self->latency / 1000000.0);
619+
} else {
620+
/* Return -1.0 for negative latency to match Message_init
621+
* default */
622+
latency_obj = PyFloat_FromDouble(-1.0);
623+
}
624+
result = Py_BuildValue(
625+
"O(NiLNNNNOOi)", Message_type, Message_topic(self, NULL),
626+
self->partition, self->offset, Message_key(self, NULL),
627+
Message_value(self, NULL), Message_headers(self, NULL),
628+
Message_error(self, NULL), Message_timestamp(self, NULL),
629+
latency_obj, self->leader_epoch);
630+
Py_DECREF(latency_obj);
631+
632+
633+
Py_DECREF(Message_type);
634+
return result;
635+
}
636+
543637
static PyMethodDef Message_methods[] = {
544638
{"error", (PyCFunction)Message_error, METH_NOARGS,
545639
" The message object is also used to propagate errors and events, "
@@ -634,6 +728,22 @@ static PyMethodDef Message_methods[] = {
634728
" :returns: None.\n"
635729
" :rtype: None\n"
636730
"\n"},
731+
{"set_topic", (PyCFunction)Message_set_topic, METH_O,
732+
" Set the field 'Message.topic' with new value.\n"
733+
"\n"
734+
" :param object value: Message.topic.\n"
735+
" :returns: None.\n"
736+
" :rtype: None\n"
737+
"\n"},
738+
{"set_error", (PyCFunction)Message_set_error, METH_O,
739+
" Set the field 'Message.error' with new value.\n"
740+
"\n"
741+
" :param object value: Message.error.\n"
742+
" :returns: None.\n"
743+
" :rtype: None\n"
744+
"\n"},
745+
{"__reduce__", (PyCFunction)Message_reduce, METH_NOARGS,
746+
" Function for serializing Message using the pickle protocol."},
637747
{NULL}};
638748

639749
static int Message_clear(Message *self) {
@@ -783,33 +893,99 @@ static int Message_traverse(Message *self, visitproc visit, void *arg) {
783893
return 0;
784894
}
785895

896+
static PyObject *Message_richcompare(PyObject *self, PyObject *other, int op) {
897+
if (op != Py_EQ && op != Py_NE) {
898+
Py_INCREF(Py_NotImplemented);
899+
return Py_NotImplemented;
900+
}
901+
902+
if (self == other) {
903+
if (op == Py_EQ)
904+
Py_RETURN_TRUE;
905+
else
906+
Py_RETURN_FALSE;
907+
}
908+
909+
if (!PyObject_TypeCheck(other, &MessageType)) {
910+
if (op == Py_EQ)
911+
Py_RETURN_FALSE;
912+
else
913+
Py_RETURN_TRUE;
914+
}
915+
916+
Message *msg_self = (Message *)self;
917+
Message *msg_other = (Message *)other;
918+
919+
int result;
920+
921+
#define _LOCAL_COMPARE(left, right) \
922+
do { \
923+
result = PyObject_RichCompareBool(left, right, Py_EQ); \
924+
if (result < 0) \
925+
return NULL; \
926+
if (result == 0) { \
927+
if (op == Py_EQ) \
928+
Py_RETURN_FALSE; \
929+
else \
930+
Py_RETURN_TRUE; \
931+
} \
932+
} while (0)
933+
_LOCAL_COMPARE(msg_self->topic, msg_other->topic);
934+
_LOCAL_COMPARE(msg_self->value, msg_other->value);
935+
_LOCAL_COMPARE(msg_self->key, msg_other->key);
936+
_LOCAL_COMPARE(msg_self->headers, msg_other->headers);
937+
_LOCAL_COMPARE(msg_self->error, msg_other->error);
938+
#undef _LOCAL_COMPARE
939+
940+
#define _LOCAL_COMPARE(left, right) \
941+
do { \
942+
if (left != right) { \
943+
if (op == Py_EQ) \
944+
Py_RETURN_FALSE; \
945+
else \
946+
Py_RETURN_TRUE; \
947+
} \
948+
} while (0)
949+
_LOCAL_COMPARE(msg_self->partition, msg_other->partition);
950+
_LOCAL_COMPARE(msg_self->offset, msg_other->offset);
951+
_LOCAL_COMPARE(msg_self->leader_epoch, msg_other->leader_epoch);
952+
_LOCAL_COMPARE(msg_self->timestamp, msg_other->timestamp);
953+
// latency is skipped, it is a float and not that significant.
954+
#undef _LOCAL_COMPARE
955+
956+
Py_RETURN_TRUE;
957+
}
958+
786959
static Py_ssize_t Message__len__(Message *self) {
787-
return self->value ? PyObject_Length(self->value) : 0;
960+
return self->value && self->value != Py_None
961+
? PyObject_Length(self->value)
962+
: 0;
788963
}
789964

790965
static PySequenceMethods Message_seq_methods = {
791966
(lenfunc)Message__len__ /* sq_length */
792967
};
793968

969+
794970
PyTypeObject MessageType = {
795-
PyVarObject_HEAD_INIT(NULL, 0) "cimpl.Message", /*tp_name*/
796-
sizeof(Message), /*tp_basicsize*/
797-
0, /*tp_itemsize*/
798-
(destructor)Message_dealloc, /*tp_dealloc*/
799-
0, /*tp_print*/
800-
0, /*tp_getattr*/
801-
0, /*tp_setattr*/
802-
0, /*tp_compare*/
803-
0, /*tp_repr*/
804-
0, /*tp_as_number*/
805-
&Message_seq_methods, /*tp_as_sequence*/
806-
0, /*tp_as_mapping*/
807-
0, /*tp_hash */
808-
0, /*tp_call*/
809-
0, /*tp_str*/
810-
PyObject_GenericGetAttr, /*tp_getattro*/
811-
0, /*tp_setattro*/
812-
0, /*tp_as_buffer*/
971+
PyVarObject_HEAD_INIT(NULL, 0) "confluent_kafka.cimpl.Message", /*tp_name*/
972+
sizeof(Message), /*tp_basicsize*/
973+
0, /*tp_itemsize*/
974+
(destructor)Message_dealloc, /*tp_dealloc*/
975+
0, /*tp_print*/
976+
0, /*tp_getattr*/
977+
0, /*tp_setattr*/
978+
0, /*tp_compare*/
979+
0, /*tp_repr*/
980+
0, /*tp_as_number*/
981+
&Message_seq_methods, /*tp_as_sequence*/
982+
0, /*tp_as_mapping*/
983+
0, /*tp_hash */
984+
0, /*tp_call*/
985+
0, /*tp_str*/
986+
PyObject_GenericGetAttr, /*tp_getattro*/
987+
0, /*tp_setattro*/
988+
0, /*tp_as_buffer*/
813989
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
814990
"The Message object represents either a single consumed or "
815991
"produced message, or an event (:py:func:`error()` is not None).\n"
@@ -850,24 +1026,24 @@ PyTypeObject MessageType = {
8501026
"\n"
8511027
" :returns: Message value (payload) size in bytes\n"
8521028
" :rtype: int\n"
853-
"\n", /*tp_doc*/
854-
(traverseproc)Message_traverse, /* tp_traverse */
855-
(inquiry)Message_clear, /* tp_clear */
856-
0, /* tp_richcompare */
857-
0, /* tp_weaklistoffset */
858-
0, /* tp_iter */
859-
0, /* tp_iternext */
860-
Message_methods, /* tp_methods */
861-
0, /* tp_members */
862-
0, /* tp_getset */
863-
0, /* tp_base */
864-
0, /* tp_dict */
865-
0, /* tp_descr_get */
866-
0, /* tp_descr_set */
867-
0, /* tp_dictoffset */
868-
Message_init, /* tp_init */
869-
0, /* tp_alloc */
870-
Message_new /* tp_new */
1029+
"\n", /*tp_doc*/
1030+
(traverseproc)Message_traverse, /* tp_traverse */
1031+
(inquiry)Message_clear, /* tp_clear */
1032+
(richcmpfunc)Message_richcompare, /* tp_richcompare */
1033+
0, /* tp_weaklistoffset */
1034+
0, /* tp_iter */
1035+
0, /* tp_iternext */
1036+
Message_methods, /* tp_methods */
1037+
0, /* tp_members */
1038+
0, /* tp_getset */
1039+
0, /* tp_base */
1040+
0, /* tp_dict */
1041+
0, /* tp_descr_get */
1042+
0, /* tp_descr_set */
1043+
0, /* tp_dictoffset */
1044+
Message_init, /* tp_init */
1045+
0, /* tp_alloc */
1046+
Message_new /* tp_new */
8711047
};
8721048

8731049
/**
@@ -911,8 +1087,6 @@ PyObject *Message_new0(const Handle *handle, const rd_kafka_message_t *rkm) {
9111087
return (PyObject *)self;
9121088
}
9131089

914-
915-
9161090
/****************************************************************************
9171091
*
9181092
*
@@ -1066,7 +1240,6 @@ PyTypeObject UuidType = {
10661240
};
10671241

10681242

1069-
10701243
/****************************************************************************
10711244
*
10721245
*
@@ -1226,7 +1399,7 @@ static PyObject *TopicPartition_str0(TopicPartition *self) {
12261399
}
12271400

12281401
ret = cfl_PyUnistr(
1229-
_FromFormat("TopicPartition{topic=%s,partition=%d"
1402+
_FromFormat("TopicPartition{topic=%s,partition=%" CFL_PRId32
12301403
",offset=%s,leader_epoch=%s,error=%s}",
12311404
self->topic, self->partition, offset_str,
12321405
leader_epoch_str, c_errstr ? c_errstr : "None"));

0 commit comments

Comments
 (0)