From d6ee06d519b06296d6d7f921981cba7b7aa34ad7 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sat, 11 Oct 2025 12:50:50 -0700 Subject: [PATCH 01/16] add Checkpoint table and read/write funcs --- go/base/context.go | 10 +++++ go/logic/applier.go | 89 +++++++++++++++++++++++++++++++++++++-- go/logic/applier_test.go | 90 +++++++++++++++++++++++++++++++++++++--- go/logic/inspect.go | 5 +++ go/logic/migrator.go | 17 ++++---- go/sql/builder.go | 44 ++++++++++++++++++++ go/sql/types.go | 13 ++++-- 7 files changed, 248 insertions(+), 20 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 0a1cae739..29d7681fa 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -153,6 +153,7 @@ type MigrationContext struct { HooksHintToken string HooksStatusIntervalSec int64 PanicOnWarnings bool + Checkpoint bool DropServeSocket bool ServeSocketFile string @@ -380,6 +381,15 @@ func (this *MigrationContext) GetChangelogTableName() string { } } +// GetCheckpointTableName generates the name of checkpoint table. +func (this *MigrationContext) GetCheckpointTableName() string { + if this.ForceTmpTableName != "" { + return getSafeTableName(this.ForceTmpTableName, "ghk") + } else { + return getSafeTableName(this.OriginalTableName, "ghk") + } +} + // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout // the swap-tables process. func (this *MigrationContext) GetVoluntaryLockName() string { diff --git a/go/logic/applier.go b/go/logic/applier.go index 30ac97695..d214a9fcd 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -66,9 +66,10 @@ type Applier struct { finishedMigrating int64 name string - dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder - dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder - dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder + dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder + dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder + dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder + checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -144,6 +145,13 @@ func (this *Applier) prepareQueries() (err error) { ); err != nil { return err } + if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( + this.migrationContext.DatabaseName, + this.migrationContext.GetCheckpointTableName(), + &this.migrationContext.UniqueKey.Columns, + ); err != nil { + return err + } return nil } @@ -400,6 +408,38 @@ func (this *Applier) CreateChangelogTable() error { return nil } +// Create the checkpoint table to store the chunk copy and applier state. +func (this *Applier) CreateCheckpointTable() error { + if err := this.DropCheckpointTable(); err != nil { + return err + } + colDefs := []string{ + "`gh_ost_chk_id` bigint auto_increment primary key", + "`gh_ost_chk_coords` varchar(4096)", + "`gh_ost_chk_iteration` bigint", + } + for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { + if col.MySQLType == "" { + return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) + } + colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name), col.MySQLType) + if !col.Nullable { + colDef += " NOT NULL" + } + colDefs = append(colDefs, colDef) + } + query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetCheckpointTableName()), + strings.Join(colDefs, ",\n "), + ) + this.migrationContext.Log.Infof("Created checkpoint table") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + return nil +} + // dropTable drops a given table on the applied host func (this *Applier) dropTable(tableName string) error { query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, @@ -494,6 +534,11 @@ func (this *Applier) DropChangelogTable() error { return this.dropTable(this.migrationContext.GetChangelogTableName()) } +// DropCheckpointTable drops the checkpoint table on applier host +func (this *Applier) DropCheckpointTable() error { + return this.dropTable(this.migrationContext.GetCheckpointTableName()) +} + // DropOldTable drops the _Old table on the applier host func (this *Applier) DropOldTable() error { return this.dropTable(this.migrationContext.GetOldTableName()) @@ -542,6 +587,42 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { return this.WriteAndLogChangelog("state", value) } +// WriteCheckpoints writes a checkpoint to the _ghk table. +func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { + var insertId int64 + uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...) + query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs) + if err != nil { + return insertId, err + } + args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration) + args = append(args, uniqueKeyArgs...) + res, err := this.db.Exec(query, args...) + if err != nil { + return insertId, err + } + return res.LastInsertId() +} + +func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error { + rows, err := this.db.Query(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) + if err != nil { + return err + } + var coordStr string + ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration} + ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) + for rows.Next() { + rows.Scan(ptrs...) + } + gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr) + if err != nil { + return err + } + chk.LastTrxCoords = gtidCoords + return nil +} + // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // This is done asynchronously func (this *Applier) InitiateHeartbeat() { @@ -686,7 +767,7 @@ func (this *Applier) ReadMigrationRangeValues() error { // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with -// iterating the range (and this done with copying row chunks) +// iterating the range (and thus done with copying row chunks) func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues if this.migrationContext.MigrationIterationRangeMinValues == nil { diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 13e8a4d3b..96e7acc97 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -15,12 +15,13 @@ import ( "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/modules/mysql" + testmysql "github.com/testcontainers/testcontainers-go/modules/mysql" "fmt" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go/wait" ) @@ -207,11 +208,11 @@ type ApplierTestSuite struct { func (suite *ApplierTestSuite) SetupSuite() { ctx := context.Background() - mysqlContainer, err := mysql.Run(ctx, + mysqlContainer, err := testmysql.Run(ctx, testMysqlContainerImage, - mysql.WithDatabase(testMysqlDatabase), - mysql.WithUsername(testMysqlUser), - mysql.WithPassword(testMysqlPass), + testmysql.WithDatabase(testMysqlDatabase), + testmysql.WithUsername(testMysqlUser), + testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), ) suite.Require().NoError(err) @@ -631,6 +632,85 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1") } +func (suite *ApplierTestSuite) TestWriteCheckpoint() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, id2 char(4) CHARACTER SET utf8mb4, name varchar(20), primary key(id, id2));", getTestGhostTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?)", getTestTableName()), 201, "為政以德", 212, "君子不器") + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.AlterStatementOptions = "add column name varchar(20)" + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id", "id2"}), + } + + inspector := NewInspector(migrationContext) + suite.Require().NoError(inspector.InitDBConnections()) + + err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) + suite.Require().NoError(err) + migrationContext.Log.Infof("%+v", migrationContext.UniqueKey.Columns) + + //suite.Require().Equal("int", migrationContext.UniqueKey.Columns.GetColumn("id").MySQLType) + + applier := NewApplier(migrationContext) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + err = applier.CreateChangelogTable() + suite.Require().NoError(err) + + err = applier.CreateCheckpointTable() + suite.Require().NoError(err) + + err = applier.prepareQueries() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues() + suite.Require().NoError(err) + + coords, err := mysql.NewGTIDBinlogCoordinates(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603,0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982,19636248-246d-11e9-ab0d-0263df733a8e:1`) + suite.Require().NoError(err) + + chk := &Checkpoint{ + LastTrxCoords: coords, + IterationRangeMin: applier.migrationContext.MigrationRangeMaxValues, + Iteration: 2, + } + id, err := applier.WriteCheckpoint(chk) + suite.Require().NoError(err) + suite.Require().Equal(int64(1), id) + + gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2)} + err = applier.ReadLastCheckpoint(gotChk) + suite.Require().NoError(err) + + suite.Require().Equal(chk.Iteration, gotChk.Iteration) + suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) + suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 7a7dc8424..6b182e91b 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -711,12 +711,17 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL columnName := m.GetString("COLUMN_NAME") columnType := m.GetString("COLUMN_TYPE") columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH") + isNullable := m.GetString("IS_NULLABLE") extra := m.GetString("EXTRA") for _, columnsList := range columnsLists { column := columnsList.GetColumn(columnName) if column == nil { continue } + column.MySQLType = columnType + if isNullable == "YES" { + column.Nullable = true + } if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4d7074b22..4f623a7d3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -91,8 +91,6 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct - handledChangelogStates map[string]bool - finishedMigrating int64 } @@ -107,10 +105,9 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), - handledChangelogStates: make(map[string]bool), - finishedMigrating: 0, + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + finishedMigrating: 0, } return migrator } @@ -394,10 +391,17 @@ func (this *Migrator) Migrate() (err error) { if err := this.inspector.inspectOriginalAndGhostTables(); err != nil { return err } + + if this.migrationContext.Checkpoint { + if err := this.applier.CreateCheckpointTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error deatils.") + } + } // We can prepare some of the queries on the applier if err := this.applier.prepareQueries(); err != nil { return err } + // Validation complete! We're good to execute this migration if err := this.hooksExecutor.onValidated(); err != nil { return err @@ -1190,7 +1194,6 @@ func (this *Migrator) initiateApplier() error { this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } - if err := this.applier.AlterGhost(); err != nil { this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") return err diff --git a/go/sql/builder.go b/go/sql/builder.go index f2683181f..564ad56bc 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -101,6 +101,50 @@ func BuildEqualsPreparedComparison(columns []string) (result string, err error) return BuildEqualsComparison(columns, values) } +// It holds the prepared query statement so it doesn't need to be recreated every time. +type CheckpointInsertQueryBuilder struct { + uniqueKeyColumns *ColumnList + preparedStatement string +} + +func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList) (*CheckpointInsertQueryBuilder, error) { + if uniqueKeyColumns.Len() == 0 { + return nil, fmt.Errorf("Got 0 columns in BuildSetCheckpointInsertQuery") + } + values := buildColumnsPreparedValues(uniqueKeyColumns) + databaseName = EscapeName(databaseName) + tableName = EscapeName(tableName) + stmt := fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_coords, gh_ost_chk_iteration, %s) + values + (?, ?, %s)`, + databaseName, tableName, + strings.Join(uniqueKeyColumns.Names(), ", "), + strings.Join(values, ", "), + ) + + b := &CheckpointInsertQueryBuilder{ + uniqueKeyColumns: uniqueKeyColumns, + preparedStatement: stmt, + } + return b, nil +} + +// BuildQuery builds the insert query. +func (b *CheckpointInsertQueryBuilder) BuildQuery(uniqueKeyArgs []interface{}) (string, []interface{}, error) { + if len(uniqueKeyArgs) != b.uniqueKeyColumns.Len() { + return "", nil, fmt.Errorf("args count differs from unique key column count") + } + convertedArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()+1) + for _, column := range b.uniqueKeyColumns.Columns() { + arg := column.convertArg(column, true) + convertedArgs = append(convertedArgs, arg) + } + return b.preparedStatement, uniqueKeyArgs, nil +} + func BuildSetPreparedClause(columns *ColumnList) (result string, err error) { if columns.Len() == 0 { return "", fmt.Errorf("Got 0 columns in BuildSetPreparedClause") diff --git a/go/sql/types.go b/go/sql/types.go index aac52bc32..b701e9514 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -38,10 +38,12 @@ type CharacterSetConversion struct { } type Column struct { - Name string - IsUnsigned bool - IsVirtual bool - Charset string + Name string + IsUnsigned bool + IsVirtual bool + Charset string + // Type represents a subset of MySQL types + // used for mapping columns to golang values. Type ColumnType EnumValues string timezoneConversion *TimezoneConversion @@ -50,6 +52,9 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + CharacterSetName string + Nullable bool + MySQLType string } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { From e2f0a203db96b71f01f78ff5bee4b481eb3385d5 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 15:23:24 -0700 Subject: [PATCH 02/16] handle no checkpoints returned --- go/base/context.go | 1 + go/logic/applier.go | 32 ++++++++++++++++++++------------ go/logic/applier_test.go | 13 ++++++++----- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 29d7681fa..7475577c8 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -195,6 +195,7 @@ type MigrationContext struct { pointOfInterestTimeMutex *sync.Mutex lastHeartbeatOnChangelogTime time.Time lastHeartbeatOnChangelogMutex *sync.Mutex + LastLockProcessedCoords mysql.BinlogCoordinates CurrentLag int64 currentProgress uint64 etaNanoseonds int64 diff --git a/go/logic/applier.go b/go/logic/applier.go index d214a9fcd..277379e7a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -21,6 +21,7 @@ import ( "context" "database/sql/driver" + "errors" "github.com/github/gh-ost/go/mysql" drivermysql "github.com/go-sql-driver/mysql" "github.com/openark/golib/sqlutils" @@ -31,6 +32,9 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) +// NoCheckpointFoundError is returned when an empty checkpoint table is queried. +var NoCheckpointFoundError = errors.New("no checkpoint found in _ghk table") + type dmlBuildResult struct { query string args []interface{} @@ -145,12 +149,14 @@ func (this *Applier) prepareQueries() (err error) { ); err != nil { return err } - if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( - this.migrationContext.DatabaseName, - this.migrationContext.GetCheckpointTableName(), - &this.migrationContext.UniqueKey.Columns, - ); err != nil { - return err + if this.migrationContext.Checkpoint { + if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( + this.migrationContext.DatabaseName, + this.migrationContext.GetCheckpointTableName(), + &this.migrationContext.UniqueKey.Columns, + ); err != nil { + return err + } } return nil } @@ -605,15 +611,17 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { } func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error { - rows, err := this.db.Query(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) - if err != nil { - return err - } + row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) + var coordStr string ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration} ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) - for rows.Next() { - rows.Scan(ptrs...) + err := row.Scan(ptrs...) + if err != nil { + if errors.Is(err, gosql.ErrNoRows) { + return NoCheckpointFoundError + } + return err } gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr) if err != nil { diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 96e7acc97..8e16bc905 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -643,7 +643,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, id2 char(4) CHARACTER SET utf8mb4, name varchar(20), primary key(id, id2));", getTestGhostTableName())) suite.Require().NoError(err) - _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?)", getTestTableName()), 201, "為政以德", 212, "君子不器") + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器") suite.Require().NoError(err) connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) @@ -658,6 +658,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"}) migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"}) migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.Checkpoint = true migrationContext.UniqueKey = &sql.UniqueKey{ Name: "PRIMARY", NameInGhostTable: "PRIMARY", @@ -669,9 +670,6 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) suite.Require().NoError(err) - migrationContext.Log.Infof("%+v", migrationContext.UniqueKey.Columns) - - //suite.Require().Equal("int", migrationContext.UniqueKey.Columns.GetColumn("id").MySQLType) applier := NewApplier(migrationContext) @@ -690,6 +688,12 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { err = applier.ReadMigrationRangeValues() suite.Require().NoError(err) + // checkpoint table is empty + gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2)} + err = applier.ReadLastCheckpoint(gotChk) + suite.Require().ErrorIs(err, NoCheckpointFoundError) + + // write a checkpoint and read it back coords, err := mysql.NewGTIDBinlogCoordinates(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603,0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982,19636248-246d-11e9-ab0d-0263df733a8e:1`) suite.Require().NoError(err) @@ -702,7 +706,6 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().NoError(err) suite.Require().Equal(int64(1), id) - gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2)} err = applier.ReadLastCheckpoint(gotChk) suite.Require().NoError(err) From 7c5fda8076208f7814c7c5de8d84b87c1ccd47f4 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 19:34:55 -0700 Subject: [PATCH 03/16] store min and max range values in checkpoint --- go/base/context.go | 1 - go/cmd/gh-ost/main.go | 1 + go/logic/applier.go | 32 ++++++++++++++- go/logic/applier_test.go | 6 ++- go/logic/migrator.go | 84 +++++++++++++++++++++++++++++++++++---- go/logic/streamer.go | 16 ++++---- go/logic/streamer_test.go | 8 ++-- go/sql/builder.go | 20 +++++++--- go/sql/types.go | 6 +++ localtests/test.sh | 5 ++- 10 files changed, 147 insertions(+), 32 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 7475577c8..29d7681fa 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -195,7 +195,6 @@ type MigrationContext struct { pointOfInterestTimeMutex *sync.Mutex lastHeartbeatOnChangelogTime time.Time lastHeartbeatOnChangelogMutex *sync.Mutex - LastLockProcessedCoords mysql.BinlogCoordinates CurrentLag int64 currentProgress uint64 etaNanoseonds int64 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 6391cf4fb..c31bbcc2a 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -145,6 +145,7 @@ func main() { flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'") flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'") flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections") + flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Use migration checkpoints") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits") diff --git a/go/logic/applier.go b/go/logic/applier.go index 277379e7a..4644d75ae 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -25,6 +25,7 @@ import ( "github.com/github/gh-ost/go/mysql" drivermysql "github.com/go-sql-driver/mysql" "github.com/openark/golib/sqlutils" + "sync" ) const ( @@ -70,6 +71,13 @@ type Applier struct { finishedMigrating int64 name string + CurrentCoordinatesMutex sync.Mutex + CurrentCoordinates mysql.BinlogCoordinates + + LastIterationRangeMutex sync.Mutex + LastIterationRangeMinValues *sql.ColumnValues + LastIterationRangeMaxValues *sql.ColumnValues + dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder @@ -415,6 +423,8 @@ func (this *Applier) CreateChangelogTable() error { } // Create the checkpoint table to store the chunk copy and applier state. +// There are two sets of columns with the same types as the shared unique key, +// one for IterationMinValues and one for IterationMaxValues. func (this *Applier) CreateCheckpointTable() error { if err := this.DropCheckpointTable(); err != nil { return err @@ -428,12 +438,21 @@ func (this *Applier) CreateCheckpointTable() error { if col.MySQLType == "" { return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) } - colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name), col.MySQLType) + colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_min"), col.MySQLType) + if !col.Nullable { + colDef += " NOT NULL" + } + colDefs = append(colDefs, colDef) + } + + for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { + colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_max"), col.MySQLType) if !col.Nullable { colDef += " NOT NULL" } colDefs = append(colDefs, colDef) } + query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetCheckpointTableName()), @@ -597,6 +616,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { var insertId int64 uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...) + uniqueKeyArgs = append(uniqueKeyArgs, chk.IterationRangeMax.AbstractValues()...) query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs) if err != nil { return insertId, err @@ -611,11 +631,12 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { } func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error { - row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) + row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) var coordStr string ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration} ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) + ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) if err != nil { if errors.Is(err, gosql.ErrNoRows) { @@ -777,6 +798,13 @@ func (this *Applier) ReadMigrationRangeValues() error { // no further chunk to work through, i.e. we're past the last chunk and are done with // iterating the range (and thus done with copying row chunks) func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { + this.LastIterationRangeMutex.Lock() + if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil { + this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone() + this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone() + } + this.LastIterationRangeMutex.Unlock() + this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 8e16bc905..d4a3ee307 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -689,7 +689,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().NoError(err) // checkpoint table is empty - gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2)} + gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2), IterationRangeMax: sql.NewColumnValues(2)} err = applier.ReadLastCheckpoint(gotChk) suite.Require().ErrorIs(err, NoCheckpointFoundError) @@ -699,7 +699,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { chk := &Checkpoint{ LastTrxCoords: coords, - IterationRangeMin: applier.migrationContext.MigrationRangeMaxValues, + IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, + IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, Iteration: 2, } id, err := applier.WriteCheckpoint(chk) @@ -712,6 +713,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.Iteration, gotChk.Iteration) suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) + suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) } func TestApplier(t *testing.T) { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4f623a7d3..3063a85e4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -26,6 +26,8 @@ var ( ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.") ErrMigrationNotAllowedOnMaster = errors.New("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master.") RetrySleepFn = time.Sleep + checkpointInterval = 10 * time.Second // 5 * time.Minute + checkpointTimeout = 2 * time.Second ) type ChangelogState string @@ -46,6 +48,7 @@ type tableWriteFunc func() error type applyEventStruct struct { writeFunc *tableWriteFunc dmlEvent *binlog.BinlogDMLEvent + coords mysql.BinlogCoordinates } func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { @@ -53,8 +56,8 @@ func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { return result } -func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct { - result := &applyEventStruct{dmlEvent: dmlEvent} +func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct { + result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates} return result } @@ -431,6 +434,9 @@ func (this *Migrator) Migrate() (err error) { go this.iterateChunks() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() + if this.migrationContext.Checkpoint { + go this.checkpointLoop() + } this.migrationContext.Log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() @@ -1086,7 +1092,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), - currentBinlogCoordinates, + currentBinlogCoordinates.DisplayString(), this.migrationContext.GetCurrentLagDuration().Seconds(), this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), state, @@ -1123,8 +1129,8 @@ func (this *Migrator) initiateStreaming() error { false, this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), - func(dmlEvent *binlog.BinlogDMLEvent) error { - return this.onChangelogEvent(dmlEvent) + func(dmlEntry *binlog.BinlogEntry) error { + return this.onChangelogEvent(dmlEntry.DmlEvent) }, ) @@ -1157,8 +1163,8 @@ func (this *Migrator) addDMLEventsListener() error { false, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, - func(dmlEvent *binlog.BinlogDMLEvent) error { - this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent) + func(dmlEntry *binlog.BinlogEntry) error { + this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry) return nil }, ) @@ -1342,6 +1348,11 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if err := this.retryOperation(applyEventFunc); err != nil { return this.migrationContext.Log.Errore(err) } + // update applier coordinates + this.applier.CurrentCoordinatesMutex.Lock() + this.applier.CurrentCoordinates = eventStruct.coords + this.applier.CurrentCoordinatesMutex.Unlock() + if nonDmlStructToApply != nil { // We pulled DML events from the queue, and then we hit a non-DML event. Wait! // We need to handle it! @@ -1353,6 +1364,65 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return nil } +func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { + coords := this.eventsStreamer.GetCurrentBinlogCoordinates() + this.applier.LastIterationRangeMutex.Lock() + if this.applier.LastIterationRangeMaxValues == nil || this.applier.LastIterationRangeMinValues == nil { + this.applier.LastIterationRangeMutex.Unlock() + return nil, errors.New("iteration range is empty, not checkpointing...") + } + chk := &Checkpoint{ + IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(), + IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(), + LastTrxCoords: coords, + } + this.applier.LastIterationRangeMutex.Unlock() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + this.applier.CurrentCoordinatesMutex.Lock() + if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { + id, err := this.applier.WriteCheckpoint(chk) + chk.Id = id + this.applier.CurrentCoordinatesMutex.Unlock() + return chk, err + } + this.applier.CurrentCoordinatesMutex.Unlock() + time.Sleep(500 * time.Millisecond) + } + } +} + +func (this *Migrator) checkpointLoop() { + if this.migrationContext.Noop { + this.migrationContext.Log.Debugf("Noop operation; not really checkpointing") + return + } + ticker := time.NewTicker(checkpointInterval) + for t := range ticker.C { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } + this.migrationContext.Log.Infof("starting checkpoint at %+v", t) + ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout) + chk, err := this.Checkpoint(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + this.migrationContext.Log.Errorf("checkpoint attempt timed out after %+v", checkpointTimeout) + } else { + this.migrationContext.Log.Errorf("error attempting checkpoint: %+v", err) + } + } else { + this.migrationContext.Log.Infof("checkpoint success at coords=%+v range_min=%+v range_max=%+v", + chk.LastTrxCoords.DisplayString(), chk.IterationRangeMin.String(), chk.IterationRangeMax.String()) + } + cancel() + } +} + // executeWriteFuncs writes data via applier: both the rowcopy and the events backlog. // This is where the ghost table gets the data. The function fills the data single-threaded. // Both event backlog and rowcopy events are polled; the backlog events have precedence. diff --git a/go/logic/streamer.go b/go/logic/streamer.go index fd0240ffd..087da8763 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -24,7 +24,7 @@ type BinlogEventListener struct { async bool databaseName string tableName string - onDmlEvent func(event *binlog.BinlogDMLEvent) error + onDmlEvent func(event *binlog.BinlogEntry) error } const ( @@ -60,7 +60,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer // AddListener registers a new listener for binlog events, on a per-table basis func (this *EventsStreamer) AddListener( - async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { + async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogEntry) error) (err error) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() @@ -82,24 +82,24 @@ func (this *EventsStreamer) AddListener( // notifyListeners will notify relevant listeners with given DML event. Only // listeners registered for changes on the table on which the DML operates are notified. -func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { +func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() for _, listener := range this.listeners { listener := listener - if !strings.EqualFold(listener.databaseName, binlogEvent.DatabaseName) { + if !strings.EqualFold(listener.databaseName, binlogEntry.DmlEvent.DatabaseName) { continue } - if !strings.EqualFold(listener.tableName, binlogEvent.TableName) { + if !strings.EqualFold(listener.tableName, binlogEntry.DmlEvent.TableName) { continue } if listener.async { go func() { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEntry) }() } else { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEntry) } } } @@ -176,7 +176,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { go func() { for binlogEntry := range this.eventsChannel { if binlogEntry.DmlEvent != nil { - this.notifyListeners(binlogEntry.DmlEvent) + this.notifyListeners(binlogEntry) } } }() diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go index baa6076da..2c5d3886b 100644 --- a/go/logic/streamer_test.go +++ b/go/logic/streamer_test.go @@ -90,8 +90,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEvents() { streamCtx, cancel := context.WithCancel(context.Background()) dmlEvents := make([]*binlog.BinlogDMLEvent, 0) - err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error { - dmlEvents = append(dmlEvents, event) + err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + dmlEvents = append(dmlEvents, event.DmlEvent) // Stop once we've collected three events if len(dmlEvents) == 3 { @@ -165,8 +165,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEventsAutomaticallyReconnects() streamCtx, cancel := context.WithCancel(context.Background()) dmlEvents := make([]*binlog.BinlogDMLEvent, 0) - err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error { - dmlEvents = append(dmlEvents, event) + err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + dmlEvents = append(dmlEvents, event.DmlEvent) // Stop once we've collected three events if len(dmlEvents) == 3 { diff --git a/go/sql/builder.go b/go/sql/builder.go index 564ad56bc..c5063fb1a 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -112,16 +112,24 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns return nil, fmt.Errorf("Got 0 columns in BuildSetCheckpointInsertQuery") } values := buildColumnsPreparedValues(uniqueKeyColumns) + minUniqueColNames := []string{} + maxUniqueColNames := []string{} + for _, name := range uniqueKeyColumns.Names() { + minUniqueColNames = append(minUniqueColNames, name+"_min") + maxUniqueColNames = append(maxUniqueColNames, name+"_max") + } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) stmt := fmt.Sprintf(` insert /* gh-ost */ into %s.%s - (gh_ost_chk_coords, gh_ost_chk_iteration, %s) + (gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s) values - (?, ?, %s)`, + (?, ?, %s, %s)`, databaseName, tableName, - strings.Join(uniqueKeyColumns.Names(), ", "), + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), strings.Join(values, ", "), ) @@ -134,10 +142,10 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns // BuildQuery builds the insert query. func (b *CheckpointInsertQueryBuilder) BuildQuery(uniqueKeyArgs []interface{}) (string, []interface{}, error) { - if len(uniqueKeyArgs) != b.uniqueKeyColumns.Len() { - return "", nil, fmt.Errorf("args count differs from unique key column count") + if len(uniqueKeyArgs) != 2*b.uniqueKeyColumns.Len() { + return "", nil, fmt.Errorf("args count differs from 2 x unique key column count") } - convertedArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()+1) + convertedArgs := make([]interface{}, 0, 2*b.uniqueKeyColumns.Len()) for _, column := range b.uniqueKeyColumns.Columns() { arg := column.convertArg(column, true) convertedArgs = append(convertedArgs, arg) diff --git a/go/sql/types.go b/go/sql/types.go index b701e9514..a01fb8bff 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -350,3 +350,9 @@ func (this *ColumnValues) String() string { } return strings.Join(stringValues, ",") } + +func (this *ColumnValues) Clone() *ColumnValues { + cv := NewColumnValues(len(this.abstractValues)) + copy(cv.abstractValues, this.abstractValues) + return cv +} diff --git a/localtests/test.sh b/localtests/test.sh index 7fe9c2ab2..8a57b80d1 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -151,7 +151,7 @@ sysbench_prepare() { --mysql-password=opensesame \ --mysql-db=test \ --tables=1 \ - --table-size=10000 \ + --table-size=20000 \ prepare } @@ -169,7 +169,7 @@ sysbench_run_cmd() { --threads=2 \ --time=30 \ --report-interval=10 \ - --rate=500 \ + --rate=200 \ run" echo $cmd } @@ -299,6 +299,7 @@ test_single() { --verbose \ --debug \ --stack \ + --checkpoint \ --execute ${extra_args[@]}" echo_dot echo $cmd >$exec_command_file From 713ce9724ced0e32bbc2255c87bd8e4f3f3dacdc Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 22:03:05 -0700 Subject: [PATCH 04/16] resume from checkpoint --- go/base/context.go | 2 + go/cmd/gh-ost/main.go | 1 + go/logic/applier.go | 35 +++++++++++----- go/logic/applier_test.go | 11 +++-- go/logic/migrator.go | 91 ++++++++++++++++++++++++++-------------- go/logic/streamer.go | 19 +++++---- go/sql/builder.go | 4 +- localtests/test.sh | 4 +- 8 files changed, 108 insertions(+), 59 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 29d7681fa..ab9f942f8 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -103,6 +103,7 @@ type MigrationContext struct { GoogleCloudPlatform bool AzureMySQL bool AttemptInstantDDL bool + Resume bool // SkipPortValidation allows skipping the port validation in `ValidateConnection` // This is useful when connecting to a MySQL instance where the external port @@ -240,6 +241,7 @@ type MigrationContext struct { Iteration int64 MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues + InitialStreamerCoords mysql.BinlogCoordinates ForceTmpTableName string IncludeTriggers bool diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index c31bbcc2a..379aba4cb 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -146,6 +146,7 @@ func main() { flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'") flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections") flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Use migration checkpoints") + flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits") diff --git a/go/logic/applier.go b/go/logic/applier.go index 4644d75ae..7f2012f63 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -239,7 +239,7 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) { // ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // or attempts to drop them if instructed to. func (this *Applier) ValidateOrDropExistingTables() error { - if this.migrationContext.InitiallyDropGhostTable { + if this.migrationContext.InitiallyDropGhostTable && !this.migrationContext.Resume { if err := this.DropGhostTable(); err != nil { return err } @@ -431,6 +431,7 @@ func (this *Applier) CreateCheckpointTable() error { } colDefs := []string{ "`gh_ost_chk_id` bigint auto_increment primary key", + "`gh_ost_chk_timestamp` bigint", "`gh_ost_chk_coords` varchar(4096)", "`gh_ost_chk_iteration` bigint", } @@ -630,26 +631,40 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { return res.LastInsertId() } -func (this *Applier) ReadLastCheckpoint(chk *Checkpoint) error { +func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) { row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) + chk := &Checkpoint{ + IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()), + IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()), + } var coordStr string - ptrs := []interface{}{&chk.Id, &coordStr, &chk.Iteration} + var timestamp int64 + ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration} ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) if err != nil { if errors.Is(err, gosql.ErrNoRows) { - return NoCheckpointFoundError + return nil, NoCheckpointFoundError } - return err + return nil, err } - gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr) - if err != nil { - return err + chk.Timestamp = time.Unix(timestamp, 0) + if this.migrationContext.UseGTIDs { + gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = gtidCoords + } else { + fileCoords, err := mysql.ParseFileBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = fileCoords } - chk.LastTrxCoords = gtidCoords - return nil + return chk, nil } // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index d4a3ee307..3cb767518 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1,6 +1,7 @@ /* Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE + See https://github.com/git +hub/gh-ost/blob/master/LICENSE */ package logic @@ -689,13 +690,11 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().NoError(err) // checkpoint table is empty - gotChk := &Checkpoint{IterationRangeMin: sql.NewColumnValues(2), IterationRangeMax: sql.NewColumnValues(2)} - err = applier.ReadLastCheckpoint(gotChk) + _, err = applier.ReadLastCheckpoint() suite.Require().ErrorIs(err, NoCheckpointFoundError) // write a checkpoint and read it back - coords, err := mysql.NewGTIDBinlogCoordinates(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603,0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982,19636248-246d-11e9-ab0d-0263df733a8e:1`) - suite.Require().NoError(err) + coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(219202907)) chk := &Checkpoint{ LastTrxCoords: coords, @@ -707,7 +706,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().NoError(err) suite.Require().Equal(int64(1), id) - err = applier.ReadLastCheckpoint(gotChk) + gotChk, err := applier.ReadLastCheckpoint() suite.Require().NoError(err) suite.Require().Equal(chk.Iteration, gotChk.Iteration) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 3063a85e4..902baa8fd 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -352,8 +352,14 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } - if err := this.initiateStreaming(); err != nil { - return err + // If we are resuming, we will initiateStreaming later when we know + // the coordinates to resume streaming. + // If not resuming, the streamer must be initiated before the applier, + // so that the "GhostTableMigrated" event gets processed. + if !this.migrationContext.Resume { + if err := this.initiateStreaming(); err != nil { + return err + } } if err := this.initiateApplier(); err != nil { return err @@ -384,9 +390,11 @@ func (this *Migrator) Migrate() (err error) { } initialLag, _ := this.inspector.getReplicationLag() - this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) - <-this.ghostTableMigrated - this.migrationContext.Log.Debugf("ghost table migrated") + if !this.migrationContext.Resume { + this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) + <-this.ghostTableMigrated + this.migrationContext.Log.Debugf("ghost table migrated") + } // Yay! We now know the Ghost and Changelog tables are good to examine! // When running on replica, this means the replica has those tables. When running // on master this is always true, of course, and yet it also implies this knowledge @@ -395,14 +403,33 @@ func (this *Migrator) Migrate() (err error) { return err } - if this.migrationContext.Checkpoint { + // We can prepare some of the queries on the applier + if err := this.applier.prepareQueries(); err != nil { + return err + } + + // inspectOriginalAndGhostTables must be called before creating checkpoint table. + if this.migrationContext.Checkpoint && !this.migrationContext.Resume { if err := this.applier.CreateCheckpointTable(); err != nil { this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error deatils.") } + } - // We can prepare some of the queries on the applier - if err := this.applier.prepareQueries(); err != nil { - return err + + if this.migrationContext.Resume { + lastCheckpoint, err := this.applier.ReadLastCheckpoint() + if err != nil { + return this.migrationContext.Log.Errorf("No checkpoint found, unable to resume: %+v", err) + } + this.migrationContext.Log.Infof("Resuming from checkpoint coords=%+v range_min=%+v range_max=%+v", + lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String()) + + this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin + this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords + if err := this.initiateStreaming(); err != nil { + return err + } } // Validation complete! We're good to execute this migration @@ -1189,31 +1216,33 @@ func (this *Migrator) initiateApplier() error { if err := this.applier.InitDBConnections(); err != nil { return err } - if err := this.applier.ValidateOrDropExistingTables(); err != nil { - return err - } - if err := this.applier.CreateChangelogTable(); err != nil { - this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - if err := this.applier.CreateGhostTable(); err != nil { - this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err - } - if err := this.applier.AlterGhost(); err != nil { - this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") - return err - } - - if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() { - // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, - // so we should copy AUTO_INCREMENT value onto our ghost table. - if err := this.applier.AlterGhostAutoIncrement(); err != nil { - this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + if !this.migrationContext.Resume { + if err := this.applier.ValidateOrDropExistingTables(); err != nil { return err } + if err := this.applier.CreateChangelogTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + if err := this.applier.CreateGhostTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := this.applier.AlterGhost(); err != nil { + this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + return err + } + + if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() { + // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, + // so we should copy AUTO_INCREMENT value onto our ghost table. + if err := this.applier.AlterGhostAutoIncrement(); err != nil { + this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + return err + } + } + this.applier.WriteChangelogState(string(GhostTableMigrated)) } - this.applier.WriteChangelogState(string(GhostTableMigrated)) if err := this.applier.StateMetadataLockInstrument(); err != nil { this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out") return err diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 087da8763..63afc3f3d 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -49,12 +49,13 @@ type EventsStreamer struct { func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer { return &EventsStreamer{ - connectionConfig: migrationContext.InspectorConnectionConfig, - migrationContext: migrationContext, - listeners: [](*BinlogEventListener){}, - listenersMutex: &sync.Mutex{}, - eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), - name: "streamer", + connectionConfig: migrationContext.InspectorConnectionConfig, + migrationContext: migrationContext, + listeners: [](*BinlogEventListener){}, + listenersMutex: &sync.Mutex{}, + eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), + name: "streamer", + initialBinlogCoordinates: migrationContext.InitialStreamerCoords, } } @@ -114,8 +115,10 @@ func (this *EventsStreamer) InitDBConnections() (err error) { return err } this.dbVersion = version - if err := this.readCurrentBinlogCoordinates(); err != nil { - return err + if this.initialBinlogCoordinates == nil || this.initialBinlogCoordinates.IsEmpty() { + if err := this.readCurrentBinlogCoordinates(); err != nil { + return err + } } if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil { return err diff --git a/go/sql/builder.go b/go/sql/builder.go index c5063fb1a..f2992f90a 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -123,9 +123,9 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns stmt := fmt.Sprintf(` insert /* gh-ost */ into %s.%s - (gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s) + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s) values - (?, ?, %s, %s)`, + (unix_timestamp(now()), ?, ?, %s, %s)`, databaseName, tableName, strings.Join(minUniqueColNames, ", "), strings.Join(maxUniqueColNames, ", "), diff --git a/localtests/test.sh b/localtests/test.sh index 8a57b80d1..7e5953c10 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -151,7 +151,7 @@ sysbench_prepare() { --mysql-password=opensesame \ --mysql-db=test \ --tables=1 \ - --table-size=20000 \ + --table-size=100000 \ prepare } @@ -254,7 +254,6 @@ test_single() { table_name="gh_ost_test" ghost_table_name="_gh_ost_test_gho" - trap cleanup EXIT INT TERM # test with sysbench oltp write load if [[ "$test_name" == "sysbench" ]]; then if ! command -v sysbench &>/dev/null; then @@ -273,6 +272,7 @@ test_single() { echo -n "Started sysbench (PID $sysbench_pid): " echo $load_cmd fi + trap cleanup SIGINT # cmd="GOTRACEBACK=crash $ghost_binary \ From a292d243c17851a0f301fe857f424a2570715f44 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 22:12:41 -0700 Subject: [PATCH 05/16] add checkpoint file --- go/logic/checkpoint.go | 17 +++++++++++++++++ go/logic/migrator.go | 1 + 2 files changed, 18 insertions(+) create mode 100644 go/logic/checkpoint.go diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go new file mode 100644 index 000000000..73980eac7 --- /dev/null +++ b/go/logic/checkpoint.go @@ -0,0 +1,17 @@ +package logic + +import ( + "github.com/github/gh-ost/go/mysql" + "github.com/github/gh-ost/go/sql" + "time" +) + +// Checkpoint holds state necessary to resume a migration. +type Checkpoint struct { + Id int64 + Timestamp time.Time + LastTrxCoords mysql.BinlogCoordinates + IterationRangeMin *sql.ColumnValues + IterationRangeMax *sql.ColumnValues + Iteration int64 +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 902baa8fd..e1a2e214f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1394,6 +1394,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { + // TODO: doesn't work if no DML events come in coords := this.eventsStreamer.GetCurrentBinlogCoordinates() this.applier.LastIterationRangeMutex.Lock() if this.applier.LastIterationRangeMaxValues == nil || this.applier.LastIterationRangeMinValues == nil { From 9012df0d296e406c7ce6668be8e9a3e0bac5f92f Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 22:25:17 -0700 Subject: [PATCH 06/16] fix unique key args --- go/sql/builder.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/go/sql/builder.go b/go/sql/builder.go index f2992f90a..36f115461 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -146,11 +146,15 @@ func (b *CheckpointInsertQueryBuilder) BuildQuery(uniqueKeyArgs []interface{}) ( return "", nil, fmt.Errorf("args count differs from 2 x unique key column count") } convertedArgs := make([]interface{}, 0, 2*b.uniqueKeyColumns.Len()) - for _, column := range b.uniqueKeyColumns.Columns() { - arg := column.convertArg(column, true) - convertedArgs = append(convertedArgs, arg) + for i, column := range b.uniqueKeyColumns.Columns() { + minArg := column.convertArg(uniqueKeyArgs[i], true) + convertedArgs = append(convertedArgs, minArg) } - return b.preparedStatement, uniqueKeyArgs, nil + for i, column := range b.uniqueKeyColumns.Columns() { + minArg := column.convertArg(uniqueKeyArgs[i+b.uniqueKeyColumns.Len()], true) + convertedArgs = append(convertedArgs, minArg) + } + return b.preparedStatement, convertedArgs, nil } func BuildSetPreparedClause(columns *ColumnList) (result string, err error) { From c1fae0b1eafef407cd4c6b26a0742825c568cae4 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Sun, 12 Oct 2025 22:42:19 -0700 Subject: [PATCH 07/16] update applier coordinates from _ghc heartbeat --- go/logic/applier_test.go | 2 +- go/logic/migrator.go | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 3cb767518..e7e41cf73 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -691,7 +691,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { // checkpoint table is empty _, err = applier.ReadLastCheckpoint() - suite.Require().ErrorIs(err, NoCheckpointFoundError) + suite.Require().ErrorIs(err, ErrNoCheckpointFound) // write a checkpoint and read it back coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(219202907)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e1a2e214f..87d5e42b9 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -201,20 +201,20 @@ func (this *Migrator) canStopStreaming() bool { } // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted. -func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { +func (this *Migrator) onChangelogEvent(dmlEntry *binlog.BinlogEntry) (err error) { // Hey, I created the changelog table, I know the type of columns it has! - switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { + switch hint := dmlEntry.DmlEvent.NewColumnValues.StringColumn(2); hint { case "state": - return this.onChangelogStateEvent(dmlEvent) + return this.onChangelogStateEvent(dmlEntry) case "heartbeat": - return this.onChangelogHeartbeatEvent(dmlEvent) + return this.onChangelogHeartbeatEvent(dmlEntry) default: return nil } } -func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { - changelogStateString := dmlEvent.NewColumnValues.StringColumn(3) +func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err error) { + changelogStateString := dmlEntry.DmlEvent.NewColumnValues.StringColumn(3) changelogState := ReadChangelogState(changelogStateString) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) switch changelogState { @@ -242,14 +242,17 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } -func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { - changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) +func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (err error) { + changelogHeartbeatString := dmlEntry.DmlEvent.NewColumnValues.StringColumn(3) heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) if err != nil { return this.migrationContext.Log.Errore(err) } else { this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + this.applier.CurrentCoordinatesMutex.Lock() + this.applier.CurrentCoordinates = dmlEntry.Coordinates + this.applier.CurrentCoordinatesMutex.Unlock() return nil } } @@ -411,7 +414,7 @@ func (this *Migrator) Migrate() (err error) { // inspectOriginalAndGhostTables must be called before creating checkpoint table. if this.migrationContext.Checkpoint && !this.migrationContext.Resume { if err := this.applier.CreateCheckpointTable(); err != nil { - this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error deatils.") + this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error details.") } } @@ -1157,7 +1160,7 @@ func (this *Migrator) initiateStreaming() error { this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), func(dmlEntry *binlog.BinlogEntry) error { - return this.onChangelogEvent(dmlEntry.DmlEvent) + return this.onChangelogEvent(dmlEntry) }, ) From 93953c93fb6620ad380424da04dd530f725af112 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 13 Oct 2025 08:14:02 -0700 Subject: [PATCH 08/16] fix test --- go/logic/applier.go | 13 ++++---- go/logic/applier_test.go | 5 ++-- go/logic/checkpoint.go | 20 ++++++++++--- go/logic/inspect.go | 2 +- go/logic/migrator.go | 7 +++-- go/logic/migrator_test.go | 63 +++++++++++++++++++++++---------------- localtests/test.sh | 2 +- 7 files changed, 69 insertions(+), 43 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 7f2012f63..9ca45097e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1,5 +1,5 @@ /* - Copyright 2021 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -22,10 +22,11 @@ import ( "database/sql/driver" "errors" + "sync" + "github.com/github/gh-ost/go/mysql" drivermysql "github.com/go-sql-driver/mysql" "github.com/openark/golib/sqlutils" - "sync" ) const ( @@ -33,8 +34,8 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) -// NoCheckpointFoundError is returned when an empty checkpoint table is queried. -var NoCheckpointFoundError = errors.New("no checkpoint found in _ghk table") +// ErrNoCheckpointFound is returned when an empty checkpoint table is queried. +var ErrNoCheckpointFound = errors.New("no checkpoint found in _ghk table") type dmlBuildResult struct { query string @@ -239,7 +240,7 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) { // ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // or attempts to drop them if instructed to. func (this *Applier) ValidateOrDropExistingTables() error { - if this.migrationContext.InitiallyDropGhostTable && !this.migrationContext.Resume { + if this.migrationContext.InitiallyDropGhostTable { if err := this.DropGhostTable(); err != nil { return err } @@ -646,7 +647,7 @@ func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) { err := row.Scan(ptrs...) if err != nil { if errors.Is(err, gosql.ErrNoRows) { - return nil, NoCheckpointFoundError + return nil, ErrNoCheckpointFound } return nil, err } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index e7e41cf73..8896abf6d 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1,7 +1,6 @@ /* - Copyright 2022 GitHub Inc. - See https://github.com/git -hub/gh-ost/blob/master/LICENSE + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE */ package logic diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go index 73980eac7..e01ee4e19 100644 --- a/go/logic/checkpoint.go +++ b/go/logic/checkpoint.go @@ -1,17 +1,29 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + package logic import ( + "time" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" - "time" ) // Checkpoint holds state necessary to resume a migration. type Checkpoint struct { - Id int64 - Timestamp time.Time - LastTrxCoords mysql.BinlogCoordinates + Id int64 + Timestamp time.Time + // LastTrxCoords are coordinates of a transaction + // that has been applied on ghost table. + LastTrxCoords mysql.BinlogCoordinates + // IterationRangeMin is the min shared key value + // for the chunk copier range. IterationRangeMin *sql.ColumnValues + // IterationRangeMax is the max shared key value + // for the chunk copier range. IterationRangeMax *sql.ColumnValues Iteration int64 } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 6b182e91b..044360153 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 87d5e42b9..75baef3cf 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -416,7 +416,6 @@ func (this *Migrator) Migrate() (err error) { if err := this.applier.CreateCheckpointTable(); err != nil { this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error details.") } - } if this.migrationContext.Resume { @@ -1396,8 +1395,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return nil } +// Checkpoint attempts to write a checkpoint of the Migrator's current state. +// It gets the binlog coordinates of the last received trx and waits until the +// applier reaches that trx. At that point it's safe to resume from these func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { - // TODO: doesn't work if no DML events come in coords := this.eventsStreamer.GetCurrentBinlogCoordinates() this.applier.LastIterationRangeMutex.Lock() if this.applier.LastIterationRangeMaxValues == nil || this.applier.LastIterationRangeMinValues == nil { diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 86c060acf..2520e72ce 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -21,12 +21,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/testcontainers/testcontainers-go/modules/mysql" + testmysql "github.com/testcontainers/testcontainers-go/modules/mysql" "runtime" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -35,6 +36,7 @@ import ( func TestMigratorOnChangelogEvent(t *testing.T) { migrationContext := base.NewMigrationContext() migrator := NewMigrator(migrationContext, "1.2.3") + migrator.applier = NewApplier(migrationContext) t.Run("heartbeat", func(t *testing.T) { columnValues := sql.ToColumnValues([]interface{}{ @@ -43,10 +45,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "heartbeat", "2022-08-16T00:45:10.52Z", }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) @@ -66,10 +70,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", AllEventsUpToLockProcessed, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) wg.Wait() }) @@ -85,10 +91,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", GhostTableMigrated, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) @@ -99,11 +107,14 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", Migrated, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) + }) t.Run("state-ReadMigrationRangeValues", func(t *testing.T) { @@ -113,10 +124,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", ReadMigrationRangeValues, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) } @@ -283,11 +296,11 @@ type MigratorTestSuite struct { func (suite *MigratorTestSuite) SetupSuite() { ctx := context.Background() - mysqlContainer, err := mysql.Run(ctx, + mysqlContainer, err := testmysql.Run(ctx, testMysqlContainerImage, - mysql.WithDatabase(testMysqlDatabase), - mysql.WithUsername(testMysqlUser), - mysql.WithPassword(testMysqlPass), + testmysql.WithDatabase(testMysqlDatabase), + testmysql.WithUsername(testMysqlUser), + testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), ) suite.Require().NoError(err) diff --git a/localtests/test.sh b/localtests/test.sh index 7e5953c10..6776d227a 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -151,7 +151,7 @@ sysbench_prepare() { --mysql-password=opensesame \ --mysql-db=test \ --tables=1 \ - --table-size=100000 \ + --table-size=20000 \ prepare } From 02f885fb4d6edc348acc47c965aa66ebfbfc2d72 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 13 Oct 2025 08:47:43 -0700 Subject: [PATCH 09/16] fix linter --- go/logic/migrator_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 2520e72ce..b268054ab 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -114,7 +114,6 @@ func TestMigratorOnChangelogEvent(t *testing.T) { NewColumnValues: columnValues}, Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) - }) t.Run("state-ReadMigrationRangeValues", func(t *testing.T) { From 8c2ad7775dd613bcc1eaaede47e039b0cd1bb87e Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 13 Oct 2025 13:21:12 -0700 Subject: [PATCH 10/16] make checkpoint interval configurable --- go/base/context.go | 1 + go/cmd/gh-ost/main.go | 7 ++++++- go/logic/migrator.go | 11 ++++++----- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index ab9f942f8..35030e30b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -155,6 +155,7 @@ type MigrationContext struct { HooksStatusIntervalSec int64 PanicOnWarnings bool Checkpoint bool + CheckpointIntervalSeconds int64 DropServeSocket bool ServeSocketFile string diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 379aba4cb..0a2a8afa4 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -145,7 +145,8 @@ func main() { flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'") flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'") flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections") - flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Use migration checkpoints") + flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints") + flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints") flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") @@ -286,6 +287,9 @@ func main() { if *storageEngine == "rocksdb" { migrationContext.Log.Warning("RocksDB storage engine support is experimental") } + if migrationContext.CheckpointIntervalSeconds < 10 { + migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10") + } switch *cutOver { case "atomic", "default", "": @@ -318,6 +322,7 @@ func main() { } migrationContext.CliPassword = string(bytePassword) } + migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 75baef3cf..dc2155180 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -26,7 +26,6 @@ var ( ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.") ErrMigrationNotAllowedOnMaster = errors.New("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master.") RetrySleepFn = time.Sleep - checkpointInterval = 10 * time.Second // 5 * time.Minute checkpointTimeout = 2 * time.Second ) @@ -423,11 +422,12 @@ func (this *Migrator) Migrate() (err error) { if err != nil { return this.migrationContext.Log.Errorf("No checkpoint found, unable to resume: %+v", err) } - this.migrationContext.Log.Infof("Resuming from checkpoint coords=%+v range_min=%+v range_max=%+v", - lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String()) + this.migrationContext.Log.Infof("Resuming from checkpoint coords=%+v range_min=%+v range_max=%+v iteration=%d", + lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String(), lastCheckpoint.Iteration) this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + this.migrationContext.Iteration = lastCheckpoint.Iteration this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords if err := this.initiateStreaming(); err != nil { return err @@ -1435,6 +1435,7 @@ func (this *Migrator) checkpointLoop() { this.migrationContext.Log.Debugf("Noop operation; not really checkpointing") return } + checkpointInterval := time.Duration(this.migrationContext.CheckpointIntervalSeconds) * time.Second ticker := time.NewTicker(checkpointInterval) for t := range ticker.C { if atomic.LoadInt64(&this.finishedMigrating) > 0 { @@ -1450,8 +1451,8 @@ func (this *Migrator) checkpointLoop() { this.migrationContext.Log.Errorf("error attempting checkpoint: %+v", err) } } else { - this.migrationContext.Log.Infof("checkpoint success at coords=%+v range_min=%+v range_max=%+v", - chk.LastTrxCoords.DisplayString(), chk.IterationRangeMin.String(), chk.IterationRangeMax.String()) + this.migrationContext.Log.Infof("checkpoint success at coords=%+v range_min=%+v range_max=%+v iteration=%d", + chk.LastTrxCoords.DisplayString(), chk.IterationRangeMin.String(), chk.IterationRangeMax.String(), chk.Iteration) } cancel() } From d4ac082b9b071799690e3cc25d96f073edc6f5ec Mon Sep 17 00:00:00 2001 From: meiji163 Date: Mon, 13 Oct 2025 22:20:42 -0700 Subject: [PATCH 11/16] write checkpoint iteration number --- go/logic/migrator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index dc2155180..4f5d34549 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1406,6 +1406,7 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { return nil, errors.New("iteration range is empty, not checkpointing...") } chk := &Checkpoint{ + Iteration: this.migrationContext.GetIteration(), IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(), IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(), LastTrxCoords: coords, From 43e0d2c8e48da9d806515ea6a07bbbfe372392d4 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Tue, 14 Oct 2025 09:20:24 -0700 Subject: [PATCH 12/16] store rows copied & dml applied --- go/logic/applier.go | 6 ++++-- go/logic/applier_test.go | 4 ++++ go/logic/checkpoint.go | 2 ++ go/logic/migrator.go | 6 +++++- go/sql/builder.go | 8 ++++++-- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 9ca45097e..337d5de60 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -435,6 +435,8 @@ func (this *Applier) CreateCheckpointTable() error { "`gh_ost_chk_timestamp` bigint", "`gh_ost_chk_coords` varchar(4096)", "`gh_ost_chk_iteration` bigint", + "`gh_ost_rows_copied` bigint", + "`gh_ost_dml_applied` bigint", } for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { if col.MySQLType == "" { @@ -623,7 +625,7 @@ func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { if err != nil { return insertId, err } - args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration) + args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied) args = append(args, uniqueKeyArgs...) res, err := this.db.Exec(query, args...) if err != nil { @@ -641,7 +643,7 @@ func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) { var coordStr string var timestamp int64 - ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration} + ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied} ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 8896abf6d..232349d16 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -700,6 +700,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, Iteration: 2, + RowsCopied: 100000, + DMLApplied: 200000, } id, err := applier.WriteCheckpoint(chk) suite.Require().NoError(err) @@ -712,6 +714,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) + suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) + suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) } func TestApplier(t *testing.T) { diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go index e01ee4e19..69cc2bd20 100644 --- a/go/logic/checkpoint.go +++ b/go/logic/checkpoint.go @@ -26,4 +26,6 @@ type Checkpoint struct { // for the chunk copier range. IterationRangeMax *sql.ColumnValues Iteration int64 + RowsCopied int64 + DMLApplied int64 } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4f5d34549..561e276d5 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -428,6 +428,8 @@ func (this *Migrator) Migrate() (err error) { this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax this.migrationContext.Iteration = lastCheckpoint.Iteration + this.migrationContext.TotalRowsCopied = lastCheckpoint.RowsCopied + this.migrationContext.TotalDMLEventsApplied = lastCheckpoint.DMLApplied this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords if err := this.initiateStreaming(); err != nil { return err @@ -1397,7 +1399,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { // Checkpoint attempts to write a checkpoint of the Migrator's current state. // It gets the binlog coordinates of the last received trx and waits until the -// applier reaches that trx. At that point it's safe to resume from these +// applier reaches that trx. At that point it's safe to resume from these coordinates. func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { coords := this.eventsStreamer.GetCurrentBinlogCoordinates() this.applier.LastIterationRangeMutex.Lock() @@ -1410,6 +1412,8 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(), IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(), LastTrxCoords: coords, + RowsCopied: atomic.LoadInt64(&this.migrationContext.TotalRowsCopied), + DMLApplied: atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), } this.applier.LastIterationRangeMutex.Unlock() diff --git a/go/sql/builder.go b/go/sql/builder.go index 36f115461..800c477b6 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -123,9 +123,13 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns stmt := fmt.Sprintf(` insert /* gh-ost */ into %s.%s - (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, %s, %s) + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, + %s, %s) values - (unix_timestamp(now()), ?, ?, %s, %s)`, + (unix_timestamp(now()), ?, ?, + ?, ?, + %s, %s)`, databaseName, tableName, strings.Join(minUniqueColNames, ", "), strings.Join(maxUniqueColNames, ", "), From 4c7ed5f307374cfe767e48b2850c5ac0fcd45642 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Tue, 14 Oct 2025 15:32:04 -0700 Subject: [PATCH 13/16] truncate column name if necessary --- go/logic/applier.go | 6 ++++-- go/sql/builder.go | 22 ++++++++++++++++++++-- go/sql/builder_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 337d5de60..3fe7f2287 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -442,7 +442,8 @@ func (this *Applier) CreateCheckpointTable() error { if col.MySQLType == "" { return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) } - colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_min"), col.MySQLType) + minColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_min" + colDef := fmt.Sprintf("%s %s", sql.EscapeName(minColName), col.MySQLType) if !col.Nullable { colDef += " NOT NULL" } @@ -450,7 +451,8 @@ func (this *Applier) CreateCheckpointTable() error { } for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { - colDef := fmt.Sprintf("%s %s", sql.EscapeName(col.Name+"_max"), col.MySQLType) + maxColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_max" + colDef := fmt.Sprintf("%s %s", sql.EscapeName(maxColName), col.MySQLType) if !col.Nullable { colDef += " NOT NULL" } diff --git a/go/sql/builder.go b/go/sql/builder.go index 800c477b6..757d74910 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -20,6 +20,7 @@ const ( GreaterThanOrEqualsComparisonSign ValueComparisonSign = ">=" GreaterThanComparisonSign ValueComparisonSign = ">" NotEqualsComparisonSign ValueComparisonSign = "!=" + MaxColumnNameLength = 64 ) // EscapeName will escape a db/table/column/... name by wrapping with backticks. @@ -32,6 +33,21 @@ func EscapeName(name string) string { return fmt.Sprintf("`%s`", name) } +// TruncateColumnName truncates a name so it can be used as a MySQL +// column name, taking into account UTF-8 characters. +func TruncateColumnName(name string, limit int) string { + truncatedName := name + chars := 0 + for byteIdx := range name { + if chars >= limit { + truncatedName = name[:byteIdx] + break + } + chars++ + } + return truncatedName +} + func buildColumnsPreparedValues(columns *ColumnList) []string { values := make([]string, columns.Len()) for i, column := range columns.Columns() { @@ -115,8 +131,10 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns minUniqueColNames := []string{} maxUniqueColNames := []string{} for _, name := range uniqueKeyColumns.Names() { - minUniqueColNames = append(minUniqueColNames, name+"_min") - maxUniqueColNames = append(maxUniqueColNames, name+"_max") + minColName := TruncateColumnName(name, MaxColumnNameLength-4) + "_min" + maxColName := TruncateColumnName(name, MaxColumnNameLength-4) + "_max" + minUniqueColNames = append(minUniqueColNames, minColName) + maxUniqueColNames = append(maxUniqueColNames, maxColName) } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a6735a324..06e402c89 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -790,3 +790,28 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { require.Equal(t, []interface{}{uint8(253)}, uniqueKeyArgs) } } + +func TestCheckpointQueryBuilder(t *testing.T) { + databaseName := "mydb" + tableName := "_tbl_ghk" + valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} + uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns) + require.NoError(t, err) + query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) + require.NoError(t, err) + expected := ` + insert /* gh-ost */ into mydb._tbl_ghk + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, + name_min, position_min, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_min, + name_max, position_max, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_max) + values + (unix_timestamp(now()), ?, ?, + ?, ?, + ?, ?, ?, + ?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) +} From 52843015f51ffe3c14988dcfe502c987fa7eb508 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 15 Oct 2025 10:59:31 -0700 Subject: [PATCH 14/16] drop checkpoint table for final cleanup --- go/logic/migrator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 561e276d5..33271d01a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1541,6 +1541,9 @@ func (this *Migrator) finalCleanup() error { if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { return err } + if err := this.retryOperation(this.applier.DropCheckpointTable); err != nil { + return err + } if this.migrationContext.OkToDropTable && !this.migrationContext.TestOnReplica { if err := this.retryOperation(this.applier.DropOldTable); err != nil { return err From 5662d9b94b62c9ba0d250715886bf60a0feb3e1c Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 15 Oct 2025 13:10:37 -0700 Subject: [PATCH 15/16] add docs --- doc/command-line-flags.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 7b5efd9fb..c706a4fe4 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -64,6 +64,15 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant ### binlogsyncer-max-reconnect-attempts `--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0` +### checkpoint + +`--checkpoint` enables periodic checkpoints of the gh-ost's state so that gh-ost can resume a migration from the checkpoint with `--resume`. Checkpoints are written to a separate table named `_${original_table_name}_ghk`. It is recommended to use with `--gtid` for checkpoints. +See also: [`resuming-migrations`](resume.md) + +### checkpoint-seconds + +`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: @@ -226,6 +235,11 @@ Optionally involve the process ID, for example: `--replica-server-id=$((10000000 It's on you to choose a number that does not collide with another `gh-ost` or another running replica. See also: [`concurrent-migrations`](cheatsheet.md#concurrent-migrations) on the cheatsheet. +### resume + +`--resume` attempts to resume a migration that was previously interrupted from the last checkpoint. The first `gh-ost` invocation must run with `--checkpoint` and have successfully written a checkpoint in order for `--resume` to work. +See also: [`resuming-migrations`](resume.md) + ### serve-socket-file Defaults to an auto-determined and advertised upon startup file. Defines Unix socket file to serve on. From bcd19da16b08900288ccdc7202ccf5b675162da7 Mon Sep 17 00:00:00 2001 From: meiji163 Date: Wed, 15 Oct 2025 13:12:57 -0700 Subject: [PATCH 16/16] add resume doc --- doc/resume.md | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 doc/resume.md diff --git a/doc/resume.md b/doc/resume.md new file mode 100644 index 000000000..46d6ac909 --- /dev/null +++ b/doc/resume.md @@ -0,0 +1,42 @@ +# Resuming Migrations + +`gh-ost` can attempt to resume an interrupted migration from a checkpoint if the following conditions are met: +- The first `gh-ost` process was invoked with `--checkpoint` +- The first `gh-ost` process had at least one successful checkpoint +- The binlogs from the last checkpoint's binlog coordinates still exist on the replica gh-ost is inspecting (specified by `--host`) + +To resume, invoke `gh-ost` again with the same arguments with the `--resume` flag. + +> [!WARNING] +> It is recommended use `--checkpoint` with `--gtid` enabled so that checkpoint binlog coordinates store GTID sets rather than file positions. In that case, `gh-ost` can resume using a different replica than it originally attached to. + +## Example +The migration starts with a `gh-ost` invocation such as: +```shell +gh-ost \ +--chunk-size=100 \ +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--alter="add column mycol varchar(20)" +--gtid \ +--checkpoint \ +--checkpoint-seconds=60 \ +--execute +``` + +In this example `gh-ost` writes a checkpoint to a table `_mytable_ghk` every 60 seconds. After `gh-ost` is interrupted/killed, the migration can be resumed with: +```shell +# resume migration +gh-ost \ +--chunk-size=100 +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--alter="add column mycol varchar(20)" +--gtid \ +--resume \ +--execute +``` + +`gh-ost` then reconnects at the binlog coordinates of the last checkpoint and resumes copying rows at the chunk specified by the checkpoint. The data integrity of the ghost table is preserved because `gh-ost` applies row DMLs and copies row in an idempotent way.