diff --git a/service/api/sse/client.go b/service/api/sse/client.go index 6e989d9d..1734e9e3 100644 --- a/service/api/sse/client.go +++ b/service/api/sse/client.go @@ -2,7 +2,9 @@ package sse import ( "errors" + "fmt" "strings" + "sync/atomic" "github.com/splitio/go-split-commons/v9/conf" "github.com/splitio/go-split-commons/v9/dtos" @@ -18,6 +20,13 @@ const ( keepAlive = 70 ) +// Client state constants for atomic state management +const ( + StateIdle int32 = 0 // Client is idle/not started + StateRunning int32 = 1 // Client is running + StateDestroyed int32 = -1 // Client is destroyed/stopped +) + // StreamingClient interface type StreamingClient interface { ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) @@ -27,11 +36,13 @@ type StreamingClient interface { // StreamingClientImpl struct type StreamingClientImpl struct { - sseClient *sse.Client - logger logging.LoggerInterface - lifecycle lifecycle.Manager - metadata dtos.Metadata - clientKey *string + sseClient *sse.Client + logger logging.LoggerInterface + lifecycle lifecycle.Manager + metadata dtos.Metadata + clientKey *string + state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed + goroutineStarted chan struct{} // Signals when goroutine has started } // Status constants @@ -54,15 +65,32 @@ func NewStreamingClient(cfg *conf.AdvancedConfig, logger logging.LoggerInterface metadata: metadata, clientKey: clientKey, } + client.state.Store(StateIdle) client.lifecycle.Setup() return client } // ConnectStreaming connects to streaming func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) { + fmt.Println("VAMOS A VERRRRRRRR") + // Atomic state check: Only proceed if state is Idle (0) + if s.state.Load() != StateIdle { + s.logger.Info("Client is not in idle state (already running or destroyed). Ignoring") + return + } if !s.lifecycle.BeginInitialization() { s.logger.Info("Connection is already in process/running. Ignoring") + s.state.Store(StateIdle) // Reset state since lifecycle check failed + return + } + + // Now that lifecycle check passed, atomically transition to Running + // This ensures we only set Running if goroutine will actually be spawned + if !s.state.CompareAndSwap(StateIdle, StateRunning) { + s.logger.Info("Client destroyed during initialization. Aborting") + // Lifecycle was started but we're not spawning goroutine, need to complete it + s.lifecycle.ShutdownComplete() return } @@ -71,21 +99,65 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha params["accessToken"] = token params["v"] = version + // Channel to signal goroutine has started (prevents race with StopStreaming) + // Must be created AFTER lifecycle check to ensure goroutine will be spawned + goroutineStarted := make(chan struct{}) + s.goroutineStarted = goroutineStarted + go func() { - defer s.lifecycle.ShutdownComplete() + // Signal that goroutine has started executing + close(goroutineStarted) + + defer func() { + s.lifecycle.ShutdownComplete() + // Reset to idle if not destroyed + if s.state.Load() != StateDestroyed { + s.state.Store(StateIdle) + } + }() + + // Early exit if client was destroyed while goroutine was starting + if s.state.Load() <= StateIdle { + s.logger.Info("Client state is not valid (destroyed or idle). Exiting goroutine") + return + } + if !s.lifecycle.InitializationComplete() { return } + + // Helper to send status without blocking if destroyed + sendStatus := func(status int) { + currentState := s.state.Load() + if currentState == StateDestroyed { + // Client is being destroyed, don't block + select { + case streamingStatus <- status: + default: + s.logger.Debug("Client destroyed, skipping status send") + } + return + } + // Normal operation: blocking send is safe + streamingStatus <- status + } + + // Final check before starting connection - prevent race if Destroy was called + if s.state.Load() != StateRunning { + s.logger.Info("Client destroyed before connection started. Exiting goroutine") + return + } + firstEventReceived := gtSync.NewAtomicBool(false) out := s.sseClient.Do(params, api.AddMetadataToHeaders(s.metadata, nil, s.clientKey), func(m IncomingMessage) { if firstEventReceived.TestAndSet() && !m.IsError() { - streamingStatus <- StatusFirstEventOk + sendStatus(StatusFirstEventOk) } handleIncomingMessage(m) }) if out == nil { // all good - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) return } @@ -94,18 +166,18 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha asConnectionFailedError := &sse.ErrConnectionFailed{} if errors.As(out, &asConnectionFailedError) { - streamingStatus <- StatusConnectionFailed + sendStatus(StatusConnectionFailed) return } switch out { case sse.ErrNotIdle: // If this happens we have a bug - streamingStatus <- StatusUnderlyingClientInUse + sendStatus(StatusUnderlyingClientInUse) case sse.ErrReadingStream: - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) case sse.ErrTimeout: - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) default: } }() @@ -113,16 +185,32 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha // StopStreaming stops streaming func (s *StreamingClientImpl) StopStreaming() { - if !s.lifecycle.BeginShutdown() { - s.logger.Info("SSE client wrapper not running. Ignoring") + // Check old state before destroying - we need to know if a goroutine was spawned + oldState := s.state.Swap(StateDestroyed) + + // Always shutdown the underlying SSE client to unblock any in-flight connections + s.sseClient.Shutdown(true) + + // If we were running, a goroutine exists and will call ShutdownComplete() + if oldState == StateRunning { + // Wait for goroutine to actually start before waiting for completion + // This prevents race where goroutine is queued but not started + if s.goroutineStarted != nil { + <-s.goroutineStarted + } + // Try to begin shutdown (might fail if goroutine still initializing) + s.lifecycle.BeginShutdown() + // But always wait for the goroutine to complete + s.lifecycle.AwaitShutdownComplete() + s.logger.Info("Stopped streaming") return } - s.sseClient.Shutdown(true) - s.lifecycle.AwaitShutdownComplete() - s.logger.Info("Stopped streaming") + + // No goroutine was spawned, safe to return + s.logger.Info("SSE client wrapper not running. Ignoring") } // IsRunning returns true if the client is running func (s *StreamingClientImpl) IsRunning() bool { - return s.lifecycle.IsRunning() + return s.lifecycle.IsRunning() && s.state.Load() == StateRunning } diff --git a/synchronizer/manager.go b/synchronizer/manager.go index dbb7cf2d..9724ebdb 100644 --- a/synchronizer/manager.go +++ b/synchronizer/manager.go @@ -165,11 +165,16 @@ func (s *ManagerImpl) Start() { } func (s *ManagerImpl) stop() { + fmt.Println("---Inside stop---") if s.pushManager != nil { + fmt.Println("---Inside stop--- s.pushManager != nil ") s.pushManager.Stop() } + fmt.Println("---Inside stop---StopPeriodicFetching()") s.synchronizer.StopPeriodicFetching() + fmt.Println("---Inside stop---StopPeriodicDataRecording()") s.synchronizer.StopPeriodicDataRecording() + fmt.Println("---Inside stop---ShutdownComplete()") s.lifecycle.ShutdownComplete() } @@ -185,12 +190,16 @@ func (s *ManagerImpl) Stop() { } func (s *ManagerImpl) pushStatusWatcher() { + fmt.Println("---Inside pushStatusWatcher before stop---") defer s.stop() + fmt.Println("---Inside pushStatusWatcher---After stop") for { select { case <-s.lifecycle.ShutdownRequested(): + fmt.Println("---Inside pushStatusWatcher---ShutdownRequested") return case status := <-s.streamingStatus: + fmt.Println("---Inside pushStatusWatcher---streamingStatus ", status) switch status { case push.StatusUp: s.stopPolling()