88using Unity . Collections . LowLevel . Unsafe ;
99using Unity . Entities ;
1010using Unity . NetCode ;
11+ using Unity . Netcode . Transports . UTP ;
1112using Unity . Networking . Transport ;
1213using UnityEngine ;
1314
@@ -16,37 +17,19 @@ namespace Unity.Netcode.Unified
1617 internal struct TransportRpc : IRpcCommand , IRpcCommandSerializer < TransportRpc >
1718 {
1819 public FixedList4096Bytes < byte > Buffer ;
19-
20- internal static string ByteArrayToString ( FixedList4096Bytes < byte > ba , int offset , int count )
21- {
22- var hex = new StringBuilder ( ba . Length * 2 ) ;
23- for ( int i = offset ; i < offset + count ; ++ i )
24- {
25- hex . AppendFormat ( "{0:x2} " , ba [ i ] ) ;
26- }
27-
28- return hex . ToString ( ) ;
29- }
30- internal static string ByteArrayToString ( NativeArray < byte > ba , int offset , int count )
31- {
32- var hex = new StringBuilder ( ba . Length * 2 ) ;
33- for ( int i = offset ; i < offset + count ; ++ i )
34- {
35- hex . AppendFormat ( "{0:x2} " , ba [ i ] ) ;
36- }
37-
38- return hex . ToString ( ) ;
39- }
20+ public ulong Order ;
4021
4122 public unsafe void Serialize ( ref DataStreamWriter writer , in RpcSerializerState state , in TransportRpc data )
4223 {
24+ writer . WriteULong ( data . Order ) ;
4325 writer . WriteInt ( data . Buffer . Length ) ;
4426 var span = new Span < byte > ( data . Buffer . GetUnsafePtr ( ) , data . Buffer . Length ) ;
4527 writer . WriteBytes ( span ) ;
4628 }
4729
4830 public unsafe void Deserialize ( ref DataStreamReader reader , in RpcDeserializerState state , ref TransportRpc data )
4931 {
32+ data . Order = reader . ReadULong ( ) ;
5033 var length = reader . ReadInt ( ) ;
5134 data . Buffer = new FixedList4096Bytes < byte > ( ) ;
5235 data . Buffer . Length = length ;
@@ -104,13 +87,13 @@ internal partial class UnifiedNetcodeUpdateSystem : SystemBase
10487 {
10588 public UnifiedNetcodeTransport Transport ;
10689
107- public List < Connection > DiscconedtQueue = new List < Connection > ( ) ;
108-
90+ public List < Connection > DisconnectQueue = new List < Connection > ( ) ;
91+
10992 public void Disconnect ( Connection connection )
11093 {
111- DiscconedtQueue . Add ( connection ) ;
94+ DisconnectQueue . Add ( connection ) ;
11295 }
113-
96+
11497 protected override void OnUpdate ( )
11598 {
11699 using var commandBuffer = new EntityCommandBuffer ( Allocator . Temp ) ;
@@ -119,16 +102,24 @@ protected override void OnUpdate()
119102 var connectionId = SystemAPI . GetComponent < NetworkId > ( request . ValueRO . SourceConnection ) . Value ;
120103
121104 var buffer = rpc . ValueRW . Buffer ;
122- Transport . DispatchMessage ( connectionId , buffer ) ;
123- commandBuffer . DestroyEntity ( entity ) ;
105+ try
106+ {
107+ Transport . DispatchMessage ( connectionId , buffer , rpc . ValueRO . Order ) ;
108+ }
109+ finally
110+ {
111+ commandBuffer . DestroyEntity ( entity ) ;
112+ }
124113 }
125114
126- foreach ( var connection in DiscconedtQueue )
115+ foreach ( var connection in DisconnectQueue )
127116 {
128117 commandBuffer . AddComponent < NetworkStreamRequestDisconnect > ( connection . ConnectionEntity ) ;
129118 }
119+ DisconnectQueue . Clear ( ) ;
120+
130121 commandBuffer . Playback ( EntityManager ) ;
131- DiscconedtQueue . Clear ( ) ;
122+
132123 }
133124 }
134125
@@ -144,36 +135,90 @@ internal class UnifiedNetcodeTransport : NetworkTransport
144135
145136 private IRealTimeProvider m_RealTimeProvider ;
146137
147- private Dictionary < int , Connection > m_Connections ;
148-
149- internal void DispatchMessage ( int connectionId , FixedList4096Bytes < byte > buffer )
138+ private class ConnectionInfo
150139 {
151- ArraySegment < byte > data = new ArraySegment < byte > ( buffer . ToArray ( ) ) ;
152- InvokeOnTransportEvent ( NetworkEvent . Data , ( ulong ) connectionId , data , m_RealTimeProvider . RealTimeSinceStartup ) ;
140+ public BatchedSendQueue SendQueue ;
141+ public BatchedReceiveQueue ReceiveQueue ;
142+ public Connection Connection ;
143+ public ulong LastSent ;
144+ public ulong LastReceived ;
145+ public Dictionary < ulong , FixedList4096Bytes < byte > > DeferredMessages ;
153146 }
147+
148+ private Dictionary < int , ConnectionInfo > m_Connections ;
154149
155- public override void Send ( ulong clientId , ArraySegment < byte > payload , NetworkDelivery networkDelivery )
150+ internal void DispatchMessage ( int connectionId , FixedList4096Bytes < byte > buffer , ulong order )
156151 {
157- if ( ! m_Connections . TryGetValue ( ( int ) clientId , out Connection connection ) )
152+ var connectionInfo = m_Connections [ connectionId ] ;
153+
154+ if ( order != connectionInfo . LastReceived + 1 )
158155 {
156+ if ( connectionInfo . DeferredMessages == null )
157+ {
158+ connectionInfo . DeferredMessages = new Dictionary < ulong , FixedList4096Bytes < byte > > ( ) ;
159+ }
160+
161+ connectionInfo . DeferredMessages [ order ] = buffer ;
159162 return ;
160163 }
161164
162- var rpc = new TransportRpc
165+ var reader = new DataStreamReader ( buffer . ToNativeArray ( Allocator . Temp ) ) ;
166+ if ( connectionInfo . ReceiveQueue == null )
163167 {
164- Buffer = new FixedList4096Bytes < byte > ( ) ,
165- } ;
166-
167- unsafe
168+ connectionInfo . ReceiveQueue = new BatchedReceiveQueue ( reader ) ;
169+ }
170+ else
171+ {
172+ connectionInfo . ReceiveQueue . PushReader ( reader ) ;
173+ }
174+
175+ connectionInfo . LastReceived = order ;
176+ if ( connectionInfo . DeferredMessages != null )
168177 {
169- rpc . Buffer . Length = payload . Count ;
170- fixed ( byte * data = payload . Array )
178+ var next = order + 1 ;
179+ while ( connectionInfo . DeferredMessages . Remove ( next , out var nextBuffer ) )
171180 {
172- UnsafeUtility . MemCpy ( rpc . Buffer . GetUnsafePtr ( ) , ( void * ) ( data + payload . Offset ) , payload . Count ) ;
181+ reader = new DataStreamReader ( nextBuffer . ToNativeArray ( Allocator . Temp ) ) ;
182+ connectionInfo . ReceiveQueue . PushReader ( reader ) ;
183+ connectionInfo . LastReceived = next ;
184+ ++ next ;
173185 }
174186 }
175187
176- connection . SendMessage ( rpc ) ;
188+ var message = connectionInfo . ReceiveQueue . PopMessage ( ) ;
189+ while ( message . Count != 0 )
190+ {
191+ InvokeOnTransportEvent ( NetworkEvent . Data , ( ulong ) connectionId , message ,
192+ m_RealTimeProvider . RealTimeSinceStartup ) ;
193+ message = connectionInfo . ReceiveQueue . PopMessage ( ) ;
194+ }
195+ }
196+
197+ public override unsafe void Send ( ulong clientId , ArraySegment < byte > payload , NetworkDelivery networkDelivery )
198+ {
199+ if ( ! m_Connections . TryGetValue ( ( int ) clientId , out ConnectionInfo connectionInfo ) )
200+ {
201+ return ;
202+ }
203+
204+ connectionInfo . SendQueue . PushMessage ( payload ) ;
205+
206+ while ( ! connectionInfo . SendQueue . IsEmpty )
207+ {
208+ var rpc = new TransportRpc
209+ {
210+ Buffer = new FixedList4096Bytes < byte > ( ) ,
211+ } ;
212+ var writer = new DataStreamWriter ( rpc . Buffer . GetUnsafePtr ( ) , 1024 ) ;
213+
214+ var amount = connectionInfo . SendQueue . FillWriterWithBytes ( ref writer , 1024 ) ;
215+ rpc . Buffer . Length = amount ;
216+ rpc . Order = ++ connectionInfo . LastSent ;
217+
218+ connectionInfo . Connection . SendMessage ( rpc ) ;
219+
220+ connectionInfo . SendQueue . Consume ( amount ) ;
221+ }
177222 }
178223
179224 public override NetworkEvent PollEvent ( out ulong clientId , out ArraySegment < byte > payload , out float receiveTime )
@@ -186,14 +231,24 @@ public override NetworkEvent PollEvent(out ulong clientId, out ArraySegment<byte
186231
187232 private void OnClientConnectedToServer ( Connection connection , NetCodeConnectionEvent connectionEvent )
188233 {
189- m_Connections [ connection . NetworkId . Value ] = connection ;
234+ m_Connections [ connection . NetworkId . Value ] = new ConnectionInfo
235+ {
236+ ReceiveQueue = null ,
237+ SendQueue = new BatchedSendQueue ( BatchedSendQueue . MaximumMaximumCapacity ) ,
238+ Connection = connection
239+ } ;
190240 m_ServerClientId = connection . NetworkId . Value ;
191241 InvokeOnTransportEvent ( NetworkEvent . Connect , ( ulong ) connection . NetworkId . Value , default , m_RealTimeProvider . RealTimeSinceStartup ) ;
192242 }
193243
194244 private void OnServerNewClientConnection ( Connection connection , NetCodeConnectionEvent connectionEvent )
195245 {
196- m_Connections [ connection . NetworkId . Value ] = connection ;
246+ m_Connections [ connection . NetworkId . Value ] = new ConnectionInfo
247+ {
248+ ReceiveQueue = null ,
249+ SendQueue = new BatchedSendQueue ( BatchedSendQueue . MaximumMaximumCapacity ) ,
250+ Connection = connection
251+ } ; ;
197252 InvokeOnTransportEvent ( NetworkEvent . Connect , ( ulong ) connection . NetworkId . Value , default , m_RealTimeProvider . RealTimeSinceStartup ) ;
198253 }
199254
@@ -219,6 +274,9 @@ public override bool StartClient()
219274 NetCode . Netcode . Client . OnDisconnect = OnClientDisconnectFromServer ;
220275 var updateSystem = NetCode . Netcode . GetWorld ( false ) . GetExistingSystemManaged < UnifiedNetcodeUpdateSystem > ( ) ;
221276 updateSystem . Transport = this ;
277+ using var drvQuery = updateSystem . EntityManager . CreateEntityQuery ( ComponentType . ReadWrite < NetworkStreamDriver > ( ) ) ;
278+ var driver = drvQuery . GetSingletonRW < NetworkStreamDriver > ( ) ;
279+ driver . ValueRW . Connect ( updateSystem . EntityManager , NetworkEndpoint . Parse ( "127.0.0.1" , 7979 ) ) ;
222280 return true ;
223281 }
224282
@@ -241,20 +299,23 @@ public override bool StartServer()
241299 NetCode . Netcode . Server . OnDisconnect = OnServerClientDisconnected ;
242300 var updateSystem = NetCode . Netcode . GetWorld ( true ) . GetExistingSystemManaged < UnifiedNetcodeUpdateSystem > ( ) ;
243301 updateSystem . Transport = this ;
302+ using var drvQuery = updateSystem . EntityManager . CreateEntityQuery ( ComponentType . ReadWrite < NetworkStreamDriver > ( ) ) ;
303+ var driver = drvQuery . GetSingletonRW < NetworkStreamDriver > ( ) ;
304+ driver . ValueRW . Listen ( NetworkEndpoint . Parse ( "127.0.0.1" , 7979 ) ) ;
244305 return true ;
245306 }
246307
247308 public override void DisconnectRemoteClient ( ulong clientId )
248309 {
249310 var updateSystem = NetCode . Netcode . GetWorld ( true ) . GetExistingSystemManaged < UnifiedNetcodeUpdateSystem > ( ) ;
250- updateSystem . Disconnect ( m_Connections [ ( int ) clientId ] ) ;
311+ updateSystem . Disconnect ( m_Connections [ ( int ) clientId ] . Connection ) ;
251312 m_Connections . Remove ( ( int ) clientId ) ;
252313 }
253314
254315 public override void DisconnectLocalClient ( )
255316 {
256317 var updateSystem = NetCode . Netcode . GetWorld ( false ) . GetExistingSystemManaged < UnifiedNetcodeUpdateSystem > ( ) ;
257- updateSystem . Disconnect ( m_Connections [ ( int ) ServerClientId ] ) ;
318+ updateSystem . Disconnect ( m_Connections [ ( int ) ServerClientId ] . Connection ) ;
258319 m_Connections . Remove ( ( int ) ServerClientId ) ;
259320 }
260321
@@ -279,7 +340,7 @@ public override void Shutdown()
279340
280341 public override void Initialize ( NetworkManager networkManager = null )
281342 {
282- m_Connections = new Dictionary < int , Connection > ( ) ;
343+ m_Connections = new Dictionary < int , ConnectionInfo > ( ) ;
283344 m_RealTimeProvider = networkManager . RealTimeProvider ;
284345 }
285346 }
0 commit comments