Skip to content

Commit 59666f4

Browse files
authored
DS-2069 Retry failing origins on startup (#59)
Currently on startup if an origin is failing to connect it isn't added as a connection and isn't retried. This updated the logic so that even failing origins at startup are retried until connected or the stream is closed.
1 parent 98df86e commit 59666f4

File tree

2 files changed

+145
-40
lines changed

2 files changed

+145
-40
lines changed

go/stream.go

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
130130
if err != nil {
131131
c.config.logInfo("client: failed to connect to origin %s: %s", origins[x], err)
132132
errs = append(errs, fmt.Errorf("origin %s: %w", origins[x], err))
133+
// Retry connecting to the origin in the background
134+
go func() {
135+
conn, err := s.newWSconnWithRetry(origins[x])
136+
if err != nil {
137+
return
138+
}
139+
go s.monitorConn(conn)
140+
s.conns = append(s.conns, conn)
141+
}()
133142
continue
134143
}
135144
go s.monitorConn(conn)
@@ -138,7 +147,10 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
138147

139148
// Only fail if we couldn't connect to ANY origins
140149
if len(s.conns) == 0 {
141-
return nil, fmt.Errorf("failed to connect to any origins in HA mode: %v", errs)
150+
err = fmt.Errorf("failed to connect to any origins in HA mode: %v", errs)
151+
s.closeError.CompareAndSwap(nil, err)
152+
s.Close()
153+
return nil, err
142154
}
143155
c.config.logInfo("client: connected to %d out of %d origins in HA mode", len(s.conns), len(origins))
144156
} else {
@@ -237,52 +249,52 @@ func (s *stream) monitorConn(conn *wsConn) {
237249
// ensure the current connection is closed
238250
_ = conn.close()
239251

240-
// reconnect loop
241-
// will try to reconnect until client is closed or
242-
// we have no active connections and have exceeded maxWSReconnectAttempts
243-
var attempts int
244-
for {
245-
var re *wsConn
246-
var err error
247-
248-
if s.closed.Load() {
249-
return
250-
}
252+
re, err := s.newWSconnWithRetry(conn.origin)
253+
if err != nil {
254+
s.closeError.CompareAndSwap(nil, fmt.Errorf("stream has no active connections, last error: %w", err))
255+
s.Close()
256+
return
257+
}
258+
conn.replace(re.conn)
259+
s.config.logInfo(
260+
"client: stream websocket %s: reconnected",
261+
conn.origin,
262+
)
263+
}
264+
}
251265

252-
// fail the stream if we are over the maxWSReconnectAttempts
253-
// and there are no other active connection
254-
if attempts >= s.config.WsMaxReconnect && s.stats.activeConnections.Load() == 0 {
255-
s.closeError.CompareAndSwap(nil, fmt.Errorf("stream has no active connections, last error: %w", err))
256-
s.Close()
257-
return
258-
}
259-
attempts++
266+
func (s *stream) newWSconnWithRetry(origin string) (conn *wsConn, err error) {
267+
// reconnect loop
268+
// will try to reconnect until client is closed or
269+
// we have no active connections and have exceeded maxWSReconnectAttempts
270+
var attempts int
271+
for {
272+
if s.closed.Load() || s.streamCtx.Err() != nil {
273+
return nil, fmt.Errorf("Retry cancelled, stream is closed")
274+
}
260275

261-
ctx, cancel = context.WithTimeout(context.Background(), defaultWSConnectTimeout)
262-
re, err = s.newWSconn(ctx, conn.origin)
263-
cancel()
276+
// fail the stream if we are over the maxWSReconnectAttempts
277+
// and there are no other active connection
278+
if attempts >= s.config.WsMaxReconnect && s.stats.activeConnections.Load() == 0 {
279+
return nil, err
280+
}
281+
attempts++
264282

265-
if err != nil {
266-
interval := time.Millisecond * time.Duration(
267-
rand.Intn(maxWSReconnectIntervalMIllis-minWSReconnectIntervalMillis)+minWSReconnectIntervalMillis) //nolint:gosec
268-
s.config.logInfo(
269-
"client: stream websocket %s: error reconnecting: %s, backing off: %s",
270-
conn.origin, err, interval.String(),
271-
)
272-
time.Sleep(interval)
273-
continue
274-
}
283+
ctx, cancel := context.WithTimeout(context.Background(), defaultWSConnectTimeout)
284+
conn, err = s.newWSconn(ctx, origin)
285+
cancel()
275286

276-
conn.replace(re.conn)
277-
if s.connStatusCallback != nil {
278-
go s.connStatusCallback(true, conn.host, conn.origin)
279-
}
287+
if err != nil {
288+
interval := time.Millisecond * time.Duration(
289+
rand.Intn(maxWSReconnectIntervalMIllis-minWSReconnectIntervalMillis)+minWSReconnectIntervalMillis) //nolint:gosec
280290
s.config.logInfo(
281-
"client: stream websocket %s: reconnected",
282-
conn.origin,
291+
"client: stream websocket %s: error reconnecting: %s, backing off: %s",
292+
origin, err, interval.String(),
283293
)
284-
break
294+
time.Sleep(interval)
295+
continue
285296
}
297+
return conn, nil
286298
}
287299
}
288300

go/stream_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,3 +801,96 @@ func TestClient_StreamHA_OneOriginDown(t *testing.T) {
801801
}
802802

803803
}
804+
805+
// Tests that when in HA mode both origins are up after a recovery period even if one origin is down on initial connection
806+
func TestClient_StreamHA_OneOriginDownRecovery(t *testing.T) {
807+
connectAttempts := &atomic.Uint64{}
808+
reconnectAttemptsBeforeRecovery := uint64(4)
809+
810+
ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
811+
if r.Method == http.MethodHead {
812+
w.Header().Add(cllAvailOriginsHeader, "{001,002}")
813+
w.WriteHeader(200)
814+
return
815+
}
816+
817+
if r.URL.Path != apiV1WS {
818+
t.Errorf("expected path %s, got %s", apiV1WS, r.URL.Path)
819+
}
820+
821+
origin := r.Header.Get(cllOriginHeader)
822+
connectAttempts.Add(1)
823+
824+
// Simulate origin 002 being down for the first reconnectAttemptsBeforeRecovery attempts
825+
// Add one to count for 001 connection
826+
if origin == "002" && connectAttempts.Load() <= reconnectAttemptsBeforeRecovery+1 {
827+
w.WriteHeader(http.StatusGatewayTimeout)
828+
return
829+
}
830+
831+
conn, err := websocket.Accept(
832+
w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover},
833+
)
834+
835+
if err != nil {
836+
t.Fatalf("error accepting connection: %s", err)
837+
}
838+
defer func() { _ = conn.CloseNow() }()
839+
840+
// Keep the connection alive for testing
841+
for {
842+
_, _, err := conn.Read(context.Background())
843+
if err != nil {
844+
break
845+
}
846+
}
847+
})
848+
defer ms.Close()
849+
850+
streamsClient, err := ms.Client()
851+
if err != nil {
852+
t.Fatalf("error creating client %s", err)
853+
}
854+
855+
cc := streamsClient.(*client)
856+
cc.config.Logger = LogPrintf
857+
cc.config.LogDebug = true
858+
cc.config.WsHA = true
859+
860+
sub, err := streamsClient.StreamWithStatusCallback(context.Background(), []feed.ID{feed1, feed2}, func(connected bool, host string, origin string) {
861+
t.Logf("status callback: connected=%v, host=%s, origin=%s", connected, host, origin)
862+
})
863+
if err != nil {
864+
t.Fatalf("error subscribing %s", err)
865+
}
866+
defer sub.Close()
867+
868+
for connectAttempts.Load() != 2 {
869+
time.Sleep(time.Millisecond)
870+
}
871+
872+
time.Sleep(time.Millisecond * 5)
873+
stats := sub.Stats()
874+
if stats.ActiveConnections != 1 {
875+
t.Errorf("expected 1 active connection before recovery, got %d", stats.ActiveConnections)
876+
}
877+
878+
if stats.ConfiguredConnections != 2 {
879+
t.Errorf("expected 2 configured connections before recovery, got %d", stats.ConfiguredConnections)
880+
}
881+
882+
// Add two to count one for 001 connection and one for 002 connection
883+
for connectAttempts.Load() != reconnectAttemptsBeforeRecovery+2 {
884+
time.Sleep(time.Millisecond)
885+
}
886+
887+
time.Sleep(time.Millisecond * 5)
888+
stats = sub.Stats()
889+
if stats.ActiveConnections != 2 {
890+
t.Errorf("expected 2 active connection after recovery, got %d", stats.ActiveConnections)
891+
}
892+
893+
if stats.ConfiguredConnections != 2 {
894+
t.Errorf("expected 2 configured connections after recovery, got %d", stats.ConfiguredConnections)
895+
}
896+
}

0 commit comments

Comments
 (0)