diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index a508b2a176..75fafab586 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -53,6 +53,7 @@ import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IORef (atomicSwapIORef) import GHC.Stats (getRTSStats) import Network.Socket (ServiceName, Socket, socketToHandle) +import Numeric.Natural (Natural) import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C @@ -85,7 +86,7 @@ import System.Exit (exitFailure, exitSuccess) import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import System.Timeout (timeout) -import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile) +import UnliftIO (IOMode (..), UnliftIO (..), askUnliftIO, race_, unliftIO, withFile) import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception @@ -116,7 +117,6 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} void $ forkIO $ resubscribe s raceAny_ ( ntfSubscriber s - : ntfPush ps : periodicNtfsThread ps : map runServer transports <> serverStatsThread_ cfg @@ -147,12 +147,17 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} saveServer NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber + NtfPushServer {pushWorkers} <- asks pushServer + liftIO $ readTVarIO pushWorkers >>= mapM_ stopPushWorker liftIO $ closeSMPClientAgent smpAgent logNote "Server stopped" where stopSubscriber v = atomically (tryReadTMVar $ sessionVar v) >>= mapM (deRefWeak . subThreadId >=> mapM_ killThread) + stopPushWorker v = + atomically (tryReadTMVar $ sessionVar v) + >>= mapM (deRefWeak . workerThreadId >=> mapM_ killThread) saveServer :: M () saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats @@ -257,7 +262,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} let threadsCount = 0 #endif let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber - NtfPushServer {pushQ} = pushServer + NtfPushServer {pushWorkers} = pushServer SMPClientAgent {smpClients, smpSessions, smpSubWorkers} = a srvSubscribers <- getSMPWorkerMetrics a smpSubscribers srvClients <- getSMPWorkerMetrics a smpClients @@ -267,7 +272,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} ntfPendingServiceSubs <- getSMPServiceSubMetrics a pendingServiceSubs snd ntfPendingQueueSubs <- getSMPSubMetrics a pendingQueueSubs smpSessionCount <- M.size <$> readTVarIO smpSessions - apnsPushQLength <- atomically $ lengthTBQueue pushQ + apnsPushQLength <- pushWorkersQLength pushWorkers pure NtfRealTimeMetrics { threadsCount, @@ -526,35 +531,36 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = where receiveSMP = do st <- asks store - NtfPushServer {pushQ} <- asks pushServer + ps <- asks pushServer stats <- asks serverStats - liftIO $ forever $ do + forever $ do ((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ - forM ts $ \(ntfId, t) -> case t of + forM_ ts $ \(ntfId, t) -> case t of STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen STResponse {} -> pure () -- it was already reported as timeout error STEvent msgOrErr -> do let smpQueue = SMPQueueNtf srv ntfId case msgOrErr of Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- getSystemTime - updatePeriodStats (activeSubs stats) ntfId + ntfTs <- liftIO getSystemTime + liftIO $ updatePeriodStats (activeSubs stats) ntfId let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing - addTokenLastNtf st newNtf >>= \case + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + liftIO (addTokenLastNtf st newNtf) >>= \case Right (tkn, lastNtfs) -> do - atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs) - incNtfStat_ stats ntfReceived - mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_ - Left AUTH -> do + pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + liftIO $ incNtfStat_ stats ntfReceived + when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> liftIO $ do incNtfStat_ stats ntfReceivedAuth - mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_ + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) Left _ -> pure () Right SMP.END -> whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ updateSrvSubStatus st smpQueue NSEnd + void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd Right SMP.DELD -> - void $ updateSrvSubStatus st smpQueue NSDeleted + void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e Right _ -> logError "SMP server unexpected response" Left e -> logError $ "SMP client error: " <> tshow e @@ -632,9 +638,25 @@ logSubStatus srv event n updated = showServer' :: SMPServer -> Text showServer' = decodeLatin1 . strEncode . host -ntfPush :: NtfPushServer -> M () -ntfPush s@NtfPushServer {pushQ} = forever $ do - (srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) +pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M () +pushNotification s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do + q <- getOrCreatePushWorker s (srvHost_, pp) isOwn + atomically $ writeTBQueue q (tkn, ntf) + +getOrCreatePushWorker :: NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification)) +getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do + ts <- liftIO getCurrentTime + atomically (getSessVar pushWorkerSeq key pushWorkers ts) >>= \case + Left v -> do + q <- liftIO $ newTBQueueIO pushQSize + tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q) + atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId} + pure q + Right v -> workerQ <$> atomically (readTMVar $ sessionVar v) + +runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M () +runPushWorker s srvHost_ isOwn q = forever $ do + (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) st <- asks store case ntf of @@ -644,7 +666,7 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do void $ liftIO $ setTknStatusConfirmed st tkn incNtfStatT t ntfVrfDelivered Left _ -> incNtfStatT t ntfVrfFailed - PNCheckMessages -> do + PNCheckMessages -> liftIO (deliverNotification st pp tkn ntf) >>= \case Right _ -> do void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime @@ -656,24 +678,23 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do liftIO (deliverNotification st pp tkn ntf) >>= \case Left _ -> do incNtfStatT t ntfFailed - liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ Right () -> do incNtfStatT t ntfDelivered - liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ - + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ where checkActiveTkn :: NtfTknStatus -> M () -> M () checkActiveTkn status action | status == NTActive = action | otherwise = liftIO $ logError "bad notification token status" deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) - deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do - deliver <- getPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do + (deliver, clientVar) <- getPushClient s pp + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of - PPConnection _ -> retryDeliver - PPRetryLater -> retryDeliver + PPConnection ce -> retryDeliver clientVar $ "connection " <> tshow ce + PPRetryLater r -> retryDeliver clientVar r PPCryptoError _ -> err e PPResponseError {} -> err e PPTokenInvalid r -> do @@ -681,10 +702,12 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do err e PPPermanentError -> err e where - retryDeliver :: IO (Either PushProviderError ()) - retryDeliver = do - deliver <- newPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + retryDeliver :: PushClientVar -> Text -> IO (Either PushProviderError ()) + retryDeliver oldVar reason = do + logWarn $ "retrying push (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> reason + atomically $ removeSessVar oldVar pp (pushClients s) + (deliver, _) <- getPushClient s pp + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of PPTokenInvalid r -> do @@ -693,15 +716,26 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do _ -> err e err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e +pushWorkersQLength :: TMap (Maybe T.Text, PushProvider) PushWorkerVar -> IO Natural +pushWorkersQLength workers = do + ws <- readTVarIO workers + foldM addQLength 0 ws + where + addQLength acc v = + atomically (tryReadTMVar $ sessionVar v) >>= \case + Just PushWorker {workerQ} -> (acc +) <$> atomically (lengthTBQueue workerQ) + Nothing -> pure acc + periodicNtfsThread :: NtfPushServer -> M () -periodicNtfsThread NtfPushServer {pushQ} = do +periodicNtfsThread s = do st <- asks store ntfsInterval <- asks $ periodicNtfsInterval . config let interval = 1000000 * ntfsInterval + UnliftIO unlift <- askUnliftIO liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages) + cnt <- withPeriodicNtfTokens st now $ \tkn -> unlift $ pushNotification s Nothing False tkn PNCheckMessages logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -791,7 +825,7 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} = +client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand @@ -808,7 +842,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode) + pushNotification ps Nothing False tkn $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -824,7 +858,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode) + pushNotification ps Nothing False tkn $ PNVerification tknRegCode incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -842,7 +876,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode) + pushNotification ps Nothing False tkn' $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 6544286020..c798f6a9eb 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -13,22 +13,27 @@ module Simplex.Messaging.Notifications.Server.Env SMPSubscriberVar, SMPSubscriber (..), NtfPushServer (..), + PushClientVar, + PushWorker (..), + PushWorkerVar, NtfRequest (..), NtfServerClient (..), defaultInactiveClientExpiration, newNtfServerEnv, newNtfSubscriber, newNtfPushServer, - newPushClient, getPushClient, newNtfServerClient, ) where import Control.Concurrent (ThreadId) +import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad import Crypto.Random +import Data.Functor (($>)) import Data.Int (Int64) +import Simplex.Messaging.Agent.RetryInterval import Data.List.NonEmpty (NonEmpty) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -58,6 +63,7 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceCredentials (..), THandleParams, TransportPeer (..)) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) +import Simplex.Messaging.Util (tshow) import System.Exit (exitFailure) import System.Mem.Weak (Weak) import UnliftIO.STM @@ -163,28 +169,62 @@ data SMPSubscriber = SMPSubscriber } data NtfPushServer = NtfPushServer - { pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server - pushClients :: TMap PushProvider PushProviderClient, + { pushWorkers :: TMap (Maybe T.Text, PushProvider) PushWorkerVar, + pushWorkerSeq :: TVar Int, + pushQSize :: Natural, + pushClients :: TMap PushProvider PushClientVar, + pushClientSeq :: TVar Int, apnsConfig :: APNSPushClientConfig } +data PushWorker = PushWorker + { workerQ :: TBQueue (NtfTknRec, PushNotification), + workerThreadId :: Weak ThreadId + } + +type PushWorkerVar = SessionVar PushWorker + +-- The Either communicates client-creation failure from the winner to the waiters. +type PushClientVar = SessionVar (Either E.SomeException PushProviderClient) + newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer -newNtfPushServer qSize apnsConfig = do - pushQ <- newTBQueueIO qSize +newNtfPushServer pushQSize apnsConfig = do + pushWorkers <- TM.emptyIO + pushWorkerSeq <- newTVarIO 0 pushClients <- TM.emptyIO - pure NtfPushServer {pushQ, pushClients, apnsConfig} - -newPushClient :: NtfPushServer -> PushProvider -> IO PushProviderClient -newPushClient NtfPushServer {apnsConfig, pushClients} pp = do - c <- case apnsProviderHost pp of + pushClientSeq <- newTVarIO 0 + pure NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize, pushClients, pushClientSeq, apnsConfig} + +-- | Single-flight access to the per-provider push client with bounded retry. +-- The returned PushClientVar is the handle retryDeliver passes to removeSessVar to evict +-- this specific instance before re-fetching. +getPushClient :: NtfPushServer -> PushProvider -> IO (PushProviderClient, PushClientVar) +getPushClient s@NtfPushServer {apnsConfig = APNSPushClientConfig {reconnectInterval}} pp = + withRetryIntervalCount reconnectInterval $ \n _delay loop -> do + ts <- getCurrentTime + E.try (atomically (getSessVar (pushClientSeq s) pp (pushClients s) ts) >>= either (newPushClient s pp) waitForPushClient) >>= \case + Right result -> pure result + Left e + | n < 2 -> do + logError $ "getPushClient error (" <> tshow pp <> "): " <> tshow (e :: E.SomeException) + loop + | otherwise -> E.throwIO e + +newPushClient :: NtfPushServer -> PushProvider -> PushClientVar -> IO (PushProviderClient, PushClientVar) +newPushClient NtfPushServer {pushClients, apnsConfig} pp v = do + r <- E.try $ case apnsProviderHost pp of Nothing -> pure $ \_ _ -> pure () Just host -> apnsPushProviderClient <$> createAPNSPushClient host apnsConfig - atomically $ TM.insert pp c pushClients - pure c - -getPushClient :: NtfPushServer -> PushProvider -> IO PushProviderClient -getPushClient s@NtfPushServer {pushClients} pp = - TM.lookupIO pp pushClients >>= maybe (newPushClient s pp) pure + atomically $ do + putTMVar (sessionVar v) r + case r of + Left _ -> removeSessVar v pp pushClients + Right _ -> pure () + either E.throwIO (\c -> pure (c, v)) r + +waitForPushClient :: PushClientVar -> IO (PushProviderClient, PushClientVar) +waitForPushClient v = + atomically (readTMVar $ sessionVar v) >>= either E.throwIO (\c -> pure (c, v)) data NtfRequest = NtfReqNew CorrId ANewNtfEntity diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 4b274271c0..47dc435c27 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -23,7 +23,7 @@ module Simplex.Messaging.Notifications.Server.Push.APNS apnsPushProviderClient, ) where -import Control.Exception (Exception) +import Control.Exception (Exception, throwIO) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -66,6 +66,7 @@ import qualified Network.HTTP2.Client as H import Network.Socket (HostName, ServiceName) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Notifications.Server.Push.APNS.Internal import Simplex.Messaging.Notifications.Server.Store.Types (NtfTknRec (..)) import Simplex.Messaging.Parsers (defaultJSON) @@ -192,7 +193,8 @@ data APNSPushClientConfig = APNSPushClientConfig appTeamId :: Text, apnsPort :: ServiceName, http2cfg :: HTTP2ClientConfig, - caStoreFile :: FilePath + caStoreFile :: FilePath, + reconnectInterval :: RetryInterval } apnsProviderHost :: PushProvider -> Maybe HostName @@ -214,7 +216,8 @@ defaultAPNSPushClientConfig = appTeamId = "5NN7GUYB6T", apnsPort = "443", http2cfg = defaultHTTP2ClientConfig {bufferSize = 16384}, - caStoreFile = "/etc/ssl/cert.pem" + caStoreFile = "/etc/ssl/cert.pem", + reconnectInterval = RetryInterval {initialInterval = 2000000, increaseAfter = 0, maxInterval = 10000000} } data APNSPushClient = APNSPushClient @@ -230,7 +233,7 @@ data APNSPushClient = APNSPushClient createAPNSPushClient :: HostName -> APNSPushClientConfig -> IO APNSPushClient createAPNSPushClient apnsHost apnsCfg@APNSPushClientConfig {authKeyFileEnv, authKeyAlg, authKeyIdEnv, appTeamId} = do https2Client <- newTVarIO Nothing - void $ connectHTTPS2 apnsHost apnsCfg https2Client + connectHTTPS2 apnsHost apnsCfg https2Client >>= either (throwIO . userError . show) (\_ -> pure ()) privateKey <- readECPrivateKey =<< getEnv authKeyFileEnv authKeyId <- T.pack <$> getEnv authKeyIdEnv let jwtHeader = JWTHeader {alg = authKeyAlg, kid = authKeyId} @@ -326,7 +329,7 @@ data PushProviderError | PPCryptoError C.CryptoError | PPResponseError (Maybe Status) Text | PPTokenInvalid NTInvalidReason - | PPRetryLater + | PPRetryLater Text | PPPermanentError deriving (Show, Exception) @@ -343,7 +346,6 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token nonce <- atomically $ C.randomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf - -- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing let status = H.responseStatus response reason' = maybe "" reason $ J.decodeStrict' bodyHead @@ -373,8 +375,8 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token | status == Just N.gone410 = throwE $ case reason' of "ExpiredToken" -> PPTokenInvalid NTIRExpiredToken "Unregistered" -> PPTokenInvalid NTIRUnregistered - _ -> PPRetryLater - | status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE PPRetryLater + _ -> PPRetryLater $ "410 " <> reason' + | status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE (PPRetryLater "503") -- Just tooManyRequests429 -> TooManyRequests - too many requests for the same token | otherwise = throwE $ PPResponseError status reason' liftHTTPS2 a = ExceptT $ first PPConnection <$> a