diff --git a/.gitignore b/.gitignore index 4d374fb0d..f647a6846 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,5 @@ src/Exceptionless\.Web/ClientApp.angular/dist/ *.DotSettings coverage/ +nul +tmpclaude* diff --git a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs index fade8ec24..c968f0262 100644 --- a/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs +++ b/src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs @@ -57,10 +57,9 @@ protected override async Task RunInternalAsync(JobContext context) var cacheKeysToRemove = new List(results.Documents.Count * 2); var existingSessionHeartbeatIds = new HashSet(StringComparer.OrdinalIgnoreCase); - foreach (var sessionStart in results.Documents) + foreach (var (sessionStart, heartbeatResult) in await GetHeartbeatsBatchAsync(results.Documents)) { var lastActivityUtc = sessionStart.Date.UtcDateTime.AddSeconds((double)sessionStart.Value.GetValueOrDefault()); - var heartbeatResult = await GetHeartbeatAsync(sessionStart); bool closeDuplicate = heartbeatResult?.CacheKey is not null && existingSessionHeartbeatIds.Contains(heartbeatResult.CacheKey); if (heartbeatResult?.CacheKey is not null && !closeDuplicate) @@ -112,33 +111,85 @@ protected override async Task RunInternalAsync(JobContext context) return JobResult.Success; } - private async Task GetHeartbeatAsync(PersistentEvent sessionStart) + private async Task<(PersistentEvent Session, HeartbeatResult? Heartbeat)[]> GetHeartbeatsBatchAsync(IReadOnlyCollection sessionCollection) { - string? sessionId = sessionStart.GetSessionId(); - if (!String.IsNullOrWhiteSpace(sessionId)) + var sessions = sessionCollection.ToList(); + var allHeartbeatKeys = new HashSet(StringComparer.OrdinalIgnoreCase); + var sessionKeyMap = new (string? SessionIdKey, string? UserIdentityKey)[sessions.Count]; + + for (int i = 0; i < sessions.Count; i++) { - var result = await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{sessionId.ToSHA1()}"); - if (result is not null) - return result; + var session = sessions[i]; + string? sessionIdKey = null; + string? userIdentityKey = null; + + string? sessionId = session.GetSessionId(); + if (!String.IsNullOrWhiteSpace(sessionId)) + { + sessionIdKey = $"Project:{session.ProjectId}:heartbeat:{sessionId.ToSHA1()}"; + allHeartbeatKeys.Add(sessionIdKey); + } + + var user = session.GetUserIdentity(_jsonOptions); + if (!String.IsNullOrWhiteSpace(user?.Identity)) + { + userIdentityKey = $"Project:{session.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"; + allHeartbeatKeys.Add(userIdentityKey); + } + + sessionKeyMap[i] = (sessionIdKey, userIdentityKey); } - var user = sessionStart.GetUserIdentity(_jsonOptions); - if (String.IsNullOrWhiteSpace(user?.Identity)) - return null; + if (allHeartbeatKeys.Count == 0) + return sessions.Select(s => (s, (HeartbeatResult?)null)).ToArray(); - return await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{user.Identity.ToSHA1()}"); - } + var heartbeatValues = await _cache.GetAllAsync(allHeartbeatKeys); - private async Task GetLastHeartbeatActivityUtcAsync(string cacheKey) - { - var cacheValue = await _cache.GetAsync(cacheKey); - if (cacheValue.HasValue) + var closeKeys = new HashSet(StringComparer.OrdinalIgnoreCase); + var resolved = new (DateTime ActivityUtc, string CacheKey)?[sessions.Count]; + + for (int i = 0; i < sessionKeyMap.Length; i++) + { + var (sessionIdKey, userIdentityKey) = sessionKeyMap[i]; + string? matchedKey = null; + DateTime activityUtc = default; + + if (sessionIdKey is not null && heartbeatValues.TryGetValue(sessionIdKey, out var sidVal) && sidVal.HasValue) + { + matchedKey = sessionIdKey; + activityUtc = sidVal.Value; + } + else if (userIdentityKey is not null && heartbeatValues.TryGetValue(userIdentityKey, out var uidVal) && uidVal.HasValue) + { + matchedKey = userIdentityKey; + activityUtc = uidVal.Value; + } + + if (matchedKey is not null) + { + resolved[i] = (activityUtc, matchedKey); + closeKeys.Add($"{matchedKey}-close"); + } + } + + IDictionary> closeValues = closeKeys.Count > 0 + ? await _cache.GetAllAsync(closeKeys) + : new Dictionary>(); + + var results = new (PersistentEvent Session, HeartbeatResult? Heartbeat)[sessions.Count]; + for (int i = 0; i < sessions.Count; i++) { - bool close = await _cache.GetAsync($"{cacheKey}-close", false); - return new HeartbeatResult { ActivityUtc = cacheValue.Value, Close = close, CacheKey = cacheKey }; + if (resolved[i] is not { } r) + { + results[i] = (sessions[i], null); + continue; + } + + bool close = closeValues.TryGetValue($"{r.CacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value; + results[i] = (sessions[i], new HeartbeatResult { ActivityUtc = r.ActivityUtc, Close = close, CacheKey = r.CacheKey }); } - return null; + return results; } public TimeSpan DefaultInactivePeriod { get; set; } = TimeSpan.FromMinutes(5); diff --git a/src/Exceptionless.Core/Services/MessageService.cs b/src/Exceptionless.Core/Services/MessageService.cs index 2592bee42..94b535aa1 100644 --- a/src/Exceptionless.Core/Services/MessageService.cs +++ b/src/Exceptionless.Core/Services/MessageService.cs @@ -3,6 +3,7 @@ using Exceptionless.Core.Repositories; using Exceptionless.Core.Utility; using Foundatio.Extensions.Hosting.Startup; +using Foundatio.Repositories.Elasticsearch; using Foundatio.Repositories.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -11,16 +12,37 @@ namespace Exceptionless.Core.Services; public class MessageService : IDisposable, IStartupAction { + private readonly IOrganizationRepository _organizationRepository; + private readonly IProjectRepository _projectRepository; + private readonly IUserRepository _userRepository; private readonly IStackRepository _stackRepository; private readonly IEventRepository _eventRepository; + private readonly ITokenRepository _tokenRepository; + private readonly IWebHookRepository _webHookRepository; private readonly IConnectionMapping _connectionMapping; private readonly AppOptions _options; private readonly ILogger _logger; + private readonly List _disposeActions = []; - public MessageService(IStackRepository stackRepository, IEventRepository eventRepository, IConnectionMapping connectionMapping, AppOptions options, ILoggerFactory loggerFactory) + public MessageService( + IOrganizationRepository organizationRepository, + IProjectRepository projectRepository, + IUserRepository userRepository, + IStackRepository stackRepository, + IEventRepository eventRepository, + ITokenRepository tokenRepository, + IWebHookRepository webHookRepository, + IConnectionMapping connectionMapping, + AppOptions options, + ILoggerFactory loggerFactory) { + _organizationRepository = organizationRepository; + _projectRepository = projectRepository; + _userRepository = userRepository; _stackRepository = stackRepository; _eventRepository = eventRepository; + _tokenRepository = tokenRepository; + _webHookRepository = webHookRepository; _connectionMapping = connectionMapping; _options = options; _logger = loggerFactory.CreateLogger() ?? NullLogger.Instance; @@ -31,26 +53,34 @@ public Task RunAsync(CancellationToken shutdownToken = default) if (!_options.EnableRepositoryNotifications) return Task.CompletedTask; - if (_stackRepository is StackRepository sr) - sr.BeforePublishEntityChanged.AddHandler(BeforePublishStackEntityChanged); - if (_eventRepository is EventRepository er) - er.BeforePublishEntityChanged.AddHandler(BeforePublishEventEntityChanged); + RegisterHandler(_organizationRepository); + RegisterHandler(_userRepository); + RegisterHandler(_projectRepository); + RegisterHandler(_stackRepository); + RegisterHandler(_eventRepository); + RegisterHandler(_tokenRepository); + RegisterHandler(_webHookRepository); return Task.CompletedTask; } - private async Task BeforePublishStackEntityChanged(object sender, BeforePublishEntityChangedEventArgs args) + private void RegisterHandler(object repository) where T : class, IIdentity, new() { - args.Cancel = await GetNumberOfListeners(args.Message) == 0; - if (args.Cancel) - _logger.LogTrace("Cancelled Stack Entity Changed Message: {@Message}", args.Message); + if (repository is not ElasticRepositoryBase repo) + return; + + Func, Task> handler = OnBeforePublishEntityChangedAsync; + repo.BeforePublishEntityChanged.AddHandler(handler); + _disposeActions.Add(() => repo.BeforePublishEntityChanged.RemoveHandler(handler)); } - private async Task BeforePublishEventEntityChanged(object sender, BeforePublishEntityChangedEventArgs args) + private async Task OnBeforePublishEntityChangedAsync(object sender, BeforePublishEntityChangedEventArgs args) + where T : class, IIdentity, new() { - args.Cancel = await GetNumberOfListeners(args.Message) == 0; + var listenerCount = await GetNumberOfListeners(args.Message); + args.Cancel = listenerCount == 0; if (args.Cancel) - _logger.LogTrace("Cancelled Persistent Event Entity Changed Message: {@Message}", args.Message); + _logger.LogTrace("Cancelled {EntityType} Entity Changed Message: {@Message}", typeof(T).Name, args.Message); } private Task GetNumberOfListeners(EntityChanged message) @@ -64,9 +94,8 @@ private Task GetNumberOfListeners(EntityChanged message) public void Dispose() { - if (_stackRepository is StackRepository sr) - sr.BeforePublishEntityChanged.RemoveHandler(BeforePublishStackEntityChanged); - if (_eventRepository is EventRepository er) - er.BeforePublishEntityChanged.RemoveHandler(BeforePublishEventEntityChanged); + foreach (var disposeAction in _disposeActions) + disposeAction(); + _disposeActions.Clear(); } }