@@ -11,10 +11,13 @@ namespace Cnblogs.DashScope.Core;
1111/// </summary>
1212public sealed class DashScopeClientWebSocket : IDisposable
1313{
14- private static readonly UnboundedChannelOptions UnboundedChannelOptions = new ( )
15- {
16- SingleWriter = true ,
17- } ;
14+ private static readonly UnboundedChannelOptions UnboundedChannelOptions =
15+ new ( )
16+ {
17+ SingleWriter = true ,
18+ SingleReader = true ,
19+ AllowSynchronousContinuations = true
20+ } ;
1821
1922 private readonly ClientWebSocket _socket = new ( ) ;
2023 private Task ? _receiveTask ;
@@ -63,14 +66,15 @@ public async Task ConnectAsync<TOutput>(Uri uri, CancellationToken cancellationT
6366 {
6467 await _socket . ConnectAsync ( uri , cancellationToken ) ;
6568 _receiveTask = ReceiveMessagesAsync < TOutput > ( cancellationToken ) ;
66- State = DashScopeWebSocketState . Connected ;
69+ State = DashScopeWebSocketState . Ready ;
6770 }
6871
6972 /// <summary>
7073 /// Reset binary output.
7174 /// </summary>
7275 public void ResetOutput ( )
7376 {
77+ BinaryOutput . Writer . TryComplete ( ) ;
7478 BinaryOutput = Channel . CreateUnbounded < byte > ( UnboundedChannelOptions ) ;
7579 _taskStartedSignal = new TaskCompletionSource < bool > ( ) ;
7680 }
@@ -84,17 +88,22 @@ public void ResetOutput()
8488 /// <typeparam name="TInput">Type of the input.</typeparam>
8589 /// <typeparam name="TParameter">Type of the parameter.</typeparam>
8690 /// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> is requested.</exception>
87- /// <exception cref="InvalidOperationException">Websocket is not connected.</exception>
91+ /// <exception cref="InvalidOperationException">Websocket is not connected or already closed .</exception>
8892 /// <exception cref="ObjectDisposedException">The underlying websocket has already been closed.</exception>
8993 public Task SendMessageAsync < TInput , TParameter > (
9094 DashScopeWebSocketRequest < TInput , TParameter > request ,
9195 CancellationToken cancellationToken = default )
9296 where TInput : class
9397 where TParameter : class
9498 {
99+ if ( State == DashScopeWebSocketState . Closed )
100+ {
101+ throw new InvalidOperationException ( "Socket is already closed." ) ;
102+ }
103+
104+ var json = JsonSerializer . Serialize ( request , DashScopeDefaults . SerializationOptions ) ;
95105 return _socket . SendAsync (
96- new ArraySegment < byte > (
97- Encoding . UTF8 . GetBytes ( JsonSerializer . Serialize ( request , DashScopeDefaults . SerializationOptions ) ) ) ,
106+ new ArraySegment < byte > ( Encoding . UTF8 . GetBytes ( json ) ) ,
98107 WebSocketMessageType . Text ,
99108 true ,
100109 cancellationToken ) ;
@@ -110,7 +119,6 @@ public Task SendMessageAsync<TInput, TParameter>(
110119 try
111120 {
112121 var result = await _socket . ReceiveAsync ( segment , cancellationToken ) ;
113-
114122 if ( result . MessageType == WebSocketMessageType . Close )
115123 {
116124 await CloseAsync ( cancellationToken ) ;
@@ -168,7 +176,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
168176 _taskStartedSignal . TrySetResult ( true ) ;
169177 break ;
170178 case "task-finished" :
171- State = DashScopeWebSocketState . Connected ;
179+ State = DashScopeWebSocketState . Ready ;
172180 BinaryOutput . Writer . Complete ( ) ;
173181 break ;
174182 case "task-failed" :
@@ -199,7 +207,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
199207 public async Task CloseAsync ( CancellationToken cancellationToken = default )
200208 {
201209 await _socket . CloseAsync ( WebSocketCloseStatus . NormalClosure , "Closing" , cancellationToken ) ;
202- Dispose ( ) ;
210+ State = DashScopeWebSocketState . Closed ;
203211 }
204212
205213 private void Dispose ( bool disposing )
@@ -208,7 +216,6 @@ private void Dispose(bool disposing)
208216 {
209217 // Dispose managed resources.
210218 _socket . Dispose ( ) ;
211- State = DashScopeWebSocketState . Closed ;
212219 BinaryOutput . Writer . TryComplete ( ) ;
213220 }
214221 }
0 commit comments