Skip to content

Commit abf54da

Browse files
committed
Add retry for SQLite DB operations
1 parent e2d10ad commit abf54da

File tree

4 files changed

+101
-61
lines changed

4 files changed

+101
-61
lines changed

api/pkg/di/container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (container *Container) DedicatedDB() (db *gorm.DB) {
266266

267267
sqlDB.SetMaxOpenConns(1)
268268
sqlDB.SetMaxIdleConns(0)
269-
sqlDB.SetConnMaxLifetime(-1)
269+
sqlDB.SetConnMaxLifetime(10 * time.Second)
270270

271271
if err = db.Use(tracing.NewPlugin()); err != nil {
272272
container.logger.Fatal(stacktrace.Propagate(err, "cannot use GORM tracing plugin"))

api/pkg/repositories/gorm_heartbeat_monitor_repository.go

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,30 @@ type gormHeartbeatMonitorRepository struct {
2222
db *gorm.DB
2323
}
2424

25+
// NewGormHeartbeatMonitorRepository creates the GORM version of the HeartbeatMonitorRepository
26+
func NewGormHeartbeatMonitorRepository(
27+
logger telemetry.Logger,
28+
tracer telemetry.Tracer,
29+
db *gorm.DB,
30+
) HeartbeatMonitorRepository {
31+
return &gormHeartbeatMonitorRepository{
32+
logger: logger.WithService(fmt.Sprintf("%T", &gormHeartbeatRepository{})),
33+
tracer: tracer,
34+
db: db,
35+
}
36+
}
37+
2538
func (repository *gormHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
2639
ctx, span := repository.tracer.Start(ctx)
2740
defer span.End()
2841

29-
if err := repository.db.WithContext(ctx).Where("user_id = ?", userID).Delete(&entities.HeartbeatMonitor{}).Error; err != nil {
30-
msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID)
31-
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
32-
}
33-
34-
return nil
42+
return executeWithRetry(func() error {
43+
if err := repository.db.WithContext(ctx).Where("user_id = ?", userID).Delete(&entities.HeartbeatMonitor{}).Error; err != nil {
44+
msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID)
45+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
46+
}
47+
return nil
48+
})
3549
}
3650

3751
// UpdatePhoneOnline updates the online status of a phone
@@ -42,14 +56,16 @@ func (repository *gormHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.
4256
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
4357
defer cancel()
4458

45-
err := repository.db.
46-
Model(&entities.HeartbeatMonitor{}).
47-
Where("id = ?", monitorID).
48-
Where("user_id = ?", userID).
49-
Updates(map[string]any{
50-
"phone_online": isOnline,
51-
"updated_at": time.Now().UTC(),
52-
}).Error
59+
err := executeWithRetry(func() error {
60+
return repository.db.
61+
Model(&entities.HeartbeatMonitor{}).
62+
Where("id = ?", monitorID).
63+
Where("user_id = ?", userID).
64+
Updates(map[string]any{
65+
"phone_online": isOnline,
66+
"updated_at": time.Now().UTC(),
67+
}).Error
68+
})
5369
if err != nil {
5470
msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID)
5571
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -65,13 +81,15 @@ func (repository *gormHeartbeatMonitorRepository) UpdateQueueID(ctx context.Cont
6581
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
6682
defer cancel()
6783

68-
err := repository.db.
69-
Model(&entities.HeartbeatMonitor{}).
70-
Where("id = ?", monitorID).
71-
Updates(map[string]any{
72-
"queue_id": queueID,
73-
"updated_at": time.Now().UTC(),
74-
}).Error
84+
err := executeWithRetry(func() error {
85+
return repository.db.
86+
Model(&entities.HeartbeatMonitor{}).
87+
Where("id = ?", monitorID).
88+
Updates(map[string]any{
89+
"queue_id": queueID,
90+
"updated_at": time.Now().UTC(),
91+
}).Error
92+
})
7593
if err != nil {
7694
msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID)
7795
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -86,10 +104,12 @@ func (repository *gormHeartbeatMonitorRepository) Delete(ctx context.Context, us
86104
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
87105
defer cancel()
88106

89-
err := repository.db.WithContext(ctx).
90-
Where("user_id = ?", userID).
91-
Where("owner = ?", owner).
92-
Delete(&entities.HeartbeatMonitor{}).Error
107+
err := executeWithRetry(func() error {
108+
return repository.db.WithContext(ctx).
109+
Where("user_id = ?", userID).
110+
Where("owner = ?", owner).
111+
Delete(&entities.HeartbeatMonitor{}).Error
112+
})
93113
if err != nil {
94114
msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", owner, userID)
95115
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -98,19 +118,6 @@ func (repository *gormHeartbeatMonitorRepository) Delete(ctx context.Context, us
98118
return nil
99119
}
100120

101-
// NewGormHeartbeatMonitorRepository creates the GORM version of the HeartbeatMonitorRepository
102-
func NewGormHeartbeatMonitorRepository(
103-
logger telemetry.Logger,
104-
tracer telemetry.Tracer,
105-
db *gorm.DB,
106-
) HeartbeatMonitorRepository {
107-
return &gormHeartbeatMonitorRepository{
108-
logger: logger.WithService(fmt.Sprintf("%T", &gormHeartbeatRepository{})),
109-
tracer: tracer,
110-
db: db,
111-
}
112-
}
113-
114121
// Index entities.Message between 2 parties
115122
func (repository *gormHeartbeatMonitorRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) {
116123
ctx, span := repository.tracer.Start(ctx)
@@ -121,7 +128,9 @@ func (repository *gormHeartbeatMonitorRepository) Index(ctx context.Context, use
121128

122129
query := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("owner = ?", owner)
123130
heartbeats := new([]entities.Heartbeat)
124-
if err := query.Order("timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&heartbeats).Error; err != nil {
131+
if err := executeWithRetry(func() error {
132+
return query.Order("timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&heartbeats).Error
133+
}); err != nil {
125134
msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params)
126135
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
127136
}
@@ -137,7 +146,7 @@ func (repository *gormHeartbeatMonitorRepository) Store(ctx context.Context, hea
137146
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
138147
defer cancel()
139148

140-
if err := repository.db.WithContext(ctx).Create(heartbeatMonitor).Error; err != nil {
149+
if err := executeWithRetry(func() error { return repository.db.WithContext(ctx).Create(heartbeatMonitor).Error }); err != nil {
141150
msg := fmt.Sprintf("cannot save heartbeatMonitor monitor with ID [%s]", heartbeatMonitor.ID)
142151
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
143152
}
@@ -154,11 +163,12 @@ func (repository *gormHeartbeatMonitorRepository) Load(ctx context.Context, user
154163
defer cancel()
155164

156165
phone := new(entities.HeartbeatMonitor)
157-
err := repository.db.WithContext(ctx).
158-
Where("user_id = ?", userID).
159-
Where("owner = ?", owner).
160-
First(&phone).Error
161-
166+
err := executeWithRetry(func() error {
167+
return repository.db.WithContext(ctx).
168+
Where("user_id = ?", userID).
169+
Where("owner = ?", owner).
170+
First(&phone).Error
171+
})
162172
if errors.Is(err, gorm.ErrRecordNotFound) {
163173
msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, owner)
164174
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
@@ -181,14 +191,16 @@ func (repository *gormHeartbeatMonitorRepository) Exists(ctx context.Context, us
181191
defer cancel()
182192

183193
var exists bool
184-
err := repository.db.WithContext(ctx).
185-
Model(&entities.HeartbeatMonitor{}).
186-
Select("count(*) > 0").
187-
Where("user_id = ?", userID).
188-
Where("id = ?", monitorID).
189-
Find(&exists).Error
194+
err := executeWithRetry(func() error {
195+
return repository.db.WithContext(ctx).
196+
Model(&entities.HeartbeatMonitor{}).
197+
Select("count(*) > 0").
198+
Where("user_id = ?", userID).
199+
Where("id = ?", monitorID).
200+
Find(&exists).Error
201+
})
190202
if err != nil {
191-
msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and montiorID [%s]", userID, monitorID)
203+
msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and montior ID [%s]", userID, monitorID)
192204
return exists, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
193205
}
194206

api/pkg/repositories/gorm_heartbeat_repository.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ func (repository *gormHeartbeatRepository) DeleteAllForUser(ctx context.Context,
3636
ctx, span := repository.tracer.Start(ctx)
3737
defer span.End()
3838

39-
if err := repository.db.WithContext(ctx).Where("user_id = ?", userID).Delete(&entities.Heartbeat{}).Error; err != nil {
39+
err := executeWithRetry(func() error {
40+
return repository.db.WithContext(ctx).Where("user_id = ?", userID).Delete(&entities.Heartbeat{}).Error
41+
})
42+
if err != nil {
4043
msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID)
4144
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
4245
}
@@ -52,11 +55,13 @@ func (repository *gormHeartbeatRepository) Last(ctx context.Context, userID enti
5255
defer cancel()
5356

5457
heartbeat := new(entities.Heartbeat)
55-
err := repository.db.WithContext(ctx).
56-
Where("user_id = ?", userID).
57-
Where("owner = ?", owner).
58-
Order("timestamp DESC").
59-
First(&heartbeat).Error
58+
err := executeWithRetry(func() error {
59+
return repository.db.WithContext(ctx).
60+
Where("user_id = ?", userID).
61+
Where("owner = ?", owner).
62+
Order("timestamp DESC").
63+
First(&heartbeat).Error
64+
})
6065
if errors.Is(err, gorm.ErrRecordNotFound) {
6166
msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner)
6267
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
@@ -85,7 +90,10 @@ func (repository *gormHeartbeatRepository) Index(ctx context.Context, userID ent
8590
}
8691

8792
heartbeats := new([]entities.Heartbeat)
88-
if err := query.Order("timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&heartbeats).Error; err != nil {
93+
err := executeWithRetry(func() error {
94+
return query.Order("timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&heartbeats).Error
95+
})
96+
if err != nil {
8997
msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params)
9098
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
9199
}
@@ -101,7 +109,7 @@ func (repository *gormHeartbeatRepository) Store(ctx context.Context, heartbeat
101109
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
102110
defer cancel()
103111

104-
if err := repository.db.WithContext(ctx).Create(heartbeat).Error; err != nil {
112+
if err := executeWithRetry(func() error { return repository.db.WithContext(ctx).Create(heartbeat).Error }); err != nil {
105113
msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID)
106114
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
107115
}

api/pkg/repositories/repository.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package repositories
22

33
import (
4+
"strings"
45
"time"
56

7+
"github.com/avast/retry-go"
68
"github.com/palantir/stacktrace"
79
)
810

@@ -21,3 +23,21 @@ const (
2123

2224
dbOperationDuration = 5 * time.Second
2325
)
26+
27+
// isRetryableError checks if the error is a retryable connection error
28+
func isRetryableError(err error) bool {
29+
msg := err.Error()
30+
return strings.Contains(msg, "bad connection") ||
31+
strings.Contains(msg, "stream is closed") ||
32+
strings.Contains(msg, "driver: bad connection")
33+
}
34+
35+
// executeWithRetry executes a GORM query with retry logic for transient connection errors
36+
func executeWithRetry(fn func() error) (err error) {
37+
return retry.Do(
38+
fn,
39+
retry.Attempts(3),
40+
retry.Delay(100*time.Millisecond),
41+
retry.RetryIf(isRetryableError),
42+
)
43+
}

0 commit comments

Comments
 (0)