From e7099f53d6252e1af0464ad042adaf7679d57aec Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Thu, 22 Jan 2026 16:56:05 -0300 Subject: [PATCH 1/3] Updated --- go.mod | 2 +- go.sum | 4 ++-- synchronizer/manager.go | 9 +++++++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 708e4309..f7aa308b 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/bits-and-blooms/bloom/v3 v3.3.1 - github.com/splitio/go-toolkit/v5 v5.4.1 + github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4 github.com/stretchr/testify v1.11.1 golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 8ce993ea..b6ac049d 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/splitio/go-toolkit/v5 v5.4.1 h1:srTyvDBJZMUcJ/KiiQDMyjCuELVgTBh2TGRVn0sOXEE= -github.com/splitio/go-toolkit/v5 v5.4.1/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw= +github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4 h1:kzIV4qa+OgPyStVEk0QWP+skkI/M5UAgmNSf/awcoek= +github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/synchronizer/manager.go b/synchronizer/manager.go index dbb7cf2d..9724ebdb 100644 --- a/synchronizer/manager.go +++ b/synchronizer/manager.go @@ -165,11 +165,16 @@ func (s *ManagerImpl) Start() { } func (s *ManagerImpl) stop() { + fmt.Println("---Inside stop---") if s.pushManager != nil { + fmt.Println("---Inside stop--- s.pushManager != nil ") s.pushManager.Stop() } + fmt.Println("---Inside stop---StopPeriodicFetching()") s.synchronizer.StopPeriodicFetching() + fmt.Println("---Inside stop---StopPeriodicDataRecording()") s.synchronizer.StopPeriodicDataRecording() + fmt.Println("---Inside stop---ShutdownComplete()") s.lifecycle.ShutdownComplete() } @@ -185,12 +190,16 @@ func (s *ManagerImpl) Stop() { } func (s *ManagerImpl) pushStatusWatcher() { + fmt.Println("---Inside pushStatusWatcher before stop---") defer s.stop() + fmt.Println("---Inside pushStatusWatcher---After stop") for { select { case <-s.lifecycle.ShutdownRequested(): + fmt.Println("---Inside pushStatusWatcher---ShutdownRequested") return case status := <-s.streamingStatus: + fmt.Println("---Inside pushStatusWatcher---streamingStatus ", status) switch status { case push.StatusUp: s.stopPolling() From 17ab7766d3790ad47b9eace0e6c34a0f283e56a3 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 1 Apr 2026 11:59:33 -0500 Subject: [PATCH 2/3] Updated sse client to fix timeout issue --- go.mod | 2 +- go.sum | 4 +-- service/api/sse/client.go | 69 ++++++++++++++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index f7aa308b..708e4309 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/bits-and-blooms/bloom/v3 v3.3.1 - github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4 + github.com/splitio/go-toolkit/v5 v5.4.1 github.com/stretchr/testify v1.11.1 golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index b6ac049d..8ce993ea 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= -github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4 h1:kzIV4qa+OgPyStVEk0QWP+skkI/M5UAgmNSf/awcoek= -github.com/splitio/go-toolkit/v5 v5.4.1-0.20260122194008-238694c3e6f4/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw= +github.com/splitio/go-toolkit/v5 v5.4.1 h1:srTyvDBJZMUcJ/KiiQDMyjCuELVgTBh2TGRVn0sOXEE= +github.com/splitio/go-toolkit/v5 v5.4.1/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/service/api/sse/client.go b/service/api/sse/client.go index 6e989d9d..94915d80 100644 --- a/service/api/sse/client.go +++ b/service/api/sse/client.go @@ -3,6 +3,7 @@ package sse import ( "errors" "strings" + "sync/atomic" "github.com/splitio/go-split-commons/v9/conf" "github.com/splitio/go-split-commons/v9/dtos" @@ -18,6 +19,13 @@ const ( keepAlive = 70 ) +// Client state constants for atomic state management +const ( + StateIdle int32 = 0 // Client is idle/not started + StateRunning int32 = 1 // Client is running + StateDestroyed int32 = -1 // Client is destroyed/stopped +) + // StreamingClient interface type StreamingClient interface { ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) @@ -32,6 +40,7 @@ type StreamingClientImpl struct { lifecycle lifecycle.Manager metadata dtos.Metadata clientKey *string + state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed } // Status constants @@ -54,6 +63,7 @@ func NewStreamingClient(cfg *conf.AdvancedConfig, logger logging.LoggerInterface metadata: metadata, clientKey: clientKey, } + client.state.Store(StateIdle) client.lifecycle.Setup() return client } @@ -61,8 +71,15 @@ func NewStreamingClient(cfg *conf.AdvancedConfig, logger logging.LoggerInterface // ConnectStreaming connects to streaming func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) { + // Atomic state check: Only proceed if state is Idle (0) + if !s.state.CompareAndSwap(StateIdle, StateRunning) { + s.logger.Info("Client is not in idle state (already running or destroyed). Ignoring") + return + } + if !s.lifecycle.BeginInitialization() { s.logger.Info("Connection is already in process/running. Ignoring") + s.state.Store(StateIdle) // Reset state since lifecycle check failed return } @@ -72,20 +89,53 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha params["v"] = version go func() { - defer s.lifecycle.ShutdownComplete() + defer func() { + s.lifecycle.ShutdownComplete() + // Reset to idle if not destroyed + if s.state.Load() != StateDestroyed { + s.state.Store(StateIdle) + } + }() + + // Early exit if client was destroyed while goroutine was starting + if s.state.Load() <= StateIdle { + s.logger.Info("Client state is not valid (destroyed or idle). Exiting goroutine") + return + } + if !s.lifecycle.InitializationComplete() { return } + + // Helper to send status without blocking + sendStatus := func(status int) { + // Only send if client is still running + if s.state.Load() != StateRunning { + return + } + select { + case streamingStatus <- status: + default: + s.logger.Debug("streamingStatus channel full or closed, skipping send") + } + } + + // Final check before starting connection - prevent race if Destroy was called + if s.state.Load() != StateRunning { + s.logger.Info("Client destroyed before connection started. Exiting goroutine") + return + } + firstEventReceived := gtSync.NewAtomicBool(false) out := s.sseClient.Do(params, api.AddMetadataToHeaders(s.metadata, nil, s.clientKey), func(m IncomingMessage) { if firstEventReceived.TestAndSet() && !m.IsError() { - streamingStatus <- StatusFirstEventOk + sendStatus(StatusFirstEventOk) } handleIncomingMessage(m) }) if out == nil { // all good - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) return } @@ -94,18 +144,18 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha asConnectionFailedError := &sse.ErrConnectionFailed{} if errors.As(out, &asConnectionFailedError) { - streamingStatus <- StatusConnectionFailed + sendStatus(StatusConnectionFailed) return } switch out { case sse.ErrNotIdle: // If this happens we have a bug - streamingStatus <- StatusUnderlyingClientInUse + sendStatus(StatusUnderlyingClientInUse) case sse.ErrReadingStream: - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) case sse.ErrTimeout: - streamingStatus <- StatusDisconnected + sendStatus(StatusDisconnected) default: } }() @@ -113,6 +163,9 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha // StopStreaming stops streaming func (s *StreamingClientImpl) StopStreaming() { + // Set atomic state to destroyed immediately to prevent new goroutines + s.state.Store(StateDestroyed) + if !s.lifecycle.BeginShutdown() { s.logger.Info("SSE client wrapper not running. Ignoring") return @@ -124,5 +177,5 @@ func (s *StreamingClientImpl) StopStreaming() { // IsRunning returns true if the client is running func (s *StreamingClientImpl) IsRunning() bool { - return s.lifecycle.IsRunning() + return s.lifecycle.IsRunning() && s.state.Load() == StateRunning } From ca17d3564cf92431fba6681e9a6d98fb1edec4e4 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 1 Apr 2026 15:23:37 -0500 Subject: [PATCH 3/3] Updated sse client --- service/api/sse/client.go | 81 ++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/service/api/sse/client.go b/service/api/sse/client.go index 94915d80..1734e9e3 100644 --- a/service/api/sse/client.go +++ b/service/api/sse/client.go @@ -2,6 +2,7 @@ package sse import ( "errors" + "fmt" "strings" "sync/atomic" @@ -35,12 +36,13 @@ type StreamingClient interface { // StreamingClientImpl struct type StreamingClientImpl struct { - sseClient *sse.Client - logger logging.LoggerInterface - lifecycle lifecycle.Manager - metadata dtos.Metadata - clientKey *string - state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed + sseClient *sse.Client + logger logging.LoggerInterface + lifecycle lifecycle.Manager + metadata dtos.Metadata + clientKey *string + state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed + goroutineStarted chan struct{} // Signals when goroutine has started } // Status constants @@ -70,9 +72,9 @@ func NewStreamingClient(cfg *conf.AdvancedConfig, logger logging.LoggerInterface // ConnectStreaming connects to streaming func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) { - + fmt.Println("VAMOS A VERRRRRRRR") // Atomic state check: Only proceed if state is Idle (0) - if !s.state.CompareAndSwap(StateIdle, StateRunning) { + if s.state.Load() != StateIdle { s.logger.Info("Client is not in idle state (already running or destroyed). Ignoring") return } @@ -83,12 +85,29 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha return } + // Now that lifecycle check passed, atomically transition to Running + // This ensures we only set Running if goroutine will actually be spawned + if !s.state.CompareAndSwap(StateIdle, StateRunning) { + s.logger.Info("Client destroyed during initialization. Aborting") + // Lifecycle was started but we're not spawning goroutine, need to complete it + s.lifecycle.ShutdownComplete() + return + } + params := make(map[string]string) params["channels"] = strings.Join(append(channelList), ",") params["accessToken"] = token params["v"] = version + // Channel to signal goroutine has started (prevents race with StopStreaming) + // Must be created AFTER lifecycle check to ensure goroutine will be spawned + goroutineStarted := make(chan struct{}) + s.goroutineStarted = goroutineStarted + go func() { + // Signal that goroutine has started executing + close(goroutineStarted) + defer func() { s.lifecycle.ShutdownComplete() // Reset to idle if not destroyed @@ -107,17 +126,20 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha return } - // Helper to send status without blocking + // Helper to send status without blocking if destroyed sendStatus := func(status int) { - // Only send if client is still running - if s.state.Load() != StateRunning { + currentState := s.state.Load() + if currentState == StateDestroyed { + // Client is being destroyed, don't block + select { + case streamingStatus <- status: + default: + s.logger.Debug("Client destroyed, skipping status send") + } return } - select { - case streamingStatus <- status: - default: - s.logger.Debug("streamingStatus channel full or closed, skipping send") - } + // Normal operation: blocking send is safe + streamingStatus <- status } // Final check before starting connection - prevent race if Destroy was called @@ -163,16 +185,29 @@ func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus cha // StopStreaming stops streaming func (s *StreamingClientImpl) StopStreaming() { - // Set atomic state to destroyed immediately to prevent new goroutines - s.state.Store(StateDestroyed) + // Check old state before destroying - we need to know if a goroutine was spawned + oldState := s.state.Swap(StateDestroyed) + + // Always shutdown the underlying SSE client to unblock any in-flight connections + s.sseClient.Shutdown(true) - if !s.lifecycle.BeginShutdown() { - s.logger.Info("SSE client wrapper not running. Ignoring") + // If we were running, a goroutine exists and will call ShutdownComplete() + if oldState == StateRunning { + // Wait for goroutine to actually start before waiting for completion + // This prevents race where goroutine is queued but not started + if s.goroutineStarted != nil { + <-s.goroutineStarted + } + // Try to begin shutdown (might fail if goroutine still initializing) + s.lifecycle.BeginShutdown() + // But always wait for the goroutine to complete + s.lifecycle.AwaitShutdownComplete() + s.logger.Info("Stopped streaming") return } - s.sseClient.Shutdown(true) - s.lifecycle.AwaitShutdownComplete() - s.logger.Info("Stopped streaming") + + // No goroutine was spawned, safe to return + s.logger.Info("SSE client wrapper not running. Ignoring") } // IsRunning returns true if the client is running