Skip to content

Commit 1d4e514

Browse files
authored
UnifiedNetcodeTransport refactor (#3976)
* Fix DontDestroyOnLoadTest * [MTT-14843] Refactor sending and receiving of N4E RPCs in UnifiedNetcodeTransport to not use entities (#3943) Refactor sending and receiving of RPCs to not use entities, thereby preserving send/receive order without having to encode an order value and reorder messages on the receive side. * Post-merge fixes * Fix remaining errors. * Revert unnecessary change from bad merge. * Accidentally deleted a line.
1 parent 6361bb3 commit 1d4e514

2 files changed

Lines changed: 70 additions & 68 deletions

File tree

com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,15 @@ public void NetworkUpdate(NetworkUpdateStage updateStage)
476476

477477
// This should be invoked just prior to the MessageManager processes its outbound queue.
478478
SceneManager.CheckForAndSendNetworkObjectSceneChanged();
479-
480-
// Process outbound messages
481-
MessageManager.ProcessSendQueues();
479+
#if UNIFIED_NETCODE
480+
if (!NetworkConfig.Prefabs.HasGhostPrefabs)
481+
{
482+
#endif
483+
// Process outbound messages
484+
MessageManager.ProcessSendQueues();
485+
#if UNIFIED_NETCODE
486+
}
487+
#endif
482488

483489
// Metrics update needs to be driven by NetworkConnectionManager's update to assure metrics are dispatched after the send queue is processed.
484490
MetricsManager.UpdateMetrics();
@@ -1359,7 +1365,6 @@ private bool CanStart(StartType type)
13591365
/// The world instance assigned to this NetworkManager instance.
13601366
/// </summary>
13611367
public NetcodeWorld NetcodeWorld { get; internal set; }
1362-
13631368
internal void InitializeNetcodeWorld()
13641369
{
13651370
if (NetcodeWorld != null)

com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs

Lines changed: 61 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -43,37 +43,41 @@ public static NativeArray<byte> ToNativeArray(in FixedBytes1280 data)
4343
}
4444
}
4545

46+
internal struct TransportRpcData : IBufferElementData
47+
{
48+
public FixedBytes1280 Buffer;
49+
}
50+
4651
[BurstCompile]
4752
internal struct TransportRpc : IOutOfBandRpcCommand, IRpcCommandSerializer<TransportRpc>
4853
{
49-
public FixedBytes1280 Buffer;
50-
public ulong Order;
54+
public TransportRpcData Value;
5155

5256
public unsafe void Serialize(ref DataStreamWriter writer, in RpcSerializerState state, in TransportRpc data)
5357
{
54-
writer.WriteULong(data.Order);
55-
writer.WriteInt(data.Buffer.Length);
56-
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), data.Buffer.Length);
58+
writer.WriteInt(data.Value.Buffer.Length);
59+
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), data.Value.Buffer.Length);
5760
writer.WriteBytes(span);
5861
}
5962

6063
public unsafe void Deserialize(ref DataStreamReader reader, in RpcDeserializerState state, ref TransportRpc data)
6164
{
62-
data.Order = reader.ReadULong();
6365
var length = reader.ReadInt();
64-
data.Buffer = new FixedBytes1280
66+
data.Value.Buffer = new FixedBytes1280
6567
{
6668
Length = length
6769
};
6870

69-
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), length);
71+
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), length);
7072
reader.ReadBytes(span);
7173
}
7274

7375
[BurstCompile(DisableDirectCall = true)]
7476
private static void InvokeExecute(ref RpcExecutor.Parameters parameters)
7577
{
76-
RpcExecutor.ExecuteCreateRequestComponent<TransportRpc, TransportRpc>(ref parameters);
78+
var element = new TransportRpc();
79+
element.Deserialize(ref parameters.Reader, parameters.DeserializerState, ref element);
80+
parameters.CommandBuffer.AppendToBuffer(parameters.JobIndex, parameters.Connection, element.Value);
7781
}
7882

7983
private static readonly PortableFunctionPointer<RpcExecutor.ExecuteDelegate> k_InvokeExecuteFunctionPointer = new PortableFunctionPointer<RpcExecutor.ExecuteDelegate>(InvokeExecute);
@@ -115,33 +119,57 @@ public void OnUpdate(ref SystemState state)
115119
}
116120
}
117121

122+
[WorldSystemFilter(WorldSystemFilterFlags.ServerSimulation | WorldSystemFilterFlags.ClientSimulation | WorldSystemFilterFlags.ThinClientSimulation)]
123+
[UpdateInGroup(typeof(SimulationSystemGroup), OrderLast = true)]
124+
[UpdateBefore(typeof(RpcSystem))]
118125
internal partial class UnifiedNetcodeUpdateSystem : SystemBase
119126
{
127+
public void OnCreate(ref SystemState state)
128+
{
129+
state.RequireForUpdate<RpcCollection>();
130+
state.RequireForUpdate<NetworkId>();
131+
}
132+
120133
public UnifiedNetcodeTransport Transport;
134+
public NetworkManager NetworkManager;
121135

122136
public List<Connection> DisconnectQueue = new List<Connection>();
123137

124138
public void Disconnect(Connection connection)
125139
{
126140
DisconnectQueue.Add(connection);
127141
}
142+
143+
public void SendRpc(TransportRpc rpc, Entity connectionEntity)
144+
{
145+
var rpcQueue = SystemAPI.GetSingleton<RpcCollection>().GetRpcQueue<TransportRpc, TransportRpc>();
146+
var ghostInstance = GetComponentLookup<GhostInstance>();
147+
var rpcDataStreamBuffer = EntityManager.GetBuffer<OutgoingOutOfBandRpcDataStreamBuffer>(connectionEntity);
148+
rpcQueue.Schedule(rpcDataStreamBuffer, ghostInstance, rpc);
149+
}
128150

129151
protected override void OnUpdate()
130152
{
153+
NetworkManager.MessageManager.ProcessSendQueues();
154+
131155
using var commandBuffer = new EntityCommandBuffer(Allocator.Temp);
132-
foreach (var (request, rpc, entity) in SystemAPI.Query<RefRO<ReceiveRpcCommandRequest>, RefRO<TransportRpc>>().WithEntityAccess())
156+
foreach(var (networkId, _, entity) in SystemAPI.Query<RefRO<NetworkId>, RefRO<NetworkStreamConnection>>().WithEntityAccess())
133157
{
134-
var connectionId = SystemAPI.GetComponent<NetworkId>(request.ValueRO.SourceConnection).Value;
135-
136-
var buffer = rpc.ValueRO.Buffer;
137-
try
138-
{
139-
Transport.DispatchMessage(connectionId, buffer, rpc.ValueRO.Order);
140-
}
141-
finally
158+
var connectionId = networkId.ValueRO.Value;
159+
DynamicBuffer<TransportRpcData> rpcs = EntityManager.GetBuffer<TransportRpcData>(entity);
160+
foreach (var rpc in rpcs)
142161
{
143-
commandBuffer.DestroyEntity(entity);
162+
var buffer = rpc.Buffer;
163+
try
164+
{
165+
Transport.DispatchMessage(connectionId, buffer);
166+
}
167+
catch(Exception e)
168+
{
169+
Debug.LogException(e);
170+
}
144171
}
172+
rpcs.Clear();
145173
}
146174

147175
foreach (var connection in DisconnectQueue)
@@ -171,34 +199,15 @@ private class ConnectionInfo
171199
public BatchedSendQueue SendQueue;
172200
public BatchedReceiveQueue ReceiveQueue;
173201
public Connection Connection;
174-
public ulong LastSent;
175-
public ulong LastReceived;
176202
public Dictionary<ulong, FixedBytes1280> DeferredMessages;
177203
}
178204

179205
private Dictionary<int, ConnectionInfo> m_Connections;
180206

181-
internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong order)
207+
internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer)
182208
{
183209
var connectionInfo = m_Connections[connectionId];
184210

185-
if (order <= connectionInfo.LastReceived)
186-
{
187-
Debug.LogWarning("Received duplicate message, ignoring.");
188-
return;
189-
}
190-
191-
if (order != connectionInfo.LastReceived + 1)
192-
{
193-
if (connectionInfo.DeferredMessages == null)
194-
{
195-
connectionInfo.DeferredMessages = new Dictionary<ulong, FixedBytes1280>();
196-
}
197-
198-
connectionInfo.DeferredMessages[order] = buffer;
199-
return;
200-
}
201-
202211
using var arr = FixedBytes1280.ToNativeArray(buffer);
203212
var reader = new DataStreamReader(arr);
204213
if (connectionInfo.ReceiveQueue == null)
@@ -209,20 +218,7 @@ internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong
209218
{
210219
connectionInfo.ReceiveQueue.PushReader(reader);
211220
}
212-
213-
connectionInfo.LastReceived = order;
214-
if (connectionInfo.DeferredMessages != null)
215-
{
216-
var next = order + 1;
217-
while (connectionInfo.DeferredMessages.Remove(next, out var nextBuffer))
218-
{
219-
reader = new DataStreamReader(FixedBytes1280.ToNativeArray(nextBuffer));
220-
connectionInfo.ReceiveQueue.PushReader(reader);
221-
connectionInfo.LastReceived = next;
222-
++next;
223-
}
224-
}
225-
221+
226222
var message = connectionInfo.ReceiveQueue.PopMessage();
227223
while (message.Count != 0)
228224
{
@@ -243,20 +239,15 @@ public override unsafe void Send(ulong clientId, ArraySegment<byte> payload, Net
243239

244240
while (!connectionInfo.SendQueue.IsEmpty)
245241
{
246-
var rpc = new TransportRpc
247-
{
248-
Buffer = new FixedBytes1280(),
249-
};
242+
var rpc = new TransportRpc();
250243

251-
var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Buffer), k_MaxPacketSize);
244+
var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Value.Buffer), k_MaxPacketSize);
252245

253246
var amount = connectionInfo.SendQueue.FillWriterWithBytes(ref writer, k_MaxPacketSize);
254-
rpc.Buffer.Length = amount;
255-
rpc.Order = ++connectionInfo.LastSent;
256-
257-
var req = m_NetworkManager.NetcodeWorld.EntityManager.CreateEntity(ComponentType.ReadWrite<SendRpcCommandRequest>(), ComponentType.ReadWrite<TransportRpc>());
258-
m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, new SendRpcCommandRequest{TargetConnection = connectionInfo.Connection.ConnectionEntity});
259-
m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, rpc);
247+
rpc.Value.Buffer.Length = amount;
248+
249+
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
250+
updateSystem.SendRpc(rpc, connectionInfo.Connection.ConnectionEntity);
260251

261252
connectionInfo.SendQueue.Consume(amount);
262253
}
@@ -280,6 +271,8 @@ private void OnClientConnectedToServer(Connection connection, NetCodeConnectionE
280271
};
281272
m_ServerClientId = connection.NetworkId.Value;
282273
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
274+
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
275+
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
283276
}
284277

285278
private void OnServerNewClientConnection(Connection connection, NetCodeConnectionEvent connectionEvent)
@@ -291,6 +284,8 @@ private void OnServerNewClientConnection(Connection connection, NetCodeConnectio
291284
Connection = connection
292285
}; ;
293286
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
287+
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
288+
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
294289
}
295290

296291
private const string k_InvalidRpcMessage = "An invalid RPC was received";
@@ -403,6 +398,7 @@ public override bool StartClient()
403398
m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnClientConnectionEvent;
404399
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
405400
updateSystem.Transport = this;
401+
updateSystem.NetworkManager = m_NetworkManager;
406402
return true;
407403
}
408404

@@ -416,6 +412,7 @@ public override bool StartServer()
416412
m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnServerConnectionEvent;
417413
var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
418414
updateSystem.Transport = this;
415+
updateSystem.NetworkManager = m_NetworkManager;
419416
return true;
420417
}
421418

0 commit comments

Comments
 (0)