diff --git a/Common/Errors/WebsocketError.cs b/Common/Errors/WebsocketError.cs index 01378979..a494fb31 100644 --- a/Common/Errors/WebsocketError.cs +++ b/Common/Errors/WebsocketError.cs @@ -10,4 +10,7 @@ public static class WebsocketError public static OpenShockProblem WebsocketHubFirmwareVersionInvalid => new("Websocket.Hub.FirmwareVersionInvalid", "Supplied firmware version header is not valid semver", HttpStatusCode.BadRequest); public static OpenShockProblem WebsocketLiveControlHubIdInvalid => new("Websocket.LiveControl.HubIdInvalid", "Hub (device) id was missing or invalid", HttpStatusCode.BadRequest); public static OpenShockProblem WebsocketLiveControlHubNotFound => new("Websocket.LiveControl.HubNotFound", "Hub was not found or you are missing access", HttpStatusCode.NotFound); + + public static OpenShockProblem WebsocketLiveControlHubLifetimeBusy => new("Websocket.LiveControl.HubLifetimeBusy", "Hub Lifetime is currently busy, try again please", HttpStatusCode.PreconditionFailed); + public static OpenShockProblem WebsocketHubLifetimeBusy => new("Websocket.Hub.LifetimeBusy", "Hub Lifetime is currently busy, try again soon please", HttpStatusCode.PreconditionFailed); } \ No newline at end of file diff --git a/Common/Websocket/SimpleWebsocketCollection.cs b/Common/Websocket/SimpleWebsocketCollection.cs deleted file mode 100644 index 47aa3793..00000000 --- a/Common/Websocket/SimpleWebsocketCollection.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System.Collections.Concurrent; - -namespace OpenShock.Common.Websocket; - -public sealed class SimpleWebsocketCollection where T : class, IWebsocketController -{ - private readonly ConcurrentDictionary> _websockets = new(); - - public void RegisterConnection(T controller) - { - var list = _websockets.GetOrAdd(controller.Id, [controller]); - - lock (list) - { - if (!list.Contains(controller)) list.Add(controller); - } - } - - public bool UnregisterConnection(T controller) - { - var key = controller.Id; - if (!_websockets.TryGetValue(key, out var list)) return false; - - lock (list) - { - if (!list.Remove(controller)) return false; - if (list.Count == 0) - { - _websockets.TryRemove(key, out _); - } - } - - return true; - } - - public bool IsConnected(Guid id) => _websockets.ContainsKey(id); - - public T[] GetConnections(Guid id) - { - if (!_websockets.TryGetValue(id, out var list)) return []; - - lock (list) - { - return list.ToArray(); - } - } - - public uint Count => (uint)_websockets.Sum(kvp => { lock (kvp.Value) { return kvp.Value.Count; } }); -} \ No newline at end of file diff --git a/Common/Websocket/WebsockBaseController.cs b/Common/Websocket/WebsockBaseController.cs index e4813116..050cd748 100644 --- a/Common/Websocket/WebsockBaseController.cs +++ b/Common/Websocket/WebsockBaseController.cs @@ -17,7 +17,8 @@ namespace OpenShock.Common.Websocket; /// Base for json serialized websocket controller, you can override the SendMessageMethod to implement a different serializer /// /// -public abstract class WebsocketBaseController : OpenShockControllerBase, IAsyncDisposable, IDisposable, IWebsocketController where T : class +public abstract class WebsocketBaseController : OpenShockControllerBase, IAsyncDisposable, IDisposable, + IWebsocketController where T : class { /// public abstract Guid Id { get; } @@ -36,6 +37,7 @@ public abstract class WebsocketBaseController : OpenShockControllerBase, IAsy /// When passing a cancellation token, pass this Linked token, it is a Link from ApplicationStopping and Close. /// protected readonly CancellationTokenSource LinkedSource; + protected readonly CancellationToken LinkedToken; /// @@ -62,7 +64,7 @@ public WebsocketBaseController(ILogger> logger, IHost /// [NonAction] - public ValueTask QueueMessage(T data) => _channel.Writer.WriteAsync(data); + public ValueTask QueueMessage(T data) => _channel.Writer.WriteAsync(data, LinkedToken); private bool _disposed; @@ -73,28 +75,28 @@ public virtual void Dispose() // ReSharper disable once MethodSupportsCancellation DisposeAsync().AsTask().Wait(); } - + /// [NonAction] public virtual async ValueTask DisposeAsync() { - if(_disposed) return; + if (_disposed) return; _disposed = true; - + Logger.LogTrace("Disposing websocket controller.."); - + await DisposeControllerAsync(); await UnregisterConnection(); - + _channel.Writer.Complete(); await Close.CancelAsync(); WebSocket?.Dispose(); LinkedSource.Dispose(); - + GC.SuppressFinalize(this); Logger.LogTrace("Disposed websocket controller"); } - + /// /// Dispose function for any inheriting controller /// @@ -115,7 +117,9 @@ public async Task Get() HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; var response = WebsocketError.NonWebsocketRequest; response.AddContext(HttpContext); - await HttpContext.Response.WriteAsJsonAsync(response, jsonOptions.Value.SerializerOptions, contentType: MediaTypeNames.Application.ProblemJson); + // ReSharper disable once MethodSupportsCancellation + await HttpContext.Response.WriteAsJsonAsync(response, jsonOptions.Value.SerializerOptions, + contentType: MediaTypeNames.Application.ProblemJson); await Close.CancelAsync(); return; } @@ -127,35 +131,32 @@ public async Task Get() var response = connectionPrecondition.AsT1.Value; HttpContext.Response.StatusCode = response.Status ?? StatusCodes.Status400BadRequest; response.AddContext(HttpContext); - await HttpContext.Response.WriteAsJsonAsync(response, jsonOptions.Value.SerializerOptions, contentType: MediaTypeNames.Application.ProblemJson); - + // ReSharper disable once MethodSupportsCancellation + await HttpContext.Response.WriteAsJsonAsync(response, jsonOptions.Value.SerializerOptions, + contentType: MediaTypeNames.Application.ProblemJson); + await Close.CancelAsync(); return; } Logger.LogInformation("Opening websocket connection"); - - WebSocket?.Dispose(); // This should never happen, but just in case + WebSocket?.Dispose(); // This should never happen, suppresses warning WebSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); - if (await TryRegisterConnection()) - { #pragma warning disable CS4014 - OsTask.Run(MessageLoop); + OsTask.Run(MessageLoop); #pragma warning restore CS4014 - await SendInitialData(); - - await Logic(); - - - if(_disposed) return; - - await UnregisterConnection(); - } + await SendInitialData(); + await Logic(); - await Close.CancelAsync(); + + if (_disposed) return; + + await UnregisterConnection(); + + await DisposeAsync(); } #region Send Loop @@ -191,7 +192,6 @@ private async Task MessageLoop() protected virtual Task SendWebSocketMessage(T message, WebSocket websocket, CancellationToken cancellationToken) => JsonWebSocketUtils.SendFullMessage(message, websocket, cancellationToken); - #endregion /// @@ -200,7 +200,7 @@ protected virtual Task SendWebSocketMessage(T message, WebSocket websocket, Canc /// [NonAction] protected abstract Task Logic(); - + /// /// Send initial data to the client /// @@ -208,12 +208,6 @@ protected virtual Task SendWebSocketMessage(T message, WebSocket websocket, Canc [NonAction] protected virtual Task SendInitialData() => Task.CompletedTask; - /// - /// Action when the websocket connection is created - /// - [NonAction] - protected virtual Task TryRegisterConnection() => Task.FromResult(true); - /// /// Action when the websocket connection is finished or disposed /// diff --git a/Common/Websocket/WebsocketCollection.cs b/Common/Websocket/WebsocketCollection.cs deleted file mode 100644 index c2842558..00000000 --- a/Common/Websocket/WebsocketCollection.cs +++ /dev/null @@ -1,75 +0,0 @@ -using System.Collections.Concurrent; - -namespace OpenShock.Common.Websocket; - -public sealed class WebsocketCollection where T : class -{ - private readonly ConcurrentDictionary>> _websockets = new(); - - public void RegisterConnection(IWebsocketController controller) - { - var list = _websockets.GetOrAdd(controller.Id, - new List> { controller }); - lock (list) - { - if (!list.Contains(controller)) list.Add(controller); - } - } - - public void UnregisterConnection(IWebsocketController controller) - { - var key = controller.Id; - if (!_websockets.TryGetValue(key, out var list)) return; - - lock (list) - { - list.Remove(controller); - if (list.Count <= 0) _websockets.TryRemove(key, out _); - } - } - - public bool IsConnected(Guid id) => _websockets.ContainsKey(id); - - public IList> GetConnections(Guid id) - { - if (_websockets.TryGetValue(id, out var list)) - return list; - return Array.Empty>(); - } - - public async ValueTask SendMessageTo(Guid id, T msg) - { - var list = GetConnections(id); - - // ReSharper disable once ForCanBeConvertedToForeach - for (var i = 0; i < list.Count; i++) - { - var conn = list[i]; - await conn.QueueMessage(msg); - } - } - - public Task SendMessageTo(T msg, params Guid[] id) => SendMessageTo(id, msg); - - public Task SendMessageTo(IEnumerable id, T msg) - { - var tasks = id.Select(x => SendMessageTo(x, msg).AsTask()); - return Task.WhenAll(tasks); - } - - public async ValueTask SendMessageToAll(T msg) - { - // Im cloning a moment-in-time snapshot on purpose here, so we dont miss any connections. - // This is fine since this is not regularly called, and does not need to be realtime. - foreach (var (_, list) in _websockets.ToArray()) - foreach (var websocketController in list) - await websocketController.QueueMessage(msg); - } - - public IWebsocketController[] GetConnectedById(IEnumerable ids) - { - return ids.SelectMany(GetConnections).ToArray(); - } - - public uint Count => (uint)_websockets.Sum(x => x.Value.Count); -} \ No newline at end of file diff --git a/LiveControlGateway/Controllers/HubControllerBase.cs b/LiveControlGateway/Controllers/HubControllerBase.cs index 16e7c322..97ba5c86 100644 --- a/LiveControlGateway/Controllers/HubControllerBase.cs +++ b/LiveControlGateway/Controllers/HubControllerBase.cs @@ -45,7 +45,14 @@ public abstract class HubControllerBase : FlatbuffersWebsocketBaseCon /// Service provider /// protected readonly IServiceProvider ServiceProvider; - + + private HubLifetime? _hubLifetime = null; + + /// + /// Hub lifetime + /// + /// + protected HubLifetime HubLifetime => _hubLifetime ?? throw new InvalidOperationException("Hub lifetime is null but was tried to access"); private readonly LcgOptions _options; private readonly HubLifetimeManager _hubLifetimeManager; @@ -113,7 +120,7 @@ ILogger> logger private SemVersion? _firmwareVersion; /// - protected override Task>> ConnectionPrecondition() + protected override async Task>> ConnectionPrecondition() { _connected = DateTimeOffset.UtcNow; @@ -125,24 +132,29 @@ protected override Task>> ConnectionPreco else { var err = new Error(WebsocketError.WebsocketHubFirmwareVersionInvalid); - return Task.FromResult(OneOf>.FromT1(err)); + return err; } _userAgent = HttpContext.Request.Headers.UserAgent.ToString().Truncate(256); + var hubLifetimeResult = await _hubLifetimeManager.TryAddDeviceConnection(5, this, LinkedToken); - return Task.FromResult(OneOf>.FromT0(new Success())); + if (hubLifetimeResult.IsT1) + { + Logger.LogWarning("Hub lifetime busy, closing connection"); + return new Error(ExceptionError.Exception); + } + + if (hubLifetimeResult.IsT2) + { + Logger.LogError("Hub lifetime error, closing connection"); + return new Error(); + } + + _hubLifetime = hubLifetimeResult.AsT0; + + return new Success(); } - - /// - /// Register to the hub lifetime manager - /// - /// - protected override async Task TryRegisterConnection() - { - return await _hubLifetimeManager.TryAddDeviceConnection(5, this, LinkedToken); - } - private bool _unregistered; /// @@ -177,7 +189,7 @@ protected async Task SelfOnline(DateTimeOffset bootedAt, ushort? latency = null, // Reset the keep alive timeout _keepAliveTimeoutTimer.Interval = Duration.DeviceKeepAliveTimeout.TotalMilliseconds; - var result = await _hubLifetimeManager.DeviceOnline(CurrentHub.Id, new SelfOnlineData() + await HubLifetime.Online(CurrentHub.Id, new SelfOnlineData() { Owner = CurrentHub.Owner, Gateway = _options.Fqdn, @@ -188,15 +200,6 @@ protected async Task SelfOnline(DateTimeOffset bootedAt, ushort? latency = null, LatencyMs = latency, Rssi = rssi }); - - if (result.IsT1) - { - Logger.LogError("Error while updating hub online status [{HubId}], we dont exist in the managers list", CurrentHub.Id); - await Close.CancelAsync(); - if (WebSocket != null) - await WebSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Hub not found in manager", - CancellationToken.None); - } } /// diff --git a/LiveControlGateway/Controllers/LiveControlController.cs b/LiveControlGateway/Controllers/LiveControlController.cs index 79002e38..82071444 100644 --- a/LiveControlGateway/Controllers/LiveControlController.cs +++ b/LiveControlGateway/Controllers/LiveControlController.cs @@ -22,7 +22,6 @@ using OpenShock.Common.Websocket; using OpenShock.LiveControlGateway.LifetimeManager; using OpenShock.LiveControlGateway.Models; -using OpenShock.LiveControlGateway.Websocket; using Timer = System.Timers.Timer; namespace OpenShock.LiveControlGateway.Controllers; @@ -37,6 +36,7 @@ namespace OpenShock.LiveControlGateway.Controllers; public sealed class LiveControlController : WebsocketBaseController>, IActionFilter { private readonly OpenShockContext _db; + private readonly ILogger _logger; private readonly HubLifetimeManager _hubLifetimeManager; private static readonly TimeSpan PingInterval = TimeSpan.FromSeconds(5); @@ -52,12 +52,14 @@ public sealed class LiveControlController : WebsocketBaseController _sharedShockers = new(); private byte _tps = 10; private long _pingTimestamp = Stopwatch.GetTimestamp(); private ushort _latencyMs = 0; + private HubLifetime? _hubLifetime = null; + private HubLifetime HubLifetime => _hubLifetime ?? throw new InvalidOperationException("Hub lifetime is null but was accessed"); /// /// Connection Id for this connection, unique and random per connection @@ -80,22 +82,25 @@ public LiveControlController( HubLifetimeManager hubLifetimeManager) : base(logger, lifetime) { _db = db; + _logger = logger; _hubLifetimeManager = hubLifetimeManager; _pingTimer.Elapsed += (_, _) => OsTask.Run(SendPing); } - /// - protected override Task TryRegisterConnection() - { - WebsocketManager.LiveControlUsers.RegisterConnection(this); - return Task.FromResult(true); - } + + private bool _unregistered; /// - protected override Task UnregisterConnection() + protected override async Task UnregisterConnection() { - WebsocketManager.LiveControlUsers.UnregisterConnection(this); - return Task.CompletedTask; + if(_unregistered) return; + _unregistered = true; + + if(_hubLifetime == null) return; + if (!await _hubLifetime.RemoveLiveControlClient(this)) + { + _logger.LogError("Failed to remove live control client from hub lifetime {HubId} {CurrentUserId}", HubId, _currentUser.Id); + } } /// @@ -150,10 +155,10 @@ public async Task UpdatePermissions(OpenShockContext db) return new OneOf.Types.Error(WebsocketError.WebsocketLiveControlHubIdInvalid); } - _hubId = id; + HubId = id; var hubExistsAndYouHaveAccess = await _db.Devices.AnyAsync(x => - x.Id == _hubId && (x.Owner == _currentUser.Id || x.Shockers.Any(y => y.ShockerShares.Any( + x.Id == HubId && (x.Owner == _currentUser.Id || x.Shockers.Any(y => y.ShockerShares.Any( z => z.SharedWith == _currentUser.Id && z.PermLive)))); if (!hubExistsAndYouHaveAccess) @@ -161,7 +166,7 @@ public async Task UpdatePermissions(OpenShockContext db) return new OneOf.Types.Error(WebsocketError.WebsocketLiveControlHubNotFound); } - _device = await _db.Devices.FirstOrDefaultAsync(x => x.Id == _hubId); + _device = await _db.Devices.FirstOrDefaultAsync(x => x.Id == HubId); await UpdatePermissions(_db); @@ -176,6 +181,22 @@ public async Task UpdatePermissions(OpenShockContext db) } } } + + var hubLifetimeResult = await _hubLifetimeManager.AddLiveControlConnection(this); + + if (hubLifetimeResult.IsT1) + { + _logger.LogDebug("No such hub with id [{HubId}] connected", HubId); + } + + if (hubLifetimeResult.IsT2) + { + _logger.LogDebug("Hub is busy, cannot connect [{HubId}]", HubId); + return new OneOf.Types.Error(WebsocketError.WebsocketLiveControlHubLifetimeBusy); + } + + _hubLifetime = hubLifetimeResult.AsT0; + return new Success(); } @@ -201,7 +222,7 @@ public void OnActionExecuted(ActionExecutedContext context) } /// - public override Guid Id => _hubId ?? throw new Exception("Hub id is null"); + public override Guid Id => HubId ?? throw new Exception("Hub id is null"); /// protected override async Task SendInitialData() @@ -214,29 +235,19 @@ await QueueMessage(new LiveControlResponse Client = _tps } }); - await UpdateConnectedState(_hubLifetimeManager.IsConnected(Id), true); + await UpdateConnectedState(true); } - - private bool _lastIsConnected; - - /// - /// Update the connected state of the hub if different from what was last sent - /// - /// - /// + [NonAction] - public async Task UpdateConnectedState(bool isConnected, bool force = false) + private async Task UpdateConnectedState(bool isConnected) { - if (_lastIsConnected == isConnected && !force) return; - Logger.LogTrace("Sending connection update for hub [{HubId}] [{State}]", Id, isConnected); - - _lastIsConnected = isConnected; + try { await QueueMessage(new LiveControlResponse { - ResponseType = _lastIsConnected + ResponseType = isConnected ? LiveResponseType.DeviceConnected : LiveResponseType.DeviceNotConnected, }); @@ -481,22 +492,13 @@ await QueueMessage(new LiveControlResponse() // Clamp to limits var intensity = Math.Clamp(frame.Intensity, HardLimits.MinControlIntensity, perms.Intensity ?? HardLimits.MaxControlIntensity); - var result = _hubLifetimeManager.ReceiveFrame(Id, frame.Shocker, frame.Type, intensity, _tps); + var result = HubLifetime.ReceiveFrame(frame.Shocker, frame.Type, intensity, _tps); if (result.IsT0) { Logger.LogTrace("Successfully received frame"); } if (result.IsT1) - { - await QueueMessage(new LiveControlResponse - { - ResponseType = LiveResponseType.DeviceNotConnected - }); - return; - } - - if (result.IsT2) { await QueueMessage(new LiveControlResponse { @@ -505,12 +507,12 @@ await QueueMessage(new LiveControlResponse return; } - if (result.IsT3) + if (result.IsT2) { await QueueMessage(new LiveControlResponse { ResponseType = LiveResponseType.ShockerExclusive, - Data = result.AsT3.Until + Data = result.AsT2.Until }); return; } @@ -565,11 +567,12 @@ await QueueMessage(new LiveControlResponse } /// - public override ValueTask DisposeControllerAsync() + public override async ValueTask DisposeControllerAsync() { Logger.LogTrace("Disposing controller timer"); _pingTimer.Dispose(); - return base.DisposeControllerAsync(); + await UpdateConnectedState(false); + await base.DisposeControllerAsync(); } } diff --git a/LiveControlGateway/LifetimeManager/HubLifetime.cs b/LiveControlGateway/LifetimeManager/HubLifetime.cs index f1363cf6..85f46177 100644 --- a/LiveControlGateway/LifetimeManager/HubLifetime.cs +++ b/LiveControlGateway/LifetimeManager/HubLifetime.cs @@ -1,10 +1,11 @@ -using System.ComponentModel.DataAnnotations; +using System.Collections.Immutable; +using System.ComponentModel.DataAnnotations; using System.Diagnostics; using Microsoft.EntityFrameworkCore; using OneOf; using OneOf.Types; -using OpenShock.Common; using OpenShock.Common.Constants; +using OpenShock.Common.Extensions; using OpenShock.Common.Models; using OpenShock.Common.OpenShockDb; using OpenShock.Common.Redis; @@ -12,7 +13,6 @@ using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using OpenShock.LiveControlGateway.Controllers; -using OpenShock.LiveControlGateway.Websocket; using OpenShock.Serialization.Gateway; using OpenShock.Serialization.Types; using Redis.OM.Contracts; @@ -26,18 +26,18 @@ namespace OpenShock.LiveControlGateway.LifetimeManager; /// public sealed class HubLifetime : IAsyncDisposable { - - public enum HubLifetimeState - { - Idle, - SettingUp, - Swapping, - Removing - } - private volatile HubLifetimeState _state = HubLifetimeState.SettingUp; + + /// + /// Current state of the lifetime + /// + public HubLifetimeState State => _state; + + /// + /// The current Hub Controller + /// public IHubController HubController { get; private set; } - + private readonly TimeSpan _waitBetweenTicks; private readonly ushort _commandDuration; @@ -51,6 +51,9 @@ public enum HubLifetimeState private readonly ILogger _logger; + private ImmutableArray _liveControlClients = ImmutableArray.Empty; + private readonly SemaphoreSlim _liveControlClientsLock = new(1); + /// /// DI Constructor /// @@ -78,6 +81,63 @@ public HubLifetime([Range(1, 10)] byte tps, IHubController hubController, _commandDuration = (ushort)(_waitBetweenTicks.TotalMilliseconds * 2.5); } + /// + /// Add a live control client to the lifetime + /// + /// + /// + public async Task AddLiveControlClient(LiveControlController controller) + { + using (await _liveControlClientsLock.LockAsyncScoped()) + { + if (_liveControlClients.Contains(controller)) + { + _logger.LogWarning("Client already registered, not sure how this happened, probably a bug"); + return null; + } + + _liveControlClients = _liveControlClients.Add(controller); + } + + return this; + } + + /// + /// Remove a live control client from the lifetime + /// + /// + /// + public async Task RemoveLiveControlClient(LiveControlController controller) + { + using (await _liveControlClientsLock.LockAsyncScoped()) + { + if (!_liveControlClients.Contains(controller)) return false; + _liveControlClients = _liveControlClients.Remove(controller); + } + + return true; + } + + private async Task DisposeLiveControlClients() + { + foreach (var client in _liveControlClients) + { + try + { + await client.DisposeAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error disposing live control client"); + } + } + + using (await _liveControlClientsLock.LockAsyncScoped()) + { + _liveControlClients = _liveControlClients.Clear(); + } + } + /// /// Call on creation to setup shockers for the first time /// @@ -94,15 +154,16 @@ public async Task InitAsync(CancellationToken cancellationToken) _logger.LogError(e, "Error initializing OpenShock Hub lifetime"); return false; } - + #pragma warning disable CS4014 OsTask.Run(UpdateLoop); #pragma warning restore CS4014 - + _state = HubLifetimeState.Idle; // We are fully setup, we can go back to idle state + return true; } - + /// /// Swap to a new underlying controller /// @@ -208,8 +269,8 @@ public async Task UpdateDevice() { await using var db = await _dbContextFactory.CreateDbContextAsync(_cancellationSource.Token); await UpdateShockers(db, _cancellationSource.Token); - - foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(HubController.Id)) + + foreach (var websocketController in _liveControlClients) await websocketController.UpdatePermissions(db); } @@ -219,7 +280,7 @@ public async Task UpdateDevice() private async Task UpdateShockers(OpenShockContext db, CancellationToken cancellationToken) { _logger.LogDebug("Updating shockers for device [{DeviceId}]", HubController.Id); - + _shockerStates = await db.Shockers.Where(x => x.Device == HubController.Id).Select(x => new ShockerState() { Id = x.Id, @@ -324,7 +385,7 @@ await deviceOnline.InsertAsync(new DeviceOnline Rssi = data.Rssi, }, Duration.DeviceKeepAliveTimeout); - + await _redisPubService.SendDeviceOnlineStatus(device); return new Success(); } @@ -336,7 +397,7 @@ await deviceOnline.InsertAsync(new DeviceOnline online.Rssi = data.Rssi; var sendOnlineStatusUpdate = false; - + if (online.FirmwareVersion != data.FirmwareVersion || online.Gateway != data.Gateway || online.ConnectedAt != data.ConnectedAt || @@ -346,12 +407,12 @@ await deviceOnline.InsertAsync(new DeviceOnline online.FirmwareVersion = data.FirmwareVersion!; online.ConnectedAt = data.ConnectedAt; online.UserAgent = data.UserAgent; - + sendOnlineStatusUpdate = true; } await deviceOnline.UpdateAsync(online, Duration.DeviceKeepAliveTimeout); - + if (sendOnlineStatusUpdate) { await _redisPubService.SendDeviceOnlineStatus(device); @@ -362,14 +423,15 @@ await deviceOnline.InsertAsync(new DeviceOnline } private bool _disposed = false; - + /// public async ValueTask DisposeAsync() { - if(_disposed) return; + if (_disposed) return; _disposed = true; await _cancellationSource.CancelAsync(); + await DisposeLiveControlClients(); } } @@ -414,42 +476,42 @@ public SelfOnlineData( LatencyMs = latencyMs; Rssi = rssi; } - + /// /// The owner of the device /// public required Guid Owner { get; init; } - + /// /// Our gateway fqdn /// public required string Gateway { get; init; } - + /// /// Firmware version sent by the hub /// public required SemVersion FirmwareVersion { get; init; } - + /// /// When the websocket connected /// public required DateTimeOffset ConnectedAt { get; init; } - + /// /// Raw useragent /// public string? UserAgent { get; init; } = null; - + /// /// Hub uptime /// public DateTimeOffset BootedAt { get; init; } - + /// /// Measured latency /// public ushort? LatencyMs { get; init; } = null; - + /// /// Wifi rssi /// diff --git a/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs b/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs index 6332cb7a..8bef8474 100644 --- a/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs +++ b/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs @@ -1,14 +1,10 @@ -using System.Collections.Concurrent; -using Microsoft.EntityFrameworkCore; -using OneOf; +using Microsoft.EntityFrameworkCore; using OneOf.Types; using OpenShock.Common.Extensions; -using OpenShock.Common.Models; using OpenShock.Common.OpenShockDb; using OpenShock.Common.Redis.PubSub; using OpenShock.Common.Services.RedisPubSub; using OpenShock.LiveControlGateway.Controllers; -using OpenShock.LiveControlGateway.Websocket; using Redis.OM.Contracts; using Semver; @@ -50,6 +46,11 @@ ILoggerFactory loggerFactory _logger = _loggerFactory.CreateLogger(); } + /// + /// When the hub lifetime is busy, we cannot add a new device connection + /// + public readonly struct Busy; + /// /// Add device to lifetime manager, called on successful connect of device /// @@ -57,7 +58,7 @@ ILoggerFactory loggerFactory /// /// /// - public async Task TryAddDeviceConnection(byte tps, IHubController hubController, + public async Task> TryAddDeviceConnection(byte tps, IHubController hubController, CancellationToken cancellationToken) { _logger.LogDebug("Adding hub lifetime [{HubId}]", hubController.Id); @@ -71,8 +72,7 @@ public async Task TryAddDeviceConnection(byte tps, IHubController hubContr // There already is a hub lifetime, lets swap! if (!hubLifetime.TryMarkSwapping()) { - return - false; // Tell the controller that we are busy right now TODO: Tell the connecting client why it failed + return new Busy(); } isSwapping = true; @@ -97,15 +97,13 @@ public async Task TryAddDeviceConnection(byte tps, IHubController hubContr if (!await hubLifetime.InitAsync(cancellationToken)) { // If we fail to initialize, the hub must be removed - await RemoveDeviceConnection(hubController, false); // Here be dragons? - return false; + await RemoveDeviceConnection(hubController); // Here be dragons? + _logger.LogError("Failed to initialize hub lifetime [{HubId}]", hubController.Id); + return new Error(); } - - foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubLifetime.HubController.Id)) - await websocketController.UpdateConnectedState(true); } - return true; + return hubLifetime; } private HubLifetime CreateNewLifetime(byte tps, IHubController hubController) @@ -128,8 +126,7 @@ private HubLifetime CreateNewLifetime(byte tps, IHubController hubController) /// this is the actual end of life of the hub /// /// - /// - public async Task RemoveDeviceConnection(IHubController hubController, bool notifyLiveControlClients = true) + public async Task RemoveDeviceConnection(IHubController hubController) { _logger.LogDebug("Removing hub lifetime [{HubId}]", hubController.Id); HubLifetime? hubLifetime; @@ -151,12 +148,6 @@ public async Task RemoveDeviceConnection(IHubController hubController, bool noti return; } } - - if (notifyLiveControlClients) - { - foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubController.Id)) - await websocketController.UpdateConnectedState(false); - } await hubLifetime.DisposeAsync(); @@ -177,30 +168,36 @@ public async Task RemoveDeviceConnection(IHubController hubController, bool noti public bool IsConnected(Guid device) => _lifetimes.ContainsKey(device); /// - /// Receive a control frame by a client, this implies that limits and permissions have been checked before + /// Register live control connection to hub lifetime, null if hub not found /// - /// - /// - /// - /// - /// + /// /// - public OneOf ReceiveFrame(Guid device, Guid shocker, - ControlType type, byte intensity, byte tps) + /// + public async Task> AddLiveControlConnection(LiveControlController liveControlController) { - if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); - var receiveFrameAction = deviceLifetime.ReceiveFrame(shocker, type, intensity, tps); - if (receiveFrameAction.IsT0) return new Success(); - if (receiveFrameAction.IsT1) return new ShockerNotFound(); - return receiveFrameAction.AsT2; + if(liveControlController.HubId == null) throw new ArgumentNullException(nameof(liveControlController), "HubId is null"); + + using (await _lifetimesLock.LockAsyncScoped()) + { + if (!_lifetimes.TryGetValue(liveControlController.HubId!.Value, out var hubLifetime)) return new NotFound(); + + if (hubLifetime.State == HubLifetimeState.Removing) + { + _logger.LogDebug("Hub lifetime [{HubId}] is removing, cannot add live control connection", liveControlController.HubId); + return new Busy(); + } + + await hubLifetime.AddLiveControlClient(liveControlController); + return hubLifetime; + } } - - /// + + /// /// Update device data from the database /// /// /// - public async Task> UpdateDevice(Guid device) + public async Task> UpdateDevice(Guid device) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); await deviceLifetime.UpdateDevice(); @@ -213,7 +210,7 @@ public async Task> UpdateDevice(Guid device) /// /// /// - public async Task> Control(Guid device, + public async Task> Control(Guid device, IReadOnlyList shocks) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); @@ -227,7 +224,7 @@ public async Task> Control(Guid device, /// /// /// - public async Task> ControlCaptive(Guid device, bool enabled) + public async Task> ControlCaptive(Guid device, bool enabled) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); await deviceLifetime.ControlCaptive(enabled); @@ -240,7 +237,7 @@ public async Task> ControlCaptive(Guid device, bo /// /// /// - public async Task> OtaInstall(Guid device, SemVersion version) + public async Task> OtaInstall(Guid device, SemVersion version) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); await deviceLifetime.OtaInstall(version); @@ -252,7 +249,7 @@ public async Task> OtaInstall(Guid device, SemVer /// /// /// - public async Task> DeviceOnline(Guid device, SelfOnlineData data) + public async Task> DeviceOnline(Guid device, SelfOnlineData data) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); await deviceLifetime.Online(device, data); @@ -265,11 +262,6 @@ public async Task> DeviceOnline(Guid device, Self /// public readonly struct DeviceNotFound; -/// -/// OneOf -/// -public readonly struct ShockerNotFound; - /// /// OneOf /// diff --git a/LiveControlGateway/LifetimeManager/HubLifetimeState.cs b/LiveControlGateway/LifetimeManager/HubLifetimeState.cs new file mode 100644 index 00000000..ca17c7a9 --- /dev/null +++ b/LiveControlGateway/LifetimeManager/HubLifetimeState.cs @@ -0,0 +1,27 @@ +namespace OpenShock.LiveControlGateway.LifetimeManager; + +/// +/// State of a hub lifetime +/// +public enum HubLifetimeState +{ + /// + /// Normal operation + /// + Idle, + + /// + /// Initial state + /// + SettingUp, + + /// + /// Swapping to a new hub controller + /// + Swapping, + + /// + /// Hub controller is disconnecting, shutting down the lifetime + /// + Removing +} \ No newline at end of file diff --git a/LiveControlGateway/Websocket/WebsocketManager.cs b/LiveControlGateway/Websocket/WebsocketManager.cs deleted file mode 100644 index f2063b3d..00000000 --- a/LiveControlGateway/Websocket/WebsocketManager.cs +++ /dev/null @@ -1,18 +0,0 @@ -using OpenShock.Common.Models.WebSocket; -using OpenShock.Common.Models.WebSocket.LCG; -using OpenShock.Common.Websocket; -using OpenShock.LiveControlGateway.Controllers; -using OpenShock.Serialization; - -namespace OpenShock.LiveControlGateway.Websocket; - -/// -/// Websocket connection manager -/// -public static class WebsocketManager -{ - /// - /// Live control users - /// - public static readonly SimpleWebsocketCollection> LiveControlUsers = new(); -} \ No newline at end of file