Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,28 @@ type MigrationContext struct {
SkipMetadataLockCheck bool
IsOpenMetadataLockInstruments bool

// MTS parallel apply configuration
NumWorkers int
BinlogHasLogicalTimestamps bool
LogicalTimestampsDetected chan struct{}
logicalTimestampsDetectOnce sync.Once

Log Logger
}

// NotifyLogicalTimestampsDetection closes LogicalTimestampsDetected once so MTS
// startup can proceed. found=true when binlog carries logical timestamps (MySQL 5.7+).
func (mctx *MigrationContext) NotifyLogicalTimestampsDetection(found bool) {
mctx.logicalTimestampsDetectOnce.Do(func() {
if found {
mctx.BinlogHasLogicalTimestamps = true
}
if mctx.LogicalTimestampsDetected != nil {
close(mctx.LogicalTimestampsDetected)
}
})
}

type Logger interface {
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Expand Down
6 changes: 4 additions & 2 deletions go/binlog/binlog_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

// BinlogEntry describes an entry in the binary log
type BinlogEntry struct {
Coordinates mysql.BinlogCoordinates
DmlEvent *BinlogDMLEvent
Coordinates mysql.BinlogCoordinates
DmlEvent *BinlogDMLEvent
LastCommitted int64 // logical timestamp of commit parent (0 = SEQ_UNINIT, unavailable)
SequenceNumber int64 // monotonically increasing logical timestamp (0 = SEQ_UNINIT)
}

// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
Expand Down
19 changes: 17 additions & 2 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (gmr *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates
return gmr.currentCoordinates.Clone()
}

func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry, lastCommitted int64, sequenceNumber int64) error {
currentCoords := gmr.GetCurrentBinlogCoordinates()
dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
Expand All @@ -98,6 +98,8 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent
continue
}
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.LastCommitted = lastCommitted
binlogEntry.SequenceNumber = sequenceNumber
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
Expand Down Expand Up @@ -130,6 +132,10 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent

// StreamEvents
func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
var currentLastCommitted int64
var currentSequenceNumber int64
logicalTimestampsDetected := false

for !canStopStreaming() {
ev, err := gmr.binlogStreamer.GetEvent(context.Background())
if err != nil {
Expand All @@ -156,6 +162,14 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
if !gmr.migrationContext.UseGTIDs {
continue
}
// Capture logical timestamps for MTS dependency tracking
currentLastCommitted = event.LastCommitted
currentSequenceNumber = event.SequenceNumber
// Detect whether binlog contains logical timestamps (MySQL 5.7+)
if !logicalTimestampsDetected && (event.LastCommitted > 0 || event.SequenceNumber > 0) {
logicalTimestampsDetected = true
gmr.migrationContext.NotifyLogicalTimestampsDetection(true)
}
sid, err := uuid.FromBytes(event.SID)
if err != nil {
return err
Expand Down Expand Up @@ -184,12 +198,13 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
gmr.LastTrxCoords = gmr.currentCoordinates.Clone()
}
case *replication.RowsEvent:
if err := gmr.handleRowsEvent(ev, event, entriesChannel); err != nil {
if err := gmr.handleRowsEvent(ev, event, entriesChannel, currentLastCommitted, currentSequenceNumber); err != nil {
return err
}
}
}
gmr.migrationContext.Log.Debugf("done streaming events")
gmr.migrationContext.NotifyLogicalTimestampsDetection(logicalTimestampsDetected)

return nil
}
Expand Down
9 changes: 9 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func main() {
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)")
numWorkers := flag.Int("num-workers", 1, "number of parallel DML apply workers (MTS mode). Requires MySQL 5.7+ with binlog logical timestamps. Default: 1 (single-threaded, backward compatible)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
Expand Down Expand Up @@ -377,6 +378,14 @@ func main() {
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
if *numWorkers < 1 {
migrationContext.Log.Warningf("invalid --num-workers=%d; using 1", *numWorkers)
*numWorkers = 1
}
migrationContext.NumWorkers = *numWorkers
if migrationContext.NumWorkers > 1 {
migrationContext.LogicalTimestampsDetected = make(chan struct{})
}
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetThrottleHTTP(*throttleHTTP)
migrationContext.SetIgnoreHTTPErrors(*ignoreHTTPErrors)
Expand Down
201 changes: 139 additions & 62 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,18 @@ func (apl *Applier) releaseMigrationLock() {
apl.migrationLockConn = nil
}

// adoptDMLQueryBuildersFrom shares read-only DML query builders from the primary applier.
// The primary applier must have called prepareQueries() before MTS workers start.
func (apl *Applier) adoptDMLQueryBuildersFrom(source *Applier) error {
if source.dmlInsertQueryBuilder == nil {
return fmt.Errorf("primary applier DML query builders are not prepared")
}
apl.dmlDeleteQueryBuilder = source.dmlDeleteQueryBuilder
apl.dmlInsertQueryBuilder = source.dmlInsertQueryBuilder
apl.dmlUpdateQueryBuilder = source.dmlUpdateQueryBuilder
return nil
}

func (apl *Applier) prepareQueries() (err error) {
if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
Expand Down Expand Up @@ -462,6 +474,31 @@ func (apl *Applier) AttemptInstantDDL() error {
}, apl.migrationContext.MaxRetries(), apl.migrationContext.Log)
}

// isDeadlockError checks whether the given error is a MySQL InnoDB deadlock (errno 1213).
func isDeadlockError(err error) bool {
var mysqlErr *drivermysql.MySQLError
return errors.As(err, &mysqlErr) && mysqlErr.Number == 1213
}

// isRetryableApplyError reports whether a failed DML apply is a transient
// concurrency error that should be retried. Under concurrent MTS workers the
// following errors are expected: InnoDB deadlocks (1213), lock-wait timeouts
// (1205), and NOWAIT lock failures (3572). Gap locks on the ghost table's
// secondary indexes cause contention between parallel statements. All are
// resolved by retrying the whole transaction, mirroring MySQL's
// slave_transaction_retries behaviour.
func isRetryableApplyError(err error) bool {
var mysqlErr *drivermysql.MySQLError
if !errors.As(err, &mysqlErr) {
return false
}
switch mysqlErr.Number {
case 1205, 1213, 3572:
return true
}
return false
}

// retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout
// (errno 1205). Non-timeout errors return immediately. This is used for instant
// DDL attempts where the operation may be blocked by a long-running transaction.
Expand Down Expand Up @@ -1069,6 +1106,25 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
return hasFurtherRange, nil
}

// rowCopyMaxTransientRetries bounds internal retries of the chunk-INSERT query on
// transient lock errors (lock-wait timeout 1205, deadlock 1213). In single-threaded
// mode on MySQL 8.x, NOWAIT (3572) can also occur and is retried here.
const rowCopyMaxTransientRetries = 20

// rowCopyUsesNoWait reports whether row-copy should use SELECT ... FOR SHARE NOWAIT.
// On MySQL 8.x transactional tables, NOWAIT fails fast instead of blocking behind
// concurrent DML. Under MTS (NumWorkers > 1), lock contention with parallel DML
// workers is expected; row-copy waits via LOCK IN SHARE MODE instead.
func rowCopyUsesNoWait(migrationContext *base.MigrationContext) bool {
if !strings.HasPrefix(migrationContext.ApplierMySQLVersion, "8.") {
return false
}
if migrationContext.NumWorkers > 1 {
return false
}
return true
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
Expand All @@ -1087,68 +1143,29 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
apl.migrationContext.GetIteration() == 0,
apl.migrationContext.IsTransactionalTable(),
// TODO: Don't hardcode this
strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."),
rowCopyUsesNoWait(apl.migrationContext),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}

sqlResult, err := func() (gosql.Result, error) {
tx, err := apl.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()

sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone)
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery())

if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
var sqlResult gosql.Result
for attempt := 0; attempt < rowCopyMaxTransientRetries; attempt++ {
sqlResult, err = apl.executeChunkInsertTx(query, explodedArgs)
if err == nil {
break
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
if !isRetryableApplyError(err) {
return chunkSize, rowsAffected, duration, err
}

if apl.migrationContext.PanicOnWarnings {
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex()
if err != nil {
return nil, err
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
if attempt == rowCopyMaxTransientRetries-1 {
// Exhausted internal retries; propagate to outer retryOperation.
break
}

if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
apl.migrationContext.Log.Infof("Retrying row-copy after transient lock error (attempt %d/%d): %v",
attempt+2, rowCopyMaxTransientRetries, err)
RetrySleepFn(time.Duration(100+attempt*50) * time.Millisecond)
}

if err != nil {
return chunkSize, rowsAffected, duration, err
Expand All @@ -1164,6 +1181,63 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
return chunkSize, rowsAffected, duration, nil
}

// executeChunkInsertTx runs a single chunk-INSERT transaction (Begin/Exec/Commit).
func (apl *Applier) executeChunkInsertTx(query string, explodedArgs []interface{}) (gosql.Result, error) {
tx, err := apl.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()

sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone)
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery())

if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}

if apl.migrationContext.PanicOnWarnings {
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex()
if err != nil {
return nil, err
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}

if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}

// LockOriginalTable places a write lock on the original table
func (apl *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
Expand Down Expand Up @@ -1660,12 +1734,16 @@ func (apl *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBu
case binlog.UpdateDML:
{
if _, isModified := apl.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, apl.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, apl.buildDMLEventQuery(dmlEvent)...)
return results
// A unique-key-modifying UPDATE is split into DELETE + INSERT.
// We must NOT mutate dmlEvent.DML here: the same event may be
// re-applied (e.g. MTS deadlock retry), and a mutated DML would
// be rebuilt as the wrong statement, corrupting the ghost table.
deleteQuery, deleteArgs, deleteErr := apl.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
insertQuery, insertArgs, insertErr := apl.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{
newDmlBuildResult(deleteQuery, deleteArgs, -1, deleteErr),
newDmlBuildResult(insertQuery, insertArgs, 1, insertErr),
}
}
query, updateArgs, err := apl.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
Expand Down Expand Up @@ -1778,9 +1856,8 @@ func (apl *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gos
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
func (apl *Applier) ApplyDMLEventQueries(ctx context.Context, dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
ctx := context.Background()

err := func() error {
conn, err := apl.db.Conn(ctx)
Expand Down
Loading
Loading