Skip to content

Commit 86f7ecb

Browse files
committed
adds the timeout settings to the metrics
1 parent 6bccb60 commit 86f7ecb

4 files changed

Lines changed: 24 additions & 14 deletions

File tree

cmd/reader/main.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,29 @@ func main() {
6060
logger.Info("starting service", zap.Int("readers", *numReaders), zap.Duration("delay", *readDelay), zap.Int("start", *start))
6161
// goal is to spam reading and client connections
6262

63+
// Timeout configuration
64+
connWriteTimeout := 10 * time.Second
6365
kvRocksLiteReadTimeout := 1500 * time.Millisecond // context timeout
6466

6567
// Create and increment initialization counter with configuration metrics
6668
// Convert delay to milliseconds for readability
6769
delayMs := int64(*readDelay / time.Millisecond)
70+
connWriteTimeoutMs := int64(connWriteTimeout / time.Millisecond)
71+
hGetAllTimeoutMs := int64(kvRocksLiteReadTimeout / time.Millisecond)
6872
initCounter := metrics.GetOrCreateCounter(fmt.Sprintf(
69-
`kvrocks_reader_initialized_total{readers="%d",delay_ms="%d",start_index="%d"}`,
70-
*numReaders, delayMs, *start,
73+
`kvrocks_reader_initialized_total{readers="%d",delay_ms="%d",start_index="%d",conn_write_timeout_ms="%d",hgetall_timeout_ms="%d"}`,
74+
*numReaders, delayMs, *start, connWriteTimeoutMs, hGetAllTimeoutMs,
7175
))
7276
initCounter.Inc()
7377

7478
for i := 0; i < *numReaders; i++ {
7579
wg.Add(1)
76-
go func(id int, sleep time.Duration, startIndex int) {
80+
go func(id int, sleep time.Duration, startIndex int, connTimeout time.Duration, readTimeout time.Duration) {
7781
defer wg.Done()
7882
client, err := rueidis.NewClient(
7983
rueidis.ClientOption{
8084
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
81-
ConnWriteTimeout: 10 * time.Second, // explicitly set to the rueidis default; otherwise, it would be computed from Dialer.KeepAlive - e.g 60s * 10
85+
ConnWriteTimeout: connTimeout, // explicitly set to the rueidis default; otherwise, it would be computed from Dialer.KeepAlive - e.g 60s * 10
8286
ShuffleInit: true,
8387
Dialer: net.Dialer{KeepAlive: time.Second * 60}, // To decrease the pings
8488
DisableCache: true, // client cache is not enabled on kvrocks
@@ -110,7 +114,7 @@ func main() {
110114
}
111115
// Convert integer i to alphabet-only key before passing to hGetAll
112116
alphabetKey := intToAlphabetKey(int64(i))
113-
_, err := hGetAll(ctx, kvRocksLiteReadTimeout, client, alphabetKey)
117+
_, err := hGetAll(ctx, readTimeout, client, alphabetKey)
114118
if err != nil {
115119
// Check if error is due to context cancellation
116120
if ctx.Err() != nil {
@@ -123,7 +127,7 @@ func main() {
123127
logger.Info("reading", zap.Int("keyIndex", i), zap.String("key", alphabetKey))
124128
}
125129
}
126-
}(i, *readDelay, *start)
130+
}(i, *readDelay, *start, connWriteTimeout, kvRocksLiteReadTimeout)
127131
}
128132

129133
logger.Info("service running, waiting for shutdown signal")

cmd/reader/reader

18.5 KB
Binary file not shown.

cmd/writer/main.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func main() {
3838
logger.Get().Warn("invalid number of writers, using default", zap.Int("default", numOfWriters))
3939
}
4040

41+
// Timeout configuration
42+
connWriteTimeout := 10 * time.Second
43+
hSetExpireTimeout := 1 * time.Second
44+
4145
metricsWg := sync.WaitGroup{}
4246
opts := &metrics.PushOptions{
4347
WaitGroup: &metricsWg,
@@ -53,7 +57,7 @@ func main() {
5357
writers := make([]*Writer, numOfWriters)
5458
for i := 0; i < numOfWriters; i++ {
5559
logger.Get().Info("creating writers", zap.Int("num", i))
56-
writer, err := NewWriter()
60+
writer, err := NewWriter(connWriteTimeout)
5761
if err != nil {
5862
logger.Get().Error("unable to get rueidis client", zap.Error(err))
5963
return
@@ -81,9 +85,11 @@ func main() {
8185
// Create and increment initialization counter with configuration metrics
8286
// Convert delay to milliseconds for readability
8387
delayMs := int64(*writeDelay / time.Millisecond)
88+
connWriteTimeoutMs := int64(connWriteTimeout / time.Millisecond)
89+
hSetExpireTimeoutMs := int64(hSetExpireTimeout / time.Millisecond)
8490
initCounter := metrics.GetOrCreateCounter(fmt.Sprintf(
85-
`kvrocks_writer_initialized_total{writers="%d",delay_ms="%d",payload_size_bytes="%d",total_size_per_key_bytes="%d"}`,
86-
numOfWriters, delayMs, payloadSize, totalSizePerKey,
91+
`kvrocks_writer_initialized_total{writers="%d",delay_ms="%d",payload_size_bytes="%d",total_size_per_key_bytes="%d",conn_write_timeout_ms="%d",hsetexpire_timeout_ms="%d"}`,
92+
numOfWriters, delayMs, payloadSize, totalSizePerKey, connWriteTimeoutMs, hSetExpireTimeoutMs,
8793
))
8894
initCounter.Inc()
8995

@@ -92,7 +98,7 @@ func main() {
9298
logger.Get().Info("starting writers", zap.Int("start_index", *start))
9399
for i, writer := range writers {
94100
wg.Add(1)
95-
go writer.Start(ctx, &wg, data, cols, *writeDelay, int64(*start), i, numOfWriters)
101+
go writer.Start(ctx, &wg, data, cols, *writeDelay, int64(*start), i, numOfWriters, hSetExpireTimeout)
96102
}
97103

98104
logger.Get().Info("service running, waiting for shutdown signal")
@@ -125,12 +131,12 @@ type Writer struct {
125131
client rueidis.Client
126132
}
127133

128-
func NewWriter() (*Writer, error) {
134+
func NewWriter(connWriteTimeout time.Duration) (*Writer, error) {
129135
client, err := rueidis.NewClient(
130136
rueidis.ClientOption{
131137
InitAddress: []string{"kvrocks-byron-test.us-east-1.stackadapt:6379"},
132138
ShuffleInit: true,
133-
ConnWriteTimeout: 10 * time.Second,
139+
ConnWriteTimeout: connWriteTimeout,
134140
DisableCache: true, // client cache is not enabled on kvrocks
135141
PipelineMultiplex: 5,
136142
MaxFlushDelay: 50 * time.Microsecond,
@@ -165,7 +171,7 @@ func intToAlphabetKey(n int64) string {
165171
return string(result)
166172
}
167173

168-
func (w *Writer) Start(ctx context.Context, wg *sync.WaitGroup, data map[string][]byte, cols []string, sleep time.Duration, startIndex int64, writerIndex int, numWriters int) {
174+
func (w *Writer) Start(ctx context.Context, wg *sync.WaitGroup, data map[string][]byte, cols []string, sleep time.Duration, startIndex int64, writerIndex int, numWriters int, hSetExpireTimeout time.Duration) {
169175
defer wg.Done()
170176
// Each writer starts at startIndex + writerIndex, then increments by numWriters
171177
// This ensures no two writers write to the same key
@@ -182,7 +188,7 @@ func (w *Writer) Start(ctx context.Context, wg *sync.WaitGroup, data map[string]
182188

183189
// Convert integer keyIndex to alphabet-only key before passing to hSetExpire
184190
alphabetKey := intToAlphabetKey(keyIndex)
185-
err := hSetExpire(ctx, time.Millisecond*1000, w.client, alphabetKey, cols, data, time.Hour*24*7)
191+
err := hSetExpire(ctx, hSetExpireTimeout, w.client, alphabetKey, cols, data, time.Hour*24*7)
186192
if err != nil {
187193
// Check if error is due to context cancellation
188194
if ctx.Err() != nil {

cmd/writer/writer

1.37 MB
Binary file not shown.

0 commit comments

Comments
 (0)