Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ void evalGenericCommand(client *c, int evalsha) {
get appropriate error messages and logs */

/* gtid eval => multi + exec */
if(server.gtid_dbid_at_multi == -1) {
server.gtid_offset_at_multi = server.master_repl_offset+1;
server.gtid_dbid_at_multi = c->db->id;
if(server.gtid_pending_multi_dbid == -1) {
server.gtid_pending_multi_dbid = c->db->id;
server.gtid_pending_multi_offset = server.master_repl_offset+1;
}
luaCallFunction(&rctx, lua, c->argv+3, numkeys, c->argv+3+numkeys, c->argc-3-numkeys, ldb.active);
lua_pop(lua,1); /* Remove the error handler. */
Expand Down
4 changes: 2 additions & 2 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ void execCommand(client *c) {
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

server.in_exec = 1;
server.gtid_dbid_at_multi = c->db->id;
server.gtid_offset_at_multi = server.master_repl_offset+1;
server.gtid_pending_multi_dbid = c->db->id;
server.gtid_pending_multi_offset = server.master_repl_offset+1;

orig_argv = c->argv;
orig_argv_len = c->argv_len;
Expand Down
30 changes: 25 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,8 @@ void initServer(void) {
server.in_exec = 0;
server.gtid_dbid_at_multi = -1;
server.gtid_offset_at_multi = -1;
server.gtid_pending_multi_dbid = -1;
server.gtid_pending_multi_offset = -1;
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
server.busy_module_yield_reply = NULL;
server.client_pause_in_transaction = 0;
Expand Down Expand Up @@ -3783,6 +3785,27 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
hdr_record_value(*latency_histogram,duration_hist);
}

static void gtidPreparePropagateState(int transaction) {
if (!transaction) {
server.gtid_pending_multi_dbid = -1;
server.gtid_pending_multi_offset = -1;
return;
}

if (server.gtid_pending_multi_dbid != -1) {
server.gtid_dbid_at_multi = server.gtid_pending_multi_dbid;
server.gtid_offset_at_multi = server.gtid_pending_multi_offset;
server.gtid_pending_multi_dbid = -1;
server.gtid_pending_multi_offset = -1;
} else {
redisOp *op = &server.also_propagate.ops[0];
serverAssert(op->dbid >= 0);

server.gtid_dbid_at_multi = op->dbid;
server.gtid_offset_at_multi = server.master_repl_offset+1;
}
}

/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
Expand All @@ -3808,15 +3831,12 @@ static void propagatePendingCommands(void) {
transaction = 0;
}

gtidPreparePropagateState(transaction);

if (transaction) {
/* We use dbid=-1 to indicate we do not want to replicate SELECT.
* It'll be inserted together with the next command (inside the MULTI) */
propagateNow(-1,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
} else {
if (server.gtid_dbid_at_multi != -1) {
server.gtid_dbid_at_multi = -1;
server.gtid_offset_at_multi = -1;
}
}

for (j = 0; j < server.also_propagate.numops; j++) {
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,8 @@ struct redisServer {
long long gtid_reploff_delta;
int gtid_dbid_at_multi;
long long gtid_offset_at_multi;
int gtid_pending_multi_dbid;
long long gtid_pending_multi_offset;
gtidSeq *gtid_seq;
long long gtid_xsync_fullresync_indicator;
gtidInitialInfo gtid_initial[1];
Expand Down
40 changes: 40 additions & 0 deletions tests/gtid/gtid.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,46 @@ start_server {tags {"gtid"} overrides {gtid-enabled yes}} {
assert_equal [$slave GET key] $orig_val
}

test "propagte repl: GTID-ENABLED(yes) auto-wrapped alsoPropagate TX maps to one GTID" {
set sadd_argv [list SADD spop_gtid]
for {set i 1} {$i <= 3000} {incr i} {
lappend sadd_argv $i
}
$master {*}$sadd_argv
wait_for_gtid_sync $master $slave

incr mygno
assert_equal [$master SCARD spop_gtid] 3000
assert_equal [$slave SCARD spop_gtid] 3000
assert_replication_stream $master_repl [list "gtid $myuuid:$mygno * SADD spop_gtid *"]
assert_replication_stream $slave_repl [list "gtid $myuuid:$mygno * SADD spop_gtid *"]

set orig_master_reploff [status $master master_repl_offset]
set orig_slave_reploff [status $slave master_repl_offset]
set orig_master_gtidset [status $master gtid_set]
set orig_slave_gtidset [status $slave gtid_set]

$master SPOP spop_gtid 2500
wait_for_gtid_sync $master $slave

incr mygno
set mygtidset "$myuuid:1-$mygno"

assert_equal [$master SCARD spop_gtid] 500
assert_equal [$slave SCARD spop_gtid] 500

assert_replication_stream $master_repl [list multi {srem spop_gtid *} {srem spop_gtid *} {srem spop_gtid *} "gtid $myuuid:$mygno * EXEC"]
assert_replication_stream $slave_repl [list multi {srem spop_gtid *} {srem spop_gtid *} {srem spop_gtid *} "gtid $myuuid:$mygno * EXEC"]

assert_match "*$mygtidset*" [status $master gtid_executed]

assert_equal [lindex [$master GTIDX SEQ LOCATE $orig_master_gtidset] 0] [expr $orig_master_reploff+1]
assert_equal [lindex [$slave GTIDX SEQ LOCATE $orig_slave_gtidset ] 0] [expr $orig_slave_reploff+1]

assert_equal [lindex [$master GTIDX SEQ LOCATE $orig_master_gtidset] 1] "$myuuid:$mygno"
assert_equal [lindex [$slave GTIDX SEQ LOCATE $orig_slave_gtidset ] 1] "$myuuid:$mygno"
}

test "propagte repl: GTID-ENABLED(yes) TX(no) CMD(write)" {
set orig_master_reploff [status $master master_repl_offset]
set orig_slave_reploff [status $slave master_repl_offset]
Expand Down
Loading