From 8ab732834c75a24d5bb915b99e8c6f0c6d2c0887 Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Wed, 18 Feb 2026 23:28:16 -0600 Subject: [PATCH 1/4] Optimize the close inactive sessions job --- .../Jobs/CloseInactiveSessionsJob.cs | 83 ++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs index fade8ec24..d1e70e151 100644 --- a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs +++ b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs @@ -57,10 +57,12 @@ protected override async Task RunInternalAsync(JobContext context) var cacheKeysToRemove = new List(results.Documents.Count * 2); var existingSessionHeartbeatIds = new HashSet(StringComparer.OrdinalIgnoreCase); + var heartbeats = await GetHeartbeatsBatchAsync(results.Documents); + foreach (var sessionStart in results.Documents) { var lastActivityUtc = sessionStart.Date.UtcDateTime.AddSeconds((double)sessionStart.Value.GetValueOrDefault()); - var heartbeatResult = await GetHeartbeatAsync(sessionStart); + heartbeats.TryGetValue(sessionStart, out var heartbeatResult); bool closeDuplicate = heartbeatResult?.CacheKey is not null && existingSessionHeartbeatIds.Contains(heartbeatResult.CacheKey); if (heartbeatResult?.CacheKey is not null && !closeDuplicate) @@ -112,33 +114,76 @@ protected override async Task RunInternalAsync(JobContext context) return JobResult.Success; } - private async Task GetHeartbeatAsync(PersistentEvent sessionStart) + private async Task> GetHeartbeatsBatchAsync(IReadOnlyCollection sessions) { - string? sessionId = sessionStart.GetSessionId(); - if (!String.IsNullOrWhiteSpace(sessionId)) + var allHeartbeatKeys = new HashSet(StringComparer.OrdinalIgnoreCase); + var sessionKeyMap = new List<(PersistentEvent Session, string? SessionIdKey, string? UserIdentityKey)>(sessions.Count); + + foreach (var sessionStart in sessions) { - var result = await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{sessionId.ToSHA1()}"); - if (result is not null) - return result; + string? sessionIdKey = null; + string? userIdentityKey = null; + + string? sessionId = sessionStart.GetSessionId(); + if (!String.IsNullOrWhiteSpace(sessionId)) + { + sessionIdKey = $"Project:{sessionStart.ProjectId}:heartbeat:{sessionId.ToSHA1()}"; + allHeartbeatKeys.Add(sessionIdKey); + } + + var user = sessionStart.GetUserIdentity(_jsonOptions); + if (!String.IsNullOrWhiteSpace(user?.Identity)) + { + userIdentityKey = $"Project:{sessionStart.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"; + allHeartbeatKeys.Add(userIdentityKey); + } + + sessionKeyMap.Add((sessionStart, sessionIdKey, userIdentityKey)); } - var user = sessionStart.GetUserIdentity(_jsonOptions); - if (String.IsNullOrWhiteSpace(user?.Identity)) - return null; + if (allHeartbeatKeys.Count == 0) + return new Dictionary(); - return await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"); - } + var heartbeatValues = await _cache.GetAllAsync(allHeartbeatKeys); - private async Task GetLastHeartbeatActivityUtcAsync(string cacheKey) - { - var cacheValue = await _cache.GetAsync(cacheKey); - if (cacheValue.HasValue) + var closeKeys = new HashSet(StringComparer.OrdinalIgnoreCase); + var resolved = new Dictionary(sessions.Count); + + foreach (var (session, sessionIdKey, userIdentityKey) in sessionKeyMap) + { + string? matchedKey = null; + DateTime activityUtc = default; + + if (sessionIdKey is not null && heartbeatValues.TryGetValue(sessionIdKey, out var sidVal) && sidVal.HasValue) + { + matchedKey = sessionIdKey; + activityUtc = sidVal.Value; + } + else if (userIdentityKey is not null && heartbeatValues.TryGetValue(userIdentityKey, out var uidVal) && uidVal.HasValue) + { + matchedKey = userIdentityKey; + activityUtc = uidVal.Value; + } + + if (matchedKey is not null) + { + resolved[session] = (activityUtc, matchedKey); + closeKeys.Add($"{matchedKey}-close"); + } + } + + IDictionary> closeValues = closeKeys.Count > 0 + ? await _cache.GetAllAsync(closeKeys) + : new Dictionary>(); + + var results = new Dictionary(resolved.Count); + foreach (var (session, (activityUtc, cacheKey)) in resolved) { - bool close = await _cache.GetAsync($"{cacheKey}-close", false); - return new HeartbeatResult { ActivityUtc = cacheValue.Value, Close = close, CacheKey = cacheKey }; + bool close = closeValues.TryGetValue($"{cacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value; + results[session] = new HeartbeatResult { ActivityUtc = activityUtc, Close = close, CacheKey = cacheKey }; } - return null; + return results; } public TimeSpan DefaultInactivePeriod { get; set; } = TimeSpan.FromMinutes(5); From bf12cccb0ef549f120caa4aa61a485ad9baaf0fc Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Wed, 18 Feb 2026 23:34:39 -0600 Subject: [PATCH 2/4] Feedback --- .../Jobs/CloseInactiveSessionsJob.cs | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs index d1e70e151..d832131e5 100644 --- a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs +++ b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs @@ -57,12 +57,14 @@ protected override async Task RunInternalAsync(JobContext context) var cacheKeysToRemove = new List(results.Documents.Count * 2); var existingSessionHeartbeatIds = new HashSet(StringComparer.OrdinalIgnoreCase); - var heartbeats = await GetHeartbeatsBatchAsync(results.Documents); + var documents = results.Documents as IReadOnlyList ?? [.. results.Documents]; + var heartbeats = await GetHeartbeatsBatchAsync(documents); - foreach (var sessionStart in results.Documents) + for (int i = 0; i < documents.Count; i++) { + var sessionStart = documents[i]; var lastActivityUtc = sessionStart.Date.UtcDateTime.AddSeconds((double)sessionStart.Value.GetValueOrDefault()); - heartbeats.TryGetValue(sessionStart, out var heartbeatResult); + var heartbeatResult = heartbeats[i]; bool closeDuplicate = heartbeatResult?.CacheKey is not null && existingSessionHeartbeatIds.Contains(heartbeatResult.CacheKey); if (heartbeatResult?.CacheKey is not null && !closeDuplicate) @@ -114,43 +116,45 @@ protected override async Task RunInternalAsync(JobContext context) return JobResult.Success; } - private async Task> GetHeartbeatsBatchAsync(IReadOnlyCollection sessions) + private async Task GetHeartbeatsBatchAsync(IReadOnlyList sessions) { var allHeartbeatKeys = new HashSet(StringComparer.OrdinalIgnoreCase); - var sessionKeyMap = new List<(PersistentEvent Session, string? SessionIdKey, string? UserIdentityKey)>(sessions.Count); + var sessionKeyMap = new (string? SessionIdKey, string? UserIdentityKey)[sessions.Count]; - foreach (var sessionStart in sessions) + for (int i = 0; i < sessions.Count; i++) { + var session = sessions[i]; string? sessionIdKey = null; string? userIdentityKey = null; - string? sessionId = sessionStart.GetSessionId(); + string? sessionId = session.GetSessionId(); if (!String.IsNullOrWhiteSpace(sessionId)) { - sessionIdKey = $"Project:{sessionStart.ProjectId}:heartbeat:{sessionId.ToSHA1()}"; + sessionIdKey = $"Project:{session.ProjectId}:heartbeat:{sessionId.ToSHA1()}"; allHeartbeatKeys.Add(sessionIdKey); } - var user = sessionStart.GetUserIdentity(_jsonOptions); + var user = session.GetUserIdentity(_jsonOptions); if (!String.IsNullOrWhiteSpace(user?.Identity)) { - userIdentityKey = $"Project:{sessionStart.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"; + userIdentityKey = $"Project:{session.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"; allHeartbeatKeys.Add(userIdentityKey); } - sessionKeyMap.Add((sessionStart, sessionIdKey, userIdentityKey)); + sessionKeyMap[i] = (sessionIdKey, userIdentityKey); } if (allHeartbeatKeys.Count == 0) - return new Dictionary(); + return new HeartbeatResult?[sessions.Count]; var heartbeatValues = await _cache.GetAllAsync(allHeartbeatKeys); var closeKeys = new HashSet(StringComparer.OrdinalIgnoreCase); - var resolved = new Dictionary(sessions.Count); + var resolved = new (DateTime ActivityUtc, string CacheKey)?[sessions.Count]; - foreach (var (session, sessionIdKey, userIdentityKey) in sessionKeyMap) + for (int i = 0; i < sessionKeyMap.Length; i++) { + var (sessionIdKey, userIdentityKey) = sessionKeyMap[i]; string? matchedKey = null; DateTime activityUtc = default; @@ -167,7 +171,7 @@ private async Task> GetHeartbeatsBa if (matchedKey is not null) { - resolved[session] = (activityUtc, matchedKey); + resolved[i] = (activityUtc, matchedKey); closeKeys.Add($"{matchedKey}-close"); } } @@ -176,11 +180,14 @@ private async Task> GetHeartbeatsBa ? await _cache.GetAllAsync(closeKeys) : new Dictionary>(); - var results = new Dictionary(resolved.Count); - foreach (var (session, (activityUtc, cacheKey)) in resolved) + var results = new HeartbeatResult?[sessions.Count]; + for (int i = 0; i < resolved.Length; i++) { - bool close = closeValues.TryGetValue($"{cacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value; - results[session] = new HeartbeatResult { ActivityUtc = activityUtc, Close = close, CacheKey = cacheKey }; + if (resolved[i] is not { } r) + continue; + + bool close = closeValues.TryGetValue($"{r.CacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value; + results[i] = new HeartbeatResult { ActivityUtc = r.ActivityUtc, Close = close, CacheKey = r.CacheKey }; } return results; From b197c3233860b9751350e9eca9907cfac15495b2 Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Thu, 19 Feb 2026 11:31:34 -0600 Subject: [PATCH 3/4] refactor: return tuples from GetHeartbeatsBatchAsync for cleaner foreach loop Address PR feedback: - Remove weird IReadOnlyList cast at call site - GetHeartbeatsBatchAsync now returns (PersistentEvent, HeartbeatResult?) tuples - Caller uses simple foreach instead of index-based for loop - Accept IReadOnlyCollection to avoid cast at call site --- .../Jobs/CloseInactiveSessionsJob.cs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs index d832131e5..c968f0262 100644 --- a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs +++ b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs @@ -57,14 +57,9 @@ protected override async Task RunInternalAsync(JobContext context) var cacheKeysToRemove = new List(results.Documents.Count * 2); var existingSessionHeartbeatIds = new HashSet(StringComparer.OrdinalIgnoreCase); - var documents = results.Documents as IReadOnlyList ?? [.. results.Documents]; - var heartbeats = await GetHeartbeatsBatchAsync(documents); - - for (int i = 0; i < documents.Count; i++) + foreach (var (sessionStart, heartbeatResult) in await GetHeartbeatsBatchAsync(results.Documents)) { - var sessionStart = documents[i]; var lastActivityUtc = sessionStart.Date.UtcDateTime.AddSeconds((double)sessionStart.Value.GetValueOrDefault()); - var heartbeatResult = heartbeats[i]; bool closeDuplicate = heartbeatResult?.CacheKey is not null && existingSessionHeartbeatIds.Contains(heartbeatResult.CacheKey); if (heartbeatResult?.CacheKey is not null && !closeDuplicate) @@ -116,8 +111,9 @@ protected override async Task RunInternalAsync(JobContext context) return JobResult.Success; } - private async Task GetHeartbeatsBatchAsync(IReadOnlyList sessions) + private async Task<(PersistentEvent Session, HeartbeatResult? Heartbeat)[]> GetHeartbeatsBatchAsync(IReadOnlyCollection sessionCollection) { + var sessions = sessionCollection.ToList(); var allHeartbeatKeys = new HashSet(StringComparer.OrdinalIgnoreCase); var sessionKeyMap = new (string? SessionIdKey, string? UserIdentityKey)[sessions.Count]; @@ -145,7 +141,7 @@ protected override async Task RunInternalAsync(JobContext context) } if (allHeartbeatKeys.Count == 0) - return new HeartbeatResult?[sessions.Count]; + return sessions.Select(s => (s, (HeartbeatResult?)null)).ToArray(); var heartbeatValues = await _cache.GetAllAsync(allHeartbeatKeys); @@ -180,14 +176,17 @@ protected override async Task RunInternalAsync(JobContext context) ? await _cache.GetAllAsync(closeKeys) : new Dictionary>(); - var results = new HeartbeatResult?[sessions.Count]; - for (int i = 0; i < resolved.Length; i++) + var results = new (PersistentEvent Session, HeartbeatResult? Heartbeat)[sessions.Count]; + for (int i = 0; i < sessions.Count; i++) { if (resolved[i] is not { } r) + { + results[i] = (sessions[i], null); continue; + } bool close = closeValues.TryGetValue($"{r.CacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value; - results[i] = new HeartbeatResult { ActivityUtc = r.ActivityUtc, Close = close, CacheKey = r.CacheKey }; + results[i] = (sessions[i], new HeartbeatResult { ActivityUtc = r.ActivityUtc, Close = close, CacheKey = r.CacheKey }); } return results; From e921ed6ded982e3b90b36fe189f03a31d55963c7 Mon Sep 17 00:00:00 2001 From: "Eric J. Smith" Date: Thu, 19 Feb 2026 11:33:48 -0600 Subject: [PATCH 4/4] Filter messages that nobody is listening for --- .gitignore | 2 + .../Services/MessageService.cs | 61 ++++++++++++++----- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 4d374fb0d..f647a6846 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,5 @@ src/Exceptionless\.Web/ClientApp.angular/dist/ *.DotSettings coverage/ +nul +tmpclaude* diff --git a/src/Exceptionless.Core/Services/MessageService.cs b/src/Exceptionless.Core/Services/MessageService.cs index 2592bee42..94b535aa1 100644 --- a/src/Exceptionless.Core/Services/MessageService.cs +++ b/src/Exceptionless.Core/Services/MessageService.cs @@ -3,6 +3,7 @@ using Exceptionless.Core.Repositories; using Exceptionless.Core.Utility; using Foundatio.Extensions.Hosting.Startup; +using Foundatio.Repositories.Elasticsearch; using Foundatio.Repositories.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -11,16 +12,37 @@ namespace Exceptionless.Core.Services; public class MessageService : IDisposable, IStartupAction { + private readonly IOrganizationRepository _organizationRepository; + private readonly IProjectRepository _projectRepository; + private readonly IUserRepository _userRepository; private readonly IStackRepository _stackRepository; private readonly IEventRepository _eventRepository; + private readonly ITokenRepository _tokenRepository; + private readonly IWebHookRepository _webHookRepository; private readonly IConnectionMapping _connectionMapping; private readonly AppOptions _options; private readonly ILogger _logger; + private readonly List _disposeActions = []; - public MessageService(IStackRepository stackRepository, IEventRepository eventRepository, IConnectionMapping connectionMapping, AppOptions options, ILoggerFactory loggerFactory) + public MessageService( + IOrganizationRepository organizationRepository, + IProjectRepository projectRepository, + IUserRepository userRepository, + IStackRepository stackRepository, + IEventRepository eventRepository, + ITokenRepository tokenRepository, + IWebHookRepository webHookRepository, + IConnectionMapping connectionMapping, + AppOptions options, + ILoggerFactory loggerFactory) { + _organizationRepository = organizationRepository; + _projectRepository = projectRepository; + _userRepository = userRepository; _stackRepository = stackRepository; _eventRepository = eventRepository; + _tokenRepository = tokenRepository; + _webHookRepository = webHookRepository; _connectionMapping = connectionMapping; _options = options; _logger = loggerFactory.CreateLogger() ?? NullLogger.Instance; @@ -31,26 +53,34 @@ public Task RunAsync(CancellationToken shutdownToken = default) if (!_options.EnableRepositoryNotifications) return Task.CompletedTask; - if (_stackRepository is StackRepository sr) - sr.BeforePublishEntityChanged.AddHandler(BeforePublishStackEntityChanged); - if (_eventRepository is EventRepository er) - er.BeforePublishEntityChanged.AddHandler(BeforePublishEventEntityChanged); + RegisterHandler(_organizationRepository); + RegisterHandler(_userRepository); + RegisterHandler(_projectRepository); + RegisterHandler(_stackRepository); + RegisterHandler(_eventRepository); + RegisterHandler(_tokenRepository); + RegisterHandler(_webHookRepository); return Task.CompletedTask; } - private async Task BeforePublishStackEntityChanged(object sender, BeforePublishEntityChangedEventArgs args) + private void RegisterHandler(object repository) where T : class, IIdentity, new() { - args.Cancel = await GetNumberOfListeners(args.Message) == 0; - if (args.Cancel) - _logger.LogTrace("Cancelled Stack Entity Changed Message: {@Message}", args.Message); + if (repository is not ElasticRepositoryBase repo) + return; + + Func, Task> handler = OnBeforePublishEntityChangedAsync; + repo.BeforePublishEntityChanged.AddHandler(handler); + _disposeActions.Add(() => repo.BeforePublishEntityChanged.RemoveHandler(handler)); } - private async Task BeforePublishEventEntityChanged(object sender, BeforePublishEntityChangedEventArgs args) + private async Task OnBeforePublishEntityChangedAsync(object sender, BeforePublishEntityChangedEventArgs args) + where T : class, IIdentity, new() { - args.Cancel = await GetNumberOfListeners(args.Message) == 0; + var listenerCount = await GetNumberOfListeners(args.Message); + args.Cancel = listenerCount == 0; if (args.Cancel) - _logger.LogTrace("Cancelled Persistent Event Entity Changed Message: {@Message}", args.Message); + _logger.LogTrace("Cancelled {EntityType} Entity Changed Message: {@Message}", typeof(T).Name, args.Message); } private Task GetNumberOfListeners(EntityChanged message) @@ -64,9 +94,8 @@ private Task GetNumberOfListeners(EntityChanged message) public void Dispose() { - if (_stackRepository is StackRepository sr) - sr.BeforePublishEntityChanged.RemoveHandler(BeforePublishStackEntityChanged); - if (_eventRepository is EventRepository er) - er.BeforePublishEntityChanged.RemoveHandler(BeforePublishEventEntityChanged); + foreach (var disposeAction in _disposeActions) + disposeAction(); + _disposeActions.Clear(); } }