diff --git a/API/Controller/Shockers/SendControl.cs b/API/Controller/Shockers/SendControl.cs index 35620453..892fc59c 100644 --- a/API/Controller/Shockers/SendControl.cs +++ b/API/Controller/Shockers/SendControl.cs @@ -9,14 +9,11 @@ using OpenShock.Common.Hubs; using OpenShock.Common.Models; using OpenShock.Common.Problems; -using OpenShock.Common.Services.RedisPubSub; namespace OpenShock.API.Controller.Shockers; public sealed partial class ShockerController { - private static readonly IDictionary EmptyDic = new Dictionary(); - /// /// Send a control message to shockers /// @@ -31,7 +28,7 @@ public sealed partial class ShockerController public async Task SendControl( [FromBody] ControlRequest body, [FromServices] IHubContext userHub, - [FromServices] IRedisPubService redisPubService) + [FromServices] IControlSender controlSender) { var sender = new ControlLogSender { @@ -39,11 +36,11 @@ public async Task SendControl( Name = CurrentUser.Name, Image = CurrentUser.GetImageUrl(), ConnectionId = HttpContext.Connection.Id, - AdditionalItems = EmptyDic, + AdditionalItems = [], CustomName = body.CustomName }; - var controlAction = await ControlLogic.ControlByUser(body.Shocks, _db, sender, userHub.Clients, redisPubService); + var controlAction = await controlSender.ControlByUser(body.Shocks, sender, userHub.Clients); return controlAction.Match( success => LegacyEmptyOk("Successfully sent control messages"), notFound => Problem(ShockerControlError.ShockerControlNotFound(notFound.Value)), @@ -65,12 +62,12 @@ public async Task SendControl( public Task SendControl_DEPRECATED( [FromBody] IReadOnlyList body, [FromServices] IHubContext userHub, - [FromServices] IRedisPubService redisPubService) + [FromServices] IControlSender controlSender) { return SendControl(new ControlRequest { Shocks = body, CustomName = null - }, userHub, redisPubService); + }, userHub, controlSender); } } \ No newline at end of file diff --git a/API/Program.cs b/API/Program.cs index b219c674..3684799f 100644 --- a/API/Program.cs +++ b/API/Program.cs @@ -6,10 +6,12 @@ using OpenShock.API.Services.Email; using OpenShock.API.Services.UserService; using OpenShock.Common; +using OpenShock.Common.DeviceControl; using OpenShock.Common.Extensions; using OpenShock.Common.Hubs; using OpenShock.Common.JsonSerialization; using OpenShock.Common.Options; +using OpenShock.Common.Services; using OpenShock.Common.Services.Device; using OpenShock.Common.Services.LCGNodeProvisioner; using OpenShock.Common.Services.Ota; @@ -44,6 +46,7 @@ }); builder.Services.AddScoped(); +builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); diff --git a/API/Realtime/RedisSubscriberService.cs b/API/Realtime/RedisSubscriberService.cs index f150a68d..581117a1 100644 --- a/API/Realtime/RedisSubscriberService.cs +++ b/API/Realtime/RedisSubscriberService.cs @@ -1,4 +1,4 @@ -using System.Text.Json; +using MessagePack; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using OpenShock.Common.Hubs; @@ -9,7 +9,6 @@ using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using Redis.OM.Contracts; -using Redis.OM.Searching; using StackExchange.Redis; namespace OpenShock.API.Realtime; @@ -23,6 +22,7 @@ public sealed class RedisSubscriberService : IHostedService, IAsyncDisposable private readonly IDbContextFactory _dbContextFactory; private readonly IRedisConnectionProvider _redisConnectionProvider; private readonly ISubscriber _subscriber; + private readonly ILogger _logger; /// /// DI Constructor @@ -31,46 +31,87 @@ public sealed class RedisSubscriberService : IHostedService, IAsyncDisposable /// /// /// + /// public RedisSubscriberService( IConnectionMultiplexer connectionMultiplexer, IHubContext hubContext, IDbContextFactory dbContextFactory, - IRedisConnectionProvider redisConnectionProvider) + IRedisConnectionProvider redisConnectionProvider, + ILogger logger + ) { _hubContext = hubContext; _dbContextFactory = dbContextFactory; _redisConnectionProvider = redisConnectionProvider; _subscriber = connectionMultiplexer.GetSubscriber(); + _logger = logger; } /// public async Task StartAsync(CancellationToken cancellationToken) { - await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, (_, message) => { OsTask.Run(() => HandleKeyExpired(message)); }); - await _subscriber.SubscribeAsync(RedisChannels.DeviceOnlineStatus, (_, message) => { OsTask.Run(() => HandleDeviceOnlineStatus(message)); }); + await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, HandleKeyExpired); + await _subscriber.SubscribeAsync(RedisChannels.DeviceStatus, HandleDeviceStatus); } - private async Task HandleDeviceOnlineStatus(RedisValue message) + private void HandleKeyExpired(RedisChannel _, RedisValue message) { if (!message.HasValue) return; - var data = JsonSerializer.Deserialize(message.ToString()); - if (data is null) return; + if (message.ToString().Split(':', 2) is not [{ } guid, { } name]) return; + + if (!Guid.TryParse(guid, out var id)) return; - await LogicDeviceOnlineStatus(data.Id); + if (typeof(DeviceOnline).FullName == name) + { + OsTask.Run(() => LogicDeviceOnlineStatus(id)); + } } - - private async Task HandleKeyExpired(RedisValue message) + + private void HandleDeviceStatus(RedisChannel _, RedisValue value) { - if (!message.HasValue) return; - var msg = message.ToString().Split(':'); - if (msg.Length < 2) return; + if (!value.HasValue) return; + + DeviceStatus message; + try + { + message = MessagePackSerializer.Deserialize(value); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to deserialize redis message"); + return; + } + + OsTask.Run(() => HandleDeviceStatusMessage(message)); + } + private async Task HandleDeviceStatusMessage(DeviceStatus message) + { + switch (message.Payload) + { + case DeviceBoolStatePayload boolState: + await HandleDeviceBoolState(message.DeviceId, boolState); + break; + default: + _logger.LogError("Got DeviceStatus with unknown payload type: {PayloadType}", message.Payload.GetType().Name); + break; + } - if (!Guid.TryParse(msg[1], out var id)) return; + } - if (typeof(DeviceOnline).FullName == msg[0]) + private async Task HandleDeviceBoolState(Guid deviceId, DeviceBoolStatePayload state) + { + switch (state.Type) { - await LogicDeviceOnlineStatus(id); + case DeviceBoolStateType.Online: + await LogicDeviceOnlineStatus(deviceId); // TODO: Handle device offline messages too + break; + case DeviceBoolStateType.EStopped: + _logger.LogInformation("EStopped state not implemented yet for DeviceId {DeviceId}", deviceId); + break; + default: + _logger.LogError("Unknown DeviceBoolStateType: {StateType}", state.Type); + break; } } @@ -111,15 +152,23 @@ await _hubContext.Clients.Users(userIds).DeviceStatus([ } /// - public Task StopAsync(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { - return Task.CompletedTask; + await _subscriber.UnsubscribeAllAsync(); } /// public async ValueTask DisposeAsync() { - await _subscriber.UnsubscribeAllAsync(); + try + { + await _subscriber.UnsubscribeAllAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during Redis unsubscribe in DisposeAsync"); + } + GC.SuppressFinalize(this); } diff --git a/Common/Common.csproj b/Common/Common.csproj index 734a7b56..7d8aea29 100644 --- a/Common/Common.csproj +++ b/Common/Common.csproj @@ -12,6 +12,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/Common/DeviceControl/ControlLogic.cs b/Common/DeviceControl/ControlLogic.cs deleted file mode 100644 index d2edf724..00000000 --- a/Common/DeviceControl/ControlLogic.cs +++ /dev/null @@ -1,182 +0,0 @@ -using Microsoft.AspNetCore.SignalR; -using Microsoft.EntityFrameworkCore; -using OneOf; -using OneOf.Types; -using OpenShock.Common.Constants; -using OpenShock.Common.Extensions; -using OpenShock.Common.Hubs; -using OpenShock.Common.Models; -using OpenShock.Common.Models.WebSocket.User; -using OpenShock.Common.OpenShockDb; -using OpenShock.Common.Redis.PubSub; -using OpenShock.Common.Services.RedisPubSub; - -namespace OpenShock.Common.DeviceControl; - -public static class ControlLogic -{ - public static async Task> ControlByUser(IReadOnlyList shocks, OpenShockContext db, ControlLogSender sender, - IHubClients hubClients, IRedisPubService redisPubService) - { - var ownShockers = await db.Shockers.Where(x => x.Device.OwnerId == sender.Id).Select(x => - new ControlShockerObj - { - Id = x.Id, - Name = x.Name, - RfId = x.RfId, - Device = x.DeviceId, - Model = x.Model, - Owner = x.Device.OwnerId, - Paused = x.IsPaused, - PermsAndLimits = null - }).ToListAsync(); - - var sharedShockers = await db.UserShares.Where(x => x.SharedWithUserId == sender.Id).Select(x => - new ControlShockerObj - { - Id = x.Shocker.Id, - Name = x.Shocker.Name, - RfId = x.Shocker.RfId, - Device = x.Shocker.DeviceId, - Model = x.Shocker.Model, - Owner = x.Shocker.Device.OwnerId, - Paused = x.Shocker.IsPaused || x.IsPaused, - PermsAndLimits = new SharePermsAndLimits - { - Sound = x.AllowSound, - Vibrate = x.AllowVibrate, - Shock = x.AllowShock, - Duration = x.MaxDuration, - Intensity = x.MaxIntensity - } - }).ToArrayAsync(); - - ownShockers.AddRange(sharedShockers); - - return await ControlInternal(shocks, db, sender, hubClients, ownShockers, redisPubService); - } - - public static async Task> ControlPublicShare(IReadOnlyList shocks, OpenShockContext db, - ControlLogSender sender, - IHubClients hubClients, Guid publicShareId, IRedisPubService redisPubService) - { - var publicShareShockers = await db.PublicShareShockerMappings.Where(x => x.PublicShareId == publicShareId && (x.PublicShare.ExpiresAt > DateTime.UtcNow || x.PublicShare.ExpiresAt == null)) - .Select(x => new ControlShockerObj - { - Id = x.Shocker.Id, - Name = x.Shocker.Name, - RfId = x.Shocker.RfId, - Device = x.Shocker.DeviceId, - Model = x.Shocker.Model, - Owner = x.Shocker.Device.OwnerId, - Paused = x.Shocker.IsPaused || x.IsPaused, - PermsAndLimits = new SharePermsAndLimits - { - Sound = x.AllowSound, - Vibrate = x.AllowVibrate, - Shock = x.AllowShock, - Duration = x.MaxDuration, - Intensity = x.MaxIntensity - } - }).ToArrayAsync(); - - return await ControlInternal(shocks, db, sender, hubClients, publicShareShockers, redisPubService); - } - - private static async Task> ControlInternal(IReadOnlyList shocks, OpenShockContext db, ControlLogSender sender, - IHubClients hubClients, IReadOnlyCollection allowedShockers, IRedisPubService redisPubService) - { - var finalMessages = new Dictionary>(); - var curTime = DateTime.UtcNow; - var distinctShocks = shocks.DistinctBy(x => x.Id); - var logs = new Dictionary>(); - - foreach (var shock in distinctShocks) - { - var shockerInfo = allowedShockers.FirstOrDefault(x => x.Id == shock.Id); - - if (shockerInfo is null) return new ShockerNotFoundOrNoAccess(shock.Id); - - if (shockerInfo.Paused) return new ShockerPaused(shock.Id); - - if (!IsAllowed(shock.Type, shockerInfo.PermsAndLimits)) return new ShockerNoPermission(shock.Id); - var durationMax = shockerInfo.PermsAndLimits?.Duration ?? HardLimits.MaxControlDuration; - var intensityMax = shockerInfo.PermsAndLimits?.Intensity ?? HardLimits.MaxControlIntensity; - - var deviceGroup = finalMessages.GetValueOrAddDefault(shockerInfo.Device, []); - - var intensity = Math.Clamp(shock.Intensity, HardLimits.MinControlIntensity, intensityMax); - var duration = Math.Clamp(shock.Duration, HardLimits.MinControlDuration, durationMax); - - deviceGroup.Add(new ControlMessage.ShockerControlInfo - { - Id = shockerInfo.Id, - RfId = shockerInfo.RfId, - Duration = duration, - Intensity = intensity, - Type = shock.Type, - Model = shockerInfo.Model, - Exclusive = shock.Exclusive - }); - - db.ShockerControlLogs.Add(new ShockerControlLog - { - Id = Guid.CreateVersion7(), - ShockerId = shockerInfo.Id, - ControlledByUserId = sender.Id == Guid.Empty ? null : sender.Id, - Intensity = intensity, - Duration = duration, - Type = shock.Type, - CustomName = sender.CustomName, - CreatedAt = curTime - }); - - var ownerLog = logs.GetValueOrAddDefault(shockerInfo.Owner, []); - - ownerLog.Add(new ControlLog - { - Shocker = new BasicShockerInfo - { - Id = shockerInfo.Id, - Name = shockerInfo.Name - }, - Type = shock.Type, - Duration = duration, - Intensity = intensity, - ExecutedAt = curTime - }); - } - - var redisTask = redisPubService.SendDeviceControl(sender.Id, finalMessages - .ToDictionary(kvp => kvp.Key, IReadOnlyList (kvp) => kvp.Value)); - - var logSends = logs.Select(x => hubClients.User(x.Key.ToString()).Log(sender, x.Value)); - - await Task.WhenAll([ - redisTask, - db.SaveChangesAsync(), - ..logSends - ]); - - return new Success(); - } - - private static bool IsAllowed(ControlType type, SharePermsAndLimits? perms) // TODO: Duplicate logic (LiveControlGateway.csproj -> LiveControlController.cs -> IsAllowed) - { - if (perms is null) return true; - return type switch - { - ControlType.Shock => perms.Shock, - ControlType.Vibrate => perms.Vibrate, - ControlType.Sound => perms.Sound, - ControlType.Stop => perms.Shock || perms.Vibrate || perms.Sound, - _ => false - }; - } -} - -public readonly record struct ShockerNotFoundOrNoAccess(Guid Value); - -public readonly record struct ShockerPaused(Guid Value); - -public readonly record struct ShockerNoPermission(Guid Value); \ No newline at end of file diff --git a/Common/DeviceControl/ControlShockerObj.cs b/Common/DeviceControl/ControlShockerObj.cs index 92df41ab..fd94132f 100644 --- a/Common/DeviceControl/ControlShockerObj.cs +++ b/Common/DeviceControl/ControlShockerObj.cs @@ -4,12 +4,12 @@ namespace OpenShock.Common.DeviceControl; public sealed class ControlShockerObj { - public required Guid Id { get; set; } - public required string Name { get; set; } - public required ushort RfId { get; set; } - public required Guid Device { get; set; } - public required Guid Owner { get; set; } - public required ShockerModelType Model { get; set; } - public required bool Paused { get; set; } - public required SharePermsAndLimits? PermsAndLimits { get; set; } + public required Guid ShockerId { get; init; } + public required string ShockerName { get; init; } + public required ushort ShockerRfId { get; init; } + public required Guid DeviceId { get; init; } + public required ShockerModelType ShockerModel { get; init; } + public required Guid OwnerId { get; init; } + public required bool Paused { get; init; } + public required SharePermsAndLimits? PermsAndLimits { get; init; } } \ No newline at end of file diff --git a/Common/Extensions/DictionaryExtensions.cs b/Common/Extensions/DictionaryExtensions.cs index bb90c300..1fdea473 100644 --- a/Common/Extensions/DictionaryExtensions.cs +++ b/Common/Extensions/DictionaryExtensions.cs @@ -4,16 +4,15 @@ namespace OpenShock.Common.Extensions; public static class DictionaryExtensions { - public static TValue GetValueOrAddDefault(this Dictionary dictionary, TKey key, - TValue defaultValue) where TKey : notnull + public static void AppendValue(this Dictionary> dictionary, TKey key, TValue value) where TKey : notnull { - ref var value = ref CollectionsMarshal.GetValueRefOrAddDefault(dictionary, key, out var exists); - if (exists) + ref var list = ref CollectionsMarshal.GetValueRefOrAddDefault(dictionary, key, out _); + if (list is null) { - return value!; + list = [value]; + return; } - - value = defaultValue; - return value; + + list.Add(value); } } \ No newline at end of file diff --git a/Common/Hubs/PublicShareHub.cs b/Common/Hubs/PublicShareHub.cs index 960b5cea..b33f2ec4 100644 --- a/Common/Hubs/PublicShareHub.cs +++ b/Common/Hubs/PublicShareHub.cs @@ -6,7 +6,6 @@ using OpenShock.Common.Extensions; using OpenShock.Common.Models; using OpenShock.Common.OpenShockDb; -using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Services.Session; using OpenShock.Common.Utils; @@ -14,23 +13,22 @@ namespace OpenShock.Common.Hubs; public sealed class PublicShareHub : Hub { - private readonly ISessionService _sessionService; private readonly OpenShockContext _db; private readonly IHubContext _userHub; - private readonly ILogger _logger; - private readonly IRedisPubService _redisPubService; + private readonly ISessionService _sessionService; + private readonly IControlSender _controlSender; private readonly IUserReferenceService _userReferenceService; + private readonly ILogger _logger; private IReadOnlyList? _tokenPermissions = null; - public PublicShareHub(OpenShockContext db, IHubContext userHub, ILogger logger, - ISessionService sessionService, IRedisPubService redisPubService, IUserReferenceService userReferenceService) + public PublicShareHub(OpenShockContext db, IHubContext userHub, ISessionService sessionService, IControlSender controlSender, IUserReferenceService userReferenceService, ILogger logger) { _db = db; _userHub = userHub; - _logger = logger; - _redisPubService = redisPubService; - _userReferenceService = userReferenceService; _sessionService = sessionService; + _controlSender = controlSender; + _userReferenceService = userReferenceService; + _logger = logger; } public override async Task OnConnectedAsync() @@ -122,8 +120,8 @@ public Task Control(IReadOnlyList shocks) { if (!_tokenPermissions.IsAllowedAllowOnNull(PermissionType.Shockers_Use)) return Task.CompletedTask; - return ControlLogic.ControlPublicShare(shocks, _db, CustomData.CachedControlLogSender, _userHub.Clients, - CustomData.PublicShareId, _redisPubService); + return _controlSender.ControlPublicShare(shocks, CustomData.CachedControlLogSender, _userHub.Clients, + CustomData.PublicShareId); } private CustomDataHolder CustomData => (CustomDataHolder)Context.Items[PublicShareCustomData]!; diff --git a/Common/Hubs/UserHub.cs b/Common/Hubs/UserHub.cs index 69a28bd2..884368c8 100644 --- a/Common/Hubs/UserHub.cs +++ b/Common/Hubs/UserHub.cs @@ -23,16 +23,18 @@ public sealed class UserHub : Hub private readonly OpenShockContext _db; private readonly IRedisConnectionProvider _provider; private readonly IRedisPubService _redisPubService; + private readonly IControlSender _controlSender; private readonly IUserReferenceService _userReferenceService; private IReadOnlyList? _tokenPermissions = null; public UserHub(ILogger logger, OpenShockContext db, IRedisConnectionProvider provider, - IRedisPubService redisPubService, IUserReferenceService userReferenceService) + IRedisPubService redisPubService, IControlSender controlSender, IUserReferenceService userReferenceService) { _logger = logger; _db = db; _provider = provider; _redisPubService = redisPubService; + _controlSender = controlSender; _userReferenceService = userReferenceService; } @@ -89,9 +91,9 @@ public async Task ControlV2(IReadOnlyList shocks, ConnectionId = Context.ConnectionId, AdditionalItems = additionalItems, CustomName = customName - }).SingleAsync(); + }).FirstAsync(); - await ControlLogic.ControlByUser(shocks, _db, sender, Clients, _redisPubService); + await _controlSender.ControlByUser(shocks, sender, Clients); } public async Task CaptivePortal(Guid deviceId, bool enabled) @@ -142,10 +144,6 @@ public async Task Reboot(Guid deviceId) await _redisPubService.SendDeviceReboot(deviceId); } - - private Task GetUser() => GetUser(UserId, _db); - private Guid UserId => _userId ??= Guid.Parse(Context.UserIdentifier!); private Guid? _userId; - private static Task GetUser(Guid userId, OpenShockContext db) => db.Users.SingleAsync(x => x.Id == userId); } \ No newline at end of file diff --git a/Common/Models/ControlLogSender.cs b/Common/Models/ControlLogSender.cs index a3dad69b..0785f7d8 100644 --- a/Common/Models/ControlLogSender.cs +++ b/Common/Models/ControlLogSender.cs @@ -11,5 +11,5 @@ public class ControlLogSenderLight public class ControlLogSender : ControlLogSenderLight { public required string ConnectionId { get; set; } - public required IDictionary AdditionalItems { get; set; } + public required Dictionary AdditionalItems { get; set; } } \ No newline at end of file diff --git a/Common/Models/SharePermsAndLimits.cs b/Common/Models/SharePermsAndLimits.cs index 8657cdd4..948696f2 100644 --- a/Common/Models/SharePermsAndLimits.cs +++ b/Common/Models/SharePermsAndLimits.cs @@ -2,14 +2,10 @@ public class SharePermsAndLimits { - public required bool Sound { get; set; } - public required bool Vibrate { get; set; } - public required bool Shock { get; set; } - public required ushort? Duration { get; set; } - public required byte? Intensity { get; set; } -} - -public sealed class SharePermsAndLimitsLive : SharePermsAndLimits -{ - public required bool Live { get; set; } + public required bool Sound { get; init; } + public required bool Vibrate { get; init; } + public required bool Shock { get; init; } + public required ushort? Duration { get; init; } + public required byte? Intensity { get; init; } + public required bool Live { get; init; } } \ No newline at end of file diff --git a/Common/Redis/PubSub/CaptiveMessage.cs b/Common/Redis/PubSub/CaptiveMessage.cs deleted file mode 100644 index 83a4d0a1..00000000 --- a/Common/Redis/PubSub/CaptiveMessage.cs +++ /dev/null @@ -1,9 +0,0 @@ -// ReSharper disable UnusedAutoPropertyAccessor.Global - -namespace OpenShock.Common.Redis.PubSub; - -public sealed class CaptiveMessage -{ - public required Guid DeviceId { get; set; } - public required bool Enabled { get; set; } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/ControlMessage.cs b/Common/Redis/PubSub/ControlMessage.cs deleted file mode 100644 index 5a7ce145..00000000 --- a/Common/Redis/PubSub/ControlMessage.cs +++ /dev/null @@ -1,26 +0,0 @@ -using OpenShock.Common.Models; - -// ReSharper disable UnusedAutoPropertyAccessor.Global - -namespace OpenShock.Common.Redis.PubSub; - -public sealed class ControlMessage -{ - public required Guid Sender { get; set; } - - /// - /// Guid is the device id - /// - public required IDictionary> ControlMessages { get; set; } - - public sealed class ShockerControlInfo - { - public required Guid Id { get; set; } - public required ushort RfId { get; set; } - public required byte Intensity { get; set; } - public required ushort Duration { get; set; } - public required ControlType Type { get; set; } - public required ShockerModelType Model { get; set; } - public bool Exclusive { get; set; } = false; - } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceEmergencyStopMessage.cs b/Common/Redis/PubSub/DeviceEmergencyStopMessage.cs deleted file mode 100644 index 91eb0f6b..00000000 --- a/Common/Redis/PubSub/DeviceEmergencyStopMessage.cs +++ /dev/null @@ -1,7 +0,0 @@ -using System.Text.Json.Serialization; -namespace OpenShock.Common.Redis.PubSub; - -public sealed class DeviceEmergencyStopMessage -{ - public required Guid Id { get; set; } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceMessage.cs b/Common/Redis/PubSub/DeviceMessage.cs new file mode 100644 index 00000000..b8e0879a --- /dev/null +++ b/Common/Redis/PubSub/DeviceMessage.cs @@ -0,0 +1,88 @@ +using MessagePack; +using OpenShock.Common.Models; +using Semver; + +namespace OpenShock.Common.Redis.PubSub; + +[MessagePackObject] +public sealed class DeviceMessage +{ + [Key(0)] public required IDeviceMessagePayload Payload { get; init; } + + public static DeviceMessage Create(DeviceTriggerType type) => new() + { + Payload = new DeviceTriggerPayload { Type = type } + }; + + public static DeviceMessage Create(DeviceToggleTarget target, bool state) => new() + { + Payload = new DeviceTogglePayload { Target = target, State = state } + }; + + public static DeviceMessage Create(DeviceControlPayload payload) => new() + { + Payload = payload + }; + + public static DeviceMessage Create(DeviceOtaInstallPayload payload) => new() + { + Payload = payload + }; +} + +[Union(0, typeof(DeviceTriggerPayload))] +[Union(1, typeof(DeviceTogglePayload))] +[Union(2, typeof(DeviceControlPayload))] +[Union(3, typeof(DeviceOtaInstallPayload))] +public interface IDeviceMessagePayload; + +public enum DeviceTriggerType : byte +{ + DeviceInfoUpdated = 0, + DeviceEmergencyStop = 1, + DeviceReboot = 2 +} + +[MessagePackObject] +public sealed class DeviceTriggerPayload : IDeviceMessagePayload +{ + [Key(0)] public required DeviceTriggerType Type { get; init; } +} + +public enum DeviceToggleTarget : byte +{ + CaptivePortal = 0, +} + +[MessagePackObject] +public sealed class DeviceTogglePayload : IDeviceMessagePayload +{ + [Key(0)] public required DeviceToggleTarget Target { get; init; } + [Key(1)] public required bool State { get; init; } +} + +[MessagePackObject] +public sealed class DeviceControlPayload : IDeviceMessagePayload +{ + [Key(0)] public required List Controls { get; init; } +} + +[MessagePackObject] +public sealed class ShockerControlCommand +{ + [Key(0)] public required Guid ShockerId { get; init; } + [Key(1)] public required ushort RfId { get; init; } + [Key(2)] public required byte Intensity { get; init; } + [Key(3)] public required ushort Duration { get; init; } + [Key(4)] public required ControlType Type { get; init; } + [Key(5)] public required ShockerModelType Model { get; init; } + [Key(6)] public required bool Exclusive { get; init; } +} + +[MessagePackObject] +public sealed class DeviceOtaInstallPayload : IDeviceMessagePayload +{ + [Key(0)] + [MessagePackFormatter(typeof(SemVersionMessagePackFormatter))] + public required SemVersion Version { get; init; } +} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceOtaInstallMessage.cs b/Common/Redis/PubSub/DeviceOtaInstallMessage.cs deleted file mode 100644 index e1889f85..00000000 --- a/Common/Redis/PubSub/DeviceOtaInstallMessage.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Text.Json.Serialization; -using OpenShock.Common.JsonSerialization; -using Semver; - -namespace OpenShock.Common.Redis.PubSub; - -public sealed class DeviceOtaInstallMessage -{ - public required Guid Id { get; set; } - [JsonConverter(typeof(SemVersionJsonConverter))] - public required SemVersion Version { get; set; } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceRebootMessage.cs b/Common/Redis/PubSub/DeviceRebootMessage.cs deleted file mode 100644 index ebfbab84..00000000 --- a/Common/Redis/PubSub/DeviceRebootMessage.cs +++ /dev/null @@ -1,7 +0,0 @@ -using System.Text.Json.Serialization; -namespace OpenShock.Common.Redis.PubSub; - -public sealed class DeviceRebootMessage -{ - public required Guid Id { get; set; } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceStatus.cs b/Common/Redis/PubSub/DeviceStatus.cs new file mode 100644 index 00000000..bbd9c597 --- /dev/null +++ b/Common/Redis/PubSub/DeviceStatus.cs @@ -0,0 +1,36 @@ +using MessagePack; + +namespace OpenShock.Common.Redis.PubSub; + +[MessagePackObject] +public sealed class DeviceStatus +{ + [Key(0)] public required Guid DeviceId { get; init; } + [Key(1)] public required IDeviceStatusPayload Payload { get; init; } + + public static DeviceStatus Create(Guid deviceId, DeviceBoolStateType stateType, bool state) => new() + { + DeviceId = deviceId, + Payload = new DeviceBoolStatePayload + { + Type = stateType, + State = state + } + }; +} + +[Union(0, typeof(DeviceBoolStatePayload))] +public interface IDeviceStatusPayload; + +public enum DeviceBoolStateType : byte +{ + Online = 0, + EStopped = 1 +} + +[MessagePackObject] +public sealed class DeviceBoolStatePayload : IDeviceStatusPayload +{ + [Key(0)] public required DeviceBoolStateType Type { get; init; } + [Key(1)] public required bool State { get; init; } +} \ No newline at end of file diff --git a/Common/Redis/PubSub/DeviceUpdatedMessage.cs b/Common/Redis/PubSub/DeviceUpdatedMessage.cs deleted file mode 100644 index c55f23ca..00000000 --- a/Common/Redis/PubSub/DeviceUpdatedMessage.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace OpenShock.Common.Redis.PubSub; - -public sealed class DeviceUpdatedMessage -{ - public required Guid Id { get; set; } -} \ No newline at end of file diff --git a/Common/Redis/PubSub/SemVersionMessagePackFormatter.cs b/Common/Redis/PubSub/SemVersionMessagePackFormatter.cs new file mode 100644 index 00000000..255fd219 --- /dev/null +++ b/Common/Redis/PubSub/SemVersionMessagePackFormatter.cs @@ -0,0 +1,19 @@ +using MessagePack; +using MessagePack.Formatters; +using Semver; + +namespace OpenShock.Common.Redis.PubSub; + +public sealed class SemVersionMessagePackFormatter : IMessagePackFormatter +{ + public void Serialize(ref MessagePackWriter writer, SemVersion? value, MessagePackSerializerOptions options) + { + writer.Write(value?.ToString()); + } + + public SemVersion? Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options) + { + var str = reader.ReadString(); + return str is null ? null : SemVersion.Parse(str); + } +} \ No newline at end of file diff --git a/Common/Redis/QueueHelper.cs b/Common/Redis/QueueHelper.cs new file mode 100644 index 00000000..00613b49 --- /dev/null +++ b/Common/Redis/QueueHelper.cs @@ -0,0 +1,38 @@ +using OpenShock.Common.Utils; +using StackExchange.Redis; + +namespace OpenShock.Common.Redis; + +public static class QueueHelper +{ + public static Task ConsumeQueue( + ChannelMessageQueue queue, + Func handler, + ILogger logger, + CancellationToken ct) + { + return OsTask.Run(async () => + { + while (!ct.IsCancellationRequested) + { + var msg = await queue.ReadAsync(ct); + if (!msg.Message.HasValue) continue; + + try + { + await handler(msg.Message, ct); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // graceful shutdown + break; + } + catch (Exception ex) + { + // keep the loop alive on individual message failures + logger.LogError(ex, "Error while handling Redis message from {Channel}", msg.Channel); + } + } + }); + } +} diff --git a/Common/Services/ControlSender.cs b/Common/Services/ControlSender.cs new file mode 100644 index 00000000..92177458 --- /dev/null +++ b/Common/Services/ControlSender.cs @@ -0,0 +1,181 @@ +using Microsoft.AspNetCore.SignalR; +using Microsoft.EntityFrameworkCore; +using OneOf; +using OneOf.Types; +using OpenShock.Common.Constants; +using OpenShock.Common.DeviceControl; +using OpenShock.Common.Extensions; +using OpenShock.Common.Hubs; +using OpenShock.Common.Models; +using OpenShock.Common.Models.WebSocket.User; +using OpenShock.Common.OpenShockDb; +using OpenShock.Common.Redis.PubSub; +using OpenShock.Common.Services.RedisPubSub; +using OpenShock.Common.Utils; + +namespace OpenShock.Common.Services; + +public sealed class ControlSender : IControlSender +{ + private readonly OpenShockContext _db; + private readonly IRedisPubService _publisher; + + public ControlSender(OpenShockContext db, IRedisPubService publisher) + { + _db = db; + _publisher = publisher; + } + + public async Task> ControlByUser(IReadOnlyList controls,ControlLogSender sender, IHubClients hubClients) + { + var queryOwn = _db.Shockers + .AsNoTracking() + .Where(x => x.Device.OwnerId == sender.Id) + .Select(x => new ControlShockerObj + { + ShockerId = x.Id, + ShockerName = x.Name, + ShockerRfId = x.RfId, + DeviceId = x.DeviceId, + ShockerModel = x.Model, + OwnerId = x.Device.OwnerId, + Paused = x.IsPaused, + PermsAndLimits = null + }); + + var queryShared = _db.UserShares + .AsNoTracking() + .Where(x => x.SharedWithUserId == sender.Id) + .Select(x => new ControlShockerObj + { + ShockerId = x.Shocker.Id, + ShockerName = x.Shocker.Name, + ShockerRfId = x.Shocker.RfId, + DeviceId = x.Shocker.DeviceId, + ShockerModel = x.Shocker.Model, + OwnerId = x.Shocker.Device.OwnerId, + Paused = x.Shocker.IsPaused || x.IsPaused, + PermsAndLimits = new SharePermsAndLimits + { + Sound = x.AllowSound, + Vibrate = x.AllowVibrate, + Shock = x.AllowShock, + Duration = x.MaxDuration, + Intensity = x.MaxIntensity, + Live = x.AllowLiveControl + } + }); + + var shockers = await queryOwn.Concat(queryShared).ToArrayAsync(); + + return await ControlInternal(controls, sender, hubClients, shockers); + } + + public async Task> ControlPublicShare(IReadOnlyList controls, ControlLogSender sender, IHubClients hubClients, Guid publicShareId) + { + var publicShareShockers = await _db.PublicShareShockerMappings + .AsNoTracking() + .Where(x => x.PublicShareId == publicShareId && (x.PublicShare.ExpiresAt > DateTime.UtcNow || x.PublicShare.ExpiresAt == null)) + .Select(x => new ControlShockerObj + { + ShockerId = x.Shocker.Id, + ShockerName = x.Shocker.Name, + ShockerRfId = x.Shocker.RfId, + DeviceId = x.Shocker.DeviceId, + ShockerModel = x.Shocker.Model, + OwnerId = x.Shocker.Device.OwnerId, + Paused = x.Shocker.IsPaused || x.IsPaused, + PermsAndLimits = new SharePermsAndLimits + { + Sound = x.AllowSound, + Vibrate = x.AllowVibrate, + Shock = x.AllowShock, + Duration = x.MaxDuration, + Intensity = x.MaxIntensity, + Live = x.AllowLiveControl + } + }) + .ToArrayAsync(); + + return await ControlInternal(controls, sender, hubClients, publicShareShockers); + } + + private static void Clamp(Control control, SharePermsAndLimits? limits) + { + var durationMax = limits?.Duration ?? HardLimits.MaxControlDuration; + var intensityMax = limits?.Intensity ?? HardLimits.MaxControlIntensity; + + control.Intensity = Math.Clamp(control.Intensity, HardLimits.MinControlIntensity, intensityMax); + control.Duration = Math.Clamp(control.Duration, HardLimits.MinControlDuration, durationMax); + } + + private async Task> ControlInternal(IReadOnlyList controls, ControlLogSender sender, IHubClients hubClients, ControlShockerObj[] allowedShockers) + { + var shockersById = allowedShockers.ToDictionary(s => s.ShockerId, s => s); + + var now = DateTime.UtcNow; + + var messagesByDevice = new Dictionary>(); + var logsByOwner = new Dictionary>(); + + foreach (var control in controls.DistinctBy(x => x.Id)) + { + if (!shockersById.TryGetValue(control.Id, out var shocker)) + return new ShockerNotFoundOrNoAccess(control.Id); + + if (shocker.Paused) + return new ShockerPaused(control.Id); + + if (!PermissionUtils.IsAllowed(control.Type, false, shocker.PermsAndLimits)) + return new ShockerNoPermission(control.Id); + + Clamp(control, shocker.PermsAndLimits); + + messagesByDevice.AppendValue(shocker.DeviceId, new ShockerControlCommand + { + ShockerId = shocker.ShockerId, + RfId = shocker.ShockerRfId, + Duration = control.Duration, + Intensity = control.Intensity, + Type = control.Type, + Model = shocker.ShockerModel, + Exclusive = control.Exclusive + }); + logsByOwner.AppendValue(shocker.OwnerId, new ControlLog + { + Shocker = new BasicShockerInfo + { + Id = shocker.ShockerId, + Name = shocker.ShockerName + }, + Type = control.Type, + Intensity = control.Intensity, + Duration = control.Duration, + ExecutedAt = now + }); + + _db.ShockerControlLogs.Add(new ShockerControlLog + { + Id = Guid.CreateVersion7(), + ShockerId = shocker.ShockerId, + ControlledByUserId = sender.Id == Guid.Empty ? null : sender.Id, + Intensity = control.Intensity, + Duration = control.Duration, + Type = control.Type, + CustomName = sender.CustomName, + CreatedAt = now + }); + } + + // Save all db changes before continuing + await _db.SaveChangesAsync(); + + // Then send all network events + await Task.WhenAll([ + ..messagesByDevice.Select(kvp => _publisher.SendDeviceControl(kvp.Key, kvp.Value)), + ..logsByOwner.Select(x => hubClients.User(x.Key.ToString()).Log(sender, x.Value)) + ]); + + return new Success(); + } +} \ No newline at end of file diff --git a/Common/Services/IControlSender.cs b/Common/Services/IControlSender.cs new file mode 100644 index 00000000..5ebd5af8 --- /dev/null +++ b/Common/Services/IControlSender.cs @@ -0,0 +1,21 @@ +using Microsoft.AspNetCore.SignalR; +using OneOf; +using OneOf.Types; +using OpenShock.Common.Hubs; +using OpenShock.Common.Models; +using OpenShock.Common.Models.WebSocket.User; + +namespace OpenShock.Common.DeviceControl; + +public interface IControlSender +{ + public Task> ControlByUser(IReadOnlyList controls, ControlLogSender sender, IHubClients hubClients); + + public Task> ControlPublicShare(IReadOnlyList controls, ControlLogSender sender, IHubClients hubClients, Guid publicShareId); +} + +public readonly record struct ShockerNotFoundOrNoAccess(Guid Value); + +public readonly record struct ShockerPaused(Guid Value); + +public readonly record struct ShockerNoPermission(Guid Value); \ No newline at end of file diff --git a/Common/Services/RedisPubSub/IRedisPubService.cs b/Common/Services/RedisPubSub/IRedisPubService.cs index 15b098f1..a8606d99 100644 --- a/Common/Services/RedisPubSub/IRedisPubService.cs +++ b/Common/Services/RedisPubSub/IRedisPubService.cs @@ -5,22 +5,29 @@ namespace OpenShock.Common.Services.RedisPubSub; public interface IRedisPubService { + /// + /// Something about the device or its shockers updated + /// + /// + /// + Task SendDeviceUpdate(Guid deviceId); + /// /// Used when a device comes online or changes its connection details like, gateway, firmware version, etc. /// /// + /// /// - public Task SendDeviceOnlineStatus(Guid deviceId); + public Task SendDeviceOnlineStatus(Guid deviceId, bool isOnline); /// /// General shocker control /// - /// - /// + /// + /// /// - Task SendDeviceControl(Guid sender, - IDictionary> controlMessages); - + Task SendDeviceControl(Guid deviceId, List controls); + /// /// Toggle captive portal /// @@ -28,13 +35,6 @@ Task SendDeviceControl(Guid sender, /// /// Task SendDeviceCaptivePortal(Guid deviceId, bool enabled); - - /// - /// Something about the device or its shockers updated - /// - /// - /// - Task SendDeviceUpdate(Guid deviceId); /// /// Trigger the emergency stop on the device if it's supported diff --git a/Common/Services/RedisPubSub/RedisChannels.cs b/Common/Services/RedisPubSub/RedisChannels.cs index f30470ba..63f540e2 100644 --- a/Common/Services/RedisPubSub/RedisChannels.cs +++ b/Common/Services/RedisPubSub/RedisChannels.cs @@ -6,14 +6,7 @@ public static class RedisChannels { public static readonly RedisChannel KeyEventExpired = new("__keyevent@0__:expired", RedisChannel.PatternMode.Literal); - public static readonly RedisChannel DeviceControl = new("msg-device-control", RedisChannel.PatternMode.Literal); - public static readonly RedisChannel DeviceCaptive = new("msg-device-control-captive", RedisChannel.PatternMode.Literal); - public static readonly RedisChannel DeviceUpdate = new("msg-device-update", RedisChannel.PatternMode.Literal); - public static readonly RedisChannel DeviceOnlineStatus = new("msg-device-online-status", RedisChannel.PatternMode.Literal); - - // OTA - public static readonly RedisChannel DeviceOtaInstall = new("msg-device-ota-install", RedisChannel.PatternMode.Literal); - - public static readonly RedisChannel DeviceEmergencyStop = new("msg-device-emergency-stop", RedisChannel.PatternMode.Literal); - public static readonly RedisChannel DeviceReboot = new("msg-device-reboot", RedisChannel.PatternMode.Literal); + public static RedisChannel DeviceMessage(Guid deviceId) => new($"device-msg:{deviceId}", RedisChannel.PatternMode.Literal); + + public static readonly RedisChannel DeviceStatus = new("device-status", RedisChannel.PatternMode.Literal); } \ No newline at end of file diff --git a/Common/Services/RedisPubSub/RedisPubService.cs b/Common/Services/RedisPubSub/RedisPubService.cs index a487d3f0..85cd9973 100644 --- a/Common/Services/RedisPubSub/RedisPubService.cs +++ b/Common/Services/RedisPubSub/RedisPubService.cs @@ -1,4 +1,4 @@ -using System.Text.Json; +using MessagePack; using OpenShock.Common.Redis.PubSub; using Semver; using StackExchange.Redis; @@ -8,92 +8,52 @@ namespace OpenShock.Common.Services.RedisPubSub; public sealed class RedisPubService : IRedisPubService { private readonly ISubscriber _subscriber; - - /// - /// DI Constructor - /// - /// + public RedisPubService(IConnectionMultiplexer connectionMultiplexer) { _subscriber = connectionMultiplexer.GetSubscriber(); } - - public Task SendDeviceOnlineStatus(Guid deviceId) - { - var redisMessage = new DeviceUpdatedMessage - { - Id = deviceId - }; - return _subscriber.PublishAsync(RedisChannels.DeviceOnlineStatus, JsonSerializer.Serialize(redisMessage)); - } - - /// - public Task SendDeviceControl(Guid sender, IDictionary> controlMessages) - { - var redisMessage = new ControlMessage - { - Sender = sender, - ControlMessages = controlMessages - }; + private Task Publish(RedisChannel channel, T msg) => _subscriber.PublishAsync(channel, (RedisValue)new ReadOnlyMemory(MessagePackSerializer.Serialize(msg))); + private Task PublishMessage(Guid deviceId, DeviceMessage msg) => Publish(RedisChannels.DeviceMessage(deviceId), msg); - return _subscriber.PublishAsync(RedisChannels.DeviceControl, JsonSerializer.Serialize(redisMessage)); + public Task SendDeviceUpdate(Guid deviceId) + { + return PublishMessage(deviceId, DeviceMessage.Create(DeviceTriggerType.DeviceInfoUpdated)); } - /// - public Task SendDeviceCaptivePortal(Guid deviceId, bool enabled) + public Task SendDeviceOnlineStatus(Guid deviceId, bool isOnline) { - var redisMessage = new CaptiveMessage - { - DeviceId = deviceId, - Enabled = enabled - }; + return Publish(RedisChannels.DeviceStatus, DeviceStatus.Create(deviceId, DeviceBoolStateType.Online, isOnline)); + } - return _subscriber.PublishAsync(RedisChannels.DeviceCaptive, JsonSerializer.Serialize(redisMessage)); + public Task SendDeviceEstoppedStatus(Guid deviceId, bool isEstopped) + { + return Publish(RedisChannels.DeviceStatus, DeviceStatus.Create(deviceId, DeviceBoolStateType.EStopped, isEstopped)); } - /// - public Task SendDeviceUpdate(Guid deviceId) + public Task SendDeviceControl(Guid deviceId, List controls) { - var redisMessage = new DeviceUpdatedMessage - { - Id = deviceId - }; + return PublishMessage(deviceId, DeviceMessage.Create(new DeviceControlPayload { Controls = controls })); + } - return _subscriber.PublishAsync(RedisChannels.DeviceUpdate, JsonSerializer.Serialize(redisMessage)); + public Task SendDeviceCaptivePortal(Guid deviceId, bool enabled) + { + return PublishMessage(deviceId, DeviceMessage.Create(DeviceToggleTarget.CaptivePortal, enabled)); } - /// public Task SendDeviceEmergencyStop(Guid deviceId) { - var redisMessage = new DeviceEmergencyStopMessage - { - Id = deviceId - }; - - return _subscriber.PublishAsync(RedisChannels.DeviceEmergencyStop, JsonSerializer.Serialize(redisMessage)); + return PublishMessage(deviceId, DeviceMessage.Create(DeviceTriggerType.DeviceEmergencyStop)); } - /// public Task SendDeviceOtaInstall(Guid deviceId, SemVersion version) { - var redisMessage = new DeviceOtaInstallMessage - { - Id = deviceId, - Version = version - }; - - return _subscriber.PublishAsync(RedisChannels.DeviceOtaInstall, JsonSerializer.Serialize(redisMessage)); + return PublishMessage(deviceId, DeviceMessage.Create(new DeviceOtaInstallPayload { Version = version })); } - /// public Task SendDeviceReboot(Guid deviceId) { - var redisMessage = new DeviceRebootMessage - { - Id = deviceId - }; - - return _subscriber.PublishAsync(RedisChannels.DeviceReboot, JsonSerializer.Serialize(redisMessage)); + return PublishMessage(deviceId, DeviceMessage.Create(DeviceTriggerType.DeviceReboot)); } } \ No newline at end of file diff --git a/Common/Utils/PermissionUtils.cs b/Common/Utils/PermissionUtils.cs new file mode 100644 index 00000000..99307465 --- /dev/null +++ b/Common/Utils/PermissionUtils.cs @@ -0,0 +1,20 @@ +using OpenShock.Common.Models; + +namespace OpenShock.Common.Utils; + +public static class PermissionUtils +{ + public static bool IsAllowed(ControlType type, bool isLive, SharePermsAndLimits? perms) + { + if (perms is null) return true; + if (isLive && !perms.Live) return false; + return type switch + { + ControlType.Shock => perms.Shock, + ControlType.Vibrate => perms.Vibrate, + ControlType.Sound => perms.Sound, + ControlType.Stop => perms.Shock || perms.Vibrate || perms.Sound, + _ => false + }; + } +} \ No newline at end of file diff --git a/LiveControlGateway/Controllers/HubControllerBase.cs b/LiveControlGateway/Controllers/HubControllerBase.cs index a652778b..ac1ff3e5 100644 --- a/LiveControlGateway/Controllers/HubControllerBase.cs +++ b/LiveControlGateway/Controllers/HubControllerBase.cs @@ -1,5 +1,4 @@ -using System.Net.WebSockets; -using FlatSharp; +using FlatSharp; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.Filters; using Microsoft.Extensions.Options; @@ -16,6 +15,7 @@ using OpenShock.LiveControlGateway.Websocket; using OpenShock.Serialization.Gateway; using Semver; +using System.Net.WebSockets; using Timer = System.Timers.Timer; namespace OpenShock.LiveControlGateway.Controllers; @@ -167,7 +167,7 @@ protected override async Task UnregisterConnection() } /// - public abstract ValueTask Control(List controlCommands); + public abstract ValueTask Control(IList controlCommands); /// public abstract ValueTask CaptivePortal(bool enable); diff --git a/LiveControlGateway/Controllers/HubV1Controller.cs b/LiveControlGateway/Controllers/HubV1Controller.cs index c7c7fe87..970be688 100644 --- a/LiveControlGateway/Controllers/HubV1Controller.cs +++ b/LiveControlGateway/Controllers/HubV1Controller.cs @@ -165,19 +165,19 @@ await otaService.Error(CurrentHub.Id, payload.BootStatus.OtaUpdateId, false, } /// - public override ValueTask Control(List controlCommands) + public override ValueTask Control(IList controlCommands) => QueueMessage(new GatewayToHubMessage { Payload = new GatewayToHubMessagePayload(new ShockerCommandList { - Commands = controlCommands.Select(x => new ShockerCommand() + Commands = [.. controlCommands.Select(x => new ShockerCommand() { Duration = x.Duration, Type = x.Type, Id = x.Model == Serialization.Types.ShockerModelType.Petrainer998DR ? (ushort)(x.Id >> 1) : x.Id, // Fix for old hubs, their ids was serialized wrongly in the RFTransmitter, the V1 endpoint is being phased out, so this wont stay here forever Intensity = x.Intensity, Model = x.Model - }).ToList() + })] }) }); diff --git a/LiveControlGateway/Controllers/HubV2Controller.cs b/LiveControlGateway/Controllers/HubV2Controller.cs index bfe60618..8440c747 100644 --- a/LiveControlGateway/Controllers/HubV2Controller.cs +++ b/LiveControlGateway/Controllers/HubV2Controller.cs @@ -1,5 +1,4 @@ -using System.Diagnostics; -using Asp.Versioning; +using Asp.Versioning; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.SignalR; @@ -16,6 +15,7 @@ using OpenShock.Serialization.Types; using Semver; using Serilog; +using System.Diagnostics; namespace OpenShock.LiveControlGateway.Controllers; //TODO: Implement new keep alive ping pong mechanism @@ -198,7 +198,7 @@ await otaService.Error(CurrentHub.Id, payload.BootStatus.OtaUpdateId, false, } /// - public override ValueTask Control(List controlCommands) + public override ValueTask Control(IList controlCommands) => QueueMessage(new GatewayToHubMessage { Payload = new GatewayToHubMessagePayload(new ShockerCommandList diff --git a/LiveControlGateway/Controllers/IHubController.cs b/LiveControlGateway/Controllers/IHubController.cs index 343c5c36..5462c4f1 100644 --- a/LiveControlGateway/Controllers/IHubController.cs +++ b/LiveControlGateway/Controllers/IHubController.cs @@ -18,7 +18,7 @@ public interface IHubController : IAsyncDisposable /// /// /// - public ValueTask Control(List controlCommands); + public ValueTask Control(IList controlCommands); /// /// Turn the captive portal on or off diff --git a/LiveControlGateway/Controllers/LiveControlController.cs b/LiveControlGateway/Controllers/LiveControlController.cs index b22b0f18..6687e183 100644 --- a/LiveControlGateway/Controllers/LiveControlController.cs +++ b/LiveControlGateway/Controllers/LiveControlController.cs @@ -42,7 +42,7 @@ public sealed class LiveControlController : WebsocketBaseController, NotFound, LiveNotEnabled, NoPermission, ShockerPaused> + private OneOf, NotFound, LiveNotEnabled, NoPermission, ShockerPaused> CheckFramePermissions( Guid shocker, ControlType controlType) { if (!_sharedShockers.TryGetValue(shocker, out var shockerShare)) return new NotFound(); if (shockerShare.Paused) return new ShockerPaused(); - if (!IsAllowed(controlType, shockerShare.PermsAndLimits)) return new NoPermission(); + if (!PermissionUtils.IsAllowed(controlType, true, shockerShare.PermsAndLimits)) return new NoPermission(); - return new Success(shockerShare.PermsAndLimits); - } - - private static bool IsAllowed(ControlType type, SharePermsAndLimitsLive? perms) // TODO: Duplicate logic (Common.csproj -> ControlLogic.cs -> IsAllowed) - { - if (perms is null) return true; - if (!perms.Live) return false; - return type switch - { - ControlType.Shock => perms.Shock, - ControlType.Vibrate => perms.Vibrate, - ControlType.Sound => perms.Sound, - ControlType.Stop => perms.Shock || perms.Vibrate || perms.Sound, - _ => false - }; + return new Success(shockerShare.PermsAndLimits); } diff --git a/LiveControlGateway/LifetimeManager/HubLifetime.cs b/LiveControlGateway/LifetimeManager/HubLifetime.cs index 377a333f..06223d90 100644 --- a/LiveControlGateway/LifetimeManager/HubLifetime.cs +++ b/LiveControlGateway/LifetimeManager/HubLifetime.cs @@ -1,6 +1,4 @@ -using System.Collections.Immutable; -using System.ComponentModel.DataAnnotations; -using System.Diagnostics; +using MessagePack; using Microsoft.EntityFrameworkCore; using OneOf; using OneOf.Types; @@ -13,11 +11,14 @@ using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using OpenShock.LiveControlGateway.Controllers; +using OpenShock.LiveControlGateway.Mappers; using OpenShock.Serialization.Gateway; -using OpenShock.Serialization.Types; using Redis.OM.Contracts; using Semver; -using ShockerModelType = OpenShock.Serialization.Types.ShockerModelType; +using StackExchange.Redis; +using System.Collections.Immutable; +using System.ComponentModel.DataAnnotations; +using System.Diagnostics; namespace OpenShock.LiveControlGateway.LifetimeManager; @@ -41,29 +42,39 @@ public sealed class HubLifetime : IAsyncDisposable private readonly TimeSpan _waitBetweenTicks; private readonly ushort _commandDuration; - private Dictionary _shockerStates = new(); + private Dictionary _shockerStates = []; private readonly CancellationTokenSource _cancellationSource; private readonly IDbContextFactory _dbContextFactory; private readonly IRedisConnectionProvider _redisConnectionProvider; private readonly IRedisPubService _redisPubService; + private readonly RedisChannel _deviceMsgChannel; + private readonly ISubscriber _subscriber; + private readonly ILogger _logger; private ImmutableArray _liveControlClients = ImmutableArray.Empty; private readonly SemaphoreSlim _liveControlClientsLock = new(1); + private ChannelMessageQueue? _deviceMsgQueue; + private Task? _deviceMsgConsumerTask; + + private Task? _updateLoopTask; + /// /// DI Constructor /// /// /// /// + /// /// /// /// public HubLifetime([Range(1, 10)] byte tps, IHubController hubController, IDbContextFactory dbContextFactory, + IConnectionMultiplexer connectionMultiplexer, IRedisConnectionProvider redisConnectionProvider, IRedisPubService redisPubService, ILogger logger) @@ -75,8 +86,11 @@ public HubLifetime([Range(1, 10)] byte tps, IHubController hubController, _redisPubService = redisPubService; _logger = logger; - _waitBetweenTicks = TimeSpan.FromMilliseconds(Math.Floor((float)1000 / tps)); + _waitBetweenTicks = TimeSpan.FromMilliseconds(1000.0 / tps); _commandDuration = (ushort)(_waitBetweenTicks.TotalMilliseconds * 2.5); + + _subscriber = connectionMultiplexer.GetSubscriber(); + _deviceMsgChannel = RedisChannels.DeviceMessage(hubController.Id); } /// @@ -93,7 +107,6 @@ public HubLifetime([Range(1, 10)] byte tps, IHubController hubController, _logger.LogWarning("Client already registered, not sure how this happened, probably a bug"); return null; } - _liveControlClients = _liveControlClients.Add(controller); } @@ -153,15 +166,97 @@ public async Task InitAsync(CancellationToken cancellationToken) return false; } -#pragma warning disable CS4014 - OsTask.Run(UpdateLoop); -#pragma warning restore CS4014 + _updateLoopTask = OsTask.Run(UpdateLoop); + + _deviceMsgQueue = await _subscriber.SubscribeAsync(_deviceMsgChannel); + _deviceMsgConsumerTask = QueueHelper.ConsumeQueue(_deviceMsgQueue, ConsumeDeviceQueue, _logger, _cancellationSource.Token); _state = HubLifetimeState.Idle; // We are fully setup, we can go back to idle state return true; } + private async Task ConsumeDeviceQueue(RedisValue value, CancellationToken cancellationToken) + { + DeviceMessage message; + try + { + message = MessagePackSerializer.Deserialize((ReadOnlyMemory)value, cancellationToken: cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to deserialize DeviceMessage"); + return; + } + + await DeviceMessage(message); + } + + private async Task DeviceMessage(DeviceMessage message) + { + switch (message.Payload) + { + case DeviceTriggerPayload trigger: + await DeviceMessageTrigger(trigger); + break; + case DeviceTogglePayload toggle: + await DeviceMessageToggle(toggle); + break; + case DeviceControlPayload control: + await DeviceMessageControl(control); + break; + case DeviceOtaInstallPayload { Version: var version }: + await OtaInstall(version); + break; + default: + _logger.LogError("Got DeviceMessage with unknown payload type: {PayloadType}", message.Payload?.GetType().Name); + break; + } + } + + private async Task DeviceMessageTrigger(DeviceTriggerPayload trigger) + { + switch (trigger.Type) + { + case DeviceTriggerType.DeviceInfoUpdated: + await UpdateDevice(); + break; + case DeviceTriggerType.DeviceEmergencyStop: + await EmergencyStop(); // Ignored bool return + break; + case DeviceTriggerType.DeviceReboot: + await Reboot(); // Ignored bool return + break; + default: + _logger.LogError("Unknown DeviceTriggerType: {TriggerType}", trigger.Type); + break; + } + } + + private async ValueTask DeviceMessageToggle(DeviceTogglePayload toggle) + { + switch (toggle.Target) + { + case DeviceToggleTarget.CaptivePortal: + await ControlCaptive(toggle.State); + break; + default: + _logger.LogError("Unknown DeviceToggleTarget: {Target}", toggle.Target); + break; + } + } + + private async ValueTask DeviceMessageControl(DeviceControlPayload control) + { + if (control.Controls.Count == 0) + { + _logger.LogDebug("DeviceControlPayload had no commands, skipping."); + return; + } + + await Control(control.Controls); + } + /// /// Swap to a new underlying controller /// @@ -238,19 +333,20 @@ private async Task UpdateLoop() private async Task Update() { + var now = DateTimeOffset.UtcNow; var commandList = new List(_shockerStates.Count); - foreach (var (_, state) in _shockerStates) - { - var cur = DateTimeOffset.UtcNow; - if (state.ActiveUntil < cur || state.ExclusiveUntil >= cur) continue; + foreach (var state in _shockerStates.Values) + { + if (state.ActiveUntil < now || state.ExclusiveUntil >= now) continue; + commandList.Add(new ShockerCommand { + Model = FbsMapper.ToFbsModelType(state.Model), Id = state.RfId, - Model = (ShockerModelType)state.Model, - Type = (ShockerCommandType)state.LastType, + Type = FbsMapper.ToFbsCommandType(state.LastType), + Intensity = state.LastIntensity, Duration = _commandDuration, - Intensity = state.LastIntensity }); } @@ -311,33 +407,26 @@ public OneOf ReceiveFrame(Guid shocker, Con } private static DateTimeOffset CalculateActiveUntil(byte tps) => - DateTimeOffset.UtcNow.AddMilliseconds(Math.Max(1000 / (float)tps * 2.5, 250)); + DateTimeOffset.UtcNow.AddMilliseconds(Math.Max(1000.0 / tps * 2.5, 250.0)); /// /// Control from redis, aka a regular command /// - /// + /// /// - public ValueTask Control(IReadOnlyList shocks) + public ValueTask Control(IReadOnlyList commands) { - var shocksTransformed = new List(shocks.Count); - foreach (var shock in shocks) + var shocksTransformed = new List(commands.Count); + + foreach (var command in commands) { - if (!_shockerStates.TryGetValue(shock.Id, out var state)) continue; + if (!_shockerStates.TryGetValue(command.ShockerId, out var state)) continue; - _logger.LogTrace( - "Control exclusive: {Exclusive}, type: {Type}, duration: {Duration}, intensity: {Intensity}", - shock.Exclusive, shock.Type, shock.Duration, shock.Intensity); - state.ExclusiveUntil = shock.Exclusive && shock.Type != ControlType.Stop - ? DateTimeOffset.UtcNow.AddMilliseconds(shock.Duration) + state.ExclusiveUntil = command.Exclusive && command.Type != ControlType.Stop + ? DateTimeOffset.UtcNow.AddMilliseconds(command.Duration) : DateTimeOffset.MinValue; - shocksTransformed.Add(new ShockerCommand - { - Id = shock.RfId, Duration = shock.Duration, Intensity = shock.Intensity, - Type = (ShockerCommandType)shock.Type, - Model = (ShockerModelType)shock.Model - }); + shocksTransformed.Add(FbsMapper.ToFbsShockerCommand(command)); } return HubController.Control(shocksTransformed); @@ -395,7 +484,7 @@ await deviceOnline.InsertAsync(new DeviceOnline }, Duration.DeviceKeepAliveTimeout); - await _redisPubService.SendDeviceOnlineStatus(device); + await _redisPubService.SendDeviceOnlineStatus(device, true); return new Success(); } @@ -424,7 +513,7 @@ await deviceOnline.InsertAsync(new DeviceOnline if (sendOnlineStatusUpdate) { - await _redisPubService.SendDeviceOnlineStatus(device); + await _redisPubService.SendDeviceOnlineStatus(device, true); return new OnlineStateUpdated(); } @@ -439,7 +528,19 @@ public async ValueTask DisposeAsync() if (_disposed) return; _disposed = true; + await _subscriber.UnsubscribeAsync(_deviceMsgChannel); await _cancellationSource.CancelAsync(); + + // ensure the consumer loop ends + if (_deviceMsgConsumerTask is not null) + { + try { await _deviceMsgConsumerTask; } catch { /* ignore */ } + } + if (_updateLoopTask is not null) + { + try { await _updateLoopTask; } catch { /* ignore */ } + } + await DisposeLiveControlClients(); } } diff --git a/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs b/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs index e039c513..9ca52048 100644 --- a/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs +++ b/LiveControlGateway/LifetimeManager/HubLifetimeManager.cs @@ -5,8 +5,11 @@ using OpenShock.Common.Redis.PubSub; using OpenShock.Common.Services.RedisPubSub; using OpenShock.LiveControlGateway.Controllers; +using OpenShock.LiveControlGateway.Mappers; +using OpenShock.Serialization.Gateway; using Redis.OM.Contracts; using Semver; +using StackExchange.Redis; namespace OpenShock.LiveControlGateway.LifetimeManager; @@ -16,6 +19,7 @@ namespace OpenShock.LiveControlGateway.LifetimeManager; public sealed class HubLifetimeManager { private readonly IDbContextFactory _dbContextFactory; + private readonly IConnectionMultiplexer _connectionMultiplexer; private readonly IRedisConnectionProvider _redisConnectionProvider; private readonly IRedisPubService _redisPubService; private readonly ILoggerFactory _loggerFactory; @@ -28,17 +32,20 @@ public sealed class HubLifetimeManager /// DI constructor /// /// + /// /// /// /// public HubLifetimeManager( IDbContextFactory dbContextFactory, + IConnectionMultiplexer connectionMultiplexer, IRedisConnectionProvider redisConnectionProvider, IRedisPubService redisPubService, ILoggerFactory loggerFactory ) { _dbContextFactory = dbContextFactory; + _connectionMultiplexer = connectionMultiplexer; _redisConnectionProvider = redisConnectionProvider; _redisPubService = redisPubService; _loggerFactory = loggerFactory; @@ -114,6 +121,7 @@ private HubLifetime CreateNewLifetime(byte tps, IHubController hubController) tps, hubController, _dbContextFactory, + _connectionMultiplexer, _redisConnectionProvider, _redisPubService, _loggerFactory.CreateLogger()); @@ -211,8 +219,7 @@ public async Task RemoveDeviceConnection(IHubController hubController) /// /// /// - public async Task> Control(Guid device, - IReadOnlyList shocks) + public async Task> Control(Guid device, IReadOnlyList shocks) { if (!_lifetimes.TryGetValue(device, out var deviceLifetime)) return new DeviceNotFound(); await deviceLifetime.Control(shocks); diff --git a/LiveControlGateway/Mappers/FbsMapper.cs b/LiveControlGateway/Mappers/FbsMapper.cs new file mode 100644 index 00000000..3497a772 --- /dev/null +++ b/LiveControlGateway/Mappers/FbsMapper.cs @@ -0,0 +1,44 @@ +using OpenShock.Common.Models; +using OpenShock.Common.Redis.PubSub; +using OpenShock.Serialization.Gateway; +using OpenShock.Serialization.Types; + +namespace OpenShock.LiveControlGateway.Mappers; + +public static class FbsMapper +{ + public static Serialization.Types.ShockerModelType ToFbsModelType(Common.Models.ShockerModelType type) + { + return type switch + { + Common.Models.ShockerModelType.CaiXianlin => Serialization.Types.ShockerModelType.CaiXianlin, + Common.Models.ShockerModelType.PetTrainer => Serialization.Types.ShockerModelType.Petrainer, + Common.Models.ShockerModelType.Petrainer998DR => Serialization.Types.ShockerModelType.Petrainer998DR, + _ => throw new NotImplementedException(), + }; + } + + public static ShockerCommandType ToFbsCommandType(ControlType type) + { + return type switch + { + ControlType.Stop => ShockerCommandType.Stop, + ControlType.Shock => ShockerCommandType.Shock, + ControlType.Vibrate => ShockerCommandType.Vibrate, + ControlType.Sound => ShockerCommandType.Sound, + _ => throw new NotImplementedException(), + }; + } + + public static ShockerCommand ToFbsShockerCommand(ShockerControlCommand control) + { + return new ShockerCommand + { + Model = ToFbsModelType(control.Model), + Id = control.RfId, + Type = ToFbsCommandType(control.Type), + Intensity = control.Intensity, + Duration = control.Duration + }; + } +} diff --git a/LiveControlGateway/Models/LiveShockerPermission.cs b/LiveControlGateway/Models/LiveShockerPermission.cs index 3bceed82..37a2f194 100644 --- a/LiveControlGateway/Models/LiveShockerPermission.cs +++ b/LiveControlGateway/Models/LiveShockerPermission.cs @@ -15,5 +15,5 @@ public sealed class LiveShockerPermission /// /// Perms and limits for the live shocker /// - public required SharePermsAndLimitsLive PermsAndLimits { get; set; } + public required SharePermsAndLimits PermsAndLimits { get; set; } } \ No newline at end of file diff --git a/LiveControlGateway/Program.cs b/LiveControlGateway/Program.cs index 7d40b082..bfff26b7 100644 --- a/LiveControlGateway/Program.cs +++ b/LiveControlGateway/Program.cs @@ -1,14 +1,15 @@ using Microsoft.Extensions.Options; using OpenShock.Common; +using OpenShock.Common.DeviceControl; using OpenShock.Common.Extensions; using OpenShock.Common.JsonSerialization; +using OpenShock.Common.Services; using OpenShock.Common.Services.Device; using OpenShock.Common.Services.Ota; using OpenShock.Common.Swagger; using OpenShock.LiveControlGateway; using OpenShock.LiveControlGateway.LifetimeManager; using OpenShock.LiveControlGateway.Options; -using OpenShock.LiveControlGateway.PubSub; var builder = OpenShockApplication.CreateDefaultBuilder(args); @@ -33,13 +34,13 @@ }); builder.Services.AddScoped(); +builder.Services.AddScoped(); builder.Services.AddScoped(); builder.AddSwaggerExt(); //services.AddHealthChecks().AddCheck("database"); -builder.Services.AddHostedService(); builder.Services.AddHostedService(); builder.Services.AddSingleton(); diff --git a/LiveControlGateway/PubSub/RedisSubscriberService.cs b/LiveControlGateway/PubSub/RedisSubscriberService.cs deleted file mode 100644 index 1fb25ced..00000000 --- a/LiveControlGateway/PubSub/RedisSubscriberService.cs +++ /dev/null @@ -1,145 +0,0 @@ -using System.Text.Json; -using OpenShock.Common.Redis.PubSub; -using OpenShock.Common.Services.RedisPubSub; -using OpenShock.Common.Utils; -using OpenShock.LiveControlGateway.LifetimeManager; -using StackExchange.Redis; - -namespace OpenShock.LiveControlGateway.PubSub; - -/// -/// Redis subscription service, which handles listening to pub sub on redis -/// -public sealed class RedisSubscriberService : IHostedService, IAsyncDisposable -{ - private readonly HubLifetimeManager _hubLifetimeManager; - private readonly ISubscriber _subscriber; - - /// - /// DI Constructor - /// - /// - /// - public RedisSubscriberService( - IConnectionMultiplexer connectionMultiplexer, HubLifetimeManager hubLifetimeManager) - { - _hubLifetimeManager = hubLifetimeManager; - _subscriber = connectionMultiplexer.GetSubscriber(); - } - - /// - public async Task StartAsync(CancellationToken cancellationToken) - { - await _subscriber.SubscribeAsync(RedisChannels.DeviceControl, - (_, message) => OsTask.Run(() => DeviceControl(message))); - await _subscriber.SubscribeAsync(RedisChannels.DeviceCaptive, - (_, message) => OsTask.Run(() => DeviceControlCaptive(message))); - await _subscriber.SubscribeAsync(RedisChannels.DeviceUpdate, - (_, message) => OsTask.Run(() => DeviceUpdate(message))); - - // OTA - await _subscriber.SubscribeAsync(RedisChannels.DeviceOtaInstall, - (_, message) => OsTask.Run(() => DeviceOtaInstall(message))); - - await _subscriber.SubscribeAsync(RedisChannels.DeviceEmergencyStop, - (_, message) => OsTask.Run(() => DeviceEmergencyStop(message))); - await _subscriber.SubscribeAsync(RedisChannels.DeviceReboot, - (_, message) => OsTask.Run(() => DeviceReboot(message))); - } - - private async Task DeviceControl(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await Task.WhenAll(data.ControlMessages.Select(x => _hubLifetimeManager.Control(x.Key, x.Value))); - } - - private async Task DeviceControlCaptive(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await _hubLifetimeManager.ControlCaptive(data.DeviceId, data.Enabled); - } - - /// - /// Update the device connection if found - /// - /// - /// - private async Task DeviceUpdate(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await _hubLifetimeManager.UpdateDevice(data.Id); - } - - /// - /// Trigger the device's emergency stop the device if found and it supports it - /// - /// - /// - private async Task DeviceEmergencyStop(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await _hubLifetimeManager.EmergencyStop(data.Id); - } - - /// - /// Update the device connection if found - /// - /// - /// - private async Task DeviceOtaInstall(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await _hubLifetimeManager.OtaInstall(data.Id, data.Version); - } - - /// - /// Reboot the device if found and it supports it - /// - /// - /// - private async Task DeviceReboot(RedisValue value) - { - if (!value.HasValue) return; - var data = JsonSerializer.Deserialize(value.ToString()); - if (data is null) return; - - await _hubLifetimeManager.Reboot(data.Id); - } - - - /// - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - /// - public async ValueTask DisposeAsync() - { - await _subscriber.UnsubscribeAllAsync(); - GC.SuppressFinalize(this); - } - - /// - /// Destructor, just in case - /// - ~RedisSubscriberService() - { - DisposeAsync().AsTask().Wait(); - } -} \ No newline at end of file