Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/xredis-gtid
2 changes: 1 addition & 1 deletion src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -2089,7 +2089,7 @@ void dictObjectDestructor(void *privdata, void *val);
#define SWAP_RATELIMIT_STATS_METRIC_COUNT 2

#define SWAP_SWAP_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*SWAP_TYPES)
#define SWAP_RIO_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*ROCKS_TYPES)
#define SWAP_RIO_STATS_METRIC_COUNT (SWAP_STAT_METRIC_SIZE*ROCKS_TYPES*CF_COUNT)
#define SWAP_COMPACTION_FILTER_STATS_METRIC_COUNT (COMPACTION_FILTER_METRIC_SIZE*CF_COUNT)
#define SWAP_DEBUG_STATS_METRIC_COUNT (SWAP_DEBUG_SIZE*SWAP_DEBUG_INFO_TYPE)
#define SWAP_LOCK_STATS_METRIC_COUNT (SWAP_LOCK_METRIC_SIZE*REQUEST_LEVEL_TYPES)
Expand Down
15 changes: 7 additions & 8 deletions src/ctrip_swap_rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,8 @@ void RIODo(RIO *rio) {
#endif

end:
if (RIOGetCF(rio) != META_CF) {
RIOUpdateStatsDo(rio, elapsedUs(io_timer));
RIOUpdateStatsDataNotFound(rio);
}
RIOUpdateStatsDo(rio, elapsedUs(io_timer));
RIOUpdateStatsDataNotFound(rio);
}

size_t RIOEstimatePayloadSize(RIO *rio) {
Expand Down Expand Up @@ -513,11 +511,12 @@ size_t RIOEstimatePayloadSize(RIO *rio) {

void RIOUpdateStatsDo(RIO *rio, long duration) {
int action = rio->action;
int cf = RIOGetCF(rio);
size_t payload_size = RIOEstimatePayloadSize(rio);
atomicIncr(server.ror_stats->rio_stats[action].memory,payload_size);
atomicIncr(server.ror_stats->rio_stats[action].count,1);
atomicIncr(server.ror_stats->rio_stats[action].batch,1);
atomicIncr(server.ror_stats->rio_stats[action].time,duration);
atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].memory,payload_size);
atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].count,1);
atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].batch,1);
atomicIncr(server.ror_stats->rio_stats[cf*ROCKS_TYPES + action].time,duration);
}

void RIOUpdateStatsDataNotFound(RIO *rio) {
Expand Down
4 changes: 3 additions & 1 deletion src/ctrip_swap_rocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void serverRocksUnlock(rocks *rocks) {
}

static int rocksOpen(rocks *rocks) {
char *errs[3] = {NULL}, dir[ROCKS_DIR_MAX_LEN], *err = NULL, longlong_str[20];
char *errs[3] = {NULL}, dir[ROCKS_DIR_MAX_LEN], log_dir[ROCKS_DIR_MAX_LEN], *err = NULL, longlong_str[20];
rocksdb_block_based_table_options_t *block_opts = NULL;

serverAssert(rocks->db_opts == NULL);
Expand Down Expand Up @@ -255,6 +255,8 @@ static int rocksOpen(rocks *rocks) {
rocksdb_options_set_compaction_filter_factory(rocks->cf_opts[META_CF], NULL);

snprintf(dir, ROCKS_DIR_MAX_LEN, "%s/%d", ROCKS_DATA, rocks->rocksdb_epoch);
snprintf(log_dir, ROCKS_DIR_MAX_LEN, "%s/logs", ROCKS_DATA);
rocksdb_options_set_db_log_dir(rocks->db_opts, log_dir);
rocks->db = rocksdb_open_column_families(rocks->db_opts, dir, CF_COUNT,
swap_cf_names, (const rocksdb_options_t *const *)rocks->cf_opts,
rocks->cf_handles, errs);
Expand Down
91 changes: 51 additions & 40 deletions src/ctrip_swap_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

/* swap stats */
void initStatsSwap() {
int i, metric_offset;
int i, j, metric_offset;
server.ror_stats = zmalloc(sizeof(rorStat));
server.ror_stats->swap_stats = zmalloc(SWAP_TYPES*sizeof(swapStat));
for (i = 0; i < SWAP_TYPES; i++) {
Expand All @@ -54,19 +54,22 @@ void initStatsSwap() {
server.ror_stats->swap_stats[i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY;
server.ror_stats->swap_stats[i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME;
}
server.ror_stats->rio_stats = zmalloc(ROCKS_TYPES*sizeof(swapStat));
for (i = 0; i < ROCKS_TYPES; i++) {
metric_offset = SWAP_RIO_STATS_METRIC_OFFSET + i*SWAP_STAT_METRIC_SIZE;
server.ror_stats->rio_stats[i].name = rocksActionName(i);
server.ror_stats->rio_stats[i].batch = 0;
server.ror_stats->rio_stats[i].count = 0;
server.ror_stats->rio_stats[i].memory = 0;
server.ror_stats->rio_stats[i].time = 0;
server.ror_stats->rio_stats[i].stats_metric_idx_batch = metric_offset+SWAP_STAT_METRIC_BATCH;
server.ror_stats->rio_stats[i].stats_metric_idx_count = metric_offset+SWAP_STAT_METRIC_COUNT;
server.ror_stats->rio_stats[i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY;
server.ror_stats->rio_stats[i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME;
server.ror_stats->rio_stats = zmalloc(ROCKS_TYPES*sizeof(swapStat)*CF_COUNT);
for (j = 0; j < CF_COUNT; j++) {
for (i = 0; i < ROCKS_TYPES; i++) {
metric_offset = SWAP_RIO_STATS_METRIC_OFFSET + i*SWAP_STAT_METRIC_SIZE + j*ROCKS_TYPES*SWAP_STAT_METRIC_SIZE;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].name = rocksActionName(i);
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].batch = 0;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].count = 0;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].memory = 0;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].time = 0;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_batch = metric_offset+SWAP_STAT_METRIC_BATCH;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_count = metric_offset+SWAP_STAT_METRIC_COUNT;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_memory = metric_offset+SWAP_STAT_METRIC_MEMORY;
server.ror_stats->rio_stats[j*ROCKS_TYPES+i].stats_metric_idx_time = metric_offset+SWAP_STAT_METRIC_TIME;
}
}

server.ror_stats->compaction_filter_stats = zmalloc(sizeof(compactionFilterStat) * CF_COUNT);
for (i = 0; i < CF_COUNT; i++) {
metric_offset = SWAP_COMPACTION_FILTER_STATS_METRIC_OFFSET + i * COMPACTION_FILTER_METRIC_SIZE;
Expand Down Expand Up @@ -295,7 +298,7 @@ sds genRedisThreadCpuUsageInfoString(sds info, swapThreadCpuUsage *cpu_usage){
#endif

void trackSwapInstantaneousMetrics() {
int i;
int i, j;
swapStat *s;
size_t count, batch, memory, time;
for (i = 1; i < SWAP_TYPES; i++) {
Expand All @@ -309,18 +312,22 @@ void trackSwapInstantaneousMetrics() {
trackInstantaneousMetric(s->stats_metric_idx_memory,memory);
trackInstantaneousMetric(s->stats_metric_idx_time,time);
}
for (i = 1; i < ROCKS_TYPES; i++) {
s = server.ror_stats->rio_stats + i;
atomicGet(s->batch,batch);
atomicGet(s->count,count);
atomicGet(s->memory,memory);
atomicGet(s->time,time);
trackInstantaneousMetric(s->stats_metric_idx_batch,batch);
trackInstantaneousMetric(s->stats_metric_idx_count,count);
trackInstantaneousMetric(s->stats_metric_idx_memory,memory);
trackInstantaneousMetric(s->stats_metric_idx_time,time);
}
long long filt_count, scan_count, rio_count;
for (j = 0; j < CF_COUNT; j++) {
for (i = 1; i < ROCKS_TYPES; i++) {
s = server.ror_stats->rio_stats + (j* ROCKS_TYPES + i);
atomicGet(s->batch,batch);
atomicGet(s->count,count);
atomicGet(s->memory,memory);
atomicGet(s->time,time);
trackInstantaneousMetric(s->stats_metric_idx_batch,batch);
trackInstantaneousMetric(s->stats_metric_idx_count,count);
trackInstantaneousMetric(s->stats_metric_idx_memory,memory);
trackInstantaneousMetric(s->stats_metric_idx_time,time);
}
}


compactionFilterStat* cfs;
for (i = 0; i < CF_COUNT; i++) {
cfs = server.ror_stats->compaction_filter_stats + i;
Expand Down Expand Up @@ -368,7 +375,7 @@ sds genSwapInfoString(sds info) {
}

sds genSwapExecInfoString(sds info) {
int j;
int i, j;
long long ops, batch_ps, total_latency;
size_t count, batch, memory;

Expand Down Expand Up @@ -402,21 +409,25 @@ sds genSwapExecInfoString(sds info) {
ops > 0 ? total_latency/ops : 0);
}

for (j = 1; j < ROCKS_TYPES; j++) {
swapStat *s = &server.ror_stats->rio_stats[j];
atomicGet(s->batch,batch);
atomicGet(s->count,count);
atomicGet(s->memory,memory);
batch_ps = getInstantaneousMetric(s->stats_metric_idx_batch);
ops = getInstantaneousMetric(s->stats_metric_idx_count);
total_latency = getInstantaneousMetric(s->stats_metric_idx_time);
info = sdscatprintf(info,
"swap_rio_%s:batch=%ld,count=%ld,memory=%ld,batch_ps=%lld,ops=%lld,bps=%lld,latency_pb=%lld,latency_po=%lld\r\n",
s->name,batch,count,memory,batch_ps,ops,
getInstantaneousMetric(s->stats_metric_idx_memory),
batch_ps > 0 ? total_latency/batch_ps : 0,
ops > 0 ? total_latency/ops : 0);
for ( i = 0; i < CF_COUNT; i++) {
for (j = 1; j < ROCKS_TYPES; j++) {
swapStat *s = &server.ror_stats->rio_stats[i* ROCKS_TYPES + j];
atomicGet(s->batch,batch);
atomicGet(s->count,count);
atomicGet(s->memory,memory);
batch_ps = getInstantaneousMetric(s->stats_metric_idx_batch);
ops = getInstantaneousMetric(s->stats_metric_idx_count);
total_latency = getInstantaneousMetric(s->stats_metric_idx_time);
info = sdscatprintf(info,
"swap_rio_%s_%s:batch=%ld,count=%ld,memory=%ld,batch_ps=%lld,ops=%lld,bps=%lld,latency_pb=%lld,latency_po=%lld\r\n",
swap_cf_names[i],
s->name,batch,count,memory,batch_ps,ops,
getInstantaneousMetric(s->stats_metric_idx_memory),
batch_ps > 0 ? total_latency/batch_ps : 0,
ops > 0 ? total_latency/ops : 0);
}
}


for (j = 0; j < CF_COUNT; j++) {
compactionFilterStat *cfs = &server.ror_stats->compaction_filter_stats[j];
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ typedef long long ustime_t; /* microsecond time type. */
#define STATS_METRIC_MODIFIED_KEYS 3 /* Number of modified keys. */
#ifdef ENABLE_SWAP
#define STATS_METRIC_COUNT_MEM 4
#define STATS_METRIC_COUNT_SWAP 77 /* define directly here to avoid dependcy cycle, will be checked later. */
#define STATS_METRIC_COUNT_SWAP 117 /* define directly here to avoid dependcy cycle, will be checked later. */
#define STATS_METRIC_COUNT (STATS_METRIC_COUNT_SWAP + STATS_METRIC_COUNT_MEM)
#else
#define STATS_METRIC_COUNT 4
Expand Down
Loading