@@ -54,38 +54,33 @@ class UserKeyModel(BaseModel):
5454
5555
5656def test_kafka_consumer_with_json_value_and_pydantic (kafka_event_with_json_data , lambda_context ):
57- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
58-
59- # Create dict to capture results
60- result_data = {}
61-
57+ # GIVEN
58+ # A Kafka consumer configured to deserialize JSON data
59+ # and convert it to a Pydantic model instance
6260 schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
6361
6462 @kafka_consumer (schema_config = schema_config )
6563 def handler (event : ConsumerRecords , context ):
66- # Capture the results to verify
67- record = next ( event . records )
68- result_data [ "value_type" ] = type ( record .value ). __name__
69- result_data [ "name" ] = record . value . name
70- result_data [ "age" ] = record . value . age
71- return { "processed" : True }
72-
73- # Call the handler
64+ # Extract the deserialized and serialized value
65+ # which should be a UserValueModel instance
66+ value : UserValueModel = event . record .value
67+ return value
68+
69+ # WHEN
70+ # The handler processes a Kafka event containing JSON-encoded data
71+ # which is deserialized into a dictionary and then converted to a Pydantic model
7472 result = handler (kafka_event_with_json_data , lambda_context )
7573
76- # Verify the results
77- assert result == { "processed" : True }
78- assert result_data [ "value_type" ] == " UserValueModel"
79- assert result_data [ " name" ] == "John Doe"
80- assert result_data [ " age" ] == 30
74+ # THEN
75+ # The result should be a UserValueModel instance with the correct properties
76+ assert isinstance ( result , UserValueModel )
77+ assert result . name == "John Doe"
78+ assert result . age == 30
8179
8280
8381def test_kafka_consumer_with_json_value_and_union_tag (kafka_event_with_json_data , lambda_context ):
8482 """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
8583
86- # Create dict to capture results
87- result_data = {}
88-
8984 class UserValueModel (BaseModel ):
9085 name : Literal ["John Doe" ]
9186 age : int
@@ -96,67 +91,73 @@ class UserValueModel2(BaseModel):
9691
9792 UnionModel = Annotated [Union [UserValueModel , UserValueModel2 ], Field (discriminator = "name" )]
9893
94+ # GIVEN
95+ # A Kafka consumer configured to deserialize JSON data
96+ # and convert it to a Pydantic model instance with Union Tags
9997 schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UnionModel )
10098
10199 @kafka_consumer (schema_config = schema_config )
102100 def handler (event : ConsumerRecords , context ):
103- # Capture the results to verify
104- record = next ( event . records )
105- result_data [ "value_type" ] = type ( record .value ). __name__
106- result_data [ "name" ] = record . value . name
107- result_data [ "age" ] = record . value . age
108- return { "processed" : True }
109-
110- # Call the handler
101+ # Extract the deserialized and serialized value
102+ # which should be a UserValueModel instance
103+ value : UserValueModel = event . record .value
104+ return value
105+
106+ # WHEN
107+ # The handler processes a Kafka event containing JSON-encoded data
108+ # which is deserialized into a dictionary and then converted to a Pydantic model
111109 result = handler (kafka_event_with_json_data , lambda_context )
112110
113- # Verify the results
114- assert result == { "processed" : True }
115- assert result_data [ "value_type" ] == " UserValueModel"
116- assert result_data [ " name" ] == "John Doe"
117- assert result_data [ " age" ] == 30
111+ # THEN
112+ # The result should be a UserValueModel instance with the correct properties
113+ assert isinstance ( result , UserValueModel )
114+ assert result . name == "John Doe"
115+ assert result . age == 30
118116
119117
120118def test_kafka_consumer_with_json_key_and_pydantic (kafka_event_with_json_data , lambda_context ):
121- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
122-
123- # Create dict to capture results
124- result_data = {}
125-
126- schema_config = SchemaConfig (key_schema_type = "JSON" , key_output_serializer = UserKeyModel )
119+ # GIVEN
120+ # A Kafka consumer configured to deserialize only the key using JSON
121+ # and convert it to a Pydantic UserKeyModel instance
122+ schema_config = SchemaConfig (
123+ key_schema_type = "JSON" ,
124+ key_output_serializer = UserKeyModel ,
125+ )
127126
128127 @kafka_consumer (schema_config = schema_config )
129128 def handler (event : ConsumerRecords , context ):
130- # Capture the results to verify
131- record = next (event .records )
132- result_data ["value_type" ] = type (record .key ).__name__
133- result_data ["user_id" ] = record .key .user_id
134- return {"processed" : True }
129+ # Extract the deserialized key to verify
130+ key : UserKeyModel = event .record .key
131+ return key
135132
136- # Call the handler
133+ # WHEN
134+ # The handler processes a Kafka event, deserializing only the key portion as JSON
135+ # while leaving the value in its original format
137136 result = handler (kafka_event_with_json_data , lambda_context )
138137
139- # Verify the results
140- assert result == {"processed" : True }
141- assert result_data ["value_type" ] == "UserKeyModel"
142- assert result_data ["user_id" ] == "123"
138+ # THEN
139+ # The key should be properly deserialized from JSON and converted to a UserKeyModel
140+ # with the expected user_id value
141+ assert isinstance (result , UserKeyModel )
142+ assert result .user_id == "123"
143143
144144
145- # Tests for Complex Types with Pydantic TypeAdapter
146145def test_kafka_consumer_with_multiple_records (lambda_context ):
147- """Test processing multiple records in a single event."""
148-
149- # Create data for multiple records
146+ # GIVEN
147+ # Three different user records to process
148+ # First user: John Doe, age 30
150149 data1 = {"name" : "John Doe" , "age" : 30 }
150+ # Second user: Jane Smith, age 25
151151 data2 = {"name" : "Jane Smith" , "age" : 25 }
152+ # Third user: Bob Johnson, age 40
152153 data3 = {"name" : "Bob Johnson" , "age" : 40 }
153154
154- # Encode the data
155+ # Base64-encoded JSON data for each record
155156 encoded1 = base64 .b64encode (json .dumps (data1 ).encode ("utf-8" )).decode ("utf-8" )
156157 encoded2 = base64 .b64encode (json .dumps (data2 ).encode ("utf-8" )).decode ("utf-8" )
157158 encoded3 = base64 .b64encode (json .dumps (data3 ).encode ("utf-8" )).decode ("utf-8" )
158159
159- # Create a kafka event with multiple records
160+ # A Kafka event containing multiple records across different offsets
160161 multi_record_event = {
161162 "eventSource" : "aws:kafka" ,
162163 "records" : {
@@ -195,25 +196,31 @@ def test_kafka_consumer_with_multiple_records(lambda_context):
195196 },
196197 }
197198
198- # Create schema config
199- schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
200-
201- # Create list to store processed records
199+ # A list to capture processed record details
202200 processed_records = []
203201
202+ # A Kafka consumer configured to deserialize JSON and convert to Pydantic models
203+ schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
204+
204205 @kafka_consumer (schema_config = schema_config )
205206 def handler (event : ConsumerRecords , context ):
206- # Process all records
207+ # Process each record and collect its properties
207208 for record in event .records :
208209 processed_records .append ({"name" : record .value .name , "age" : record .value .age })
209210 return {"processed" : len (processed_records )}
210211
211- # Call the handler
212+ # WHEN
213+ # The handler processes the Kafka event containing multiple JSON records
212214 result = handler (multi_record_event , lambda_context )
213215
214- # Verify the results
216+ # THEN
217+ # The handler should successfully process all three records
218+ # and return the correct count
215219 assert result == {"processed" : 3 }
216220 assert len (processed_records ) == 3
221+
222+ # All three users should be correctly deserialized and processed
223+ # regardless of their order in the event
217224 assert any (r ["name" ] == "John Doe" and r ["age" ] == 30 for r in processed_records )
218225 assert any (r ["name" ] == "Jane Smith" and r ["age" ] == 25 for r in processed_records )
219226 assert any (r ["name" ] == "Bob Johnson" and r ["age" ] == 40 for r in processed_records )
0 commit comments