From a27e432c931771177648cc7ecea68b6d41839304 Mon Sep 17 00:00:00 2001 From: xiehaopeng Date: Thu, 16 Jan 2025 15:09:48 +0800 Subject: [PATCH 1/2] feat(ignore-binlog-events): ignore-over-iteration-range-max-binlog --- doc/command-line-flags.md | 6 ++ go/base/context.go | 2 + go/cmd/gh-ost/main.go | 1 + go/logic/applier.go | 120 +++++++++++++++++++++++++++++++++----- go/logic/applier_test.go | 59 +++++++++++++++++++ go/logic/inspect.go | 14 +++++ go/logic/migrator.go | 3 +- go/sql/builder.go | 21 +++++-- go/sql/builder_test.go | 34 +++++++---- go/sql/types.go | 10 ++++ 10 files changed, 238 insertions(+), 32 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index cf2b2ca95..fa40f8dcd 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -125,6 +125,12 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. + +### ignore-over-iteration-range-max-binlog + +Defaults to false. When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. Currently when enabled, this only takes effect for single-column unique index of int type. + + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index f90171698..f858bd1ef 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -201,7 +201,9 @@ type MigrationContext struct { controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 + TotalDMLEventsIgnored int64 DMLBatchSize int64 + IgnoreOverIterationRangeMaxBinlog bool isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 0829429e0..e0432fbb7 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -109,6 +109,7 @@ func main() { defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") + flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk") maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") diff --git a/go/logic/applier.go b/go/logic/applier.go index 59562dc7f..630b200d5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -30,18 +30,20 @@ const ( ) type dmlBuildResult struct { - query string - args []interface{} - rowsDelta int64 - err error + query string + args []interface{} + uniqueKeyArgs []interface{} + rowsDelta int64 + err error } -func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { +func newDmlBuildResult(query string, args []interface{}, uniqueKeyArgs []interface{}, rowsDelta int64, err error) *dmlBuildResult { return &dmlBuildResult{ - query: query, - args: args, - rowsDelta: rowsDelta, - err: err, + query: query, + args: args, + uniqueKeyArgs: uniqueKeyArgs, + rowsDelta: rowsDelta, + err: err, } } @@ -129,6 +131,7 @@ func (this *Applier) prepareQueries() (err error) { this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, + &this.migrationContext.UniqueKey.Columns, ); err != nil { return err } @@ -1190,12 +1193,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB case binlog.DeleteDML: { query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues()) - return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)} + return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, uniqueKeyArgs, -1, err)} } case binlog.InsertDML: { - query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues()) - return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)} + query, sharedArgs, uniqueKeyArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues()) + return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, uniqueKeyArgs, 1, err)} } case binlog.UpdateDML: { @@ -1211,12 +1214,87 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)} + return []*dmlBuildResult{newDmlBuildResult(query, args, uniqueKeyArgs, 0, err)} } } return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))} } +// IsIgnoreOverMaxChunkRangeEvent returns true if this event can be ignored, because the data will be synced by copy chunk +// min rangeMax max +// the value > rangeMax and value < max, ignore = true +// otherwise ignore = false +func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) { + if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog { + return false, nil + } + + // Currently only supports single-column unique index of int type + uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns() + if len(uniqueKeyCols) != 1 { + return false, nil + } + uniqueKeyCol := uniqueKeyCols[0] + if uniqueKeyCol.CompareValueFunc == nil { + return false, nil + } + + // Compare whether it is less than the MigrationIterationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. + ignore, err := func() (bool, error) { + compareValues := this.migrationContext.MigrationIterationRangeMaxValues + if compareValues == nil { + // It means that the migration has not started yet, use MigrationRangeMinValues instead + compareValues = this.migrationContext.MigrationRangeMinValues + } + + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues.StringColumn(0)) + if err != nil { + return false, err + } + + switch { + case than > 0: + return true, nil + case than < 0: + return false, nil + default: + // Since rowcopy is left-open-right-closed, when it is equal to the MigrationIterationRangeMaxValues boundary value, it cannot be ignored. + return false, nil + } + }() + if err != nil { + return false, err + } + + if !ignore { + return false, nil + } + + // Compare whether it is greater than the MigrationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. + ignore, err = func() (bool, error) { + compareValues := this.migrationContext.MigrationRangeMaxValues + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues) + if err != nil { + return false, err + } + + switch { + case than < 0: + return true, nil + case than > 0: + return false, nil + default: + // Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored. + return true, nil + } + }() + if err != nil { + return false, err + } + + return ignore, nil +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 @@ -1244,6 +1322,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return err } + var ignoredEventSize int64 buildResults := make([]*dmlBuildResult, 0, len(dmlEvents)) nArgs := 0 for _, dmlEvent := range dmlEvents { @@ -1251,10 +1330,25 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if buildResult.err != nil { return rollback(buildResult.err) } + if ignore, err := this.IsIgnoreOverMaxChunkRangeEvent(buildResult.uniqueKeyArgs); err != nil { + return rollback(err) + } else if ignore { + ignoredEventSize++ + continue + } nArgs += len(buildResult.args) buildResults = append(buildResults, buildResult) } } + atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize) + + // If there are no statements to execute, return directly + if len(buildResults) == 0 { + if err := tx.Commit(); err != nil { + return err + } + return nil + } // We batch together the DML queries into multi-statements to minimize network trips. // We have to use the raw driver connection to access the rows affected diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index f53e65ffb..9605e61bd 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -8,6 +8,8 @@ package logic import ( "context" gosql "database/sql" + "fmt" + "math/big" "strings" "testing" @@ -181,6 +183,63 @@ func TestApplierBuildDMLEventQuery(t *testing.T) { }) } +func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + uniqueColumns := sql.NewColumnList([]string{"id"}) + uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10}) + migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111}) + + applier := NewApplier(migrationContext) + + t.Run("less than MigrationRangeMinValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{5}) + require.NoError(t, err) + require.False(t, ignore) + }) + + t.Run("equal to MigrationIterationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{11111}) + require.NoError(t, err) + require.False(t, ignore) + }) + + t.Run("ignore event", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{88888}) + require.NoError(t, err) + require.True(t, ignore) + }) + + t.Run("equal to MigrationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123456}) + require.NoError(t, err) + require.True(t, ignore) + }) + + t.Run("larger than MigrationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123457}) + require.NoError(t, err) + require.False(t, ignore) + }) +} + func TestApplierInstantDDL(t *testing.T) { migrationContext := base.NewMigrationContext() migrationContext.DatabaseName = "test" diff --git a/go/logic/inspect.go b/go/logic/inspect.go index ea8c3adca..80d7a333f 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "errors" "fmt" + "math/big" "reflect" "strings" "sync/atomic" @@ -635,6 +636,19 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL continue } + if strings.Contains(columnType, "int") { + column.CompareValueFunc = func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + } + } if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 751b54a08..78289bf0e 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1054,9 +1054,10 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Ignored: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + atomic.LoadInt64(&this.migrationContext.TotalDMLEventsIgnored), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, diff --git a/go/sql/builder.go b/go/sql/builder.go index 332aef100..ef51ae5d9 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -466,15 +466,15 @@ func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interf // DMLInsertQueryBuilder can build INSERT queries for DML events. // It holds the prepared query statement so it doesn't need to be recreated every time. type DMLInsertQueryBuilder struct { - tableColumns, sharedColumns *ColumnList - preparedStatement string + tableColumns, sharedColumns, uniqueKeyColumns *ColumnList + preparedStatement string } // NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder. // It prepares the INSERT query statement. // Returns an error if no shared columns are given, the shared columns are not a subset of the table columns, // or the prepared statement cannot be built. -func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) { +func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, uniqueKeyColumns *ColumnList) (*DMLInsertQueryBuilder, error) { if !sharedColumns.IsSubsetOf(tableColumns) { return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder") } @@ -505,6 +505,7 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar return &DMLInsertQueryBuilder{ tableColumns: tableColumns, sharedColumns: sharedColumns, + uniqueKeyColumns: uniqueKeyColumns, preparedStatement: stmt, }, nil } @@ -512,17 +513,25 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar // BuildQuery builds the arguments array for a DML event INSERT query. // It returns the query string and the shared arguments array. // Returns an error if the number of arguments differs from the number of table columns. -func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) { +func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, []interface{}, error) { if len(args) != b.tableColumns.Len() { - return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") + return "", nil, nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") } + sharedArgs := make([]interface{}, 0, b.sharedColumns.Len()) for _, column := range b.sharedColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] arg := column.convertArg(args[tableOrdinal], false) sharedArgs = append(sharedArgs, arg) } - return b.preparedStatement, sharedArgs, nil + + uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()) + for _, column := range b.uniqueKeyColumns.Columns() { + tableOrdinal := b.tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal], true) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil } // DMLUpdateQueryBuilder can build UPDATE queries for DML events. diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index d43f65056..38be24402 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -504,9 +504,10 @@ func TestBuildDMLInsertQuery(t *testing.T) { args := []interface{}{3, "testname", "first", 17, 23} { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"position"}) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -517,12 +518,14 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) + require.Equal(t, []interface{}{17}, uniqueKeyArgs) } { sharedColumns := NewColumnList([]string{"position", "name", "age", "id"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"position", "name"}) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -533,15 +536,18 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{17, "testname", 23, 3}, sharedArgs) + require.Equal(t, []interface{}{17, "testname"}, uniqueKeyArgs) } { sharedColumns := NewColumnList([]string{"position", "name", "surprise", "id"}) - _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"age"}) + _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.Error(t, err) } { sharedColumns := NewColumnList([]string{}) - _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"age", "name"}) + _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.Error(t, err) } } @@ -551,13 +557,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { tableName := "tbl" tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"}) sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) + uniqueKeyColumns := NewColumnList([]string{"name", "age"}) { // testing signed args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -568,14 +575,15 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", int8(-1), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } { // testing unsigned args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns.SetUnsigned("position") - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -586,14 +594,15 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", uint8(255), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } { // testing unsigned args := []interface{}{3, "testname", "first", int32(-1), 23} sharedColumns.SetUnsigned("position") - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -604,6 +613,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", uint32(4294967295), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } } diff --git a/go/sql/types.go b/go/sql/types.go index f7aac5f5f..919a36ab7 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -50,6 +50,8 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + // compare a and b using this function, when a equal b, return 0, when a > b, return 1, when a < b, return -1 + CompareValueFunc func(a interface{}, b interface{}) (int, error) } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -218,6 +220,14 @@ func (this *ColumnList) IsEnumToTextConversion(columnName string) bool { return this.GetColumn(columnName).enumToTextConversion } +func (this *ColumnList) SetColumnCompareValueFunc(columnName string, f func(a interface{}, b interface{}) (int, error)) { + this.GetColumn(columnName).CompareValueFunc = f +} + +func (this *ColumnList) GetColumnCompareValueFunc(columnName string) func(a interface{}, b interface{}) (int, error) { + return this.GetColumn(columnName).CompareValueFunc +} + func (this *ColumnList) SetEnumValues(columnName string, enumValues string) { this.GetColumn(columnName).EnumValues = enumValues } From 5903ffc3a1df5b594bb680c48f33412711c65d80 Mon Sep 17 00:00:00 2001 From: xiehaopeng Date: Thu, 13 Mar 2025 20:01:39 +0800 Subject: [PATCH 2/2] feat(ignore-binlog-events): dynamically expand MigrationRangeMaxValues --- go/base/context.go | 2 + go/logic/applier.go | 79 +++++++++++++++++++++++++++++++++++++++- go/logic/applier_test.go | 2 +- 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index f858bd1ef..edeee706f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -204,6 +204,8 @@ type MigrationContext struct { TotalDMLEventsIgnored int64 DMLBatchSize int64 IgnoreOverIterationRangeMaxBinlog bool + IsMigrationRangeMaxValuesLocked bool + MigrationRangeMaxValuesInitial *sql.ColumnValues isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/logic/applier.go b/go/logic/applier.go index 630b200d5..b4ab839ed 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -568,7 +568,15 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK return err } } - this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) + + // Save a snapshot copy of the initial MigrationRangeMaxValues + if this.migrationContext.MigrationRangeMaxValues == nil { + this.migrationContext.MigrationRangeMaxValuesInitial = nil + } else { + abstractValues := make([]interface{}, len(this.migrationContext.MigrationRangeMaxValues.AbstractValues())) + copy(abstractValues, this.migrationContext.MigrationRangeMaxValues.AbstractValues()) + this.migrationContext.MigrationRangeMaxValuesInitial = sql.ToColumnValues(abstractValues) + } return rows.Err() } @@ -611,6 +619,63 @@ func (this *Applier) ReadMigrationRangeValues() error { return tx.Commit() } +// ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values +func (this *Applier) ResetMigrationRangeMaxValues(uniqueKeyAbstractValues []interface{}) { + abstractValues := make([]interface{}, len(uniqueKeyAbstractValues)) + copy(abstractValues, uniqueKeyAbstractValues) + this.migrationContext.MigrationRangeMaxValues = sql.ToColumnValues(abstractValues) + this.migrationContext.Log.Debugf("Reset migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) +} + +// LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates +func (this *Applier) LockMigrationRangeMaxValues() { + if this.migrationContext.IsMigrationRangeMaxValuesLocked { + return + } + this.migrationContext.IsMigrationRangeMaxValuesLocked = true + this.migrationContext.Log.Infof("Lock migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) +} + +// AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying. +// To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end, +// we need a strategy to stop updates. When the initial copy target is achieved, +// MigrationRangeMaxValues will be locked. +func (this *Applier) AttemptToLockMigrationRangeMaxValues() { + if this.migrationContext.IsMigrationRangeMaxValuesLocked { + return + } + + // Currently only supports single-column unique index of int type + uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns() + if len(uniqueKeyCols) != 1 { + this.LockMigrationRangeMaxValues() + return + } + uniqueKeyCol := uniqueKeyCols[0] + if uniqueKeyCol.CompareValueFunc == nil { + this.LockMigrationRangeMaxValues() + return + } + + // Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress + if this.migrationContext.MigrationIterationRangeMinValues == nil { + return + } + than, err := uniqueKeyCol.CompareValueFunc( + this.migrationContext.MigrationIterationRangeMinValues.AbstractValues()[0], + this.migrationContext.MigrationRangeMaxValuesInitial.AbstractValues()[0], + ) + if err != nil { + // If comparison fails, fallback to locking MigrationRangeMaxValues + this.migrationContext.Log.Errore(err) + this.LockMigrationRangeMaxValues() + return + } + if than >= 0 { + this.LockMigrationRangeMaxValues() + } +} + // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with @@ -620,6 +685,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues } + this.AttemptToLockMigrationRangeMaxValues() for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -661,6 +727,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo } } this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") + // Ensure MigrationRangeMaxValues is locked after iteration is complete + this.LockMigrationRangeMaxValues() return hasFurtherRange, nil } @@ -1282,7 +1350,14 @@ func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) case than < 0: return true, nil case than > 0: - return false, nil + // When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues + // After expand, treat this comparison as equal, otherwise it cannot be ignored + if !this.migrationContext.IsMigrationRangeMaxValuesLocked { + this.ResetMigrationRangeMaxValues(uniqueKeyArgs) + return true, nil + } else { + return false, nil + } default: // Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored. return true, nil diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 9605e61bd..605e481b7 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -236,7 +236,7 @@ func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) { t.Run("larger than MigrationRangeMaxValues", func(t *testing.T) { ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123457}) require.NoError(t, err) - require.False(t, ignore) + require.True(t, ignore) }) }