@@ -59,63 +59,56 @@ class UserKeyClass:
5959
6060
6161def test_kafka_consumer_with_json (kafka_event_with_json_data , lambda_context ):
62- """Test Kafka consumer with JSON deserialization without output serialization."""
63-
64- # Create dict to capture results
65- result_data = {}
66-
62+ # GIVEN
63+ # A Kafka consumer configured to deserialize JSON data
64+ # without any additional output serialization
6765 schema_config = SchemaConfig (value_schema_type = "JSON" )
6866
6967 @kafka_consumer (schema_config = schema_config )
7068 def handler (event : ConsumerRecords , context ):
71- # Capture the results to verify
72- record = next (event .records )
73- result_data ["value_type" ] = type (record .value ).__name__
74- result_data ["name" ] = record .value ["name" ]
75- result_data ["age" ] = record .value ["age" ]
76- return {"processed" : True }
69+ # Return the deserialized JSON value for verification
70+ return event .record .value
7771
78- # Call the handler
72+ # WHEN
73+ # The handler processes a Kafka event containing JSON-encoded data
7974 result = handler (kafka_event_with_json_data , lambda_context )
8075
81- # Verify the results
82- assert result == { "processed" : True }
83- assert result_data [ "value_type" ] == "dict"
84- assert result_data ["name" ] == "John Doe"
85- assert result_data ["age" ] == 30
76+ # THEN
77+ # The JSON should be correctly deserialized into a Python dictionary
78+ # with the expected field values
79+ assert result ["name" ] == "John Doe"
80+ assert result ["age" ] == 30
8681
8782
8883def test_kafka_consumer_with_json_and_dataclass (kafka_event_with_json_data , lambda_context ):
89- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
90-
91- # Create dict to capture results
92- result_data = {}
93-
84+ # GIVEN
85+ # A Kafka consumer configured to deserialize JSON data
86+ # and convert it to a UserValueDataClass instance
9487 schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
9588
9689 @kafka_consumer (schema_config = schema_config )
9790 def handler (event : ConsumerRecords , context ):
98- # Capture the results to verify
99- record = next ( event . records )
100- result_data [ "value_type" ] = type ( record .value ). __name__
101- result_data [ "name" ] = record . value . name
102- result_data [ "age" ] = record . value . age
103- return { "processed" : True }
104-
105- # Call the handler
91+ # Extract the deserialized and serialized value
92+ # which should be a UserValueDataClass instance
93+ value : UserValueDataClass = event . record .value
94+ return value
95+
96+ # WHEN
97+ # The handler processes a Kafka event containing JSON-encoded data
98+ # which is deserialized into a dictionary and then converted to a dataclass
10699 result = handler (kafka_event_with_json_data , lambda_context )
107100
108- # Verify the results
109- assert result == {"processed" : True }
110- assert result_data ["value_type" ] == "UserValueDataClass"
111- assert result_data ["name" ] == "John Doe"
112- assert result_data ["age" ] == 30
101+ # THEN
102+ # The result should be a UserValueDataClass instance
103+ # with the correct property values from the original JSON
104+ assert isinstance (result , UserValueDataClass )
105+ assert result .name == "John Doe"
106+ assert result .age == 30
113107
114108
115109def test_kafka_consumer_with_invalid_json_data (kafka_event_with_json_data , lambda_context ):
116- """Test error handling when JSON data is invalid."""
117-
118- # Create invalid JSON data
110+ # GIVEN
111+ # A Kafka event with raw string data that is not valid base64-encoded JSON
119112 invalid_data = "invalid json data"
120113 kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
121114 kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ] = invalid_data
@@ -124,33 +117,34 @@ def test_kafka_consumer_with_invalid_json_data(kafka_event_with_json_data, lambd
124117
125118 @kafka_consumer (schema_config = schema_config )
126119 def handler (event : ConsumerRecords , context ):
127- # This should never be reached if deserializer fails
128- record = next (event .records )
129- assert record .value
130- return {"processed" : True }
120+ return event .record .value
131121
132- # This should raise a deserialization error
122+ # WHEN/THEN
123+ # The handler should fail to process the invalid JSON data
124+ # and raise a specific deserialization error
133125 with pytest .raises (KafkaConsumerDeserializationError ) as excinfo :
134126 handler (kafka_event_with_json_data , lambda_context )
135127
128+ # Ensure the error contains useful diagnostic information
136129 assert "Error trying to deserialize json data" in str (excinfo .value )
137130
138131
139- # Tests for Complex Types with Pydantic TypeAdapter
140- def test_kafka_consumer_with_multiple_records (lambda_context ):
141- """Test processing multiple records in a single event."""
142-
143- # Create data for multiple records
132+ def test_kafka_consumer_with_multiple_records_json (lambda_context ):
133+ # GIVEN
134+ # Three different user records to process
135+ # First user: John Doe, age 30
144136 data1 = {"name" : "John Doe" , "age" : 30 }
137+ # Second user: Jane Smith, age 25
145138 data2 = {"name" : "Jane Smith" , "age" : 25 }
139+ # Third user: Bob Johnson, age 40
146140 data3 = {"name" : "Bob Johnson" , "age" : 40 }
147141
148- # Encode the data
142+ # Base64-encoded JSON data for each record
149143 encoded1 = base64 .b64encode (json .dumps (data1 ).encode ("utf-8" )).decode ("utf-8" )
150144 encoded2 = base64 .b64encode (json .dumps (data2 ).encode ("utf-8" )).decode ("utf-8" )
151145 encoded3 = base64 .b64encode (json .dumps (data3 ).encode ("utf-8" )).decode ("utf-8" )
152146
153- # Create a kafka event with multiple records
147+ # A Kafka event containing multiple records across different offsets
154148 multi_record_event = {
155149 "eventSource" : "aws:kafka" ,
156150 "records" : {
@@ -189,105 +183,149 @@ def test_kafka_consumer_with_multiple_records(lambda_context):
189183 },
190184 }
191185
192- # Create schema config
193- schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
194-
195- # Create list to store processed records
186+ # A list to capture processed record details
196187 processed_records = []
197188
189+ # A Kafka consumer configured to deserialize JSON and convert to dataclass instances
190+ schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
191+
198192 @kafka_consumer (schema_config = schema_config )
199193 def handler (event : ConsumerRecords , context ):
200- # Process all records
194+ # Process each record and collect its properties
201195 for record in event .records :
202196 processed_records .append ({"name" : record .value .name , "age" : record .value .age })
203197 return {"processed" : len (processed_records )}
204198
205- # Call the handler
199+ # WHEN
200+ # The handler processes the Kafka event containing multiple JSON records
206201 result = handler (multi_record_event , lambda_context )
207202
208- # Verify the results
203+ # THEN
204+ # The handler should successfully process all three records
205+ # and return the correct count
209206 assert result == {"processed" : 3 }
210207 assert len (processed_records ) == 3
208+
209+ # All three users should be correctly deserialized into dataclass instances
210+ # and their properties should be accessible
211211 assert any (r ["name" ] == "John Doe" and r ["age" ] == 30 for r in processed_records )
212212 assert any (r ["name" ] == "Jane Smith" and r ["age" ] == 25 for r in processed_records )
213213 assert any (r ["name" ] == "Bob Johnson" and r ["age" ] == 40 for r in processed_records )
214214
215215
216216def test_kafka_consumer_default_deserializer_value (kafka_event_with_json_data , lambda_context ):
217- """Test Kafka consumer when no schema config is provided."""
217+ # GIVEN
218+ # A simple string message encoded in base64
219+ raw_data = b"data"
220+ base64_data = base64 .b64encode (raw_data ).decode ("utf-8" )
218221
219- base64_data = base64 . b64encode ( b" data" )
220- kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
221- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ] = base64_data
222+ # A Kafka event with the base64-encoded data as value
223+ basic_kafka_event = deepcopy (kafka_event_with_json_data )
224+ basic_kafka_event ["records" ]["my-topic-1" ][0 ]["value" ] = base64_data
222225
226+ # A Kafka consumer with no schema configuration specified
227+ # which should default to base64 decoding only
223228 @kafka_consumer ()
224229 def handler (event : ConsumerRecords , context ):
225- # Capture the results to verify
230+ # Get the first record's value
226231 record = next (event .records )
227- # Should get raw base64-encoded data with no deserialization
232+ # Should receive UTF-8 decoded data with no further processing
228233 return record .value
229234
230- # Call the handler
231- result = handler (kafka_event_with_json_data , lambda_context )
235+ # WHEN
236+ # The handler processes the Kafka event with default deserializer
237+ result = handler (basic_kafka_event , lambda_context )
232238
233- # Verify the results
239+ # THEN
240+ # The result should be the UTF-8 decoded string from the base64 data
241+ # with no additional deserialization applied
234242 assert result == "data"
243+ assert isinstance (result , str )
235244
236245
237246def test_kafka_consumer_default_deserializer_key (kafka_event_with_json_data , lambda_context ):
238- """Test Kafka consumer when no schema config is provided."""
247+ # GIVEN
248+ # A simple string message encoded in base64 for the key
249+ raw_key_data = b"data"
250+ base64_key = base64 .b64encode (raw_key_data ).decode ("utf-8" )
239251
240- base64_data = base64 . b64encode ( b" data" )
241- kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
242- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = base64_data
252+ # A Kafka event with the base64-encoded data as key
253+ kafka_event_with_key = deepcopy (kafka_event_with_json_data )
254+ kafka_event_with_key ["records" ]["my-topic-1" ][0 ]["key" ] = base64_key
243255
256+ # A Kafka consumer with no schema configuration specified
257+ # which should default to base64 decoding only
244258 @kafka_consumer ()
245259 def handler (event : ConsumerRecords , context ):
246- # Capture the results to verify
260+ # Get the first record's key
247261 record = next (event .records )
248- # Should get raw base64-encoded data with no deserialization
262+ # Should receive UTF-8 decoded key with no further processing
249263 return record .key
250264
251- # Call the handler
252- result = handler (kafka_event_with_json_data , lambda_context )
265+ # WHEN
266+ # The handler processes the Kafka event with default key deserializer
267+ result = handler (kafka_event_with_key , lambda_context )
253268
254- # Verify the results
269+ # THEN
270+ # The key should be the UTF-8 decoded string from the base64 data
271+ # with no additional deserialization or transformation applied
255272 assert result == "data"
273+ assert isinstance (result , str )
256274
257275
258276def test_kafka_consumer_default_deserializer_key_is_none (kafka_event_with_json_data , lambda_context ):
259- """Test Kafka consumer when no schema config is provided."""
260-
261- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = None
277+ # GIVEN
278+ # A Kafka event with a null key in the record
279+ kafka_event_with_null_key = deepcopy (kafka_event_with_json_data )
280+ kafka_event_with_null_key ["records" ]["my-topic-1" ][0 ]["key" ] = None
262281
282+ # A Kafka consumer with no schema configuration specified
263283 @kafka_consumer ()
264284 def handler (event : ConsumerRecords , context ):
265- # Capture the results to verify
285+ # Get the first record's key which should be None
266286 record = next (event .records )
267- # Should get raw base64-encoded data with no deserialization
268287 return record .key
269288
270- # Call the handler
271- result = handler (kafka_event_with_json_data , lambda_context )
289+ # WHEN
290+ # The handler processes the Kafka event with a null key
291+ result = handler (kafka_event_with_null_key , lambda_context )
272292
273- # Verify the results
293+ # THEN
294+ # The key should be preserved as None without any attempt at deserialization
274295 assert result is None
275296
276297
277298def test_kafka_consumer_metadata_fields (kafka_event_with_json_data , lambda_context ):
278- """Test Kafka consumer when no schema config is provided."""
279-
280- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = None
299+ # GIVEN
300+ # A Kafka event with specific metadata we want to verify is preserved
301+ kafka_event = deepcopy (kafka_event_with_json_data )
302+ kafka_event ["records" ]["my-topic-1" ][0 ]["key" ] = None
281303
304+ # A Kafka consumer with no schema configuration
305+ # that returns the full record object for inspection
282306 @kafka_consumer ()
283307 def handler (event : ConsumerRecords , context ):
284308 return event .record
285309
286- # Call the handler
287- result = handler (kafka_event_with_json_data , lambda_context )
310+ # WHEN
311+ # The handler processes the Kafka event and returns the record object
312+ result = handler (kafka_event , lambda_context )
313+
314+ # THEN
315+ # The record should preserve all original metadata fields
288316
289- # Verify the results
290- assert result .original_value == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ]
291- assert result .original_key == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ]
292- assert result .original_headers == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["headers" ]
317+ # Original encoded values should be preserved
318+ assert result .original_value == kafka_event ["records" ]["my-topic-1" ][0 ]["value" ]
319+ assert result .original_key == kafka_event ["records" ]["my-topic-1" ][0 ]["key" ]
320+
321+ # Original headers array should be preserved
322+ assert result .original_headers == kafka_event ["records" ]["my-topic-1" ][0 ]["headers" ]
323+
324+ # Headers should be parsed into a dictionary for easy access
293325 assert result .headers == {"headerKey" : b"headerValue" }
326+
327+ # Additional metadata checks could be added here:
328+ assert result .topic == kafka_event ["records" ]["my-topic-1" ][0 ]["topic" ]
329+ assert result .partition == kafka_event ["records" ]["my-topic-1" ][0 ]["partition" ]
330+ assert result .offset == kafka_event ["records" ]["my-topic-1" ][0 ]["offset" ]
331+ assert result .timestamp == kafka_event ["records" ]["my-topic-1" ][0 ]["timestamp" ]
0 commit comments