@@ -20,6 +20,8 @@ namespace ServiceControl.Audit.Persistence.Tests.MongoDB.Shared
2020
2121 /// <summary>
2222 /// Base class for body storage tests that can run against different MongoDB-compatible products.
23+ /// Bodies are stored asynchronously in the messageBodies collection via a background writer.
24+ /// Tests must wait for the background writer to flush before asserting.
2325 /// </summary>
2426 public abstract class BodyStorageTestsBase
2527 {
@@ -94,11 +96,10 @@ public async Task Should_store_and_retrieve_text_body()
9496 var bodyContent = "{ \" message\" : \" Hello, World!\" }" ;
9597 var message = CreateProcessedMessage ( messageId , "application/json" ) ;
9698
97- // Ingest message with body via unit of work
9899 await IngestMessage ( factory , message , Encoding . UTF8 . GetBytes ( bodyContent ) ) . ConfigureAwait ( false ) ;
99100
100- // Retrieve body via IBodyStorage
101- var result = await bodyStorage . TryFetch ( messageId , CancellationToken . None ) . ConfigureAwait ( false ) ;
101+ // Wait for the background body writer to flush to messageBodies collection
102+ var result = await WaitForBodyAsync ( bodyStorage , messageId ) . ConfigureAwait ( false ) ;
102103
103104 Assert . Multiple ( ( ) =>
104105 {
@@ -123,11 +124,10 @@ public async Task Should_store_and_retrieve_binary_body()
123124 var binaryContent = new byte [ ] { 0x00 , 0x01 , 0x02 , 0xFF , 0xFE , 0xFD } ;
124125 var message = CreateProcessedMessage ( messageId , "application/octet-stream" ) ;
125126
126- // Ingest message with binary body
127127 await IngestMessage ( factory , message , binaryContent ) . ConfigureAwait ( false ) ;
128128
129- // Binary bodies should be retrievable
130- var result = await bodyStorage . TryFetch ( messageId , CancellationToken . None ) . ConfigureAwait ( false ) ;
129+ // Wait for the background body writer to flush
130+ var result = await WaitForBodyAsync ( bodyStorage , messageId ) . ConfigureAwait ( false ) ;
131131
132132 Assert . Multiple ( ( ) =>
133133 {
@@ -141,11 +141,11 @@ public async Task Should_store_and_retrieve_binary_body()
141141 await result . Stream . CopyToAsync ( memoryStream ) . ConfigureAwait ( false ) ;
142142 Assert . That ( memoryStream . ToArray ( ) , Is . EqualTo ( binaryContent ) , "Binary content should match" ) ;
143143
144- // Verify it's stored in binaryBody field, not body field
145- var collection = database . GetCollection < BsonDocument > ( CollectionNames . ProcessedMessages ) ;
144+ // Verify it's stored as binaryBody ( not textBody) in messageBodies collection
145+ var collection = database . GetCollection < BsonDocument > ( CollectionNames . MessageBodies ) ;
146146 var doc = await collection . Find ( Builders < BsonDocument > . Filter . Eq ( "_id" , messageId ) ) . FirstOrDefaultAsync ( ) . ConfigureAwait ( false ) ;
147- Assert . That ( doc , Is . Not . Null , "Message should be stored " ) ;
148- Assert . That ( doc . Contains ( "body" ) && doc [ "body" ] != BsonNull . Value , Is . False , "Text body field should be null for binary content" ) ;
147+ Assert . That ( doc , Is . Not . Null , "Body document should exist in messageBodies collection " ) ;
148+ Assert . That ( doc . Contains ( "textBody" ) , Is . False , "Text body field should not exist for binary content" ) ;
149149 Assert . That ( doc . Contains ( "binaryBody" ) && doc [ "binaryBody" ] != BsonNull . Value , Is . True , "Binary body field should have content" ) ;
150150 }
151151
@@ -160,23 +160,23 @@ public async Task Should_return_no_result_for_nonexistent_body()
160160 }
161161
162162 [ Test ]
163- public async Task Should_store_body_inline_in_processed_message ( )
163+ public async Task Should_store_body_in_message_bodies_collection ( )
164164 {
165165 var factory = host . Services . GetRequiredService < IAuditIngestionUnitOfWorkFactory > ( ) ;
166166
167- var messageId = "inline-storage -test" ;
168- var bodyContent = "{ \" test\" : \" inline body storage \" }" ;
167+ var messageId = "separate-collection -test" ;
168+ var bodyContent = "{ \" test\" : \" body in separate collection \" }" ;
169169 var message = CreateProcessedMessage ( messageId , "application/json" ) ;
170170
171171 await IngestMessage ( factory , message , Encoding . UTF8 . GetBytes ( bodyContent ) ) . ConfigureAwait ( false ) ;
172172
173- // Verify body is stored inline in ProcessedMessages collection
174- var collection = database . GetCollection < BsonDocument > ( CollectionNames . ProcessedMessages ) ;
175- var doc = await collection . Find ( Builders < BsonDocument > . Filter . Eq ( "_id" , messageId ) ) . FirstOrDefaultAsync ( ) . ConfigureAwait ( false ) ;
173+ // Wait for the background body writer to flush to messageBodies collection
174+ var collection = database . GetCollection < BsonDocument > ( CollectionNames . MessageBodies ) ;
175+ var doc = await WaitForDocumentAsync ( collection , messageId ) . ConfigureAwait ( false ) ;
176176
177- Assert . That ( doc , Is . Not . Null , "Message should be stored " ) ;
178- Assert . That ( doc . Contains ( "body" ) , Is . True , "Document should have body field " ) ;
179- Assert . That ( doc [ "body " ] . AsString , Is . EqualTo ( bodyContent ) , "Body should be stored as plain UTF-8 text " ) ;
177+ Assert . That ( doc , Is . Not . Null , "Body document should exist in messageBodies collection " ) ;
178+ Assert . That ( doc [ "textBody" ] . AsString , Is . EqualTo ( bodyContent ) , "Body should be stored as UTF-8 text in messageBodies " ) ;
179+ Assert . That ( doc [ "contentType " ] . AsString , Is . EqualTo ( "application/json" ) , "Content type should be stored" ) ;
180180 }
181181
182182 [ Test ]
@@ -201,8 +201,6 @@ public async Task Should_not_store_body_when_body_storage_type_is_none()
201201
202202 try
203203 {
204- var clientProvider = testHost . Services . GetRequiredService < IMongoClientProvider > ( ) ;
205- var testDatabase = clientProvider . Database ;
206204 var factory = testHost . Services . GetRequiredService < IAuditIngestionUnitOfWorkFactory > ( ) ;
207205 var bodyStorage = testHost . Services . GetRequiredService < IBodyStorage > ( ) ;
208206
@@ -216,12 +214,6 @@ public async Task Should_not_store_body_when_body_storage_type_is_none()
216214 // Assert - TryFetch should return no result
217215 var result = await bodyStorage . TryFetch ( messageId , CancellationToken . None ) . ConfigureAwait ( false ) ;
218216 Assert . That ( result . HasResult , Is . False , "Body should not be retrievable when BodyStorageType is None" ) ;
219-
220- // Assert - Message should be stored but without body
221- var collection = testDatabase . GetCollection < BsonDocument > ( CollectionNames . ProcessedMessages ) ;
222- var doc = await collection . Find ( Builders < BsonDocument > . Filter . Eq ( "_id" , messageId ) ) . FirstOrDefaultAsync ( ) . ConfigureAwait ( false ) ;
223- Assert . That ( doc , Is . Not . Null , "Message should be stored" ) ;
224- Assert . That ( doc . Contains ( "body" ) && doc [ "body" ] != BsonNull . Value , Is . False , "Body field should be null when BodyStorageType is None" ) ;
225217 }
226218 finally
227219 {
@@ -234,7 +226,7 @@ public async Task Should_not_store_body_when_body_storage_type_is_none()
234226 [ Test ]
235227 public async Task Should_not_store_body_when_body_exceeds_max_size ( )
236228 {
237- // Arrange - Create a host with a small max body size
229+ // Arrange - Create a separate host with a small max body size
238230 var testDatabaseName = $ "test_maxsize_{ Guid . NewGuid ( ) : N} ";
239231 var connectionString = Environment . BuildConnectionString ( testDatabaseName ) ;
240232
@@ -253,8 +245,6 @@ public async Task Should_not_store_body_when_body_exceeds_max_size()
253245
254246 try
255247 {
256- var clientProvider = testHost . Services . GetRequiredService < IMongoClientProvider > ( ) ;
257- var testDatabase = clientProvider . Database ;
258248 var factory = testHost . Services . GetRequiredService < IAuditIngestionUnitOfWorkFactory > ( ) ;
259249 var bodyStorage = testHost . Services . GetRequiredService < IBodyStorage > ( ) ;
260250
@@ -265,13 +255,6 @@ public async Task Should_not_store_body_when_body_exceeds_max_size()
265255 // Act - Ingest a message with a body that exceeds max size
266256 await IngestMessage ( factory , message , body ) . ConfigureAwait ( false ) ;
267257
268- // Assert - Message should be stored, but body should NOT (too large)
269- var collection = testDatabase . GetCollection < BsonDocument > ( CollectionNames . ProcessedMessages ) ;
270- var doc = await collection . Find ( Builders < BsonDocument > . Filter . Eq ( "_id" , messageId ) ) . FirstOrDefaultAsync ( ) . ConfigureAwait ( false ) ;
271-
272- Assert . That ( doc , Is . Not . Null , "Message should be stored" ) ;
273- Assert . That ( doc . Contains ( "body" ) && doc [ "body" ] != BsonNull . Value , Is . False , "Body should NOT be stored when it exceeds max size" ) ;
274-
275258 // Assert - TryFetch should return no result
276259 var result = await bodyStorage . TryFetch ( messageId , CancellationToken . None ) . ConfigureAwait ( false ) ;
277260 Assert . That ( result . HasResult , Is . False , "Body should not be retrievable when it exceeds max size" ) ;
@@ -297,6 +280,50 @@ static async Task IngestMessage(IAuditIngestionUnitOfWorkFactory factory, Proces
297280 }
298281 }
299282
283+ /// <summary>
284+ /// Polls IBodyStorage.TryFetch until the body appears or timeout is reached.
285+ /// Bodies are written asynchronously by a background writer, so they may not be
286+ /// immediately available after ingestion.
287+ /// </summary>
288+ static async Task < StreamResult > WaitForBodyAsync ( IBodyStorage bodyStorage , string bodyId , int timeoutMs = 5000 )
289+ {
290+ var deadline = DateTime . UtcNow . AddMilliseconds ( timeoutMs ) ;
291+ StreamResult result ;
292+ do
293+ {
294+ result = await bodyStorage . TryFetch ( bodyId , CancellationToken . None ) . ConfigureAwait ( false ) ;
295+ if ( result . HasResult )
296+ {
297+ return result ;
298+ }
299+
300+ await Task . Delay ( 100 ) . ConfigureAwait ( false ) ;
301+ } while ( DateTime . UtcNow < deadline ) ;
302+
303+ return result ;
304+ }
305+
306+ /// <summary>
307+ /// Polls a MongoDB collection until a document with the given ID appears or timeout is reached.
308+ /// </summary>
309+ static async Task < BsonDocument > WaitForDocumentAsync ( IMongoCollection < BsonDocument > collection , string id , int timeoutMs = 5000 )
310+ {
311+ var deadline = DateTime . UtcNow . AddMilliseconds ( timeoutMs ) ;
312+ BsonDocument doc ;
313+ do
314+ {
315+ doc = await collection . Find ( Builders < BsonDocument > . Filter . Eq ( "_id" , id ) ) . FirstOrDefaultAsync ( ) . ConfigureAwait ( false ) ;
316+ if ( doc != null )
317+ {
318+ return doc ;
319+ }
320+
321+ await Task . Delay ( 100 ) . ConfigureAwait ( false ) ;
322+ } while ( DateTime . UtcNow < deadline ) ;
323+
324+ return doc ;
325+ }
326+
300327 static ProcessedMessage CreateProcessedMessage ( string messageId , string contentType = "text/plain" )
301328 {
302329 var headers = new Dictionary < string , string >
0 commit comments