From 3e79fc21acbcc88a2627e6f267bef28445ac39c8 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Thu, 29 Jan 2026 14:42:58 +0800 Subject: [PATCH 1/3] [gtid][hotfix] xsync continue error when xsync continue point adjust to psync from (server.repl_mode->from < server.repl_backlog_off) --- deps/xredis-gtid | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/xredis-gtid b/deps/xredis-gtid index c450ba710b5..52841a2297b 160000 --- a/deps/xredis-gtid +++ b/deps/xredis-gtid @@ -1 +1 @@ -Subproject commit c450ba710b56145e14c9952f28ad436049a93738 +Subproject commit 52841a2297b0efae5f0175a4dbd7c4b9539d3801 From b5687e77c02decd6bc9589fb19b4aa9112dde633 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Tue, 10 Feb 2026 18:29:19 +0800 Subject: [PATCH 2/3] [feature] set rocksdb db log dir --- src/ctrip_swap_rocks.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ctrip_swap_rocks.c b/src/ctrip_swap_rocks.c index f3237bacece..89637aecb3b 100644 --- a/src/ctrip_swap_rocks.c +++ b/src/ctrip_swap_rocks.c @@ -86,7 +86,7 @@ void serverRocksUnlock(rocks *rocks) { } static int rocksOpen(rocks *rocks) { - char *errs[3] = {NULL}, dir[ROCKS_DIR_MAX_LEN], *err = NULL, longlong_str[20]; + char *errs[3] = {NULL}, dir[ROCKS_DIR_MAX_LEN], log_dir[ROCKS_DIR_MAX_LEN], *err = NULL, longlong_str[20]; rocksdb_block_based_table_options_t *block_opts = NULL; serverAssert(rocks->db_opts == NULL); @@ -255,6 +255,8 @@ static int rocksOpen(rocks *rocks) { rocksdb_options_set_compaction_filter_factory(rocks->cf_opts[META_CF], NULL); snprintf(dir, ROCKS_DIR_MAX_LEN, "%s/%d", ROCKS_DATA, rocks->rocksdb_epoch); + snprintf(log_dir, ROCKS_DIR_MAX_LEN, "%s/logs", ROCKS_DATA); + rocksdb_options_set_db_log_dir(rocks->db_opts, log_dir); rocks->db = rocksdb_open_column_families(rocks->db_opts, dir, CF_COUNT, swap_cf_names, (const rocksdb_options_t *const *)rocks->cf_opts, rocks->cf_handles, errs); From a385573b65980eab6d56c889bef0b9fea54f12d3 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Tue, 24 Mar 2026 20:51:35 +0800 Subject: [PATCH 3/3] [feature] add meta rio stats --- src/ctrip_swap.h | 2 +- src/ctrip_swap_rio.c | 15 ++++--- src/ctrip_swap_stat.c | 91 ++++++++++++++++++++++++------------------- src/server.h | 2 +- 4 files changed, 60 insertions(+), 50 deletions(-) diff --git a/src/ctrip_swap.h b/src/ctrip_swap.h index cb12590884b..7daaab5ccef 100644 --- a/src/ctrip_swap.h +++ b/src/ctrip_swap.h @@ -2089,7 +2089,7 @@ void dictObjectDestructor(void *privdata, void *val); #define SWAP_RATELIMIT_STATS_METRIC_COUNT 2 #define SWAP_SWAP_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*SWAP_TYPES) -#define SWAP_RIO_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*ROCKS_TYPES) +#define SWAP_RIO_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*ROCKS_TYPES*CF_COUNT) #define SWAP_COMPACTION_FILTER_STATS_METRIC_COUNT (COMPACTION_FILTER_METRIC_SIZE*CF_COUNT) #define SWAP_DEBUG_STATS_METRIC_COUNT (SWAP_DEBUG_SIZE*SWAP_DEBUG_INFO_TYPE) #define SWAP_LOCK_STATS_METRIC_COUNT (SWAP_LOCK_METRIC_SIZE*REQUEST_LEVEL_TYPES) diff --git a/src/ctrip_swap_rio.c b/src/ctrip_swap_rio.c index 336fac70bb3..03bdfa18139 100644 --- a/src/ctrip_swap_rio.c +++ b/src/ctrip_swap_rio.c @@ -470,10 +470,8 @@ void RIODo(RIO *rio) { #endif end: - if (RIOGetCF(rio) != META_CF) { - RIOUpdateStatsDo(rio, elapsedUs(io_timer)); - RIOUpdateStatsDataNotFound(rio); - } + RIOUpdateStatsDo(rio, elapsedUs(io_timer)); + RIOUpdateStatsDataNotFound(rio); } size_t RIOEstimatePayloadSize(RIO *rio) { @@ -513,11 +511,12 @@ size_t RIOEstimatePayloadSize(RIO *rio) { void RIOUpdateStatsDo(RIO *rio, long duration) { int action = rio->action; + int cf = RIOGetCF(rio); size_t payload_size = RIOEstimatePayloadSize(rio); - atomicIncr(server.ror_stats->rio_stats[action].memory,payload_size); - atomicIncr(server.ror_stats->rio_stats[action].count,1); - atomicIncr(server.ror_stats->rio_stats[action].batch,1); - atomicIncr(server.ror_stats->rio_stats[action].time,duration); + atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].memory,payload_size); + atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].count,1); + atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].batch,1); + atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].time,duration); } void RIOUpdateStatsDataNotFound(RIO *rio) { diff --git a/src/ctrip_swap_stat.c b/src/ctrip_swap_stat.c index b20dfbd8e33..92758459ba6 100644 --- a/src/ctrip_swap_stat.c +++ b/src/ctrip_swap_stat.c @@ -39,7 +39,7 @@ /* swap stats */ void initStatsSwap() { - int i, metric_offset; + int i, j, metric_offset; server.ror_stats = zmalloc(sizeof(rorStat)); server.ror_stats->swap_stats = zmalloc(SWAP_TYPES*sizeof(swapStat)); for (i = 0; i < SWAP_TYPES; i++) { @@ -54,19 +54,22 @@ void initStatsSwap() { server.ror_stats->swap_stats[i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY; server.ror_stats->swap_stats[i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME; } - server.ror_stats->rio_stats = zmalloc(ROCKS_TYPES*sizeof(swapStat)); - for (i = 0; i < ROCKS_TYPES; i++) { - metric_offset = SWAP_RIO_STATS_METRIC_OFFSET + i*SWAP_STAT_METRIC_SIZE; - server.ror_stats->rio_stats[i].name = rocksActionName(i); - server.ror_stats->rio_stats[i].batch = 0; - server.ror_stats->rio_stats[i].count = 0; - server.ror_stats->rio_stats[i].memory = 0; - server.ror_stats->rio_stats[i].time = 0; - server.ror_stats->rio_stats[i].stats_metric_idx_batch = metric_offset+SWAP_STAT_METRIC_BATCH; - server.ror_stats->rio_stats[i].stats_metric_idx_count = metric_offset+SWAP_STAT_METRIC_COUNT; - server.ror_stats->rio_stats[i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY; - server.ror_stats->rio_stats[i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME; + server.ror_stats->rio_stats = zmalloc(ROCKS_TYPES*sizeof(swapStat)*CF_COUNT); + for (j = 0; j < CF_COUNT; j++) { + for (i = 0; i < ROCKS_TYPES; i++) { + metric_offset = SWAP_RIO_STATS_METRIC_OFFSET + i*SWAP_STAT_METRIC_SIZE + j*ROCKS_TYPES*SWAP_STAT_METRIC_SIZE; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].name = rocksActionName(i); + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].batch = 0; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].count = 0; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].memory = 0; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].time = 0; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_batch = metric_offset+SWAP_STAT_METRIC_BATCH; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_count = metric_offset+SWAP_STAT_METRIC_COUNT; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY; + server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME; + } } + server.ror_stats->compaction_filter_stats = zmalloc(sizeof(compactionFilterStat) * CF_COUNT); for (i = 0; i < CF_COUNT; i++) { metric_offset = SWAP_COMPACTION_FILTER_STATS_METRIC_OFFSET + i * COMPACTION_FILTER_METRIC_SIZE; @@ -295,7 +298,7 @@ sds genRedisThreadCpuUsageInfoString(sds info, swapThreadCpuUsage *cpu_usage){ #endif void trackSwapInstantaneousMetrics() { - int i; + int i, j; swapStat *s; size_t count, batch, memory, time; for (i = 1; i < SWAP_TYPES; i++) { @@ -309,18 +312,22 @@ void trackSwapInstantaneousMetrics() { trackInstantaneousMetric(s->stats_metric_idx_memory,memory); trackInstantaneousMetric(s->stats_metric_idx_time,time); } - for (i = 1; i < ROCKS_TYPES; i++) { - s = server.ror_stats->rio_stats + i; - atomicGet(s->batch,batch); - atomicGet(s->count,count); - atomicGet(s->memory,memory); - atomicGet(s->time,time); - trackInstantaneousMetric(s->stats_metric_idx_batch,batch); - trackInstantaneousMetric(s->stats_metric_idx_count,count); - trackInstantaneousMetric(s->stats_metric_idx_memory,memory); - trackInstantaneousMetric(s->stats_metric_idx_time,time); - } long long filt_count, scan_count, rio_count; + for (j = 0; j < CF_COUNT; j++) { + for (i = 1; i < ROCKS_TYPES; i++) { + s = server.ror_stats->rio_stats + (j* ROCKS_TYPES + i); + atomicGet(s->batch,batch); + atomicGet(s->count,count); + atomicGet(s->memory,memory); + atomicGet(s->time,time); + trackInstantaneousMetric(s->stats_metric_idx_batch,batch); + trackInstantaneousMetric(s->stats_metric_idx_count,count); + trackInstantaneousMetric(s->stats_metric_idx_memory,memory); + trackInstantaneousMetric(s->stats_metric_idx_time,time); + } + } + + compactionFilterStat* cfs; for (i = 0; i < CF_COUNT; i++) { cfs = server.ror_stats->compaction_filter_stats + i; @@ -368,7 +375,7 @@ sds genSwapInfoString(sds info) { } sds genSwapExecInfoString(sds info) { - int j; + int i, j; long long ops, batch_ps, total_latency; size_t count, batch, memory; @@ -402,21 +409,25 @@ sds genSwapExecInfoString(sds info) { ops > 0 ? total_latency/ops : 0); } - for (j = 1; j < ROCKS_TYPES; j++) { - swapStat *s = &server.ror_stats->rio_stats[j]; - atomicGet(s->batch,batch); - atomicGet(s->count,count); - atomicGet(s->memory,memory); - batch_ps = getInstantaneousMetric(s->stats_metric_idx_batch); - ops = getInstantaneousMetric(s->stats_metric_idx_count); - total_latency = getInstantaneousMetric(s->stats_metric_idx_time); - info = sdscatprintf(info, - "swap_rio_%s:batch=%ld,count=%ld,memory=%ld,batch_ps=%lld,ops=%lld,bps=%lld,latency_pb=%lld,latency_po=%lld\r\n", - s->name,batch,count,memory,batch_ps,ops, - getInstantaneousMetric(s->stats_metric_idx_memory), - batch_ps > 0 ? total_latency/batch_ps : 0, - ops > 0 ? total_latency/ops : 0); + for ( i = 0; i < CF_COUNT; i++) { + for (j = 1; j < ROCKS_TYPES; j++) { + swapStat *s = &server.ror_stats->rio_stats[i* ROCKS_TYPES + j]; + atomicGet(s->batch,batch); + atomicGet(s->count,count); + atomicGet(s->memory,memory); + batch_ps = getInstantaneousMetric(s->stats_metric_idx_batch); + ops = getInstantaneousMetric(s->stats_metric_idx_count); + total_latency = getInstantaneousMetric(s->stats_metric_idx_time); + info = sdscatprintf(info, + "swap_rio_%s_%s:batch=%ld,count=%ld,memory=%ld,batch_ps=%lld,ops=%lld,bps=%lld,latency_pb=%lld,latency_po=%lld\r\n", + swap_cf_names[i], + s->name,batch,count,memory,batch_ps,ops, + getInstantaneousMetric(s->stats_metric_idx_memory), + batch_ps > 0 ? total_latency/batch_ps : 0, + ops > 0 ? total_latency/ops : 0); + } } + for (j = 0; j < CF_COUNT; j++) { compactionFilterStat *cfs = &server.ror_stats->compaction_filter_stats[j]; diff --git a/src/server.h b/src/server.h index 3c74272f8d6..0548bd51f13 100644 --- a/src/server.h +++ b/src/server.h @@ -154,7 +154,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define STATS_METRIC_MODIFIED_KEYS 3 /* Number of modified keys. */ #ifdef ENABLE_SWAP #define STATS_METRIC_COUNT_MEM 4 -#define STATS_METRIC_COUNT_SWAP 77 /* define directly here to avoid dependcy cycle, will be checked later. */ +#define STATS_METRIC_COUNT_SWAP 117 /* define directly here to avoid dependcy cycle, will be checked later. */ #define STATS_METRIC_COUNT (STATS_METRIC_COUNT_SWAP + STATS_METRIC_COUNT_MEM) #else #define STATS_METRIC_COUNT 4