Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,5 @@ src/Exceptionless\.Web/ClientApp.angular/dist/
*.DotSettings

coverage/
nul
tmpclaude*
91 changes: 71 additions & 20 deletions src/Exceptionless.Core/Jobs/CloseInactiveSessionsJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ protected override async Task<JobResult> RunInternalAsync(JobContext context)
var cacheKeysToRemove = new List<string>(results.Documents.Count * 2);
var existingSessionHeartbeatIds = new HashSet<string>(StringComparer.OrdinalIgnoreCase);

foreach (var sessionStart in results.Documents)
foreach (var (sessionStart, heartbeatResult) in await GetHeartbeatsBatchAsync(results.Documents))
{
var lastActivityUtc = sessionStart.Date.UtcDateTime.AddSeconds((double)sessionStart.Value.GetValueOrDefault());
var heartbeatResult = await GetHeartbeatAsync(sessionStart);

bool closeDuplicate = heartbeatResult?.CacheKey is not null && existingSessionHeartbeatIds.Contains(heartbeatResult.CacheKey);
if (heartbeatResult?.CacheKey is not null && !closeDuplicate)
Expand Down Expand Up @@ -112,33 +111,85 @@ protected override async Task<JobResult> RunInternalAsync(JobContext context)
return JobResult.Success;
}

private async Task<HeartbeatResult?> GetHeartbeatAsync(PersistentEvent sessionStart)
private async Task<(PersistentEvent Session, HeartbeatResult? Heartbeat)[]> GetHeartbeatsBatchAsync(IReadOnlyCollection<PersistentEvent> sessionCollection)
{
string? sessionId = sessionStart.GetSessionId();
if (!String.IsNullOrWhiteSpace(sessionId))
var sessions = sessionCollection.ToList();
var allHeartbeatKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var sessionKeyMap = new (string? SessionIdKey, string? UserIdentityKey)[sessions.Count];

for (int i = 0; i < sessions.Count; i++)
{
var result = await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{sessionId.ToSHA1()}");
if (result is not null)
return result;
var session = sessions[i];
string? sessionIdKey = null;
string? userIdentityKey = null;

string? sessionId = session.GetSessionId();
if (!String.IsNullOrWhiteSpace(sessionId))
{
sessionIdKey = $"Project:{session.ProjectId}:heartbeat:{sessionId.ToSHA1()}";
allHeartbeatKeys.Add(sessionIdKey);
}

var user = session.GetUserIdentity(_jsonOptions);
if (!String.IsNullOrWhiteSpace(user?.Identity))
{
userIdentityKey = $"Project:{session.ProjectId}:heartbeat:{user.Identity.ToSHA1()}";
allHeartbeatKeys.Add(userIdentityKey);
}

sessionKeyMap[i] = (sessionIdKey, userIdentityKey);
}

var user = sessionStart.GetUserIdentity(_jsonOptions);
if (String.IsNullOrWhiteSpace(user?.Identity))
return null;
if (allHeartbeatKeys.Count == 0)
return sessions.Select(s => (s, (HeartbeatResult?)null)).ToArray();

return await GetLastHeartbeatActivityUtcAsync($"Project:{sessionStart.ProjectId}:heartbeat:{user.Identity.ToSHA1()}");
}
var heartbeatValues = await _cache.GetAllAsync<DateTime>(allHeartbeatKeys);

private async Task<HeartbeatResult?> GetLastHeartbeatActivityUtcAsync(string cacheKey)
{
var cacheValue = await _cache.GetAsync<DateTime>(cacheKey);
if (cacheValue.HasValue)
var closeKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var resolved = new (DateTime ActivityUtc, string CacheKey)?[sessions.Count];

for (int i = 0; i < sessionKeyMap.Length; i++)
{
var (sessionIdKey, userIdentityKey) = sessionKeyMap[i];
string? matchedKey = null;
DateTime activityUtc = default;

if (sessionIdKey is not null && heartbeatValues.TryGetValue(sessionIdKey, out var sidVal) && sidVal.HasValue)
{
matchedKey = sessionIdKey;
activityUtc = sidVal.Value;
}
else if (userIdentityKey is not null && heartbeatValues.TryGetValue(userIdentityKey, out var uidVal) && uidVal.HasValue)
{
matchedKey = userIdentityKey;
activityUtc = uidVal.Value;
}

if (matchedKey is not null)
{
resolved[i] = (activityUtc, matchedKey);
closeKeys.Add($"{matchedKey}-close");
}
}

IDictionary<string, CacheValue<bool>> closeValues = closeKeys.Count > 0
? await _cache.GetAllAsync<bool>(closeKeys)
: new Dictionary<string, CacheValue<bool>>();

var results = new (PersistentEvent Session, HeartbeatResult? Heartbeat)[sessions.Count];
for (int i = 0; i < sessions.Count; i++)
{
bool close = await _cache.GetAsync($"{cacheKey}-close", false);
return new HeartbeatResult { ActivityUtc = cacheValue.Value, Close = close, CacheKey = cacheKey };
if (resolved[i] is not { } r)
{
results[i] = (sessions[i], null);
continue;
}

bool close = closeValues.TryGetValue($"{r.CacheKey}-close", out var closeVal) && closeVal.HasValue && closeVal.Value;
results[i] = (sessions[i], new HeartbeatResult { ActivityUtc = r.ActivityUtc, Close = close, CacheKey = r.CacheKey });
}

return null;
return results;
}

public TimeSpan DefaultInactivePeriod { get; set; } = TimeSpan.FromMinutes(5);
Expand Down
61 changes: 45 additions & 16 deletions src/Exceptionless.Core/Services/MessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Exceptionless.Core.Repositories;
using Exceptionless.Core.Utility;
using Foundatio.Extensions.Hosting.Startup;
using Foundatio.Repositories.Elasticsearch;
using Foundatio.Repositories.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -11,16 +12,37 @@ namespace Exceptionless.Core.Services;

public class MessageService : IDisposable, IStartupAction
{
private readonly IOrganizationRepository _organizationRepository;
private readonly IProjectRepository _projectRepository;
private readonly IUserRepository _userRepository;
private readonly IStackRepository _stackRepository;
private readonly IEventRepository _eventRepository;
private readonly ITokenRepository _tokenRepository;
private readonly IWebHookRepository _webHookRepository;
private readonly IConnectionMapping _connectionMapping;
private readonly AppOptions _options;
private readonly ILogger _logger;
private readonly List<Action> _disposeActions = [];

public MessageService(IStackRepository stackRepository, IEventRepository eventRepository, IConnectionMapping connectionMapping, AppOptions options, ILoggerFactory loggerFactory)
public MessageService(
IOrganizationRepository organizationRepository,
IProjectRepository projectRepository,
IUserRepository userRepository,
IStackRepository stackRepository,
IEventRepository eventRepository,
ITokenRepository tokenRepository,
IWebHookRepository webHookRepository,
IConnectionMapping connectionMapping,
AppOptions options,
ILoggerFactory loggerFactory)
Comment on lines +27 to +37
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MessageService.cs refactoring appears to be unrelated to the PR's stated purpose of optimizing CloseInactiveSessionsJob. While the refactoring looks reasonable (consolidating duplicate handler registration code into a generic method), it should ideally be in a separate PR for better change isolation and easier review/rollback. This makes the PR scope broader than described in the title and description.

Copilot uses AI. Check for mistakes.
{
_organizationRepository = organizationRepository;
_projectRepository = projectRepository;
_userRepository = userRepository;
_stackRepository = stackRepository;
_eventRepository = eventRepository;
_tokenRepository = tokenRepository;
_webHookRepository = webHookRepository;
_connectionMapping = connectionMapping;
_options = options;
_logger = loggerFactory.CreateLogger<MessageService>() ?? NullLogger<MessageService>.Instance;
Expand All @@ -31,26 +53,34 @@ public Task RunAsync(CancellationToken shutdownToken = default)
if (!_options.EnableRepositoryNotifications)
return Task.CompletedTask;

if (_stackRepository is StackRepository sr)
sr.BeforePublishEntityChanged.AddHandler(BeforePublishStackEntityChanged);
if (_eventRepository is EventRepository er)
er.BeforePublishEntityChanged.AddHandler(BeforePublishEventEntityChanged);
RegisterHandler<Organization>(_organizationRepository);
RegisterHandler<User>(_userRepository);
RegisterHandler<Project>(_projectRepository);
RegisterHandler<Stack>(_stackRepository);
RegisterHandler<PersistentEvent>(_eventRepository);
RegisterHandler<Token>(_tokenRepository);
RegisterHandler<WebHook>(_webHookRepository);

return Task.CompletedTask;
}

private async Task BeforePublishStackEntityChanged(object sender, BeforePublishEntityChangedEventArgs<Stack> args)
private void RegisterHandler<T>(object repository) where T : class, IIdentity, new()
{
args.Cancel = await GetNumberOfListeners(args.Message) == 0;
if (args.Cancel)
_logger.LogTrace("Cancelled Stack Entity Changed Message: {@Message}", args.Message);
if (repository is not ElasticRepositoryBase<T> repo)
return;

Func<object, BeforePublishEntityChangedEventArgs<T>, Task> handler = OnBeforePublishEntityChangedAsync;
repo.BeforePublishEntityChanged.AddHandler(handler);
_disposeActions.Add(() => repo.BeforePublishEntityChanged.RemoveHandler(handler));
}

private async Task BeforePublishEventEntityChanged(object sender, BeforePublishEntityChangedEventArgs<PersistentEvent> args)
private async Task OnBeforePublishEntityChangedAsync<T>(object sender, BeforePublishEntityChangedEventArgs<T> args)
where T : class, IIdentity, new()
{
args.Cancel = await GetNumberOfListeners(args.Message) == 0;
var listenerCount = await GetNumberOfListeners(args.Message);
args.Cancel = listenerCount == 0;
if (args.Cancel)
_logger.LogTrace("Cancelled Persistent Event Entity Changed Message: {@Message}", args.Message);
_logger.LogTrace("Cancelled {EntityType} Entity Changed Message: {@Message}", typeof(T).Name, args.Message);
}

private Task<int> GetNumberOfListeners(EntityChanged message)
Expand All @@ -64,9 +94,8 @@ private Task<int> GetNumberOfListeners(EntityChanged message)

public void Dispose()
{
if (_stackRepository is StackRepository sr)
sr.BeforePublishEntityChanged.RemoveHandler(BeforePublishStackEntityChanged);
if (_eventRepository is EventRepository er)
er.BeforePublishEntityChanged.RemoveHandler(BeforePublishEventEntityChanged);
foreach (var disposeAction in _disposeActions)
disposeAction();
_disposeActions.Clear();
}
}