@@ -100,42 +100,28 @@ class UserKeyClass:
100100
101101
102102def test_kafka_consumer_with_avro (kafka_event_with_avro_data , avro_value_schema , lambda_context ):
103- """Test Kafka consumer with Avro deserialization without output serialization."""
104-
105- # Create dict to capture results
106- result_data = {}
107-
103+ # GIVEN A Kafka consumer configured with Avro schema deserialization
108104 schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_value_schema )
109105
110106 @kafka_consumer (schema_config = schema_config )
111107 def handler (event : ConsumerRecords , context ):
112- # Capture the results to verify
113- record = next (event .records )
114- result_data ["value_type" ] = type (record .value ).__name__
115- result_data ["name" ] = record .value ["name" ]
116- result_data ["age" ] = record .value ["age" ]
117- return {"processed" : True }
108+ return event .record .value
118109
119- # Call the handler
110+ # WHEN The handler processes the Kafka event containing Avro-encoded data
120111 result = handler (kafka_event_with_avro_data , lambda_context )
121112
122- # Verify the results
123- assert result == {"processed" : True }
124- assert result_data ["value_type" ] == "dict"
125- assert result_data ["name" ] == "John Doe"
126- assert result_data ["age" ] == 30
113+ # THEN The Avro data should be correctly deserialized into a Python dictionary
114+ assert result ["name" ] == "John Doe"
115+ assert result ["age" ] == 30
127116
128117
129118def test_kafka_consumer_with_avro_and_dataclass (
130119 kafka_event_with_avro_data ,
131120 avro_value_schema ,
132121 lambda_context ,
133122):
134- """Test Kafka consumer with Avro deserialization and dataclass output serialization."""
135-
136- # Create dict to capture results
137- result_data = {}
138-
123+ # GIVEN A Kafka consumer configured with Avro schema deserialization
124+ # and a dataclass for output serialization
139125 schema_config = SchemaConfig (
140126 value_schema_type = "AVRO" ,
141127 value_schema = avro_value_schema ,
@@ -145,35 +131,33 @@ def test_kafka_consumer_with_avro_and_dataclass(
145131 @kafka_consumer (schema_config = schema_config )
146132 def handler (event : ConsumerRecords , context ):
147133 # Capture the results to verify
148- record = next (event .records )
149- result_data ["value_type" ] = type (record .value ).__name__
150- result_data ["name" ] = record .value .name
151- result_data ["age" ] = record .value .age
152- return {"processed" : True }
134+ value : UserValueDataClass = event .record .value
135+ return value
153136
154- # Call the handler
137+ # WHEN The handler processes the Kafka event containing Avro-encoded data
138+ # and serializes the output as a UserValueDataClass instance
155139 result = handler (kafka_event_with_avro_data , lambda_context )
156140
157- # Verify the results
158- assert result == { "processed" : True }
159- assert result_data [ "value_type" ] == "UserValueDataClass "
160- assert result_data [ "name" ] == "John Doe"
161- assert result_data [ "age" ] == 30
141+ # THEN The Avro data should be correctly deserialized and converted to a dataclass instance
142+ # with the expected property values
143+ assert result . name == "John Doe "
144+ assert result . age == 30
145+ assert isinstance ( result , UserValueDataClass )
162146
163147
164- def test_kafka_consumer_with_avro_and_custom_object (
148+ def test_kafka_consumer_with_avro_and_custom_function (
165149 kafka_event_with_avro_data ,
166150 avro_value_schema ,
167151 lambda_context ,
168152):
169- """Test Kafka consumer with Avro deserialization and custom object serialization."""
170-
153+ # GIVEN A custom serialization function that removes the age field from the dictionary
171154 def dict_output (data : dict ) -> dict :
155+ # removing age key
156+ del data ["age" ]
172157 return data
173158
174- # Create dict to capture results
175- result_data = {}
176-
159+ # A Kafka consumer configured with Avro schema deserialization
160+ # and a custom function for output transformation
177161 schema_config = SchemaConfig (
178162 value_schema_type = "AVRO" ,
179163 value_schema = avro_value_schema ,
@@ -183,23 +167,20 @@ def dict_output(data: dict) -> dict:
183167 @kafka_consumer (schema_config = schema_config )
184168 def handler (event : ConsumerRecords , context ):
185169 # Capture the results to verify
186- record = next (event .records )
187- result_data ["name" ] = record .value .get ("name" )
188- result_data ["age" ] = record .value .get ("age" )
189- return {"processed" : True }
170+ return event .record .value
190171
191- # Call the handler
172+ # WHEN The handler processes the Kafka event containing Avro-encoded data
173+ # and applies the custom transformation function to the output
192174 result = handler (kafka_event_with_avro_data , lambda_context )
193175
194- # Verify the results
195- assert result == { "processed" : True }
196- assert result_data ["name" ] == "John Doe"
197- assert result_data [ "age" ] == 30
176+ # THEN The Avro data should be correctly deserialized and transformed
177+ # with the name field intact but the age field removed
178+ assert result ["name" ] == "John Doe"
179+ assert "age" not in result
198180
199181
200182def test_kafka_consumer_with_invalid_avro_data (kafka_event_with_avro_data , lambda_context , avro_value_schema ):
201- """Test error handling when Avro data is invalid."""
202- # Create invalid avro data
183+ # GIVEN A Kafka event with deliberately corrupted Avro data
203184 invalid_data = base64 .b64encode (b"invalid avro data" ).decode ("utf-8" )
204185 kafka_event_with_avro_data_temp = deepcopy (kafka_event_with_avro_data )
205186 kafka_event_with_avro_data_temp ["records" ]["my-topic-1" ][0 ]["value" ] = invalid_data
@@ -209,11 +190,11 @@ def test_kafka_consumer_with_invalid_avro_data(kafka_event_with_avro_data, lambd
209190 @kafka_consumer (schema_config = schema_config )
210191 def lambda_handler (event : ConsumerRecords , context ):
211192 # This should never be reached if deserializer fails
212- record = next (event .records )
213- assert record .value
214- return {"processed" : True }
193+ return event .record .value
215194
216- # This should raise a deserialization error
195+ # WHEN/THEN
196+ # The handler should fail to process the invalid Avro data
197+ # and raise a specific deserialization error
217198 with pytest .raises (KafkaConsumerDeserializationError ) as excinfo :
218199 lambda_handler (kafka_event_with_avro_data_temp , lambda_context )
219200
@@ -223,8 +204,8 @@ def lambda_handler(event: ConsumerRecords, context):
223204
224205
225206def test_kafka_consumer_with_invalid_avro_schema (kafka_event_with_avro_data , lambda_context ):
226- """Test error handling when Avro data is invalid."""
227-
207+ # GIVEN
208+ # An intentionally malformed Avro schema with syntax errors
228209 avro_schema = """
229210 {
230211 "type": "record",
@@ -234,16 +215,17 @@ def test_kafka_consumer_with_invalid_avro_schema(kafka_event_with_avro_data, lam
234215 }
235216 """
236217
218+ # A Kafka consumer configured with the invalid schema
237219 schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_schema )
238220
239221 @kafka_consumer (schema_config = schema_config )
240222 def lambda_handler (event : ConsumerRecords , context ):
241223 # This should never be reached if deserializer fails
242- record = next (event .records )
243- assert record .value
244- return {"processed" : True }
224+ return event .record .value
245225
246- # This should raise a deserialization error
226+ # WHEN/THEN
227+ # The handler should fail during initialization when it tries to parse the schema
228+ # and raise a specific schema parser error
247229 with pytest .raises (KafkaConsumerAvroSchemaParserError ) as excinfo :
248230 lambda_handler (kafka_event_with_avro_data , lambda_context )
249231
@@ -260,10 +242,10 @@ def test_kafka_consumer_with_key_deserialization(
260242):
261243 """Test deserializing both key and value with different schemas and serializers."""
262244
263- # Create dict to capture results
264245 key_value_result = {}
265246
266- # Create schema config with both key and value
247+ # GIVEN A Kafka consumer configured with Avro schemas for both key and value
248+ # with different output serializers for each
267249 schema_config = SchemaConfig (
268250 value_schema_type = "AVRO" ,
269251 value_schema = avro_value_schema ,
@@ -283,27 +265,47 @@ def lambda_handler(event: ConsumerRecords, context):
283265 key_value_result ["value_age" ] = record .value .age
284266 return {"processed" : True }
285267
286- # Call the handler
268+ # WHEN
269+ # The handler processes the Kafka event, deserializing both key and value
287270 result = lambda_handler (kafka_event_with_avro_data , lambda_context )
288271
289- # Verify the results
272+ # THEN
273+ # The handler should return success and the captured properties should match expectations
290274 assert result == {"processed" : True }
275+
276+ # Key should be correctly deserialized into a UserKeyClass instance
291277 assert key_value_result ["key_type" ] == "UserKeyClass"
292278 assert key_value_result ["key_id" ] == "user-123"
279+
280+ # Value should be correctly deserialized into a UserValueDataClass instance
293281 assert key_value_result ["value_type" ] == "UserValueDataClass"
294282 assert key_value_result ["value_name" ] == "John Doe"
295283 assert key_value_result ["value_age" ] == 30
296284
297285
298286def test_kafka_consumer_without_avro_value_schema ():
299- """Test error handling when Avro data is invalid."""
287+ # GIVEN
288+ # A scenario where AVRO schema type is specified for value
289+ # but no actual schema is provided
300290
301- with pytest .raises (KafkaConsumerMissingSchemaError ):
291+ # WHEN/THEN
292+ # SchemaConfig initialization should fail with an appropriate error
293+ with pytest .raises (KafkaConsumerMissingSchemaError ) as excinfo :
302294 SchemaConfig (value_schema_type = "AVRO" , value_schema = None )
303295
296+ # Verify the error message mentions 'value_schema'
297+ assert "value_schema" in str (excinfo .value )
298+
304299
305300def test_kafka_consumer_without_avro_key_schema ():
306- """Test error handling when Avro data is invalid."""
301+ # GIVEN
302+ # A scenario where AVRO schema type is specified for key
303+ # but no actual schema is provided
307304
308- with pytest .raises (KafkaConsumerMissingSchemaError ):
305+ # WHEN/THEN
306+ # SchemaConfig initialization should fail with an appropriate error
307+ with pytest .raises (KafkaConsumerMissingSchemaError ) as excinfo :
309308 SchemaConfig (key_schema_type = "AVRO" , key_schema = None )
309+
310+ # Verify the error message mentions 'key_schema'
311+ assert "key_schema" in str (excinfo .value )
0 commit comments