Skip to content

Commit 91b092b

Browse files
authored
chore: simplified interface to read/write CE using AMQP bindings. (#262)
Signed-off-by: Tudor Plugaru <plugaru.tudor@protonmail.com>
1 parent 7001341 commit 91b092b

File tree

1 file changed

+115
-0
lines changed
  • src/cloudevents/core/bindings

1 file changed

+115
-0
lines changed

src/cloudevents/core/bindings/amqp.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
from cloudevents.core.base import BaseCloudEvent, EventFactory
2222
from cloudevents.core.formats.base import Format
23+
from cloudevents.core.formats.json import JSONFormat
24+
from cloudevents.core.v1.event import CloudEvent
2325

2426
# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes
2527
# The underscore variant is preferred for JMS 2.0 compatibility
@@ -324,3 +326,116 @@ def from_amqp(
324326
return from_structured(message, event_format, event_factory)
325327

326328
return from_binary(message, event_format, event_factory)
329+
330+
331+
def to_binary_event(
332+
event: BaseCloudEvent,
333+
event_format: Format | None = None,
334+
) -> AMQPMessage:
335+
"""
336+
Convenience wrapper for to_binary with JSON format as default.
337+
338+
Example:
339+
>>> from cloudevents.core.v1.event import CloudEvent
340+
>>> from cloudevents.core.bindings import amqp
341+
>>>
342+
>>> event = CloudEvent(
343+
... attributes={"type": "com.example.test", "source": "/test"},
344+
... data={"message": "Hello"}
345+
... )
346+
>>> message = amqp.to_binary_event(event)
347+
348+
:param event: The CloudEvent to convert
349+
:param event_format: Format implementation (defaults to JSONFormat)
350+
:return: AMQPMessage with CloudEvent attributes as application properties
351+
"""
352+
if event_format is None:
353+
event_format = JSONFormat()
354+
return to_binary(event, event_format)
355+
356+
357+
def from_binary_event(
358+
message: AMQPMessage,
359+
event_format: Format | None = None,
360+
) -> CloudEvent:
361+
"""
362+
Convenience wrapper for from_binary with JSON format and CloudEvent as defaults.
363+
364+
Example:
365+
>>> from cloudevents.core.bindings import amqp
366+
>>> event = amqp.from_binary_event(message)
367+
368+
:param message: AMQPMessage to parse
369+
:param event_format: Format implementation (defaults to JSONFormat)
370+
:return: CloudEvent instance
371+
"""
372+
if event_format is None:
373+
event_format = JSONFormat()
374+
return from_binary(message, event_format, CloudEvent)
375+
376+
377+
def to_structured_event(
378+
event: BaseCloudEvent,
379+
event_format: Format | None = None,
380+
) -> AMQPMessage:
381+
"""
382+
Convenience wrapper for to_structured with JSON format as default.
383+
384+
Example:
385+
>>> from cloudevents.core.v1.event import CloudEvent
386+
>>> from cloudevents.core.bindings import amqp
387+
>>>
388+
>>> event = CloudEvent(
389+
... attributes={"type": "com.example.test", "source": "/test"},
390+
... data={"message": "Hello"}
391+
... )
392+
>>> message = amqp.to_structured_event(event)
393+
394+
:param event: The CloudEvent to convert
395+
:param event_format: Format implementation (defaults to JSONFormat)
396+
:return: AMQPMessage with structured content in application-data
397+
"""
398+
if event_format is None:
399+
event_format = JSONFormat()
400+
return to_structured(event, event_format)
401+
402+
403+
def from_structured_event(
404+
message: AMQPMessage,
405+
event_format: Format | None = None,
406+
) -> CloudEvent:
407+
"""
408+
Convenience wrapper for from_structured with JSON format and CloudEvent as defaults.
409+
410+
Example:
411+
>>> from cloudevents.core.bindings import amqp
412+
>>> event = amqp.from_structured_event(message)
413+
414+
:param message: AMQPMessage to parse
415+
:param event_format: Format implementation (defaults to JSONFormat)
416+
:return: CloudEvent instance
417+
"""
418+
if event_format is None:
419+
event_format = JSONFormat()
420+
return from_structured(message, event_format, CloudEvent)
421+
422+
423+
def from_amqp_event(
424+
message: AMQPMessage,
425+
event_format: Format | None = None,
426+
) -> CloudEvent:
427+
"""
428+
Convenience wrapper for from_amqp with JSON format and CloudEvent as defaults.
429+
Auto-detects binary or structured mode.
430+
431+
Example:
432+
>>> from cloudevents.core.bindings import amqp
433+
>>> event = amqp.from_amqp_event(message)
434+
435+
:param message: AMQPMessage to parse
436+
:param event_format: Format implementation (defaults to JSONFormat)
437+
:return: CloudEvent instance
438+
"""
439+
if event_format is None:
440+
event_format = JSONFormat()
441+
return from_amqp(message, event_format, CloudEvent)

0 commit comments

Comments
 (0)