From 81f8bf811d934b111ecdc0c555d9825ef42e3f2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Baldur=20Bl=C3=B6ndal?= Date: Thu, 24 Jul 2025 08:35:12 +0100 Subject: [PATCH] cardano-tracer: Add functionality to run cardano-tracer as a library, with shut-down functionality and internal/user messaging. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Baldur Blöndal --- cardano-tracer/CHANGELOG.md | 4 + cardano-tracer/app/cardano-tracer.hs | 25 +- cardano-tracer/bench/cardano-tracer-bench.hs | 4 + cardano-tracer/cardano-tracer.cabal | 7 +- .../src/Cardano/Tracer/Acceptors/Run.hs | 25 +- .../src/Cardano/Tracer/Environment.hs | 234 +++++++++++++++++- .../Cardano/Tracer/Handlers/Logs/Rotator.hs | 59 ++--- .../Tracer/Handlers/Metrics/Monitoring.hs | 19 +- .../Tracer/Handlers/Metrics/Prometheus.hs | 18 +- cardano-tracer/src/Cardano/Tracer/Run.hs | 106 +++++--- .../test/Cardano/Tracer/Test/Acceptor.hs | 4 + .../test/Cardano/Tracer/Test/Forwarder.hs | 4 +- .../test/Cardano/Tracer/Test/Restart/Tests.hs | 5 +- trace-dispatcher/src/Cardano/Logging/Utils.hs | 20 +- trace-forward/src/Trace/Forward/Forwarding.hs | 3 +- 15 files changed, 426 insertions(+), 111 deletions(-) diff --git a/cardano-tracer/CHANGELOG.md b/cardano-tracer/CHANGELOG.md index fc6c3145e3c..8861f08b84b 100644 --- a/cardano-tracer/CHANGELOG.md +++ b/cardano-tracer/CHANGELOG.md @@ -1,5 +1,9 @@ # ChangeLog +## NEXT +* Cardano-tracer library functionality, allows shutting down and sending signals to running + instances through channels. + ## 0.3.6 (November 2025) * Implement Prometheus HTTP service discovery (SD) under the URL `/targets` * Add optional config field `"prometheusLabels": { "": "", ... }` for custom labels to be attached with Prometheus SD diff --git a/cardano-tracer/app/cardano-tracer.hs b/cardano-tracer/app/cardano-tracer.hs index 63ba7678dae..ab37d40402a 100644 --- a/cardano-tracer/app/cardano-tracer.hs +++ b/cardano-tracer/app/cardano-tracer.hs @@ -1,14 +1,23 @@ -import Cardano.Tracer.CLI (TracerParams, parseTracerParams) +{-# LANGUAGE OverloadedRecordDot #-} + +import Cardano.Tracer.CLI (TracerParams(..), parseTracerParams) +import Cardano.Tracer.MetaTrace import Cardano.Tracer.Run (runCardanoTracer) +import Data.Functor (void) import Data.Version (showVersion) import Options.Applicative import Paths_cardano_tracer (version) main :: IO () -main = - runCardanoTracer =<< customExecParser (prefs showHelpOnEmpty) tracerInfo +main = void do + tracerParams :: TracerParams + <- customExecParser (prefs showHelpOnEmpty) tracerInfo + trace :: Trace IO TracerTrace <- + -- Default `Nothing' severity filter to Info. + mkTracerTracer $ SeverityF (tracerParams.logSeverity <|> Just Info) + runCardanoTracer trace tracerParams tracerInfo :: ParserInfo TracerParams tracerInfo = info @@ -21,7 +30,9 @@ tracerInfo = info versionOption :: Parser (a -> a) versionOption = infoOption - (showVersion version) - (long "version" <> - short 'v' <> - help "Show version") + do showVersion version + do mconcat + [ long "version" + , short 'v' + , help "Show version" + ] diff --git a/cardano-tracer/bench/cardano-tracer-bench.hs b/cardano-tracer/bench/cardano-tracer-bench.hs index b2e4a29c063..893c3dde68f 100644 --- a/cardano-tracer/bench/cardano-tracer-bench.hs +++ b/cardano-tracer/bench/cardano-tracer-bench.hs @@ -19,6 +19,7 @@ import Control.Concurrent.Extra (newLock) #if RTVIEW import Control.Concurrent.STM.TVar (newTVarIO) #endif +import Control.Concurrent.Chan.Unagi (newChan) import Control.DeepSeq import qualified Data.List.NonEmpty as NE import Data.Time.Clock (UTCTime, getCurrentTime) @@ -63,6 +64,8 @@ main = do tracer <- mkTracerTracer $ SeverityF $ Just Warning + (inChan, _outChan) <- newChan + let tracerEnv :: TracerConfig -> HandleRegistry -> TracerEnv tracerEnv config handleRegistry = TracerEnv { teConfig = config @@ -74,6 +77,7 @@ main = do , teDPRequestors = dpRequestors , teProtocolsBrake = protocolsBrake , teTracer = tracer + , teInChan = inChan , teReforwardTraceObjects = \_-> pure () , teRegistry = handleRegistry , teStateDir = Nothing diff --git a/cardano-tracer/cardano-tracer.cabal b/cardano-tracer/cardano-tracer.cabal index fd7e123f45a..2930c604eca 100644 --- a/cardano-tracer/cardano-tracer.cabal +++ b/cardano-tracer/cardano-tracer.cabal @@ -203,6 +203,7 @@ library , trace-dispatcher ^>= 2.11.0 , trace-forward ^>= 2.4.0 , trace-resources ^>= 0.2.4 + , unagi-chan , wai ^>= 3.2 , warp ^>= 3.4 , yaml @@ -297,6 +298,7 @@ library demo-acceptor-lib exposed-modules: Cardano.Tracer.Test.Acceptor build-depends: bytestring + , QuickCheck , cardano-tracer , containers , extra @@ -309,9 +311,9 @@ library demo-acceptor-lib , text , trace-dispatcher , trace-forward + , unagi-chan , vector , vector-algorithms - , QuickCheck executable demo-acceptor import: project-config @@ -455,12 +457,13 @@ benchmark cardano-tracer-bench build-depends: stm <2.5.2 || >=2.5.3 build-depends: cardano-tracer , criterion - , directory , deepseq + , directory , extra , filepath , time , trace-dispatcher + , unagi-chan ghc-options: -threaded -rtsopts diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs index 028b6833f88..56521ed62b2 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs @@ -6,13 +6,14 @@ module Cardano.Tracer.Acceptors.Run ) where import Cardano.Logging.Types (TraceObject) -import Cardano.Logging.Utils (runInLoop) +import Cardano.Logging.Utils (runInLoop, RunInLoopTermination(..)) import Cardano.Tracer.Acceptors.Client import Cardano.Tracer.Acceptors.Server import Cardano.Tracer.Configuration import Cardano.Tracer.Environment import Cardano.Tracer.MetaTrace +import Control.Concurrent.Chan.Unagi (dupChan) import Control.Concurrent.Async (forConcurrently_) import Control.Exception (SomeException (..)) import "contra-tracer" Control.Tracer (Tracer, contramap, nullTracer, stdoutTracer) @@ -33,20 +34,24 @@ import qualified Trace.Forward.Protocol.TraceObject.Type as TOF -- 1. Server mode, when the tracer accepts connections from any number of nodes. -- 2. Client mode, when the tracer initiates connections to specified number of nodes. runAcceptors :: TracerEnv -> TracerEnvRTView -> IO () -runAcceptors tracerEnv@TracerEnv{teTracer} tracerEnvRTView = do +runAcceptors tracerEnv@TracerEnv{teTracer, teInChan = inChan} tracerEnvRTView = do traceWith teTracer $ TracerStartedAcceptors network case network of - AcceptAt howToConnect -> + AcceptAt howToConnect -> let -- Run one server that accepts connections from the nodes. - runInLoop - (runAcceptorsServer tracerEnv tracerEnvRTView howToConnect $ acceptorsConfigs (show howToConnect)) - (handleOnInterruption howToConnect) initialPauseInSec 10 + action :: IO () + action = do + dieOnShutdown =<< dupChan inChan + runAcceptorsServer tracerEnv tracerEnvRTView howToConnect $ acceptorsConfigs (show howToConnect) + in runInLoop action TerminateNever (handleOnInterruption howToConnect) initialPauseInSec 10 ConnectTo localSocks -> -- Run N clients that initiate connections to the nodes. - forConcurrently_ (NE.nub localSocks) \howToConnect -> - runInLoop - (runAcceptorsClient tracerEnv tracerEnvRTView howToConnect $ acceptorsConfigs (show howToConnect)) - (handleOnInterruption howToConnect) initialPauseInSec 30 + forConcurrently_ (NE.nub localSocks) \howToConnect -> let + action :: IO () + action = do + dieOnShutdown =<< dupChan inChan + runAcceptorsClient tracerEnv tracerEnvRTView howToConnect $ acceptorsConfigs (show howToConnect) + in runInLoop action TerminateNever (handleOnInterruption howToConnect) initialPauseInSec 30 where handleOnInterruption howToConnect (SomeException e) | verbosity == Just Minimum = pure () diff --git a/cardano-tracer/src/Cardano/Tracer/Environment.hs b/cardano-tracer/src/Cardano/Tracer/Environment.hs index 3daf1d0f4d3..569f870beb4 100644 --- a/cardano-tracer/src/Cardano/Tracer/Environment.hs +++ b/cardano-tracer/src/Cardano/Tracer/Environment.hs @@ -1,11 +1,38 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE ExplicitNamespaces #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} module Cardano.Tracer.Environment ( TracerEnv (..) , TracerEnvRTView (..) + , RawMessage (..) + , InternalMessage (..) + , Tag (..) + , CardanoTracerMessage + , onRawMessage + , onInternal + , onUser + , blockUntilShutdown + , dieOnShutdown + , forever'til + , forever'tilShutdown + + , type MessageHandler + , handleInternal + , handleMessage + , handleMessageWithShutdown + , handleMessages + , handleMessagesWithShutdown + , handleNoop + , handleShutdown + , handleTerminateOnShutdown + , handleUser ) where import Cardano.Logging.Types +import Cardano.Logging.Resources.Types (ResourceStats) import Cardano.Tracer.Configuration #if RTVIEW import Cardano.Tracer.Handlers.Notifications.Types @@ -16,10 +43,14 @@ import Cardano.Tracer.Handlers.State.TraceObjects import Cardano.Tracer.MetaTrace import Cardano.Tracer.Types +import Control.Concurrent (myThreadId) +import Control.Exception (AsyncException(ThreadKilled), Exception, throwTo, throwIO, catch) +import Control.Concurrent.Chan.Unagi (InChan, OutChan, readChan, tryReadChan, tryRead) import Control.Concurrent.Extra (Lock) +import Data.Foldable (traverse_) import Data.Text (Text) import Data.Text.Lazy.Builder (Builder) - +import Data.Kind (Type) -- | Environment for all functions. data TracerEnv = TracerEnv @@ -36,6 +67,7 @@ data TracerEnv = TracerEnv , teRegistry :: !HandleRegistry , teStateDir :: !(Maybe FilePath) , teMetricsHelp :: ![(Text, Builder)] + , teInChan :: !(InChan (CardanoTracerMessage ())) } #if RTVIEW @@ -51,3 +83,203 @@ data TracerEnvRTView = TracerEnvRTView #else data TracerEnvRTView = TracerEnvRTView #endif + +type CardanoTracerMessage userMsg = RawMessage InternalMessage userMsg + +type RawMessage :: Type -> Type -> Type +data RawMessage internal user + = Shutdown + | InternalMessage internal + | UserMessage user + +type InternalMessage :: Type +data InternalMessage where + HandleInternalMessage :: Tag ex -> (ex -> IO ()) -> InternalMessage + +type Tag :: Type -> Type +data Tag a where + ResourceStatsTag :: Tag (ResourceStats, Trace IO TracerTrace) + +-- | Polls the channel until a @Shutdown@ message is received. +blockUntilShutdown :: OutChan (RawMessage internal user) -> IO () +blockUntilShutdown outChan = go where + go :: IO () + go = readChan outChan >>= \case + Shutdown -> pure () + _ -> go + +-- | Serves a channel with a composable `MessageHandler'-function. +-- +-- @ +-- onInternal = handleMessageWithShutdown . handleInternal +-- onUser = handleMessageWithShutdown . handleUser +-- dieOnShutdown = handleMessageWithShutdown mempty +-- @ +-- +-- These handlers are composable with the function Monoid instance. +-- +-- @ +-- handleMessage (handleInternal handle1 <> handleUser handle2) +-- = handleMessages [handleInternal handle1, handleUser handle2] +-- @ +-- +-- Where @handleMessage (a <> b <> c)@ is equivalent to @handleMessages [a, b, c]@. +-- +-- Instantiations: +-- +-- @ +-- handleMessage :: (RawMessage internal user -> IO ()) -> OutChan (RawMessage internal user) -> IO () +-- handleMessage :: MessageHandler internal user -> OutChan (RawMessage internal user) -> IO () +-- @ +handleMessage :: (chan -> IO ()) -> OutChan chan -> IO () +handleMessage handler outChan = do + (element, _out) <- tryReadChan outChan + tryRead element >>= traverse_ @Maybe handler + +handleMessages :: [MessageHandler internal user] -> OutChan (RawMessage internal user) -> IO () +handleMessages = handleMessage . mconcat + +handleMessageWithShutdown :: MessageHandler internal user -> OutChan (RawMessage internal user) -> IO () +handleMessageWithShutdown handler = handleMessage (handler <> handleShutdown) + +handleMessagesWithShutdown :: [MessageHandler internal user] -> OutChan (RawMessage internal user) -> IO () +handleMessagesWithShutdown = handleMessageWithShutdown . mconcat + +onRawMessage :: (internal -> IO ()) -> (user -> IO ()) -> OutChan (RawMessage internal user) -> IO () +onRawMessage internal user = handleMessagesWithShutdown + [ handleInternal internal + , handleUser user + ] + +-- onInternal = (`onRawMessage` mempty) +onInternal :: (internal -> IO ()) -> OutChan (RawMessage internal user) -> IO () +onInternal = handleMessageWithShutdown . handleInternal + +-- onUser = (mempty `onRawMessage`) +onUser :: (user -> IO ()) -> OutChan (RawMessage internal user) -> IO () +onUser = handleMessageWithShutdown . handleUser + +-- dieOnShutdown = onRawMessage mempty mempty +dieOnShutdown :: OutChan (RawMessage internal user) -> IO () +dieOnShutdown = handleMessagesWithShutdown [] + +-- | An infinite loop (@forever@) that runs an action every iteration, +-- after checking for a message from the out-channel. If a message is +-- received it is handled by the message handler. +-- +-- To terminate the loop use handler like @handleTerminateOnShutdown@ +-- that throws a @Terminate@ exception. +forever'til :: MessageHandler internal user -> OutChan (RawMessage internal user) -> IO () -> IO () +forever'til handler outChan action = do + (element, _out) <- tryReadChan outChan -- non-blocking + tryRead element >>= \case + -- Channel empty, returns Nothing immediately if channel is empty. + Nothing -> do + action + forever'til handler outChan action + Just message -> do + catch (handler message *> action *> forever'til handler outChan action) \Terminate -> + pure () + +forever'tilShutdown :: MessageHandler internal user -> OutChan (RawMessage internal user) -> IO () -> IO () +forever'tilShutdown handler = forever'til (handleTerminateOnShutdown <> handler) + +-- | Composable handlers, with a functional Monoidal instance. +-- +-- Monoid instance via 'Ap (RawMessage internal user ->) (IO ())'. +-- +-- @ +-- instance Semigroup (MessageHandler internal user) where +-- (<>) = liftA2 (<>) +-- instance Monoid (MessageHandler internal user) where +-- mempty = pure mempty +-- @ +-- +-- The handler functions are composed together, the incoming argument +-- gets passed pointwise to each function. +-- +-- @ +-- (handleShutdown <> handleInternal internal <> handleUser user) Shutdown +-- = handleShutdown Shutdown <> handleInternal internal Shutdown <> handleUser user Shutdown +-- = handleShutdown Shutdown <> mempty <> mempty +-- = myThreadId >>= (`throwTo` ThreadKilled) +-- +-- (handleShutdown <> handleInternal internal <> handleUser user) (InternalMessage message) +-- = handleShutdown (InternalMessage message) <> handleInternal internal (InternalMessage message) <> handleUser user (InternalMessage message) +-- = internal message +-- @ + +type MessageHandler :: Type -> Type -> Type +type MessageHandler internal user = RawMessage internal user -> IO () + +-- | Exception that terminates. +data MessageException = Terminate + deriving stock Show + deriving anyclass Exception + +handleShutdown :: MessageHandler internal user +handleShutdown = \case + Shutdown -> myThreadId >>= (`throwTo` ThreadKilled) + _ -> mempty + +-- | Message handler for internal messages. +-- +-- @ +-- handleInternal :: Monoid m => (internal -> m) -> RawMessage internal user -> m +-- @ +handleInternal :: (internal -> IO ()) -> MessageHandler internal user +handleInternal handler = \case + InternalMessage internal -> handler internal + _ -> mempty + +-- | Message handler for user messages. +-- +-- @ +-- handleUser :: Monoid m => (user -> m) -> RawMessage internal user -> m +-- @ +handleUser :: (user -> IO ()) -> MessageHandler internal user +handleUser handler = \case + UserMessage user -> handler user + _ -> mempty + +-- | Message handler that throws a @Terminate@ exception on @Shutdown@. +handleTerminateOnShutdown :: MessageHandler internal user +handleTerminateOnShutdown Shutdown = throwIO Terminate +handleTerminateOnShutdown _ = pure () + +handleNoop :: MessageHandler internal user +handleNoop = mempty + +{- | UNSAFE shorthand + + doing \case + A -> res + +is shorthand for + + \case + A -> res + _ -> mempty + +-- mapMaybe' = mapMaybe . partialBinding +-- Just 10 >>= partialBinding \10 -> "10" +partialBinding :: (a -> b) -> (a -> Maybe b) +partialBinding f a = unsafePerformIO do + try @PatternMatchFail (evaluate (f a)) >>= \case + Left (PatternMatchFail err) + | any (`isSuffixOf` err) + [ ": Non-exhaustive patterns in lambda\n" + , ": Non-exhaustive patterns in \\case\n" + , ": Non-exhaustive patterns in \\cases\n" + ] + -> pure @IO (Nothing) + Left err -> + throwIO err + Right b -> + pure @IO (Just b) + +doing :: Monoid m => (a -> m) -> (a -> m) +doing f a = case partialBinding f a of + Nothing -> mempty + Just b -> b +-} diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Rotator.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Rotator.hs index a8661dcd204..0de0a1140f7 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Rotator.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Logs/Rotator.hs @@ -6,7 +6,7 @@ module Cardano.Tracer.Handlers.Logs.Rotator ) where import Cardano.Tracer.Configuration -import Cardano.Tracer.Environment +import Cardano.Tracer.Environment (TracerEnv (..), handleNoop, forever'tilShutdown) import Cardano.Tracer.Handlers.Logs.Utils (createOrUpdateEmptyLog, getTimeStampFromLog, isItLog) import Cardano.Tracer.MetaTrace @@ -14,8 +14,9 @@ import Cardano.Tracer.Types (HandleRegistry, HandleRegistryKey, NodeNa import Cardano.Tracer.Utils (showProblemIfAny, readRegistry) import Control.Concurrent.Async (forConcurrently_) +import Control.Concurrent.Chan.Unagi (dupChan) import Control.Concurrent.Extra (Lock) -import Control.Monad (forM_, forever, unless, when) +import Control.Monad (forM_, unless, when) import Control.Monad.Extra (whenJust, whenM) import Data.Foldable (for_) import Data.List (nub, sort) @@ -33,39 +34,41 @@ import System.Time.Extra (sleep) -- | Runs rotation mechanism for the log files. runLogsRotator :: TracerEnv -> IO () -runLogsRotator TracerEnv - { teConfig = TracerConfig{rotation, verbosity, logging} - , teCurrentLogLock - , teTracer - , teRegistry - } = do - whenJust rotation \rotParams -> do +runLogsRotator tracerEnv@TracerEnv { teConfig = TracerConfig{rotation}, teTracer } = do + whenJust rotation \rot -> do traceWith teTracer TracerStartedLogRotator - launchRotator loggingParamsForFiles rotParams verbosity teTracer teRegistry teCurrentLogLock - where + launchRotator tracerEnv rot + +launchRotator + :: TracerEnv + -> RotationParams + -> IO () +launchRotator tracerEnv rot@RotationParams{rpFrequencySecs} = do + whenNonEmpty loggingParamsForFiles do + outChan <- dupChan teInChan + forever'tilShutdown handleNoop outChan do + showProblemIfAny verbosity teTracer do + forM_ loggingParamsForFiles \loggingParam -> do + checkRootDir teCurrentLogLock teRegistry rot loggingParam + sleep (fromIntegral rpFrequencySecs) + where + whenNonEmpty :: Applicative f => [a] -> f () -> f () + whenNonEmpty = unless . null + + TracerEnv + { teConfig = TracerConfig{verbosity, logging} + , teCurrentLogLock + , teRegistry + , teInChan + , teTracer + } = tracerEnv + loggingParamsForFiles :: [LoggingParams] loggingParamsForFiles = nub (NE.filter filesOnly logging) filesOnly :: LoggingParams -> Bool filesOnly LoggingParams{logMode} = logMode == FileMode -launchRotator - :: [LoggingParams] - -> RotationParams - -> Maybe Verbosity - -> Trace IO TracerTrace - -> HandleRegistry - -> Lock - -> IO () -launchRotator [] _ _ _ _ _ = return () -launchRotator loggingParamsForFiles - rotParams@RotationParams{rpFrequencySecs} verb tracer registry currentLogLock = - forever do - showProblemIfAny verb tracer do - forM_ loggingParamsForFiles \loggingParam -> do - checkRootDir currentLogLock registry rotParams loggingParam - sleep $ fromIntegral rpFrequencySecs - -- | All the logs with 'TraceObject's received from particular node -- will be stored in a separate subdirectory in the root directory. -- diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs index 182cadff9f4..ea101f13703 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Monitoring.hs @@ -14,6 +14,8 @@ import Cardano.Tracer.Types import Prelude hiding (head) +import Control.Concurrent.Async (race_) +import Control.Concurrent.Chan.Unagi (OutChan, dupChan) import Data.ByteString as ByteString (ByteString, isInfixOf) import Data.ByteString.Builder (stringUtf8) import qualified Data.Text as T @@ -39,7 +41,7 @@ runMonitoringServer -> Endpoint -- ^ (web page with list of connected nodes, EKG web page). -> IO RouteDictionary -> IO () -runMonitoringServer TracerEnv{teTracer} endpoint computeRoutes_autoUpdate = do +runMonitoringServer TracerEnv{teTracer, teInChan = inChan} endpoint computeRoutes_autoUpdate = do -- Pause to prevent collision between "Listening"-notifications from servers. sleep 0.2 traceWith teTracer TracerStartedMonitoring @@ -47,11 +49,18 @@ runMonitoringServer TracerEnv{teTracer} endpoint computeRoutes_autoUpdate = do , ttMonitoringType = "list" } dummyStore <- EKG.newStore - runSettings (setEndpoint endpoint defaultSettings) do - renderEkg dummyStore computeRoutes_autoUpdate + outChan <- dupChan inChan + + let run :: IO () + run = runSettings (setEndpoint endpoint defaultSettings) $ + renderEkg dummyStore outChan computeRoutes_autoUpdate + + race_ run (blockUntilShutdown outChan) + +renderEkg :: EKG.Store -> OutChan (CardanoTracerMessage ()) -> IO RouteDictionary -> Application +renderEkg dummyStore outChan computeRoutes_autoUpdate request send = do + dieOnShutdown outChan -renderEkg :: EKG.Store -> IO RouteDictionary -> Application -renderEkg dummyStore computeRoutes_autoUpdate request send = do routeDictionary :: RouteDictionary <- computeRoutes_autoUpdate diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs index 68cf43b9646..6e0a155a309 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Prometheus.hs @@ -17,6 +17,8 @@ import Prelude hiding (head) import Control.Applicative ((<|>)) import Data.Aeson (ToJSON (..), encode, pairs, (.=)) +import Control.Concurrent.Async (race_) +import Control.Concurrent.Chan.Unagi (dupChan) import qualified Data.ByteString as ByteString import Data.Functor ((<&>)) import qualified Data.Map as Map (Map, empty, fromList) @@ -85,16 +87,20 @@ runPrometheusServer tracerEnv endpoint computeRoutes_autoUpdate = do -- If everything is okay, the function 'simpleHttpServe' never returns. -- But if there is some problem, it never throws an exception, but just stops. -- So if it stopped - it will be re-started. - traceWith teTracer TracerStartedPrometheus + traceWith tracer TracerStartedPrometheus { ttPrometheusEndpoint = endpoint } - runSettings (setEndpoint endpoint defaultSettings) do - renderPrometheus computeRoutes_autoUpdate noSuffix teMetricsHelp promLabels + outChan <- dupChan inChan + let run :: IO () + run = runSettings (setEndpoint endpoint defaultSettings) $ + renderPrometheus computeRoutes_autoUpdate noSuffix metricsHelp promLabels + race_ run (blockUntilShutdown outChan) where TracerEnv - { teTracer - , teConfig = TracerConfig { metricsNoSuffix, prometheusLabels } - , teMetricsHelp + { teTracer = tracer + , teConfig = TracerConfig { metricsNoSuffix, prometheusLabels } + , teMetricsHelp = metricsHelp + , teInChan = inChan } = tracerEnv noSuffix = or @Maybe metricsNoSuffix diff --git a/cardano-tracer/src/Cardano/Tracer/Run.hs b/cardano-tracer/src/Cardano/Tracer/Run.hs index 712ce1224ea..246a32bd03f 100644 --- a/cardano-tracer/src/Cardano/Tracer/Run.hs +++ b/cardano-tracer/src/Cardano/Tracer/Run.hs @@ -2,11 +2,14 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE OverloadedRecordDot #-} -- | This top-level module is used by 'cardano-tracer' app. module Cardano.Tracer.Run ( doRunCardanoTracer , runCardanoTracer + , CardanoTracerHandle (..) + , cleanupCardanoTracer ) where import Cardano.Logging.Resources @@ -26,42 +29,48 @@ import Cardano.Tracer.MetaTrace import Cardano.Tracer.Types import Cardano.Tracer.Utils -import Control.Applicative import Control.Concurrent (threadDelay) +import Control.Concurrent.Chan.Unagi import Control.Concurrent.Async (async, link) import Control.Concurrent.Extra (newLock) #if RTVIEW import Control.Concurrent.STM.TVar (newTVarIO) #endif import Control.Exception (SomeException, try) -import Control.Monad import Data.Aeson (decodeFileStrict') import Data.Foldable (for_) -import Data.Maybe (fromMaybe) +import Data.Kind (Type) import qualified Data.Map.Strict as M (Map, empty, filter, toList) +import Data.Maybe (fromMaybe) import Data.Text as T (Text, null) import Data.Text.Lazy.Builder as TB (Builder, fromText) +type CardanoTracerHandle :: Type -> Type +newtype CardanoTracerHandle user = CardanoTracerHandle + { inChan :: InChan (CardanoTracerMessage user) + } + +cleanupCardanoTracer :: CardanoTracerHandle user -> IO () +cleanupCardanoTracer handle = + writeChan handle.inChan Shutdown -- | Top-level run function, called by 'cardano-tracer' app. -runCardanoTracer :: TracerParams -> IO () -runCardanoTracer TracerParams{tracerConfig, stateDir, logSeverity} = do - tr <- mkTracerTracer $ SeverityF $ logSeverity <|> Just Info -- default severity filter to Info - traceWith tr TracerBuildInfo +runCardanoTracer :: Trace IO TracerTrace -> TracerParams -> IO (CardanoTracerHandle ()) +runCardanoTracer tracer TracerParams{tracerConfig, stateDir, logSeverity} = do + traceWith tracer TracerBuildInfo #if RTVIEW - { ttBuiltWithRTView = True + { ttBuiltWithRTView = True } #else - { ttBuiltWithRTView = False + { ttBuiltWithRTView = False } #endif - } - traceWith tr TracerParamsAre + traceWith tracer TracerParamsAre { ttConfigPath = tracerConfig , ttStateDir = stateDir , ttMinLogSeverity = logSeverity } config <- readTracerConfig tracerConfig - traceWith tr TracerConfigIs + traceWith tracer TracerConfigIs { ttConfig = config #if RTVIEW , ttWarnRTViewMissing = False @@ -70,18 +79,11 @@ runCardanoTracer TracerParams{tracerConfig, stateDir, logSeverity} = do #endif } - for_ (resourceFreq config) \msInterval -> do - threadId <- async do - forever do - mbrs <- readResourceStats - for_ mbrs \resourceStat -> - traceWith tr (TracerResource resourceStat) - threadDelay (1_000 * msInterval) -- Delay in seconds, given milliseconds - link threadId - brake <- initProtocolsBrake dpRequestors <- initDataPointRequestors - doRunCardanoTracer config stateDir tr brake dpRequestors + cardanoTracerHandle@CardanoTracerHandle{inChan} <- doRunCardanoTracer config stateDir tracer brake dpRequestors + traceResourceStats inChan tracer (resourceFreq config) + pure cardanoTracerHandle -- | Runs all internal services of the tracer. doRunCardanoTracer @@ -90,12 +92,12 @@ doRunCardanoTracer -> Trace IO TracerTrace -> ProtocolsBrake -- ^ The flag we use to stop all the protocols. -> DataPointRequestors -- ^ The DataPointRequestors to ask 'DataPoint's. - -> IO () -doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do - traceWith tr TracerInitStarted + -> IO (CardanoTracerHandle ()) +doRunCardanoTracer config rtViewStateDir tracer protocolsBrake dpRequestors = do + traceWith tracer TracerInitStarted connectedNodes <- initConnectedNodes connectedNodesNames <- initConnectedNodesNames - acceptedMetrics <- initAcceptedMetrics + acceptedMetrics <- initAcceptedMetrics mHelp <- loadMetricsHelp $ metricsHelp config #if RTVIEW @@ -109,16 +111,18 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do currentLogLock <- newLock currentDPLock <- newLock - traceWith tr TracerInitEventQueues + traceWith tracer TracerInitEventQueues #if RTVIEW - eventsQueues <- initEventsQueues tr rtViewStateDir connectedNodesNames dpRequestors currentDPLock + eventsQueues <- initEventsQueues tracer rtViewStateDir connectedNodesNames dpRequestors currentDPLock rtViewPageOpened <- newTVarIO False #endif - (reforwardTraceObject,_trDataPoint) <- initReForwarder config tr + (reforwardTraceObject, _trDataPoint) <- initReForwarder config tracer registry <- newRegistry + (inChan, _outChan) <- newChan + -- Environment for all following functions. let tracerEnv :: TracerEnv tracerEnv = TracerEnv @@ -130,11 +134,12 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do , teCurrentDPLock = currentDPLock , teDPRequestors = dpRequestors , teProtocolsBrake = protocolsBrake - , teTracer = tr + , teTracer = tracer , teReforwardTraceObjects = reforwardTraceObject , teRegistry = registry , teStateDir = rtViewStateDir , teMetricsHelp = mHelp + , teInChan = inChan } tracerEnvRTView :: TracerEnvRTView @@ -150,24 +155,31 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do #endif -- Specify what should be done before 'cardano-tracer' stops. - beforeProgramStops $ do - traceWith tr TracerShutdownInitiated + beforeProgramStops do + traceWith tracer TracerShutdownInitiated #if RTVIEW backupAllHistory tracerEnv tracerEnvRTView - traceWith tr TracerShutdownHistBackup + traceWith tracer TracerShutdownHistBackup #endif applyBrake (teProtocolsBrake tracerEnv) - traceWith tr TracerShutdownComplete + traceWith tracer TracerShutdownComplete + + traceWith tracer TracerInitDone - traceWith tr TracerInitDone - sequenceConcurrently_ - [ runLogsRotator tracerEnv - , runMetricsServers tracerEnv - , runAcceptors tracerEnv tracerEnvRTView + let runs :: [IO ()] + runs = + [ runLogsRotator tracerEnv + , runMetricsServers tracerEnv + , runAcceptors tracerEnv tracerEnvRTView #if RTVIEW - , runRTView tracerEnv tracerEnvRTView + , runRTView tracerEnv tracerEnvRTView #endif - ] + ] + sequenceConcurrently_ runs + + pure CardanoTracerHandle + { inChan + } -- NB. this fails silently if there's any read or decode error when an external JSON file is provided loadMetricsHelp :: Maybe FileOrMap -> IO [(Text, Builder)] @@ -181,3 +193,15 @@ loadMetricsHelp (Just (FOM x)) = do Right object -> pure object pure $ (M.toList . fmap TB.fromText . M.filter (not . T.null)) result + +traceResourceStats :: InChan (CardanoTracerMessage ()) -> Trace IO TracerTrace -> Maybe Int -> IO () +traceResourceStats inChan tracer freq = + for_ @Maybe freq \msInterval -> do + outChan <- dupChan inChan + asyncId <- async do + forever'tilShutdown handleNoop outChan do + mbrs <- readResourceStats + for_ @Maybe mbrs \resourceStat -> do + traceWith tracer (TracerResource resourceStat) + threadDelay (1_000 * msInterval) -- Delay in seconds, given milliseconds + link asyncId diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs b/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs index e26ec077675..c5cfff20e1b 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs @@ -19,6 +19,7 @@ import Cardano.Tracer.MetaTrace import Cardano.Tracer.Types import Cardano.Tracer.Utils +import Control.Concurrent.Chan.Unagi (newChan) import Control.Concurrent.Extra (newLock) #if RTVIEW import Control.Concurrent.STM.TVar (newTVarIO, readTVarIO) @@ -67,6 +68,8 @@ launchAcceptorsSimple mode localSock dpName = do registry <- newRegistry + (inChan, _outChan) <- newChan + let tracerEnv :: TracerEnv tracerEnv = TracerEnv { teConfig = mkConfig @@ -82,6 +85,7 @@ launchAcceptorsSimple mode localSock dpName = do , teRegistry = registry , teStateDir = Nothing , teMetricsHelp = [] + , teInChan = inChan } tracerEnvRTView :: TracerEnvRTView diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs b/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs index de95bef2a5d..5b2d408270b 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Forwarder.hs @@ -18,7 +18,7 @@ module Cardano.Tracer.Test.Forwarder import Cardano.Logging (DetailLevel (..), SeverityS (..), TraceObject (..)) import Cardano.Logging.Types (HowToConnect) import qualified Cardano.Logging.Types as Net -import Cardano.Logging.Utils (runInLoop) +import Cardano.Logging.Utils (RunInLoopTermination(TerminateNever), runInLoop) import Cardano.Tracer.Configuration (Verbosity (..)) import Cardano.Tracer.Test.TestSetup import Cardano.Tracer.Test.Utils @@ -95,7 +95,7 @@ launchForwardersSimple -> Word -> IO () launchForwardersSimple ts mode howToConnect queueSize = withIOManager \iomgr -> - runInLoop (launchForwardersSimple' ts iomgr mode howToConnect queueSize) handleInterruption 1 60 + runInLoop (launchForwardersSimple' ts iomgr mode howToConnect queueSize) TerminateNever handleInterruption 1 60 where handleInterruption = const $ pure () diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs index a1441badbf5..12366cd48b9 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs @@ -20,6 +20,7 @@ import Ouroboros.Network.Magic (NetworkMagic (..)) import Control.Concurrent.Async (asyncBound, uninterruptibleCancel) import Control.Monad (forM_) import Control.Monad.Extra (ifM) +import Data.Functor (void) import qualified Data.List.NonEmpty as NE import System.Directory (removePathForcibly) import System.Directory.Extra (listDirectories) @@ -41,8 +42,8 @@ propNetworkForwarder ts rootDir localSock = do brake <- initProtocolsBrake dpRequestors <- initDataPointRequestors propNetwork' ts rootDir - ( launchForwardersSimple ts Initiator (Net.LocalPipe localSock) 10000 - , doRunCardanoTracer config (Just $ rootDir <> "/../state") stderrShowTracer brake dpRequestors + ( launchForwardersSimple ts Initiator (Net.LocalPipe localSock) 1000 + , void $ doRunCardanoTracer config (Just $ rootDir <> "/../state") stderrShowTracer brake dpRequestors ) propNetwork' diff --git a/trace-dispatcher/src/Cardano/Logging/Utils.hs b/trace-dispatcher/src/Cardano/Logging/Utils.hs index 2630da48f05..32939b54b5b 100644 --- a/trace-dispatcher/src/Cardano/Logging/Utils.hs +++ b/trace-dispatcher/src/Cardano/Logging/Utils.hs @@ -6,7 +6,7 @@ module Cardano.Logging.Utils import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (concurrently_) +import Control.Concurrent.Async (concurrently_, race_) import Control.Exception (SomeAsyncException (..), SomeException, fromException, tryJust) import Data.IORef import qualified Data.Text as T @@ -16,15 +16,18 @@ import qualified Data.Text.Lazy.Builder.Int as T import qualified Data.Text.Lazy.Builder.RealFloat as T (realFloat) import GHC.Conc (labelThread, myThreadId) +data RunInLoopTermination + = TerminateNever + | TerminateWhenNoLongerBlocking (IO ()) -- | Run an IO action which may throw an exception in a loop. -- On exception, the action will be re-run after a pause. -- That pause doubles which each exception, but is reset when the action runs long enough. -runInLoop :: IO () -> (SomeException -> IO ()) -> Word -> Word -> IO () -runInLoop action handleInterruption initialDelay maxDelay - | initialDelay == 0 = runInLoop action handleInterruption 1 maxDelay - | maxDelay < initialDelay = runInLoop action handleInterruption initialDelay initialDelay - | otherwise = newIORef (fromIntegral initialDelay) >>= go +runInLoop :: IO () -> RunInLoopTermination -> (SomeException -> IO ()) -> Word -> Word -> IO () +runInLoop action runInLoopTermination handleInterruption initialDelay maxDelay + | initialDelay == 0 = runInLoop action runInLoopTermination handleInterruption 1 maxDelay + | maxDelay < initialDelay = runInLoop action runInLoopTermination handleInterruption initialDelay initialDelay + | otherwise = newIORef (fromIntegral initialDelay) >>= interpret where go :: IORef Int -> IO () go currentDelay = @@ -36,6 +39,11 @@ runInLoop action handleInterruption initialDelay maxDelay go currentDelay Right _ -> return () + interpret :: IORef Int -> IO () + interpret currentDelay = case runInLoopTermination of + TerminateNever -> go currentDelay + TerminateWhenNoLongerBlocking blocking -> race_ blocking (go currentDelay) + -- if the action runs at least maxDelay seconds, the pause is reset actionResettingDelay currentDelay = concurrently_ action $ do threadDelay $ fromIntegral $ 1_000_000 * maxDelay diff --git a/trace-forward/src/Trace/Forward/Forwarding.hs b/trace-forward/src/Trace/Forward/Forwarding.hs index fae115607c2..b3eef949e1f 100644 --- a/trace-forward/src/Trace/Forward/Forwarding.hs +++ b/trace-forward/src/Trace/Forward/Forwarding.hs @@ -16,7 +16,7 @@ module Trace.Forward.Forwarding ) where import Cardano.Logging.Types -import Cardano.Logging.Utils (runInLoop) +import Cardano.Logging.Utils (runInLoop, RunInLoopTermination(..)) import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) import Ouroboros.Network.IOManager (IOManager) import Ouroboros.Network.Magic (NetworkMagic) @@ -208,6 +208,7 @@ launchForwarders iomgr forwarding sink initEKGStore dpStore) + TerminateNever (fromMaybe (const $ pure ()) initOnForwardInterruption) 1 maxReconnectDelay