forked from Linq2GraphQL/Linq2GraphQL.Client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWSClient.cs
More file actions
139 lines (110 loc) · 4.1 KB
/
WSClient.cs
File metadata and controls
139 lines (110 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
using System.Diagnostics;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text.Json;
using System.Text.Json.Serialization;
using Websocket.Client;
namespace Linq2GraphQL.Client.Subscriptions;
public class WSClient : IAsyncDisposable
{
private readonly GraphClient _graphClient;
private readonly GraphQLRequest payload;
private readonly Subject<string> subscriptionSubject = new();
private readonly WebsocketClient client;
private readonly JsonSerializerOptions jsonOptions;
public WSClient(GraphClient graphClient, GraphQLRequest payload)
{
_graphClient = graphClient;
this.payload = payload;
jsonOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
var factory = new Func<ClientWebSocket>(() =>
{
var ws = new ClientWebSocket();
ws.Options.AddSubProtocol(GetSubprotocolString());
return ws;
});
client = new WebsocketClient(new Uri(_graphClient.SubscriptionUrl), factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(30)
};
}
public IObservable<string> Subscription => subscriptionSubject.AsObservable();
public async ValueTask DisposeAsync()
{
if (client == null)
{
return;
}
await client.Stop(WebSocketCloseStatus.NormalClosure, string.Empty);
client.Dispose();
}
public async Task Start()
{
client.ReconnectionHappened.Subscribe(info => LogMessage($"Reconnection, type: {info.Type}"));
//General log message
client.MessageReceived.Subscribe(msg => LogMessage($"Message received: {msg}"));
//Filter General response
var tt = client.MessageReceived.Select(m => JsonSerializer.Deserialize<WebsocketResponse>(m.ToString()));
tt.Where(e => e.Type == WebsocketRequestTypes.PING).Subscribe(msg => SendRequest(new WebsocketRequest(WebsocketRequestTypes.PONG)));
tt.Where(e => !string.IsNullOrEmpty(e?.Id)).Subscribe(r =>
{
subscriptionSubject.OnNext(r.Payload?.ToString());
});
await client.Start();
var initRequest = new WebsocketRequest(WebsocketRequestTypes.CONNECTION_INIT);
if (_graphClient.WSConnectionInitPayload is not null)
{
var initPayload = await _graphClient.WSConnectionInitPayload(_graphClient);
if (initPayload is not null)
{
initRequest.Payload = initPayload;
}
}
SendRequest(initRequest);
var subscriptionRequest = new WebsocketRequest(GetSubscribeCommand())
{
Id = Guid.NewGuid().ToString(),
Payload = payload
};
SendRequest(subscriptionRequest);
}
private string GetSubprotocolString()
{
switch (_graphClient.SubscriptionProtocol)
{
case SubscriptionProtocol.GraphQLWebSocket:
return SubscriptionProtocols.GRAPGQL_TRANSPORT_WS;
case SubscriptionProtocol.ApolloWebSocket:
return SubscriptionProtocols.GRAPHQL_WS;
default:
throw new Exception($"{_graphClient.SubscriptionProtocol} is unknown");
}
}
private string GetSubscribeCommand()
{
switch (_graphClient.SubscriptionProtocol)
{
case SubscriptionProtocol.GraphQLWebSocket:
return SubscribeCommands.SUBSCRIBE;
case SubscriptionProtocol.ApolloWebSocket:
return SubscribeCommands.START;
default:
throw new Exception($"{_graphClient.SubscriptionProtocol} is unknown");
}
}
private static void LogMessage(string message)
{
// Write logs to debug console
Debug.WriteLine($"{message} - {DateTime.Now.ToString("T")}");
}
private void SendRequest(WebsocketRequest request)
{
var json = JsonSerializer.Serialize(request, jsonOptions);
client.Send(json);
LogMessage($"Message sent: {json}");
}
}