diff --git a/Hazel.UnitTests/FragmentationTests.cs b/Hazel.UnitTests/FragmentationTests.cs new file mode 100644 index 0000000..39fd014 --- /dev/null +++ b/Hazel.UnitTests/FragmentationTests.cs @@ -0,0 +1,80 @@ +using System; +using System.Linq; +using System.Net; +using System.Threading; +using Hazel.Udp; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Hazel.UnitTests +{ + [TestClass] + public class FragmentationTests + { + private readonly byte[] _testData = Enumerable.Range(0, 10000).Select(x => (byte)x).ToArray(); + + [TestMethod] + [DataRow(false, DisplayName = "SendBytes")] + [DataRow(true, DisplayName = "MessageWriter")] + public void ReliableSendTest(bool useMessageWriter) + { + using (var listener = new UdpConnectionListener(new IPEndPoint(IPAddress.Any, 4296))) + using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) + { + var manualResetEvent = new ManualResetEventSlim(false); + DataReceivedEventArgs? data = null; + + listener.NewConnection += e => + { + e.Connection.DataReceived += de => + { + data = de; + manualResetEvent.Set(); + }; + }; + + listener.Start(); + connection.Connect(); + + if (useMessageWriter) + { + var messageWriter = MessageWriter.Get(SendOption.Reliable); + messageWriter.Write(_testData); + connection.Send(messageWriter); + } + else + { + connection.SendBytes(_testData, SendOption.Reliable); + } + + manualResetEvent.Wait(TimeSpan.FromSeconds(5)); + + Assert.IsNotNull(data); + + Assert.AreEqual(SendOption.Reliable, data.Value.SendOption); + + var messageReader = data.Value.Message; + var received = new byte[messageReader.Length]; + Array.Copy(messageReader.Buffer, messageReader.Offset + messageReader.Position, received, 0, received.Length); + messageReader.Recycle(); + + CollectionAssert.AreEqual(_testData, received); + } + } + + [TestMethod] + public void UnreliableSendTest() + { + using (var listener = new UdpConnectionListener(new IPEndPoint(IPAddress.Any, 4296))) + using (var connection = new UdpClientConnection(new IPEndPoint(IPAddress.Loopback, 4296))) + { + listener.Start(); + connection.Connect(); + + Assert.AreEqual("Unreliable messages can't be bigger than MTU", Assert.ThrowsException(() => + { + connection.SendBytes(_testData); + }).Message); + } + } + } +} diff --git a/Hazel.UnitTests/UdpConnectionTestHarness.cs b/Hazel.UnitTests/UdpConnectionTestHarness.cs index 8414e0c..36ea165 100644 --- a/Hazel.UnitTests/UdpConnectionTestHarness.cs +++ b/Hazel.UnitTests/UdpConnectionTestHarness.cs @@ -38,7 +38,7 @@ protected override bool SendDisconnect(MessageWriter writer) return true; } - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { this.BytesSent.Add(MessageReader.Get(bytes)); } diff --git a/Hazel/Dtls/DtlsConnectionListener.cs b/Hazel/Dtls/DtlsConnectionListener.cs index ac84b67..38c4959 100644 --- a/Hazel/Dtls/DtlsConnectionListener.cs +++ b/Hazel/Dtls/DtlsConnectionListener.cs @@ -1258,7 +1258,7 @@ private void SendHelloVerifyRequest(IPEndPoint peerAddress, ulong recordSequence /// /// Handle a requrest to send a datagram to the network /// - protected override void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint) + protected override void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint, Action onTooBig = null) { PeerData peer; if (!this.existingPeers.TryGetValue(remoteEndPoint, out peer)) diff --git a/Hazel/Dtls/DtlsUnityConnection.cs b/Hazel/Dtls/DtlsUnityConnection.cs index 8bafac1..fd1ff64 100644 --- a/Hazel/Dtls/DtlsUnityConnection.cs +++ b/Hazel/Dtls/DtlsUnityConnection.cs @@ -325,13 +325,13 @@ private ByteSpan WriteBytesToConnectionInternal(byte[] bytes, int length) } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { ByteSpan wireData = this.WriteBytesToConnectionInternal(bytes, length); if (wireData.Length > 0) { Debug.Assert(wireData.Offset == 0, "Got a non-zero write data offset"); - base.WriteBytesToConnection(wireData.GetUnderlyingArray(), wireData.Length); + base.WriteBytesToConnection(wireData.GetUnderlyingArray(), wireData.Length, onTooBig); } } diff --git a/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs b/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs index 92a7a81..882a865 100644 --- a/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs +++ b/Hazel/FewerThreads/ThreadLimitedUdpConnectionListener.cs @@ -17,6 +17,7 @@ private struct SendMessageInfo { public ByteSpan Span; public IPEndPoint Recipient; + public Action OnTooBig; } private struct ReceiveMessageInfo @@ -250,7 +251,14 @@ private void SendLoop() { if (this.socket.Poll(Timeout.Infinite, SelectMode.SelectWrite)) { - this.socket.SendTo(msg.Span.GetUnderlyingArray(), msg.Span.Offset, msg.Span.Length, SocketFlags.None, msg.Recipient); + try + { + this.socket.SendTo(msg.Span.GetUnderlyingArray(), msg.Span.Offset, msg.Span.Length, SocketFlags.None, msg.Recipient); + } + catch (SocketException e) when (msg.OnTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + msg.OnTooBig(); + } } else { @@ -335,14 +343,14 @@ protected virtual void ReadCallback(MessageReader message, IPEndPoint remoteEndP connection.HandleReceive(message, bytesReceived); } - internal void SendDataRaw(byte[] response, IPEndPoint remoteEndPoint) + internal void SendDataRaw(byte[] response, IPEndPoint remoteEndPoint, Action onTooBig = null) { - QueueRawData(response, remoteEndPoint); + QueueRawData(response, remoteEndPoint, onTooBig); } - protected virtual void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint) + protected virtual void QueueRawData(ByteSpan span, IPEndPoint remoteEndPoint, Action onTooBig = null) { - this.sendQueue.TryAdd(new SendMessageInfo() { Span = span, Recipient = remoteEndPoint }); + this.sendQueue.TryAdd(new SendMessageInfo() { Span = span, Recipient = remoteEndPoint, OnTooBig = onTooBig }); } /// diff --git a/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs b/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs index 28b21d6..3aa20be 100644 --- a/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs +++ b/Hazel/FewerThreads/ThreadLimitedUdpServerConnection.cs @@ -38,14 +38,15 @@ internal ThreadLimitedUdpServerConnection(ThreadLimitedUdpConnectionListener lis State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { if (bytes.Length != length) throw new ArgumentException("I made an assumption here. I hope you see this error."); - Listener.SendDataRaw(bytes, EndPoint); + Listener.SendDataRaw(bytes, EndPoint, onTooBig); } /// diff --git a/Hazel/MessageWriter.cs b/Hazel/MessageWriter.cs index 7d4e050..d128d08 100644 --- a/Hazel/MessageWriter.cs +++ b/Hazel/MessageWriter.cs @@ -33,10 +33,12 @@ public MessageWriter(int bufferSize) public byte[] ToByteArray(bool includeHeader) { + var length = GetLength(includeHeader); + if (includeHeader) { - byte[] output = new byte[this.Length]; - System.Buffer.BlockCopy(this.Buffer, 0, output, 0, this.Length); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 0, output, 0, length); return output; } else @@ -45,14 +47,14 @@ public byte[] ToByteArray(bool includeHeader) { case SendOption.Reliable: { - byte[] output = new byte[this.Length - 3]; - System.Buffer.BlockCopy(this.Buffer, 3, output, 0, this.Length - 3); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 3, output, 0, length); return output; } case SendOption.None: { - byte[] output = new byte[this.Length - 1]; - System.Buffer.BlockCopy(this.Buffer, 1, output, 0, this.Length - 1); + byte[] output = new byte[length]; + System.Buffer.BlockCopy(this.Buffer, 1, output, 0, length); return output; } } @@ -61,6 +63,28 @@ public byte[] ToByteArray(bool includeHeader) throw new NotImplementedException(); } + public int GetLength(bool includeHeader) + { + if (includeHeader) + { + return Length; + } + + switch (this.SendOption) + { + case SendOption.Reliable: + { + return Length - 3; + } + case SendOption.None: + { + return Length - 1; + } + } + + throw new NotImplementedException(); + } + /// /// The option specifying how the message should be sent. public static MessageWriter Get(SendOption sendOption = SendOption.None) diff --git a/Hazel/NetworkConnection.cs b/Hazel/NetworkConnection.cs index 9799509..e4bd91c 100644 --- a/Hazel/NetworkConnection.cs +++ b/Hazel/NetworkConnection.cs @@ -14,7 +14,8 @@ public enum HazelInternalErrors ReceivedZeroBytes, PingsWithoutResponse, ReliablePacketWithoutResponse, - ConnectionDisconnected + ConnectionDisconnected, + MtuTooLow } /// diff --git a/Hazel/Udp/SendOptionInternal.cs b/Hazel/Udp/SendOptionInternal.cs index 74786d8..6a0cc0a 100644 --- a/Hazel/Udp/SendOptionInternal.cs +++ b/Hazel/Udp/SendOptionInternal.cs @@ -35,5 +35,10 @@ public enum UdpSendOption : byte /// Message that is part of a larger, fragmented message. /// Fragment = 11, + + /// + /// Message that is used to discover MTU. + /// + MtuTest = 13, } } diff --git a/Hazel/Udp/UdpClientConnection.cs b/Hazel/Udp/UdpClientConnection.cs index 73eec41..bf6f92a 100644 --- a/Hazel/Udp/UdpClientConnection.cs +++ b/Hazel/Udp/UdpClientConnection.cs @@ -61,21 +61,21 @@ private void ManageReliablePacketsInternal(object state) } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { #if DEBUG if (TestLagMs > 0) { - ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length, onTooBig); }); } else #endif { - WriteBytesToConnectionReal(bytes, length); + WriteBytesToConnectionReal(bytes, length, onTooBig); } } - private void WriteBytesToConnectionReal(byte[] bytes, int length) + private void WriteBytesToConnectionReal(byte[] bytes, int length, Action onTooBig) { #if DEBUG DataSentRaw?.Invoke(bytes, length); @@ -97,6 +97,10 @@ private void WriteBytesToConnectionReal(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -114,6 +118,10 @@ private void HandleSendTo(IAsyncResult result) { // Already disposed and disconnected... } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -177,6 +185,7 @@ public override void ConnectAsync(byte[] bytes = null) { this.State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); }); } diff --git a/Hazel/Udp/UdpConnection.Fragmented.cs b/Hazel/Udp/UdpConnection.Fragmented.cs new file mode 100644 index 0000000..482769d --- /dev/null +++ b/Hazel/Udp/UdpConnection.Fragmented.cs @@ -0,0 +1,272 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; + +namespace Hazel.Udp +{ + public partial class UdpConnection + { + /// + /// Maximum possible UDP header size - 60-byte IP header + 8-byte UDP header + /// + public const ushort MaxUdpHeaderSize = 68; + + /// + /// Popular MTU values used for quick MTU discovery + /// + public static ushort[] PossibleMtu { get; } = + { + 576 - MaxUdpHeaderSize, // RFC 1191 + 1024, + 1460 - MaxUdpHeaderSize, // Google Cloud + 1492 - MaxUdpHeaderSize, // RFC 1042 + 1500 - MaxUdpHeaderSize, // RFC 1191 + }; + + private int _mtu = PossibleMtu[0]; + + /// + /// MTU of this connection + /// + public int Mtu => ForcedMtu ?? _mtu; + + /// + /// Forced MTU, overrides the MTU + /// + public int? ForcedMtu { get; set; } = null; + + /// + /// Called when the MTU changes. + /// + public event Action MtuChanged; + + private byte _mtuIndex; + + private readonly ConcurrentDictionary _fragmentedMessagesReceived = new ConcurrentDictionary(); + private volatile int _lastFragmentedId; + + protected void StartMtuDiscovery() + { + MtuTest(_mtuIndex); + } + + private void MtuTest(byte index) + { + var mtu = PossibleMtu[index]; + var failed = false; + + var buffer = new byte[mtu]; + buffer[0] = (byte)UdpSendOption.MtuTest; + var id = AttachReliableID(buffer, 1, () => + { + if (failed) return; + MtuOk(index); + }); + buffer[mtu - 2] = (byte)mtu; + buffer[mtu - 1] = (byte)(mtu >> 8); + + WriteBytesToConnection(buffer, buffer.Length, () => + { + failed = true; + AcknowledgeMessageId(id); + + if (index == 0) + { + DisconnectInternal(HazelInternalErrors.MtuTooLow, "Connection MTU is lower than the minimum"); + } + }); + } + + private void MtuOk(byte index) + { + _mtuIndex = index; + _mtu = PossibleMtu[index]; + MtuChanged?.Invoke(); + + if (_mtuIndex < PossibleMtu.Length - 1) + { + MtuTest((byte)(index + 1)); + } + } + + private void MtuTestMessageReceive(MessageReader message) + { + message.Position = message.Length - 2; + var mtu = message.ReadUInt16(); + + if (mtu != message.Length) + { + return; + } + + ProcessReliableReceive(message.Buffer, 1, out _); + } + + private const byte FragmentHeaderSize = sizeof(byte) + sizeof(ushort) + sizeof(ushort) + sizeof(byte) + sizeof(byte); + + protected void FragmentedSend(byte sendOption, byte[] data, Action ackCallback = null) + { + var length = data.Length + 1; + + var id = (ushort)Interlocked.Increment(ref _lastFragmentedId); + var fragmentSize = Mtu; + var fragmentDataSize = fragmentSize - FragmentHeaderSize; + var fragmentsCount = (int)Math.Ceiling(length / (double)fragmentDataSize); + + if (fragmentsCount >= byte.MaxValue) + { + throw new HazelException("Too many fragments"); + } + + var acksReceived = 0; + + for (byte i = 0; i < fragmentsCount; i++) + { + var dataLength = Math.Min(fragmentDataSize, length - fragmentDataSize * i); + var buffer = new byte[dataLength + FragmentHeaderSize]; + + buffer[0] = (byte)UdpSendOption.Fragment; + + AttachReliableID(buffer, 1, () => + { + acksReceived++; + + if (acksReceived >= fragmentsCount) + { + ackCallback?.Invoke(); + } + }); + + buffer[3] = (byte)id; + buffer[4] = (byte)(id >> 8); + + buffer[5] = (byte)fragmentsCount; + buffer[6] = i; + + var includingHeader = i == 0; + if (includingHeader) + { + buffer[7] = sendOption; + } + + Buffer.BlockCopy(data, fragmentDataSize * i - (includingHeader ? 0 : 1), buffer, FragmentHeaderSize + (includingHeader ? 1 : 0), dataLength - (includingHeader ? 1 : 0)); + + WriteBytesToConnection(buffer, buffer.Length); + } + } + + protected void FragmentMessageReceive(MessageReader messageReader) + { + if (ProcessReliableReceive(messageReader.Buffer, 1, out _)) + { + messageReader.Position += 3; + + var fragmentedMessageId = messageReader.ReadUInt16(); + var fragmentsCount = messageReader.ReadByte(); + var fragmentId = messageReader.ReadByte(); + + if (fragmentsCount <= 0 || fragmentId >= fragmentsCount) + { + return; + } + + + if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out var fragmentedMessage)) + { + lock (_fragmentedMessagesReceived) + { + if (!_fragmentedMessagesReceived.TryGetValue(fragmentedMessageId, out fragmentedMessage)) + { + if (!_fragmentedMessagesReceived.TryAdd(fragmentedMessageId, fragmentedMessage = new FragmentedMessage(fragmentsCount))) + { + throw new HazelException("Failed to add fragmented message"); + } + } + } + } + + lock (fragmentedMessage) + { + if (fragmentedMessage.Fragments[fragmentId] != null) + { + return; + } + + var buffer = new byte[messageReader.Length - messageReader.Position]; + Buffer.BlockCopy(messageReader.Buffer, messageReader.Position, buffer, 0, messageReader.Length - messageReader.Position); + + fragmentedMessage.AddFragment(fragmentId, buffer); + + if (fragmentedMessage.IsFinished) + { + var reconstructed = fragmentedMessage.Reconstruct(); + InvokeDataReceived((SendOption)reconstructed.ReadByte(), reconstructed, 1, fragmentedMessage.Size); + + _fragmentedMessagesReceived.TryRemove(fragmentedMessageId, out _); + } + } + } + } + + protected class FragmentedMessage + { + /// + /// The total number of fragments expected. + /// + public int FragmentsCount { get; } + + /// + /// The number of fragments received. + /// + public int FragmentsReceived { get; private set; } + + /// + /// The total size of all fragments. + /// + public int Size { get; private set; } + + /// + /// The fragments received so far. + /// + public byte[][] Fragments { get; } + + /// + /// Whether all fragments were received. + /// + public bool IsFinished => FragmentsReceived == FragmentsCount; + + public FragmentedMessage(int fragmentsCount) + { + FragmentsCount = fragmentsCount; + Fragments = new byte[fragmentsCount][]; + } + + public void AddFragment(byte id, byte[] fragment) + { + Fragments[id] = fragment; + Size += fragment.Length; + FragmentsReceived++; + } + + public MessageReader Reconstruct() + { + if (!IsFinished) + { + throw new HazelException("Can't reconstruct a FragmentedMessage until all fragments are received"); + } + + var buffer = MessageReader.GetSized(Size); + + var offset = 0; + for (var i = 0; i < FragmentsCount; i++) + { + var data = Fragments[i]; + Buffer.BlockCopy(data, 0, buffer.Buffer, offset, data.Length); + offset += data.Length; + } + + return buffer; + } + } + } +} diff --git a/Hazel/Udp/UdpConnection.Reliable.cs b/Hazel/Udp/UdpConnection.Reliable.cs index 83feabd..94831df 100644 --- a/Hazel/Udp/UdpConnection.Reliable.cs +++ b/Hazel/Udp/UdpConnection.Reliable.cs @@ -221,7 +221,7 @@ internal int ManageReliablePackets() /// The buffer to attach to. /// The offset to attach at. /// The callback to make once the packet has been acknowledged. - protected void AttachReliableID(byte[] buffer, int offset, Action ackCallback = null) + protected ushort AttachReliableID(byte[] buffer, int offset, Action ackCallback = null) { ushort id = (ushort)Interlocked.Increment(ref lastIDAllocated); @@ -241,6 +241,8 @@ protected void AttachReliableID(byte[] buffer, int offset, Action ackCallback = { throw new Exception("That shouldn't be possible"); } + + return id; } public static int ClampToInt(float value, int min, int max) @@ -258,6 +260,12 @@ public static int ClampToInt(float value, int min, int max) /// The callback to make once the packet has been acknowledged. private void ReliableSend(byte sendOption, byte[] data, Action ackCallback = null) { + if (data.Length >= Mtu) + { + FragmentedSend(sendOption, data, ackCallback); + return; + } + //Inform keepalive not to send for a while ResetKeepAliveTimer(); @@ -419,7 +427,7 @@ private void AcknowledgementMessageReceive(byte[] bytes, int bytesReceived) Statistics.LogReliableReceive(0, bytesReceived); } - private void AcknowledgeMessageId(ushort id) + protected void AcknowledgeMessageId(ushort id) { // Dispose of timer and remove from dictionary if (reliableDataPacketsSent.TryRemove(id, out Packet packet)) diff --git a/Hazel/Udp/UdpConnection.cs b/Hazel/Udp/UdpConnection.cs index cc5d088..7ef44aa 100644 --- a/Hazel/Udp/UdpConnection.cs +++ b/Hazel/Udp/UdpConnection.cs @@ -33,9 +33,12 @@ internal static Socket CreateSocket(IPMode ipMode) try { - socket.DontFragment = false; + socket.DontFragment = true; + } + catch + { + // ignored } - catch { } try { @@ -51,16 +54,21 @@ internal static Socket CreateSocket(IPMode ipMode) /// Writes the given bytes to the connection. /// /// The bytes to write. - protected abstract void WriteBytesToConnection(byte[] bytes, int length); + protected abstract void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null); /// public override void Send(MessageWriter msg) { if (this._state != ConnectionState.Connected) throw new InvalidOperationException("Could not send data as this Connection is not connected. Did you disconnect?"); + + if (msg.GetLength(false) >= Mtu) + { + FragmentedSend((byte)msg.SendOption, msg.ToByteArray(false)); + return; + } - byte[] buffer = new byte[msg.Length]; - Buffer.BlockCopy(msg.Buffer, 0, buffer, 0, msg.Length); + byte[] buffer = msg.ToByteArray(true); switch (msg.SendOption) { @@ -108,6 +116,7 @@ protected void HandleSend(byte[] data, byte sendOption, Action ackCallback = nul case (byte)UdpSendOption.Ping: case (byte)SendOption.Reliable: case (byte)UdpSendOption.Hello: + case (byte)UdpSendOption.MtuTest: ReliableSend(sendOption, data, ackCallback); break; @@ -162,6 +171,16 @@ protected internal virtual void HandleReceive(MessageReader message, int bytesRe Statistics.LogUnreliableReceive(bytesReceived - 1, bytesReceived); break; + case (byte)UdpSendOption.MtuTest: + MtuTestMessageReceive(message); + message.Recycle(); + break; + + case (byte)UdpSendOption.Fragment: + FragmentMessageReceive(message); + message.Recycle(); + break; + // Treat everything else as garbage default: message.Recycle(); @@ -177,32 +196,28 @@ protected internal virtual void HandleReceive(MessageReader message, int bytesRe /// /// The SendOption to attach. /// The data. - void UnreliableSend(byte sendOption, byte[] data) + void UnreliableSend(byte sendOption, byte[] data, bool includeHeader = true) { - this.UnreliableSend(sendOption, data, 0, data.Length); - } + var length = includeHeader ? data.Length + 1 : data.Length; + if (length >= Mtu) + { + throw new HazelException("Unreliable messages can't be bigger than MTU"); + } - /// - /// Sends bytes using the unreliable UDP protocol. - /// - /// The data. - /// The SendOption to attach. - /// - /// - void UnreliableSend(byte sendOption, byte[] data, int offset, int length) - { - byte[] bytes = new byte[length + 1]; + var bytes = new byte[length]; - //Add message type - bytes[0] = sendOption; + if (includeHeader) + { + bytes[0] = sendOption; + } //Copy data into new array - Buffer.BlockCopy(data, offset, bytes, bytes.Length - length, length); + Buffer.BlockCopy(data, 0, bytes, length - data.Length, data.Length); //Write to connection - WriteBytesToConnection(bytes, bytes.Length); + WriteBytesToConnection(bytes, length); - Statistics.LogUnreliableSend(length, bytes.Length); + Statistics.LogUnreliableSend(data.Length, length); } /// diff --git a/Hazel/Udp/UdpConnectionListener.cs b/Hazel/Udp/UdpConnectionListener.cs index d3ff423..227640b 100644 --- a/Hazel/Udp/UdpConnectionListener.cs +++ b/Hazel/Udp/UdpConnectionListener.cs @@ -239,7 +239,7 @@ void ReadCallback(IAsyncResult result) /// /// The bytes to send. /// The endpoint to send to. - internal void SendData(byte[] bytes, int length, EndPoint endPoint) + internal void SendData(byte[] bytes, int length, EndPoint endPoint, Action onTooBig = null) { if (length > bytes.Length) return; @@ -264,6 +264,10 @@ internal void SendData(byte[] bytes, int length, EndPoint endPoint) SendCallback, null); } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException e) { this.Logger?.Invoke("Could not send data as a SocketException occurred: " + e); @@ -281,6 +285,10 @@ private void SendCallback(IAsyncResult result) { socket.EndSendTo(result); } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch { } } diff --git a/Hazel/Udp/UdpServerConnection.cs b/Hazel/Udp/UdpServerConnection.cs index f348adb..7544a62 100644 --- a/Hazel/Udp/UdpServerConnection.cs +++ b/Hazel/Udp/UdpServerConnection.cs @@ -33,12 +33,13 @@ internal UdpServerConnection(UdpConnectionListener listener, IPEndPoint endPoint State = ConnectionState.Connected; this.InitializeKeepAliveTimer(); + this.StartMtuDiscovery(); } /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { - Listener.SendData(bytes, length, EndPoint); + Listener.SendData(bytes, length, EndPoint, onTooBig); } /// diff --git a/Hazel/Udp/UnityUdpClientConnection.cs b/Hazel/Udp/UnityUdpClientConnection.cs index dd192bc..17118f0 100644 --- a/Hazel/Udp/UnityUdpClientConnection.cs +++ b/Hazel/Udp/UnityUdpClientConnection.cs @@ -63,21 +63,21 @@ protected virtual void ResendPacketsIfNeeded() /// - protected override void WriteBytesToConnection(byte[] bytes, int length) + protected override void WriteBytesToConnection(byte[] bytes, int length, Action onTooBig = null) { #if DEBUG if (TestLagMs > 0) { - ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length); }); + ThreadPool.QueueUserWorkItem(a => { Thread.Sleep(this.TestLagMs); WriteBytesToConnectionReal(bytes, length, onTooBig); }); } else #endif { - WriteBytesToConnectionReal(bytes, length); + WriteBytesToConnectionReal(bytes, length, onTooBig); } } - private void WriteBytesToConnectionReal(byte[] bytes, int length) + private void WriteBytesToConnectionReal(byte[] bytes, int length, Action onTooBig) { try { @@ -96,6 +96,10 @@ private void WriteBytesToConnectionReal(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (onTooBig != null && e.SocketErrorCode == SocketError.MessageSize) + { + onTooBig(); + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -122,6 +126,10 @@ protected virtual void WriteBytesToConnectionSync(byte[] bytes, int length) { // Already disposed and disconnected... } + catch (SocketException e) when (e.SocketErrorCode == SocketError.MessageSize) + { + throw; + } catch (SocketException ex) { DisconnectInternal(HazelInternalErrors.SocketExceptionSend, "Could not send data as a SocketException occurred: " + ex.Message); @@ -202,6 +210,7 @@ public override void ConnectAsync(byte[] bytes = null) SendHello(bytes, () => { this.State = ConnectionState.Connected; + this.StartMtuDiscovery(); }); } diff --git a/README.md b/README.md index 5de4c0c..a5e4668 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ The aim of this fork is to create a simple interface for ultra-fast connection-b ## Features - UDP and Reliable UDP. +- Fragmented packets with MTU discovery - Encrypted packets using DTLS - UDP Broadcast for local-multiplayer. - Completely thread safe. @@ -37,7 +38,6 @@ To build Hazel open [solution file](Hazel.sln) using your favourite C# IDE (I us * You *should not* recycle messages after NewConnection events. * You *should not* recycle messages after Disconnect events. * You *should* recycle messages after DataReceived events. - * Hazel doesn't support fragmented packets. It used to, but I wasn't sure of it so I removed it and have never needed it since. Just stay under 1kb packets. ## Tips for using Hazel with Unity