|
| 1 | +{-# LANGUAGE LambdaCase #-} |
| 2 | +{-# LANGUAGE NamedFieldPuns #-} |
| 3 | +{-# LANGUAGE NumericUnderscores #-} |
| 4 | +{-# LANGUAGE RecordWildCards #-} |
| 5 | +{-# LANGUAGE ScopedTypeVariables #-} |
| 6 | + |
| 7 | +module Main where |
| 8 | + |
| 9 | +import Control.Concurrent (forkIO) |
| 10 | +import Control.Concurrent.Async (forConcurrently_) |
| 11 | +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) |
| 12 | +import Control.Exception (finally, throwIO) |
| 13 | +import Control.Monad (forM_, replicateM, void, when) |
| 14 | +import qualified Data.ByteString as BS |
| 15 | +import Data.IORef ( |
| 16 | + atomicModifyIORef', |
| 17 | + newIORef, |
| 18 | + ) |
| 19 | +import Data.List.NonEmpty (NonEmpty (..)) |
| 20 | +import Network.Transport ( |
| 21 | + Connection (send), |
| 22 | + EndPoint (address, connect, receive), |
| 23 | + Event (ConnectionOpened, Received), |
| 24 | + Reliability (ReliableOrdered), |
| 25 | + Transport (closeTransport, newEndPoint), |
| 26 | + defaultConnectHints, |
| 27 | + ) |
| 28 | +import qualified Network.Transport.QUIC as QUIC |
| 29 | +import qualified Network.Transport.TCP as TCP |
| 30 | +import System.FilePath ((</>)) |
| 31 | +import Test.Tasty (TestTree) |
| 32 | +import Test.Tasty.Bench (bench, bgroup, defaultMain, nfIO) |
| 33 | + |
| 34 | +data TransportConfig = TransportConfig |
| 35 | + { transportName :: String |
| 36 | + , mkTransport :: IO Transport |
| 37 | + } |
| 38 | + |
| 39 | +tcpConfig :: TransportConfig |
| 40 | +tcpConfig = |
| 41 | + TransportConfig |
| 42 | + { transportName = "TCP" |
| 43 | + , mkTransport = do |
| 44 | + Right t <- TCP.createTransport (TCP.defaultTCPAddr "127.0.0.1" "0") TCP.defaultTCPParameters |
| 45 | + pure t |
| 46 | + } |
| 47 | + |
| 48 | +quicConfig :: TransportConfig |
| 49 | +quicConfig = |
| 50 | + TransportConfig |
| 51 | + { transportName = "QUIC" |
| 52 | + , mkTransport = |
| 53 | + QUIC.credentialLoadX509 |
| 54 | + -- Generate a self-signed x509v3 certificate using this nifty tool: |
| 55 | + -- https://certificatetools.com/ |
| 56 | + ("test" </> "credentials" </> "cert.crt") |
| 57 | + ("test" </> "credentials" </> "cert.key") |
| 58 | + >>= \case |
| 59 | + Left errmsg -> throwIO $ userError errmsg |
| 60 | + Right credentials -> |
| 61 | + QUIC.createTransport "127.0.0.1" "0" (credentials :| []) |
| 62 | + } |
| 63 | + |
| 64 | +data BenchParams = BenchParams |
| 65 | + { messageSize :: !Int |
| 66 | + , messageCount :: !Int |
| 67 | + , connectionCount :: !Int |
| 68 | + } |
| 69 | + |
| 70 | +smallMessages, mediumMessages, largeMessages :: BenchParams |
| 71 | +smallMessages = BenchParams{messageSize = 64, messageCount = 10_000, connectionCount = 1} |
| 72 | +mediumMessages = BenchParams{messageSize = 1024, messageCount = 1_000, connectionCount = 1} |
| 73 | +largeMessages = BenchParams{messageSize = 4096, messageCount = 100, connectionCount = 1} |
| 74 | + |
| 75 | +multiConn :: Int -> BenchParams -> BenchParams |
| 76 | +multiConn n p = p{connectionCount = n} |
| 77 | + |
| 78 | +throughputBench :: TransportConfig -> BenchParams -> IO () |
| 79 | +throughputBench TransportConfig{mkTransport} BenchParams{messageSize, messageCount, connectionCount} = do |
| 80 | + transport <- mkTransport |
| 81 | + flip finally (closeTransport transport) $ do |
| 82 | + Right senderEP <- newEndPoint transport |
| 83 | + Right receiverEP <- newEndPoint transport |
| 84 | + |
| 85 | + let payload = BS.replicate messageSize 0x42 |
| 86 | + totalMessages = messageCount * connectionCount |
| 87 | + |
| 88 | + receiverReady <- newEmptyMVar |
| 89 | + receiverDone <- newEmptyMVar |
| 90 | + |
| 91 | + void $ forkIO $ do |
| 92 | + connsEstablished <- newIORef (0 :: Int) |
| 93 | + let waitForConnections = do |
| 94 | + event <- receive receiverEP |
| 95 | + case event of |
| 96 | + ConnectionOpened{} -> do |
| 97 | + n <- atomicModifyIORef' connsEstablished (\x -> (x + 1, x + 1)) |
| 98 | + when (n < connectionCount) waitForConnections |
| 99 | + _ -> waitForConnections |
| 100 | + waitForConnections |
| 101 | + putMVar receiverReady () |
| 102 | + |
| 103 | + msgsReceived <- newIORef (0 :: Int) |
| 104 | + let recvLoop = do |
| 105 | + event <- receive receiverEP |
| 106 | + case event of |
| 107 | + Received _ _ -> do |
| 108 | + n <- atomicModifyIORef' msgsReceived (\x -> (x + 1, x + 1)) |
| 109 | + when (n < totalMessages) recvLoop |
| 110 | + _ -> recvLoop |
| 111 | + recvLoop |
| 112 | + putMVar receiverDone () |
| 113 | + |
| 114 | + let receiverAddr = address receiverEP |
| 115 | + connections <- |
| 116 | + replicateM |
| 117 | + connectionCount |
| 118 | + ( connect senderEP receiverAddr ReliableOrdered defaultConnectHints >>= either throwIO pure |
| 119 | + ) |
| 120 | + |
| 121 | + takeMVar receiverReady |
| 122 | + |
| 123 | + forConcurrently_ connections $ \conn -> |
| 124 | + forM_ [0 .. messageCount] $ \_ -> send conn [payload] |
| 125 | + |
| 126 | + takeMVar receiverDone |
| 127 | + |
| 128 | +benchTransport :: TransportConfig -> TestTree |
| 129 | +benchTransport cfg@TransportConfig{transportName} = |
| 130 | + bgroup |
| 131 | + transportName |
| 132 | + [ bgroup |
| 133 | + "throughput" |
| 134 | + [ bgroup |
| 135 | + "single-connection" |
| 136 | + [ bench "small-msg" $ nfIO $ throughputBench cfg smallMessages |
| 137 | + , bench "default-msg" $ nfIO $ throughputBench cfg mediumMessages |
| 138 | + , bench "large-msg" $ nfIO $ throughputBench cfg largeMessages |
| 139 | + ] |
| 140 | + , bgroup |
| 141 | + "multi-connection" |
| 142 | + [ bench "2-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 2, messageCount = 10_000} |
| 143 | + , bench "5-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 5, messageCount = 10_000} |
| 144 | + , bench "10-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 10, messageCount = 5_000} |
| 145 | + ] |
| 146 | + ] |
| 147 | + ] |
| 148 | + |
| 149 | +main :: IO () |
| 150 | +main = |
| 151 | + defaultMain |
| 152 | + [ benchTransport tcpConfig |
| 153 | + , benchTransport quicConfig |
| 154 | + ] |
0 commit comments