@@ -23,7 +23,7 @@ void CreateDriver(
2323 out NetworkDriver driver ,
2424 out NetworkPipeline unreliableFragmentedPipeline ,
2525 out NetworkPipeline unreliableSequencedFragmentedPipeline ,
26- out NetworkPipeline reliableSequencedFragmentedPipeline ) ;
26+ out NetworkPipeline reliableSequencedPipeline ) ;
2727 }
2828
2929 public static class ErrorUtilities
@@ -160,7 +160,7 @@ public static implicit operator ConnectionAddressData(NetworkEndPoint d) =>
160160
161161 private NetworkPipeline m_UnreliableFragmentedPipeline ;
162162 private NetworkPipeline m_UnreliableSequencedFragmentedPipeline ;
163- private NetworkPipeline m_ReliableSequencedFragmentedPipeline ;
163+ private NetworkPipeline m_ReliableSequencedPipeline ;
164164
165165 public override ulong ServerClientId => m_ServerClientId ;
166166
@@ -210,14 +210,19 @@ public SimulatorUtility.Parameters ClientSimulatorParameters
210210 /// </summary>
211211 private readonly Dictionary < SendTarget , BatchedSendQueue > m_SendQueue = new Dictionary < SendTarget , BatchedSendQueue > ( ) ;
212212
213+ // Since reliable messages may be spread out over multiple transport payloads, it's possible
214+ // to receive only parts of a message in an update. We thus keep the reliable receive queues
215+ // around to avoid losing partial messages.
216+ private readonly Dictionary < ulong , BatchedReceiveQueue > m_ReliableReceiveQueues = new Dictionary < ulong , BatchedReceiveQueue > ( ) ;
217+
213218 private void InitDriver ( )
214219 {
215220 DriverConstructor . CreateDriver (
216221 this ,
217222 out m_Driver ,
218223 out m_UnreliableFragmentedPipeline ,
219224 out m_UnreliableSequencedFragmentedPipeline ,
220- out m_ReliableSequencedFragmentedPipeline ) ;
225+ out m_ReliableSequencedPipeline ) ;
221226 }
222227
223228 private void DisposeDriver ( )
@@ -241,7 +246,7 @@ private NetworkPipeline SelectSendPipeline(NetworkDelivery delivery)
241246 case NetworkDelivery . Reliable :
242247 case NetworkDelivery . ReliableSequenced :
243248 case NetworkDelivery . ReliableFragmentedSequenced :
244- return m_ReliableSequencedFragmentedPipeline ;
249+ return m_ReliableSequencedPipeline ;
245250
246251 default :
247252 Debug . LogError ( $ "Unknown { nameof ( NetworkDelivery ) } value: { delivery } ") ;
@@ -340,6 +345,11 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData)
340345 }
341346 }
342347
348+ internal void SetMaxPayloadSize ( int maxPayloadSize )
349+ {
350+ m_MaxPayloadSize = maxPayloadSize ;
351+ }
352+
343353 private void SetProtocol ( ProtocolType inProtocol )
344354 {
345355 m_ProtocolType = inProtocol ;
@@ -439,7 +449,14 @@ private void SendBatchedMessages(SendTarget sendTarget, BatchedSendQueue queue)
439449 return ;
440450 }
441451
442- var written = queue . FillWriter ( ref writer ) ;
452+ // We don't attempt to send entire payloads over the reliable pipeline. Instead we
453+ // fragment it manually. This is safe and easy to do since the reliable pipeline
454+ // basically implements a stream, so as long as we separate the different messages
455+ // in the stream (the send queue does that automatically) we are sure they'll be
456+ // reassembled properly at the other end. This allows us to lift the limit of ~44KB
457+ // on reliable payloads (because of the reliable window size).
458+ var written = pipeline == m_ReliableSequencedPipeline
459+ ? queue . FillWriterWithBytes ( ref writer ) : queue . FillWriterWithMessages ( ref writer ) ;
443460
444461 result = m_Driver . EndSend ( writer ) ;
445462 if ( result == written )
@@ -481,9 +498,42 @@ private bool AcceptConnection()
481498
482499 }
483500
501+ private void ReceiveMessages ( ulong clientId , NetworkPipeline pipeline , DataStreamReader dataReader )
502+ {
503+ BatchedReceiveQueue queue ;
504+ if ( pipeline == m_ReliableSequencedPipeline )
505+ {
506+ if ( m_ReliableReceiveQueues . TryGetValue ( clientId , out queue ) )
507+ {
508+ queue . PushReader ( dataReader ) ;
509+ }
510+ else
511+ {
512+ queue = new BatchedReceiveQueue ( dataReader ) ;
513+ m_ReliableReceiveQueues [ clientId ] = queue ;
514+ }
515+ }
516+ else
517+ {
518+ queue = new BatchedReceiveQueue ( dataReader ) ;
519+ }
520+
521+ while ( ! queue . IsEmpty )
522+ {
523+ var message = queue . PopMessage ( ) ;
524+ if ( message == default )
525+ {
526+ // Only happens if there's only a partial message in the queue (rare).
527+ break ;
528+ }
529+
530+ InvokeOnTransportEvent ( NetcodeNetworkEvent . Data , clientId , message , Time . realtimeSinceStartup ) ;
531+ }
532+ }
533+
484534 private bool ProcessEvent ( )
485535 {
486- var eventType = m_Driver . PopEvent ( out var networkConnection , out var reader ) ;
536+ var eventType = m_Driver . PopEvent ( out var networkConnection , out var reader , out var pipeline ) ;
487537
488538 switch ( eventType )
489539 {
@@ -510,6 +560,8 @@ private bool ProcessEvent()
510560 }
511561 }
512562
563+ m_ReliableReceiveQueues . Remove ( ParseClientId ( networkConnection ) ) ;
564+
513565 InvokeOnTransportEvent ( NetcodeNetworkEvent . Disconnect ,
514566 ParseClientId ( networkConnection ) ,
515567 default ( ArraySegment < byte > ) ,
@@ -520,17 +572,7 @@ private bool ProcessEvent()
520572 }
521573 case TransportNetworkEvent . Type . Data :
522574 {
523- var queue = new BatchedReceiveQueue ( reader ) ;
524-
525- while ( ! queue . IsEmpty )
526- {
527- InvokeOnTransportEvent ( NetcodeNetworkEvent . Data ,
528- ParseClientId ( networkConnection ) ,
529- queue . PopMessage ( ) ,
530- Time . realtimeSinceStartup
531- ) ;
532- }
533-
575+ ReceiveMessages ( ParseClientId ( networkConnection ) , pipeline , reader ) ;
534576 return true ;
535577 }
536578 }
@@ -744,7 +786,7 @@ public override void Shutdown()
744786 public void CreateDriver ( UnityTransport transport , out NetworkDriver driver ,
745787 out NetworkPipeline unreliableFragmentedPipeline ,
746788 out NetworkPipeline unreliableSequencedFragmentedPipeline ,
747- out NetworkPipeline reliableSequencedFragmentedPipeline )
789+ out NetworkPipeline reliableSequencedPipeline )
748790 {
749791 var maxFrameTimeMS = 0 ;
750792
@@ -775,8 +817,7 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
775817 typeof ( UnreliableSequencedPipelineStage ) ,
776818 typeof ( SimulatorPipelineStage ) ,
777819 typeof ( SimulatorPipelineStageInSend ) ) ;
778- reliableSequencedFragmentedPipeline = driver . CreatePipeline (
779- typeof ( FragmentationPipelineStage ) ,
820+ reliableSequencedPipeline = driver . CreatePipeline (
780821 typeof ( ReliableSequencedPipelineStage ) ,
781822 typeof ( SimulatorPipelineStage ) ,
782823 typeof ( SimulatorPipelineStageInSend ) ) ;
@@ -788,8 +829,8 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
788829 typeof ( FragmentationPipelineStage ) ) ;
789830 unreliableSequencedFragmentedPipeline = driver . CreatePipeline (
790831 typeof ( FragmentationPipelineStage ) , typeof ( UnreliableSequencedPipelineStage ) ) ;
791- reliableSequencedFragmentedPipeline = driver . CreatePipeline (
792- typeof ( FragmentationPipelineStage ) , typeof ( ReliableSequencedPipelineStage )
832+ reliableSequencedPipeline = driver . CreatePipeline (
833+ typeof ( ReliableSequencedPipelineStage )
793834 ) ;
794835 }
795836 }
0 commit comments