Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions API/Controller/Shockers/SendControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@
using OpenShock.Common.Hubs;
using OpenShock.Common.Models;
using OpenShock.Common.Problems;
using OpenShock.Common.Services.RedisPubSub;

namespace OpenShock.API.Controller.Shockers;

public sealed partial class ShockerController
{
private static readonly IDictionary<string, object> EmptyDic = new Dictionary<string, object>();

/// <summary>
/// Send a control message to shockers
/// </summary>
Expand All @@ -31,19 +28,19 @@ public sealed partial class ShockerController
public async Task<IActionResult> SendControl(
[FromBody] ControlRequest body,
[FromServices] IHubContext<UserHub, IUserHub> userHub,
[FromServices] IRedisPubService redisPubService)
[FromServices] IControlSender controlSender)
{
var sender = new ControlLogSender
{
Id = CurrentUser.Id,
Name = CurrentUser.Name,
Image = CurrentUser.GetImageUrl(),
ConnectionId = HttpContext.Connection.Id,
AdditionalItems = EmptyDic,
AdditionalItems = [],
CustomName = body.CustomName
};

var controlAction = await ControlLogic.ControlByUser(body.Shocks, _db, sender, userHub.Clients, redisPubService);
var controlAction = await controlSender.ControlByUser(body.Shocks, sender, userHub.Clients);
return controlAction.Match(
success => LegacyEmptyOk("Successfully sent control messages"),
notFound => Problem(ShockerControlError.ShockerControlNotFound(notFound.Value)),
Expand All @@ -65,12 +62,12 @@ public async Task<IActionResult> SendControl(
public Task<IActionResult> SendControl_DEPRECATED(
[FromBody] IReadOnlyList<Common.Models.WebSocket.User.Control> body,
[FromServices] IHubContext<UserHub, IUserHub> userHub,
[FromServices] IRedisPubService redisPubService)
[FromServices] IControlSender controlSender)
{
return SendControl(new ControlRequest
{
Shocks = body,
CustomName = null
}, userHub, redisPubService);
}, userHub, controlSender);
}
}
3 changes: 3 additions & 0 deletions API/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
using OpenShock.API.Services.Email;
using OpenShock.API.Services.UserService;
using OpenShock.Common;
using OpenShock.Common.DeviceControl;
using OpenShock.Common.Extensions;
using OpenShock.Common.Hubs;
using OpenShock.Common.JsonSerialization;
using OpenShock.Common.Options;
using OpenShock.Common.Services;
using OpenShock.Common.Services.Device;
using OpenShock.Common.Services.LCGNodeProvisioner;
using OpenShock.Common.Services.Ota;
Expand Down Expand Up @@ -44,6 +46,7 @@
});

builder.Services.AddScoped<IDeviceService, DeviceService>();
builder.Services.AddScoped<IControlSender, ControlSender>();
builder.Services.AddScoped<IOtaService, OtaService>();
builder.Services.AddScoped<IDeviceUpdateService, DeviceUpdateService>();
builder.Services.AddScoped<IAccountService, AccountService>();
Expand Down
89 changes: 69 additions & 20 deletions API/Realtime/RedisSubscriberService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Text.Json;
using MessagePack;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using OpenShock.Common.Hubs;
Expand All @@ -9,7 +9,6 @@
using OpenShock.Common.Services.RedisPubSub;
using OpenShock.Common.Utils;
using Redis.OM.Contracts;
using Redis.OM.Searching;
using StackExchange.Redis;

namespace OpenShock.API.Realtime;
Expand All @@ -23,6 +22,7 @@ public sealed class RedisSubscriberService : IHostedService, IAsyncDisposable
private readonly IDbContextFactory<OpenShockContext> _dbContextFactory;
private readonly IRedisConnectionProvider _redisConnectionProvider;
private readonly ISubscriber _subscriber;
private readonly ILogger<RedisSubscriberService> _logger;

/// <summary>
/// DI Constructor
Expand All @@ -31,46 +31,87 @@ public sealed class RedisSubscriberService : IHostedService, IAsyncDisposable
/// <param name="hubContext"></param>
/// <param name="dbContextFactory"></param>
/// <param name="redisConnectionProvider"></param>
/// <param name="logger"></param>
public RedisSubscriberService(
IConnectionMultiplexer connectionMultiplexer,
IHubContext<UserHub, IUserHub> hubContext,
IDbContextFactory<OpenShockContext> dbContextFactory,
IRedisConnectionProvider redisConnectionProvider)
IRedisConnectionProvider redisConnectionProvider,
ILogger<RedisSubscriberService> logger
)
{
_hubContext = hubContext;
_dbContextFactory = dbContextFactory;
_redisConnectionProvider = redisConnectionProvider;
_subscriber = connectionMultiplexer.GetSubscriber();
_logger = logger;
}

/// <inheritdoc />
public async Task StartAsync(CancellationToken cancellationToken)
{
await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, (_, message) => { OsTask.Run(() => HandleKeyExpired(message)); });
await _subscriber.SubscribeAsync(RedisChannels.DeviceOnlineStatus, (_, message) => { OsTask.Run(() => HandleDeviceOnlineStatus(message)); });
await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, HandleKeyExpired);
await _subscriber.SubscribeAsync(RedisChannels.DeviceStatus, HandleDeviceStatus);
}

private async Task HandleDeviceOnlineStatus(RedisValue message)
private void HandleKeyExpired(RedisChannel _, RedisValue message)
{
if (!message.HasValue) return;
var data = JsonSerializer.Deserialize<DeviceUpdatedMessage>(message.ToString());
if (data is null) return;
if (message.ToString().Split(':', 2) is not [{ } guid, { } name]) return;

if (!Guid.TryParse(guid, out var id)) return;

await LogicDeviceOnlineStatus(data.Id);
if (typeof(DeviceOnline).FullName == name)
{
OsTask.Run(() => LogicDeviceOnlineStatus(id));
}
}
private async Task HandleKeyExpired(RedisValue message)

private void HandleDeviceStatus(RedisChannel _, RedisValue value)
{
if (!message.HasValue) return;
var msg = message.ToString().Split(':');
if (msg.Length < 2) return;
if (!value.HasValue) return;

DeviceStatus message;
try
{
message = MessagePackSerializer.Deserialize<DeviceStatus>(value);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to deserialize redis message");
return;
}

OsTask.Run(() => HandleDeviceStatusMessage(message));
}

private async Task HandleDeviceStatusMessage(DeviceStatus message)
{
switch (message.Payload)
{
case DeviceBoolStatePayload boolState:
await HandleDeviceBoolState(message.DeviceId, boolState);
break;
default:
_logger.LogError("Got DeviceStatus with unknown payload type: {PayloadType}", message.Payload.GetType().Name);
break;
}

if (!Guid.TryParse(msg[1], out var id)) return;
}

if (typeof(DeviceOnline).FullName == msg[0])
private async Task HandleDeviceBoolState(Guid deviceId, DeviceBoolStatePayload state)
{
switch (state.Type)
{
await LogicDeviceOnlineStatus(id);
case DeviceBoolStateType.Online:
await LogicDeviceOnlineStatus(deviceId); // TODO: Handle device offline messages too
break;
case DeviceBoolStateType.EStopped:
_logger.LogInformation("EStopped state not implemented yet for DeviceId {DeviceId}", deviceId);
break;
default:
_logger.LogError("Unknown DeviceBoolStateType: {StateType}", state.Type);
break;
}
}

Expand Down Expand Up @@ -111,15 +152,23 @@ await _hubContext.Clients.Users(userIds).DeviceStatus([
}

/// <inheritdoc />
public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
await _subscriber.UnsubscribeAllAsync();
}

/// <inheritdoc />
public async ValueTask DisposeAsync()
{
await _subscriber.UnsubscribeAllAsync();
try
{
await _subscriber.UnsubscribeAllAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during Redis unsubscribe in DisposeAsync");
}

GC.SuppressFinalize(this);
}

Expand Down
1 change: 1 addition & 0 deletions Common/Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="MessagePack" Version="3.1.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.8" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.8" />
<PackageReference Include="Microsoft.Extensions.Caching.Hybrid" Version="9.8.0" />
Expand Down
Loading