Skip to content

Commit a2e8327

Browse files
committed
Pauses ingestion during retention cleanup
Ensures data ingestion is paused during retention cleanup to prevent conflicts and data corruption. Sets a flag to prevent more data ingestion while partitions are being dropped to avoid race conditions.
1 parent 4f62e3e commit a2e8327

1 file changed

Lines changed: 16 additions & 6 deletions

File tree

src/ServiceControl.Audit.Persistence.Sql.SqlServer/Infrastructure/SqlServerPartitionManager.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
namespace ServiceControl.Audit.Persistence.Sql.SqlServer.Infrastructure;
22

33
using Microsoft.EntityFrameworkCore;
4+
using ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
45
using ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
56
using ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
67

78
// Partition/table names cannot be parameterized in SQL; all values come from internal constants and date formatting
89
#pragma warning disable EF1002, EF1003
9-
public class SqlServerPartitionManager : IPartitionManager
10+
public class SqlServerPartitionManager(MinimumRequiredStorageState storageState) : IPartitionManager
1011
{
1112
const string PartitionFunctionName = "pf_CreatedOn";
1213
const string PartitionSchemeName = "ps_CreatedOn";
@@ -54,14 +55,23 @@ public async Task DropPartition(AuditDbContextBase dbContext, DateTime partition
5455
// target partition is scanned.
5556
dbContext.Database.SetCommandTimeout(TimeSpan.FromMinutes(5));
5657

57-
foreach (var table in PartitionedTables)
58+
storageState.CanIngestMore = false;
59+
try
5860
{
61+
foreach (var table in PartitionedTables)
62+
{
63+
await dbContext.Database.ExecuteSqlRawAsync(
64+
"DELETE FROM " + table + " WHERE CreatedOn >= '" + hourStr + "' AND CreatedOn < '" + nextHourStr + "'", ct);
65+
}
66+
5967
await dbContext.Database.ExecuteSqlRawAsync(
60-
"DELETE FROM " + table + " WHERE CreatedOn >= '" + hourStr + "' AND CreatedOn < '" + nextHourStr + "'", ct);
61-
}
68+
"ALTER PARTITION FUNCTION " + PartitionFunctionName + "() MERGE RANGE ('" + hourStr + "')", ct);
6269

63-
await dbContext.Database.ExecuteSqlRawAsync(
64-
"ALTER PARTITION FUNCTION " + PartitionFunctionName + "() MERGE RANGE ('" + hourStr + "')", ct);
70+
}
71+
finally
72+
{
73+
storageState.CanIngestMore = true;
74+
}
6575
}
6676

6777
public async Task<List<DateTime>> GetExpiredPartitions(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken ct)

0 commit comments

Comments
 (0)