From 66894d3a52698a2f094f29bfb5d6c172886fccb2 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 18 Dec 2016 09:23:51 +0200 Subject: [PATCH 01/28] resurrection: dump/restore of migration context cross executions --- go/base/context.go | 23 +++++++++++++++++++---- go/logic/migrator.go | 36 +++++++++++++++++++++++++++++++----- go/logic/server.go | 6 ++++-- go/logic/throttler.go | 10 ++++++---- go/sql/types.go | 6 ++++++ 5 files changed, 66 insertions(+), 15 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 606cb6983..47e233f92 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -6,7 +6,9 @@ package base import ( + "encoding/json" "fmt" + "io/ioutil" "os" "regexp" "strings" @@ -158,7 +160,6 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 - PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList @@ -174,8 +175,6 @@ type MigrationContext struct { Iteration int64 MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues - - CanStopStreaming func() bool } type ContextConfig struct { @@ -212,7 +211,6 @@ func newMigrationContext() *MigrationContext { configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error), } } @@ -221,6 +219,23 @@ func GetMigrationContext() *MigrationContext { return context } +// ToJSON exports this config to JSON string +func (this *MigrationContext) ToJSON() (string, error) { + b, err := json.Marshal(this) + return string(b), err +} + +// DumpJSON exports this config to JSON string and writes it to file +func (this *MigrationContext) DumpJSON() (fileName string, err error) { + jsonBytes, err := json.Marshal(this) + if err != nil { + return fileName, err + } + fileName = fmt.Sprintf("%s/gh-ost.%s.%d.context.json", "/tmp", this.OriginalTableName, this.ElapsedTime()) + err = ioutil.WriteFile(fileName, jsonBytes, 0644) + return fileName, err +} + // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) GetGhostTableName() string { return fmt.Sprintf("_%s_gho", this.OriginalTableName) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 8f61425b8..5382955f4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -74,6 +74,8 @@ type Migrator struct { applyEventsQueue chan tableWriteFunc handledChangelogStates map[string]bool + contextDumpFiles []string + panicAbort chan error } func NewMigrator() *Migrator { @@ -88,6 +90,9 @@ func NewMigrator() *Migrator { copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), handledChangelogStates: make(map[string]bool), + + contextDumpFiles: []string{}, + panicAbort: make(chan error), } return migrator } @@ -116,6 +121,24 @@ func (this *Migrator) initiateHooksExecutor() (err error) { return nil } +// initiateContextDump +func (this *Migrator) initiateContextDump() (err error) { + go func() { + contextDumpTick := time.Tick(1 * time.Minute) + for range contextDumpTick { + if dumpFile, err := this.migrationContext.DumpJSON(); err == nil { + this.contextDumpFiles = append(this.contextDumpFiles, dumpFile) + if len(this.contextDumpFiles) > 2 { + oldDumpFile := this.contextDumpFiles[0] + this.contextDumpFiles = this.contextDumpFiles[1:] + os.Remove(oldDumpFile) + } + } + } + }() + return nil +} + // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { @@ -147,7 +170,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // there's an error. Let's try again. } if len(notFatalHint) == 0 { - this.migrationContext.PanicAbort <- err + this.panicAbort <- err } return err } @@ -218,7 +241,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er // listenOnPanicAbort aborts on abort request func (this *Migrator) listenOnPanicAbort() { - err := <-this.migrationContext.PanicAbort + err := <-this.panicAbort log.Fatale(err) } @@ -278,6 +301,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateHooksExecutor(); err != nil { return err } + if err := this.initiateContextDump(); err != nil { + return err + } if err := this.hooksExecutor.onStartup(); err != nil { return err } @@ -608,7 +634,7 @@ func (this *Migrator) initiateServer() (err error) { var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { this.printStatus(rule, writer) } - this.server = NewServer(this.hooksExecutor, f) + this.server = NewServer(this.hooksExecutor, f, this.panicAbort) if err := this.server.BindSocketFile(); err != nil { return err } @@ -896,7 +922,7 @@ func (this *Migrator) initiateStreaming() error { log.Debugf("Beginning streaming") err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err != nil { - this.migrationContext.PanicAbort <- err + this.panicAbort <- err } log.Debugf("Done streaming") }() @@ -924,7 +950,7 @@ func (this *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (this *Migrator) initiateThrottler() error { - this.throttler = NewThrottler(this.applier, this.inspector) + this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) log.Infof("Waiting for first throttle metrics to be collected") diff --git a/go/logic/server.go b/go/logic/server.go index b8c0f2cb7..d5c13b96c 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -28,13 +28,15 @@ type Server struct { tcpListener net.Listener hooksExecutor *HooksExecutor printStatus printStatusFunc + panicAbort chan error } -func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { +func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc, panicAbort chan error) *Server { return &Server{ migrationContext: base.GetMigrationContext(), hooksExecutor: hooksExecutor, printStatus: printStatus, + panicAbort: panicAbort, } } @@ -251,7 +253,7 @@ help # This message case "panic": { err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!") - this.migrationContext.PanicAbort <- err + this.panicAbort <- err return NoPrintStatusRule, err } default: diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 482087ca4..9f8b5ccd9 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -21,13 +21,15 @@ type Throttler struct { migrationContext *base.MigrationContext applier *Applier inspector *Inspector + panicAbort chan error } -func NewThrottler(applier *Applier, inspector *Inspector) *Throttler { +func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler { return &Throttler{ migrationContext: base.GetMigrationContext(), applier: applier, inspector: inspector, + panicAbort: panicAbort, } } @@ -155,7 +157,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // Regardless of throttle, we take opportunity to check for panic-abort if this.migrationContext.PanicFlagFile != "" { if base.FileExists(this.migrationContext.PanicFlagFile) { - this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) } } @@ -164,7 +166,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint) } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) @@ -172,7 +174,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) <-timer.C if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) + this.panicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) } }() } diff --git a/go/sql/types.go b/go/sql/types.go index 720f92f5a..541c391c6 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -6,6 +6,7 @@ package sql import ( + "encoding/json" "fmt" "reflect" "strconv" @@ -247,6 +248,11 @@ func ToColumnValues(abstractValues []interface{}) *ColumnValues { return result } +// MarshalJSON will marshal this object as JSON +func (this *ColumnValues) MarshalJSON() ([]byte, error) { + return json.Marshal(this.abstractValues) +} + func (this *ColumnValues) AbstractValues() []interface{} { return this.abstractValues } From 75b6f9edf2c73f6662fd1d1056ace71558a32fbd Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 15:48:42 +0200 Subject: [PATCH 02/28] encoding range values as base64 --- go/base/context.go | 15 +++++++++++++++ go/logic/inspect.go | 2 +- go/logic/migrator.go | 4 +++- go/sql/types.go | 31 ++++++++++++++++++++++++++++++- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 47e233f92..2d9eaf700 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -175,6 +175,7 @@ type MigrationContext struct { Iteration int64 MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues + EncodedRangeValues map[string]string } type ContextConfig struct { @@ -211,6 +212,7 @@ func newMigrationContext() *MigrationContext { configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), + EncodedRangeValues: make(map[string]string), } } @@ -227,12 +229,25 @@ func (this *MigrationContext) ToJSON() (string, error) { // DumpJSON exports this config to JSON string and writes it to file func (this *MigrationContext) DumpJSON() (fileName string, err error) { + if this.MigrationRangeMinValues != nil { + this.EncodedRangeValues["MigrationRangeMinValues"], _ = this.MigrationRangeMinValues.ToBase64() + } + if this.MigrationRangeMaxValues != nil { + this.EncodedRangeValues["MigrationRangeMaxValues"], _ = this.MigrationRangeMaxValues.ToBase64() + } + if this.MigrationIterationRangeMinValues != nil { + this.EncodedRangeValues["MigrationIterationRangeMinValues"], _ = this.MigrationIterationRangeMinValues.ToBase64() + } + if this.MigrationIterationRangeMaxValues != nil { + this.EncodedRangeValues["MigrationIterationRangeMaxValues"], _ = this.MigrationIterationRangeMaxValues.ToBase64() + } jsonBytes, err := json.Marshal(this) if err != nil { return fileName, err } fileName = fmt.Sprintf("%s/gh-ost.%s.%d.context.json", "/tmp", this.OriginalTableName, this.ElapsedTime()) err = ioutil.WriteFile(fileName, jsonBytes, 0644) + return fileName, err } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 19db8a3b9..61a887f97 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -124,7 +124,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") } this.migrationContext.UniqueKey = sharedUniqueKeys[0] - log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) + log.Infof("Chosen shared unique key is %+v", this.migrationContext.UniqueKey) if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.NullableUniqueKeyAllowed { log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5382955f4..7c55b80b6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -31,6 +31,8 @@ const ( AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" ) +const contextDumpInterval time.Duration = 1 * time.Second + func ReadChangelogState(s string) ChangelogState { return ChangelogState(strings.Split(s, ":")[0]) } @@ -124,7 +126,7 @@ func (this *Migrator) initiateHooksExecutor() (err error) { // initiateContextDump func (this *Migrator) initiateContextDump() (err error) { go func() { - contextDumpTick := time.Tick(1 * time.Minute) + contextDumpTick := time.Tick(contextDumpInterval) for range contextDumpTick { if dumpFile, err := this.migrationContext.DumpJSON(); err == nil { this.contextDumpFiles = append(this.contextDumpFiles, dumpFile) diff --git a/go/sql/types.go b/go/sql/types.go index 541c391c6..82ecc1f94 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -6,6 +6,9 @@ package sql import ( + "bytes" + "encoding/base64" + "encoding/gob" "encoding/json" "fmt" "reflect" @@ -216,7 +219,7 @@ func (this *UniqueKey) String() string { if this.IsAutoIncrement { description = fmt.Sprintf("%s (auto_increment)", description) } - return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable) + return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.String(), this.HasNullable) } type ColumnValues struct { @@ -248,6 +251,32 @@ func ToColumnValues(abstractValues []interface{}) *ColumnValues { return result } +func NewColumnValuesFromBase64(b64 string) (columnValues *ColumnValues, err error) { + var abstractValues []interface{} + + b, err := base64.StdEncoding.DecodeString(b64) + if err != nil { + return nil, err + } + buff := bytes.Buffer{} + buff.Write(b) + decoder := gob.NewDecoder(&buff) + err = decoder.Decode(&abstractValues) + if err != nil { + return nil, err + } + return ToColumnValues(abstractValues), nil +} + +func (this *ColumnValues) ToBase64() (b64 string, err error) { + buff := bytes.Buffer{} + encoder := gob.NewEncoder(&buff) + if err = encoder.Encode(this.abstractValues); err != nil { + return b64, err + } + return base64.StdEncoding.EncodeToString(buff.Bytes()), nil +} + // MarshalJSON will marshal this object as JSON func (this *ColumnValues) MarshalJSON() ([]byte, error) { return json.Marshal(this.abstractValues) From 6999b4e8bf4e1fef039fd4e2e288cd9a63e463a6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 16:27:05 +0200 Subject: [PATCH 03/28] exporting to changelog table, not to file --- go/base/context.go | 16 +++------------- go/logic/applier.go | 6 ++++-- go/logic/migrator.go | 9 ++------- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 2d9eaf700..9463266ca 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -8,7 +8,6 @@ package base import ( "encoding/json" "fmt" - "io/ioutil" "os" "regexp" "strings" @@ -221,14 +220,8 @@ func GetMigrationContext() *MigrationContext { return context } -// ToJSON exports this config to JSON string -func (this *MigrationContext) ToJSON() (string, error) { - b, err := json.Marshal(this) - return string(b), err -} - // DumpJSON exports this config to JSON string and writes it to file -func (this *MigrationContext) DumpJSON() (fileName string, err error) { +func (this *MigrationContext) ToJSON() (string, error) { if this.MigrationRangeMinValues != nil { this.EncodedRangeValues["MigrationRangeMinValues"], _ = this.MigrationRangeMinValues.ToBase64() } @@ -243,12 +236,9 @@ func (this *MigrationContext) DumpJSON() (fileName string, err error) { } jsonBytes, err := json.Marshal(this) if err != nil { - return fileName, err + return "", err } - fileName = fmt.Sprintf("%s/gh-ost.%s.%d.context.json", "/tmp", this.OriginalTableName, this.ElapsedTime()) - err = ioutil.WriteFile(fileName, jsonBytes, 0644) - - return fileName, err + return string(jsonBytes), nil } // GetGhostTableName generates the name of ghost table, based on original table name diff --git a/go/logic/applier.go b/go/logic/applier.go index afff578ad..3dfce2b10 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -195,7 +195,7 @@ func (this *Applier) CreateChangelogTable() error { id bigint auto_increment, last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, hint varchar(64) charset ascii not null, - value varchar(255) charset ascii not null, + value text charset ascii not null, primary key(id), unique key hint_uidx(hint) ) auto_increment=256 @@ -220,7 +220,7 @@ func (this *Applier) dropTable(tableName string) error { sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(tableName), ) - log.Infof("Droppping table %s.%s", + log.Infof("Dropping table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(tableName), ) @@ -257,6 +257,8 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) { explicitId = 2 case "throttle": explicitId = 3 + case "context": + explicitId = 4 } query := fmt.Sprintf(` insert /* gh-ost */ into %s.%s diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7c55b80b6..08281992b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -128,13 +128,8 @@ func (this *Migrator) initiateContextDump() (err error) { go func() { contextDumpTick := time.Tick(contextDumpInterval) for range contextDumpTick { - if dumpFile, err := this.migrationContext.DumpJSON(); err == nil { - this.contextDumpFiles = append(this.contextDumpFiles, dumpFile) - if len(this.contextDumpFiles) > 2 { - oldDumpFile := this.contextDumpFiles[0] - this.contextDumpFiles = this.contextDumpFiles[1:] - os.Remove(oldDumpFile) - } + if jsonString, err := this.migrationContext.ToJSON(); err == nil { + this.applier.WriteChangelog("context", jsonString) } } }() From 3223a9389e5fd322c45b04cb0169f85576febdb2 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 16:38:58 +0200 Subject: [PATCH 04/28] context dump serialized with table writes; avoiding sync problems --- go/base/context.go | 3 +++ go/logic/migrator.go | 24 ++++++++---------------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 5546af045..d4a373ca9 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -224,6 +224,9 @@ func GetMigrationContext() *MigrationContext { // DumpJSON exports this config to JSON string and writes it to file func (this *MigrationContext) ToJSON() (string, error) { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + if this.MigrationRangeMinValues != nil { this.EncodedRangeValues["MigrationRangeMinValues"], _ = this.MigrationRangeMinValues.ToBase64() } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 9f0dc17b5..641c1f51e 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -123,19 +123,6 @@ func (this *Migrator) initiateHooksExecutor() (err error) { return nil } -// initiateContextDump -func (this *Migrator) initiateContextDump() (err error) { - go func() { - contextDumpTick := time.Tick(contextDumpInterval) - for range contextDumpTick { - if jsonString, err := this.migrationContext.ToJSON(); err == nil { - this.applier.WriteChangelog("context", jsonString) - } - } - }() - return nil -} - // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { @@ -298,9 +285,6 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateHooksExecutor(); err != nil { return err } - if err := this.initiateContextDump(); err != nil { - return err - } if err := this.hooksExecutor.onStartup(); err != nil { return err } @@ -1049,12 +1033,20 @@ func (this *Migrator) executeWriteFuncs() error { log.Debugf("Noop operation; not really executing write funcs") return nil } + contextDumpTick := time.Tick(contextDumpInterval) for { this.throttler.throttle(nil) // We give higher priority to event processing, then secondary priority to // rowcopy select { + case <-contextDumpTick: + { + if jsonString, err := this.migrationContext.ToJSON(); err == nil { + this.applier.WriteChangelog("context", jsonString) + log.Debugf("Context dumped") + } + } case applyEventFunc := <-this.applyEventsQueue: { if err := this.retryOperation(applyEventFunc); err != nil { From 6f81d62a313d143e6a0e75c950b210dd763a33dd Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 16:47:06 +0200 Subject: [PATCH 05/28] storing and updating streamer binlog coordinates --- go/base/context.go | 9 +++++++++ go/logic/migrator.go | 1 + 2 files changed, 10 insertions(+) diff --git a/go/base/context.go b/go/base/context.go index d4a373ca9..0df0f839d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -177,6 +177,7 @@ type MigrationContext struct { MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues EncodedRangeValues map[string]string + StreamerBinlogCoordinates mysql.BinlogCoordinates } type ContextConfig struct { @@ -212,6 +213,7 @@ func newMigrationContext() *MigrationContext { throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, + StreamerBinlogCoordinates: mysql.BinlogCoordinates{}, ColumnRenameMap: make(map[string]string), EncodedRangeValues: make(map[string]string), } @@ -546,6 +548,13 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) { this.niceRatio = newRatio } +func (this *MigrationContext) SetStreamerBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + this.StreamerBinlogCoordinates = *binlogCoordinates +} + // ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, // such as: 'Threads_running=100,Threads_connected=500' // It only applies changes in case there's no parsing error. diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 641c1f51e..69f6da44b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1042,6 +1042,7 @@ func (this *Migrator) executeWriteFuncs() error { select { case <-contextDumpTick: { + this.migrationContext.SetStreamerBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates()) if jsonString, err := this.migrationContext.ToJSON(); err == nil { this.applier.WriteChangelog("context", jsonString) log.Debugf("Context dumped") From 4c6f42f2f1dd94592418c088d13ca3af4eb10166 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 22:14:17 +0200 Subject: [PATCH 06/28] passwords not exported in MigrationContext --- go/base/context.go | 20 ++++++++++++++++---- go/cmd/gh-ost/main.go | 10 ++++++---- go/logic/migrator.go | 4 ++-- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 0df0f839d..a6944c652 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -87,9 +87,9 @@ type MigrationContext struct { configMutex *sync.Mutex ConfigFile string CliUser string - CliPassword string + cliPassword string CliMasterUser string - CliMasterPassword string + cliMasterPassword string HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 @@ -629,6 +629,18 @@ func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey return nil } +func (this *MigrationContext) SetCliPassword(password string) { + this.cliPassword = password +} + +func (this *MigrationContext) SetCliMasterPassword(password string) { + this.cliMasterPassword = password +} + +func (this *MigrationContext) GetCliMasterPassword() string { + return this.cliMasterPassword +} + // ApplyCredentials sorts out the credentials between the config file and the CLI flags func (this *MigrationContext) ApplyCredentials() { this.configMutex.Lock() @@ -644,9 +656,9 @@ func (this *MigrationContext) ApplyCredentials() { if this.config.Client.Password != "" { this.InspectorConnectionConfig.Password = this.config.Client.Password } - if this.CliPassword != "" { + if this.cliPassword != "" { // Override - this.InspectorConnectionConfig.Password = this.CliPassword + this.InspectorConnectionConfig.Password = this.cliPassword } } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index bb8b8d2a9..f4e75c146 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -49,9 +49,9 @@ func main() { flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master") flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user") - flag.StringVar(&migrationContext.CliPassword, "password", "", "MySQL password") + cliPassword := flag.String("password", "", "MySQL password") flag.StringVar(&migrationContext.CliMasterUser, "master-user", "", "MySQL user on master, if different from that on replica. Requires --assume-master-host") - flag.StringVar(&migrationContext.CliMasterPassword, "master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host") + cliMasterPassword := flag.String("master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host") flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file") askPass := flag.Bool("ask-pass", false, "prompt for MySQL password") @@ -175,7 +175,7 @@ func main() { if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" { log.Fatalf("--master-user requires --assume-master-host") } - if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { + if *cliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { log.Fatalf("--master-password requires --assume-master-host") } @@ -202,13 +202,15 @@ func main() { if migrationContext.ServeSocketFile == "" { migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) } + migrationContext.SetCliPassword(*cliPassword) + migrationContext.SetCliMasterPassword(*cliMasterPassword) if *askPass { fmt.Println("Password:") bytePassword, err := terminal.ReadPassword(int(syscall.Stdin)) if err != nil { log.Fatale(err) } - migrationContext.CliPassword = string(bytePassword) + migrationContext.SetCliPassword(string(bytePassword)) } migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 69f6da44b..b5fc76c26 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -663,8 +663,8 @@ func (this *Migrator) initiateInspector() (err error) { if this.migrationContext.CliMasterUser != "" { this.migrationContext.ApplierConnectionConfig.User = this.migrationContext.CliMasterUser } - if this.migrationContext.CliMasterPassword != "" { - this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.CliMasterPassword + if this.migrationContext.GetCliMasterPassword() != "" { + this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.GetCliMasterPassword() } log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey) } From c72851e1f69d6146e3dd4a96b39e1ceedac6ac52 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 20 Dec 2016 22:33:44 +0200 Subject: [PATCH 07/28] initial support for --resurrect flag --- go/base/context.go | 2 ++ go/cmd/gh-ost/main.go | 8 ++++++++ go/logic/migrator.go | 9 ++++++--- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index a6944c652..401984a1c 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -82,6 +82,7 @@ type MigrationContext struct { SkipRenamedColumns bool IsTungsten bool DiscardForeignKeys bool + Resurrect bool config ContextConfig configMutex *sync.Mutex @@ -161,6 +162,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + IsResurrected bool OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index f4e75c146..c9109c79b 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -58,6 +58,7 @@ func main() { flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") + flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") @@ -68,6 +69,7 @@ func main() { flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)") flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys") flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that") + flag.BoolVar(&migrationContext.Resurrect, "resurrect", false, "resume previously crashed migration") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") @@ -178,6 +180,12 @@ func main() { if *cliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { log.Fatalf("--master-password requires --assume-master-host") } + if migrationContext.InitiallyDropGhostTable && migrationContext.Resurrect { + log.Fatalf("--initially-drop-ghost-table and --resurrect are mutually exclusive") + } + if migrationContext.InitiallyDropOldTable && migrationContext.Resurrect { + log.Fatalf("--initially-drop-old-table and --resurrect are mutually exclusive") + } switch *cutOver { case "atomic", "default", "": diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b5fc76c26..7e89da871 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1043,10 +1043,13 @@ func (this *Migrator) executeWriteFuncs() error { case <-contextDumpTick: { this.migrationContext.SetStreamerBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates()) - if jsonString, err := this.migrationContext.ToJSON(); err == nil { - this.applier.WriteChangelog("context", jsonString) - log.Debugf("Context dumped") + if !this.migrationContext.Resurrect || this.migrationContext.IsResurrected { + if jsonString, err := this.migrationContext.ToJSON(); err == nil { + this.applier.WriteChangelog("context", jsonString) + log.Debugf("Context dumped") + } } + // If we're about to resurrect (resurrect requested) but haven't done so yet, do not wrtie resurrect info. } case applyEventFunc := <-this.applyEventsQueue: { From 171cad2a98a909541da04112d1278194a9ff94be Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 21 Dec 2016 09:23:00 +0200 Subject: [PATCH 08/28] sanity checks for resurrection --- go/cmd/gh-ost/main.go | 10 ++++++---- go/logic/applier.go | 13 +++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index c9109c79b..b498e9077 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -180,11 +180,13 @@ func main() { if *cliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { log.Fatalf("--master-password requires --assume-master-host") } - if migrationContext.InitiallyDropGhostTable && migrationContext.Resurrect { - log.Fatalf("--initially-drop-ghost-table and --resurrect are mutually exclusive") + if migrationContext.Resurrect && migrationContext.InitiallyDropGhostTable { + migrationContext.InitiallyDropGhostTable = false + log.Warningf("--resurrect given, implicitly disabling --initially-drop-ghost-table") } - if migrationContext.InitiallyDropOldTable && migrationContext.Resurrect { - log.Fatalf("--initially-drop-old-table and --resurrect are mutually exclusive") + if migrationContext.Resurrect && migrationContext.InitiallyDropOldTable { + migrationContext.InitiallyDropOldTable = false + log.Warningf("--resurrect given, implicitly disabling --initially-drop-old-table") } switch *cutOver { diff --git a/go/logic/applier.go b/go/logic/applier.go index 3dfce2b10..0d322f613 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -128,6 +128,19 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) { // ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // or attempts to drop them if instructed to. func (this *Applier) ValidateOrDropExistingTables() error { + if this.migrationContext.Resurrect { + ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName()) + if !ghostTableExists { + return fmt.Errorf("--ressurect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName()) + } + changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName()) + if !changelogTableExists { + return fmt.Errorf("--ressurect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName()) + } + return nil + } + // Normal mode (no resurrection) + if this.migrationContext.InitiallyDropGhostTable { if err := this.DropGhostTable(); err != nil { return err From 47d8306c0f9761fea0648c3310b00744439ee37c Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 21 Dec 2016 09:23:57 +0200 Subject: [PATCH 09/28] comment typo --- go/logic/applier.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 0d322f613..1a3d3f579 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -131,16 +131,16 @@ func (this *Applier) ValidateOrDropExistingTables() error { if this.migrationContext.Resurrect { ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName()) if !ghostTableExists { - return fmt.Errorf("--ressurect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName()) + return fmt.Errorf("--resurrect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName()) } changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName()) if !changelogTableExists { - return fmt.Errorf("--ressurect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName()) + return fmt.Errorf("--resurrect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName()) } return nil } - // Normal mode (no resurrection) + // Normal mode (no resurrection) if this.migrationContext.InitiallyDropGhostTable { if err := this.DropGhostTable(); err != nil { return err From bad30a8871a1f87edc1b83002a61524a38e20131 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 21 Dec 2016 09:42:40 +0200 Subject: [PATCH 10/28] sanity checks on --resurrection; skipping some normal-mode operations --- go/logic/applier.go | 28 ++++++++++++++-------------- go/logic/migrator.go | 36 +++++++++++++++++++++--------------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 1a3d3f579..090b77e67 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -125,22 +125,22 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) { return (m != nil) } -// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, -// or attempts to drop them if instructed to. -func (this *Applier) ValidateOrDropExistingTables() error { - if this.migrationContext.Resurrect { - ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName()) - if !ghostTableExists { - return fmt.Errorf("--resurrect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName()) - } - changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName()) - if !changelogTableExists { - return fmt.Errorf("--resurrect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName()) - } - return nil +// ValidateTablesForResurrection verifies ghost and changelog exist given resurrection request +func (this *Applier) ValidateTablesForResurrection() error { + ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName()) + if !ghostTableExists { + return fmt.Errorf("--resurrect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName()) } + changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName()) + if !changelogTableExists { + return fmt.Errorf("--resurrect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName()) + } + return nil +} - // Normal mode (no resurrection) +// ValidateOrDropExistingTables verifies ghost and old tables do not exist, +// or attempts to drop them if instructed to. +func (this *Applier) ValidateOrDropExistingTables() error { if this.migrationContext.InitiallyDropGhostTable { if err := this.DropGhostTable(); err != nil { return err diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7e89da871..cf34fdfc5 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -952,21 +952,27 @@ func (this *Migrator) initiateApplier() error { if err := this.applier.InitDBConnections(); err != nil { return err } - if err := this.applier.ValidateOrDropExistingTables(); err != nil { - return err - } - if err := this.applier.CreateChangelogTable(); err != nil { - log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - if err := this.applier.CreateGhostTable(); err != nil { - log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err - } - - if err := this.applier.AlterGhost(); err != nil { - log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") - return err + if this.migrationContext.Resurrect { + if err := this.applier.ValidateTablesForResurrection(); err != nil { + return err + } + } else { + // Normal operation, no resurrection + if err := this.applier.ValidateOrDropExistingTables(); err != nil { + return err + } + if err := this.applier.CreateChangelogTable(); err != nil { + log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + if err := this.applier.CreateGhostTable(); err != nil { + log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := this.applier.AlterGhost(); err != nil { + log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + return err + } } this.applier.WriteChangelogState(string(GhostTableMigrated)) From 5f25f741ad22e0d234376333e8c7688b071f16b1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 21 Dec 2016 17:55:40 +0200 Subject: [PATCH 11/28] something that works! True resurrection applied --- go/base/context.go | 35 +++++++++++++++++++++++++++++++++-- go/logic/inspect.go | 10 +++++----- go/logic/migrator.go | 37 +++++++++++++++++++++++++++++++++++++ go/logic/throttler.go | 6 ++---- go/sql/types.go | 10 +++++----- 5 files changed, 82 insertions(+), 16 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 401984a1c..6a2b6443d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -198,10 +198,10 @@ type ContextConfig struct { var context *MigrationContext func init() { - context = newMigrationContext() + context = NewMigrationContext() } -func newMigrationContext() *MigrationContext { +func NewMigrationContext() *MigrationContext { return &MigrationContext{ defaultNumRetries: 60, ChunkSize: 1000, @@ -250,6 +250,37 @@ func (this *MigrationContext) ToJSON() (string, error) { return string(jsonBytes), nil } +// LoadJSON treats given json as context-dump, and attempts to load this context's data. +func (this *MigrationContext) LoadJSON(jsonString string) error { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + // Some stuff that is in context but is more of a config that may be overriden by --resurrect kind of execution: + // Push + hooksPath := this.HooksPath + + jsonBytes := []byte(jsonString) + err := json.Unmarshal(jsonBytes, this) + + if this.MigrationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMinValues"]); err != nil { + return err + } + if this.MigrationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMaxValues"]); err != nil { + return err + } + if this.MigrationIterationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMinValues"]); err != nil { + return err + } + if this.MigrationIterationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMaxValues"]); err != nil { + return err + } + + // Pop + this.HooksPath = hooksPath + + return err +} + // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) GetGhostTableName() string { return fmt.Sprintf("_%s_gho", this.OriginalTableName) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 61a887f97..ad98a5c1e 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s } // readChangelogState reads changelog hints -func (this *Inspector) readChangelogState() (map[string]string, error) { +func (this *Inspector) readChangelogState(hint string) (string, error) { query := fmt.Sprintf(` - select hint, value from %s.%s where id <= 255 + select hint, value from %s.%s where hint = ? and id <= 255 `, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetChangelogTableName()), ) - result := make(map[string]string) + result := "" err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { - result[m.GetString("hint")] = m.GetString("value") + result = m.GetString("value") return nil - }) + }, hint) return result, err } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index cf34fdfc5..122c4d662 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -272,6 +272,38 @@ func (this *Migrator) countTableRows() (err error) { return countRowsFunc() } +func (this *Migrator) resurrect() error { + encodedContext, err := this.inspector.readChangelogState("context") + if err != nil { + return err + } + if encodedContext == "" { + return fmt.Errorf("No resurrect info found") + } + log.Infof("Proceeding to resurrection") + + // Dry run: loading migration context to a temporary location just to confirm there's no errors: + loadedContext := base.NewMigrationContext() + if err := loadedContext.LoadJSON(encodedContext); err != nil { + return err + } + // Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?) + if this.migrationContext.DatabaseName != loadedContext.DatabaseName { + return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out") + } + if this.migrationContext.OriginalTableName != loadedContext.OriginalTableName { + return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out") + } + if this.migrationContext.AlterStatement != loadedContext.AlterStatement { + return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out") + } + // Happy. Let's go live and load the context for real. + if err := this.migrationContext.LoadJSON(encodedContext); err != nil { + return err + } + return nil +} + // Migrate executes the complete migration logic. This is *the* major gh-ost function. func (this *Migrator) Migrate() (err error) { log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) @@ -340,6 +372,11 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { return err } + if this.migrationContext.Resurrect { + if err := this.resurrect(); err != nil { + return err + } + } go this.executeWriteFuncs() go this.iterateChunks() this.migrationContext.MarkRowCopyStartTime() diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 9f8b5ccd9..42b639b99 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -83,11 +83,9 @@ func (this *Throttler) collectHeartbeat() { if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { return nil } - changelogState, err := this.inspector.readChangelogState() - if err != nil { + if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { return log.Errore(err) - } - if heartbeatValue, ok := changelogState["heartbeat"]; ok { + } else { this.parseChangelogHeartbeat(heartbeatValue) } return nil diff --git a/go/sql/types.go b/go/sql/types.go index 82ecc1f94..113aeae78 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -9,7 +9,7 @@ import ( "bytes" "encoding/base64" "encoding/gob" - "encoding/json" + // "encoding/json" "fmt" "reflect" "strconv" @@ -277,10 +277,10 @@ func (this *ColumnValues) ToBase64() (b64 string, err error) { return base64.StdEncoding.EncodeToString(buff.Bytes()), nil } -// MarshalJSON will marshal this object as JSON -func (this *ColumnValues) MarshalJSON() ([]byte, error) { - return json.Marshal(this.abstractValues) -} +// // MarshalJSON will marshal this object as JSON +// func (this *ColumnValues) MarshalJSON() ([]byte, error) { +// return json.Marshal(this.abstractValues) +// } func (this *ColumnValues) AbstractValues() []interface{} { return this.abstractValues From 89ca34691982cfc96e88a0b46bf4a3f313070d7d Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 21 Dec 2016 21:10:04 +0200 Subject: [PATCH 12/28] instead of loading the entire context, only updating particular fields from the resurrected context --- go/base/context.go | 23 ++++++++++++++++------- go/logic/migrator.go | 9 ++++----- go/sql/types.go | 6 ------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 6a2b6443d..832f20e46 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -255,10 +255,6 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() - // Some stuff that is in context but is more of a config that may be overriden by --resurrect kind of execution: - // Push - hooksPath := this.HooksPath - jsonBytes := []byte(jsonString) err := json.Unmarshal(jsonBytes, this) @@ -275,12 +271,25 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { return err } - // Pop - this.HooksPath = hooksPath - return err } +// GetGhostTableName generates the name of ghost table, based on original table name +func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) { + this.MigrationRangeMinValues = other.MigrationRangeMinValues + this.MigrationRangeMaxValues = other.MigrationRangeMaxValues + this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues + this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues + + this.RowsEstimate = other.RowsEstimate + this.RowsDeltaEstimate = other.RowsDeltaEstimate + this.TotalRowsCopied = other.TotalRowsCopied + this.TotalDMLEventsApplied = other.TotalDMLEventsApplied + + this.Iteration = other.Iteration + this.StreamerBinlogCoordinates = other.StreamerBinlogCoordinates +} + // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) GetGhostTableName() string { return fmt.Sprintf("_%s_gho", this.OriginalTableName) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 122c4d662..7b0f5c2c2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -282,7 +282,7 @@ func (this *Migrator) resurrect() error { } log.Infof("Proceeding to resurrection") - // Dry run: loading migration context to a temporary location just to confirm there's no errors: + // Loading migration context to a temporary location: loadedContext := base.NewMigrationContext() if err := loadedContext.LoadJSON(encodedContext); err != nil { return err @@ -297,10 +297,9 @@ func (this *Migrator) resurrect() error { if this.migrationContext.AlterStatement != loadedContext.AlterStatement { return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out") } - // Happy. Let's go live and load the context for real. - if err := this.migrationContext.LoadJSON(encodedContext); err != nil { - return err - } + // Happy. Let's go live and update our real context + this.migrationContext.ApplyResurrectedContext(loadedContext) + return nil } diff --git a/go/sql/types.go b/go/sql/types.go index 113aeae78..4117f551d 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -9,7 +9,6 @@ import ( "bytes" "encoding/base64" "encoding/gob" - // "encoding/json" "fmt" "reflect" "strconv" @@ -277,11 +276,6 @@ func (this *ColumnValues) ToBase64() (b64 string, err error) { return base64.StdEncoding.EncodeToString(buff.Bytes()), nil } -// // MarshalJSON will marshal this object as JSON -// func (this *ColumnValues) MarshalJSON() ([]byte, error) { -// return json.Marshal(this.abstractValues) -// } - func (this *ColumnValues) AbstractValues() []interface{} { return this.abstractValues } From 1080b11d8185d452b11e7c228548f19d3fcf9f78 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Fri, 23 Dec 2016 15:24:31 +0200 Subject: [PATCH 13/28] binlog event listeners accept coordinates. StreamerBinlogCoordinates -> AppliedBinlogCoordinates updating AppliedBinlogCoordinates when truly applied; no longer asking streamer for coordinates (because streamer's events can be queued, but not handled, a crash implies we need to look at the last _handled_ event, not the last _streamed_ event) --- go/base/context.go | 10 +++++----- go/logic/migrator.go | 11 +++++++---- go/logic/streamer.go | 15 +++++++++------ 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 832f20e46..9754eff44 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -179,7 +179,7 @@ type MigrationContext struct { MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues EncodedRangeValues map[string]string - StreamerBinlogCoordinates mysql.BinlogCoordinates + AppliedBinlogCoordinates mysql.BinlogCoordinates } type ContextConfig struct { @@ -215,7 +215,7 @@ func NewMigrationContext() *MigrationContext { throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, - StreamerBinlogCoordinates: mysql.BinlogCoordinates{}, + AppliedBinlogCoordinates: mysql.BinlogCoordinates{}, ColumnRenameMap: make(map[string]string), EncodedRangeValues: make(map[string]string), } @@ -287,7 +287,7 @@ func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) { this.TotalDMLEventsApplied = other.TotalDMLEventsApplied this.Iteration = other.Iteration - this.StreamerBinlogCoordinates = other.StreamerBinlogCoordinates + this.AppliedBinlogCoordinates = other.AppliedBinlogCoordinates } // GetGhostTableName generates the name of ghost table, based on original table name @@ -590,11 +590,11 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) { this.niceRatio = newRatio } -func (this *MigrationContext) SetStreamerBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) { +func (this *MigrationContext) SetAppliedBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() - this.StreamerBinlogCoordinates = *binlogCoordinates + this.AppliedBinlogCoordinates = *binlogCoordinates } // ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7b0f5c2c2..25b2370d7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -936,7 +936,7 @@ func (this *Migrator) initiateStreaming() error { false, this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), - func(dmlEvent *binlog.BinlogDMLEvent) error { + func(dmlEvent *binlog.BinlogDMLEvent, _ *mysql.BinlogCoordinates) error { return this.onChangelogStateEvent(dmlEvent) }, ) @@ -959,10 +959,14 @@ func (this *Migrator) addDMLEventsListener() error { false, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, - func(dmlEvent *binlog.BinlogDMLEvent) error { + func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error { // Create a task to apply the DML event; this will be execute by executeWriteFuncs() applyEventFunc := func() error { - return this.applier.ApplyDMLEventQuery(dmlEvent) + err := this.applier.ApplyDMLEventQuery(dmlEvent) + if err != nil { + this.migrationContext.SetAppliedBinlogCoordinates(coordinates) + } + return err } this.applyEventsQueue <- applyEventFunc return nil @@ -1084,7 +1088,6 @@ func (this *Migrator) executeWriteFuncs() error { select { case <-contextDumpTick: { - this.migrationContext.SetStreamerBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates()) if !this.migrationContext.Resurrect || this.migrationContext.IsResurrected { if jsonString, err := this.migrationContext.ToJSON(); err == nil { this.applier.WriteChangelog("context", jsonString) diff --git a/go/logic/streamer.go b/go/logic/streamer.go index dc5ba60fe..d9606617d 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -20,11 +20,13 @@ import ( "github.com/outbrain/golib/sqlutils" ) +type BinlogEventListenerFunc func(event *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error + type BinlogEventListener struct { async bool databaseName string tableName string - onDmlEvent func(event *binlog.BinlogDMLEvent) error + onDmlEvent BinlogEventListenerFunc } const ( @@ -57,7 +59,7 @@ func NewEventsStreamer() *EventsStreamer { // AddListener registers a new listener for binlog events, on a per-table basis func (this *EventsStreamer) AddListener( - async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { + async bool, databaseName string, tableName string, onDmlEvent BinlogEventListenerFunc) (err error) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() @@ -80,10 +82,11 @@ func (this *EventsStreamer) AddListener( // notifyListeners will notify relevant listeners with given DML event. Only // listeners registered for changes on the table on which the DML operates are notified. -func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { +func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() + binlogEvent := binlogEntry.DmlEvent for _, listener := range this.listeners { listener := listener if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { @@ -94,10 +97,10 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) } if listener.async { go func() { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates) }() } else { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates) } } } @@ -184,7 +187,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { go func() { for binlogEntry := range this.eventsChannel { if binlogEntry.DmlEvent != nil { - this.notifyListeners(binlogEntry.DmlEvent) + this.notifyListeners(binlogEntry) } } }() From e50361ab614ca75230b5fe8c5d0db51e47b0065f Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 24 Dec 2016 09:53:55 +0200 Subject: [PATCH 14/28] at resurrection, pointing streamer back at last known applied coordinates --- go/logic/migrator.go | 63 +++++++++++++++++++++++++++----------------- go/logic/streamer.go | 11 +++++--- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 25b2370d7..a720d4816 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -31,7 +31,7 @@ const ( AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" ) -const contextDumpInterval time.Duration = 1 * time.Second +const contextDumpInterval time.Duration = 1 * time.Minute func ReadChangelogState(s string) ChangelogState { return ChangelogState(strings.Split(s, ":")[0]) @@ -55,14 +55,15 @@ const ( // Migrator is the main schema migration flow manager. type Migrator struct { - parser *sql.Parser - inspector *Inspector - applier *Applier - eventsStreamer *EventsStreamer - server *Server - throttler *Throttler - hooksExecutor *HooksExecutor - migrationContext *base.MigrationContext + parser *sql.Parser + inspector *Inspector + applier *Applier + eventsStreamer *EventsStreamer + server *Server + throttler *Throttler + hooksExecutor *HooksExecutor + migrationContext *base.MigrationContext + resurrectedContext *base.MigrationContext firstThrottlingCollected chan bool ghostTableMigrated chan bool @@ -272,7 +273,7 @@ func (this *Migrator) countTableRows() (err error) { return countRowsFunc() } -func (this *Migrator) resurrect() error { +func (this *Migrator) readResurrectedContext() error { encodedContext, err := this.inspector.readChangelogState("context") if err != nil { return err @@ -280,25 +281,30 @@ func (this *Migrator) resurrect() error { if encodedContext == "" { return fmt.Errorf("No resurrect info found") } - log.Infof("Proceeding to resurrection") // Loading migration context to a temporary location: - loadedContext := base.NewMigrationContext() - if err := loadedContext.LoadJSON(encodedContext); err != nil { + this.resurrectedContext = base.NewMigrationContext() + if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil { return err } // Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?) - if this.migrationContext.DatabaseName != loadedContext.DatabaseName { + if this.migrationContext.DatabaseName != this.resurrectedContext.DatabaseName { return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out") } - if this.migrationContext.OriginalTableName != loadedContext.OriginalTableName { + if this.migrationContext.OriginalTableName != this.resurrectedContext.OriginalTableName { return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out") } - if this.migrationContext.AlterStatement != loadedContext.AlterStatement { + if this.migrationContext.AlterStatement != this.resurrectedContext.AlterStatement { return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out") } - // Happy. Let's go live and update our real context - this.migrationContext.ApplyResurrectedContext(loadedContext) + if this.resurrectedContext.AppliedBinlogCoordinates.IsEmpty() { + return fmt.Errorf("Resurrection: no applied binlog coordinates. Seems like the migration you're trying to resurrect crashed before applying a single binlog event. There's not enough info for resurrection, and not much point to it. Just run your migration again.") + } + return nil +} + +func (this *Migrator) applyResurrectedContext() error { + this.migrationContext.ApplyResurrectedContext(this.resurrectedContext) return nil } @@ -328,6 +334,11 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } + if this.migrationContext.Resurrect { + if err := this.readResurrectedContext(); err != nil { + return err + } + } if err := this.initiateStreaming(); err != nil { return err } @@ -372,7 +383,7 @@ func (this *Migrator) Migrate() (err error) { return err } if this.migrationContext.Resurrect { - if err := this.resurrect(); err != nil { + if err := this.applyResurrectedContext(); err != nil { return err } } @@ -929,15 +940,19 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { // initiateStreaming begins treaming of binary log events and registers listeners for such events func (this *Migrator) initiateStreaming() error { this.eventsStreamer = NewEventsStreamer() - if err := this.eventsStreamer.InitDBConnections(); err != nil { + if err := this.eventsStreamer.InitDBConnections(this.resurrectedContext); err != nil { return err } this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), - func(dmlEvent *binlog.BinlogDMLEvent, _ *mysql.BinlogCoordinates) error { - return this.onChangelogStateEvent(dmlEvent) + func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error { + err := this.onChangelogStateEvent(dmlEvent) + if err == nil { + this.migrationContext.SetAppliedBinlogCoordinates(coordinates) + } + return err }, ) @@ -963,7 +978,7 @@ func (this *Migrator) addDMLEventsListener() error { // Create a task to apply the DML event; this will be execute by executeWriteFuncs() applyEventFunc := func() error { err := this.applier.ApplyDMLEventQuery(dmlEvent) - if err != nil { + if err == nil { this.migrationContext.SetAppliedBinlogCoordinates(coordinates) } return err @@ -1091,7 +1106,7 @@ func (this *Migrator) executeWriteFuncs() error { if !this.migrationContext.Resurrect || this.migrationContext.IsResurrected { if jsonString, err := this.migrationContext.ToJSON(); err == nil { this.applier.WriteChangelog("context", jsonString) - log.Debugf("Context dumped") + log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates) } } // If we're about to resurrect (resurrect requested) but haven't done so yet, do not wrtie resurrect info. diff --git a/go/logic/streamer.go b/go/logic/streamer.go index d9606617d..5451fb001 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -105,7 +105,7 @@ func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) { } } -func (this *EventsStreamer) InitDBConnections() (err error) { +func (this *EventsStreamer) InitDBConnections(resurrectedContext *base.MigrationContext) (err error) { EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil { return err @@ -113,8 +113,13 @@ func (this *EventsStreamer) InitDBConnections() (err error) { if err := this.validateConnection(); err != nil { return err } - if err := this.readCurrentBinlogCoordinates(); err != nil { - return err + if this.migrationContext.Resurrect { + log.Infof("Resurrection: initiating streamer at resurrected coordinates %+v", resurrectedContext.AppliedBinlogCoordinates) + this.initialBinlogCoordinates = &resurrectedContext.AppliedBinlogCoordinates + } else { + if err := this.readCurrentBinlogCoordinates(); err != nil { + return err + } } if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil { return err From 6128076485ddf3f8a8a2b179f4bb5bba1d9f1b38 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 24 Dec 2016 10:01:03 +0200 Subject: [PATCH 15/28] some cleanup --- go/logic/migrator.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a720d4816..38b913224 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -76,9 +76,7 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan tableWriteFunc - handledChangelogStates map[string]bool - contextDumpFiles []string - panicAbort chan error + panicAbort chan error } func NewMigrator() *Migrator { @@ -90,12 +88,10 @@ func NewMigrator() *Migrator { rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), - handledChangelogStates: make(map[string]bool), + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), - contextDumpFiles: []string{}, - panicAbort: make(chan error), + panicAbort: make(chan error), } return migrator } From 45b63f65006c4c1778638ae0266dc2eab6ecbbdc Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 24 Dec 2016 10:07:59 +0200 Subject: [PATCH 16/28] applying IsResurrected flag --- go/base/context.go | 2 +- go/logic/migrator.go | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 9754eff44..91ac8e4a1 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -162,7 +162,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 - IsResurrected bool + IsResurrected int64 OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 38b913224..b5950eb93 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -301,6 +301,7 @@ func (this *Migrator) readResurrectedContext() error { func (this *Migrator) applyResurrectedContext() error { this.migrationContext.ApplyResurrectedContext(this.resurrectedContext) + atomic.StoreInt64(&this.migrationContext.IsResurrected, 1) return nil } @@ -1094,12 +1095,14 @@ func (this *Migrator) executeWriteFuncs() error { for { this.throttler.throttle(nil) - // We give higher priority to event processing, then secondary priority to - // rowcopy + // We give higher priority to event processing, then secondary priority to rowcopy select { case <-contextDumpTick: { - if !this.migrationContext.Resurrect || this.migrationContext.IsResurrected { + if !(this.migrationContext.Resurrect && atomic.LoadInt64(&this.migrationContext.IsResurrected) == 0) { + // Not dumping context if we're _in the process of resurrecting_... + // otherwise, we dump the context. Note that this operation works sequentially to any row copy or + // event handling. There is no concurrency issue here. if jsonString, err := this.migrationContext.ToJSON(); err == nil { this.applier.WriteChangelog("context", jsonString) log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates) From fa399e0608c3d849782e67d358489ede086be70b Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 24 Dec 2016 17:10:37 +0200 Subject: [PATCH 17/28] added context test, JSON export/import --- go/base/context.go | 9 +++++-- go/base/context_test.go | 54 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 go/base/context_test.go diff --git a/go/base/context.go b/go/base/context.go index 91ac8e4a1..f06fc752d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -8,6 +8,7 @@ package base import ( "encoding/json" "fmt" + "io" "os" "regexp" "strings" @@ -256,8 +257,12 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { defer this.throttleMutex.Unlock() jsonBytes := []byte(jsonString) - err := json.Unmarshal(jsonBytes, this) + if err := json.Unmarshal(jsonBytes, this); err != nil && err != io.EOF { + return err + } + + var err error if this.MigrationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMinValues"]); err != nil { return err } @@ -271,7 +276,7 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { return err } - return err + return nil } // GetGhostTableName generates the name of ghost table, based on original table name diff --git a/go/base/context_test.go b/go/base/context_test.go new file mode 100644 index 000000000..a733e0280 --- /dev/null +++ b/go/base/context_test.go @@ -0,0 +1,54 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "io" + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" + + "github.com/github/gh-ost/go/mysql" + "github.com/github/gh-ost/go/sql" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestContextToJSON(t *testing.T) { + context := NewMigrationContext() + jsonString, err := context.ToJSON() + test.S(t).ExpectNil(err) + test.S(t).ExpectNotEquals(jsonString, "") +} + +func TestContextLoadJSON(t *testing.T) { + var jsonString string + var err error + { + context := NewMigrationContext() + context.AppliedBinlogCoordinates = mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789} + + abstractValues := []interface{}{31, "2016-12-24 17:04:32"} + context.MigrationRangeMinValues = sql.ToColumnValues(abstractValues) + + jsonString, err = context.ToJSON() + test.S(t).ExpectNil(err) + test.S(t).ExpectNotEquals(jsonString, "") + } + { + context := NewMigrationContext() + err = context.LoadJSON(jsonString) + test.S(t).ExpectEqualsAny(err, nil, io.EOF) + test.S(t).ExpectEquals(context.AppliedBinlogCoordinates, mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789}) + abstractValues := context.MigrationRangeMinValues.AbstractValues() + test.S(t).ExpectEquals(len(abstractValues), 2) + test.S(t).ExpectEquals(abstractValues[0], 31) + test.S(t).ExpectEquals(abstractValues[1], "2016-12-24 17:04:32") + } +} From 7dfb740519a679ecf1fb6e82812f22f372bf0835 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 24 Dec 2016 17:44:39 +0200 Subject: [PATCH 18/28] format --- go/base/context_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/base/context_test.go b/go/base/context_test.go index a733e0280..d91a504a2 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -46,6 +46,7 @@ func TestContextLoadJSON(t *testing.T) { err = context.LoadJSON(jsonString) test.S(t).ExpectEqualsAny(err, nil, io.EOF) test.S(t).ExpectEquals(context.AppliedBinlogCoordinates, mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789}) + abstractValues := context.MigrationRangeMinValues.AbstractValues() test.S(t).ExpectEquals(len(abstractValues), 2) test.S(t).ExpectEquals(abstractValues[0], 31) From 0e8e5de7aa4e45f98fd71948883c5e9206af8598 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 25 Dec 2016 08:53:24 +0200 Subject: [PATCH 19/28] added on-resurrecting hook --- doc/hooks.md | 1 + go/logic/hooks.go | 5 +++++ go/logic/migrator.go | 4 ++++ resources/hooks-sample/gh-ost-on-resurrecting-hook | 5 +++++ 4 files changed, 15 insertions(+) create mode 100644 resources/hooks-sample/gh-ost-on-resurrecting-hook diff --git a/doc/hooks.md b/doc/hooks.md index 03b150116..0efc05cab 100644 --- a/doc/hooks.md +++ b/doc/hooks.md @@ -38,6 +38,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu - `gh-ost-on-startup` - `gh-ost-on-validated` +- `gh-ost-on-resurrecting` - `gh-ost-on-rowcount-complete` - `gh-ost-on-before-row-copy` - `gh-ost-on-status` diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 25d745af2..0039640e8 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -20,6 +20,7 @@ import ( const ( onStartup = "gh-ost-on-startup" onValidated = "gh-ost-on-validated" + onResurrecting = "gh-ost-on-resurrecting" onRowCountComplete = "gh-ost-on-rowcount-complete" onBeforeRowCopy = "gh-ost-on-before-row-copy" onRowCopyComplete = "gh-ost-on-row-copy-complete" @@ -112,6 +113,10 @@ func (this *HooksExecutor) onValidated() error { return this.executeHooks(onValidated) } +func (this *HooksExecutor) onResurrecting() error { + return this.executeHooks(onResurrecting) +} + func (this *HooksExecutor) onRowCountComplete() error { return this.executeHooks(onRowCountComplete) } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b5950eb93..d33316269 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -303,6 +303,10 @@ func (this *Migrator) applyResurrectedContext() error { this.migrationContext.ApplyResurrectedContext(this.resurrectedContext) atomic.StoreInt64(&this.migrationContext.IsResurrected, 1) + if err := this.hooksExecutor.onResurrecting(); err != nil { + return err + } + return nil } diff --git a/resources/hooks-sample/gh-ost-on-resurrecting-hook b/resources/hooks-sample/gh-ost-on-resurrecting-hook new file mode 100644 index 000000000..ac434725a --- /dev/null +++ b/resources/hooks-sample/gh-ost-on-resurrecting-hook @@ -0,0 +1,5 @@ +#!/bin/bash + +# Sample hook file for gh-ost-on-resurrecting + +echo "$(date) gh-ost-on-resurrecting: beginning resurrection on $GH_OST_DATABASE_NAME.$GH_OST_TABLE_NAME" >> /tmp/gh-ost.log From af74e8c6cdb872230f9dead4f49ee5ebe9f54316 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 25 Dec 2016 11:46:14 +0200 Subject: [PATCH 20/28] Resurrection documentation --- README.md | 9 ++++----- RELEASE_VERSION | 2 +- doc/command-line-flags.md | 8 ++++++++ doc/resurrect.md | 42 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 doc/resurrect.md diff --git a/README.md b/README.md index 451b5b2aa..b4106fe0d 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ In addition, it offers many [operational perks](doc/perks.md) that make it safer - Auditing: you may query `gh-ost` for status. `gh-ost` listens on unix socket or TCP. - Control over cut-over phase: `gh-ost` can be instructed to postpone what is probably the most critical step: the swap of tables, until such time that you're comfortably available. No need to worry about ETA being outside office hours. - External [hooks](doc/hooks.md) can couple `gh-ost` with your particular environment. +- [Resurrection](doc/resurrect.md) can resume a failed migration, proceeding from last known good position. Please refer to the [docs](doc) for more information. No, really, read the [docs](doc). @@ -76,19 +77,17 @@ But then a rare genetic mutation happened, and the `c` transformed into `t`. And ## Community -`gh-ost` is released at a stable state, but with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues). +`gh-ost` is released at a stable state, and still with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues). We develop `gh-ost` at GitHub and for the community. We may have different priorities than others. From time to time we may suggest a contribution that is not on our immediate roadmap but which may appeal to others. ## Download/binaries/source -`gh-ost` is now GA and stable. - -`gh-ost` is available in binary format for Linux and Mac OS/X +`gh-ost` is GA and stable, available in binary format for Linux and Mac OS/X [Download latest release here](https://github.com/github/gh-ost/releases/latest) -`gh-ost` is a Go project; it is built with Go 1.5 with "experimental vendor". Soon to migrate to Go 1.6. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own. +`gh-ost` is a Go project; it is built with Go 1.7. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own. Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production. diff --git a/RELEASE_VERSION b/RELEASE_VERSION index 15245f3a2..9084fa2f7 100644 --- a/RELEASE_VERSION +++ b/RELEASE_VERSION @@ -1 +1 @@ -1.0.32 +1.1.0 diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 170768532..8b1003049 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -111,6 +111,14 @@ See also: [Sub-second replication lag throttling](subsecond-lag.md) Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but other issue no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag. +### resurrect + +It is possible to resurrect/resume a failed migration. Such a migration would be a valid execution, which bailed out throughout the migration process. A migration would bail out on meeting with `--critical-load`, or perhaps a user `kill -9`'d it. + +Use `--resurrect` with exact same other flags (same `--database, --table, --alter`) to resume a failed migration. + +Read more on [resurrection docs](resurrect.md) + ### skip-foreign-key-checks By default `gh-ost` verifies no foreign keys exist on the migrated table. On servers with large number of tables this check can take a long time. If you're absolutely certain no foreign keys exist (table does not referenece other table nor is referenced by other tables) and wish to save the check time, provide with `--skip-foreign-key-checks`. diff --git a/doc/resurrect.md b/doc/resurrect.md new file mode 100644 index 000000000..d66d707dc --- /dev/null +++ b/doc/resurrect.md @@ -0,0 +1,42 @@ +# Resurrection + +`gh-ost` supports resurrection of a failed migration, continuing the migration from last known good position, potentially saving hours of clock-time. + +A migration may fail as follows: + +- On meeting with `--critical-load` +- On successively meeting with a specific error (e.g. recurring locks) +- Being `kill -9`'d by a user +- MySQL crash +- Server crash +- Robots taking over the world and other reasons. + +### --resurrect + +One may resurrect such a migration by running the exact same command, adding the `--resurrect` flag. + +The terms for resurrection are: + +- Exact same database/table/alter +- Previous migration ran for at least one minute +- Previous migration began looking at row-copy and event handling (by `1` minute of execution you may expect this to be the case) + +### How does it work? + +`gh-ost` dumps its migration status (context) once per minute, onto the _changelog table_. The changelog table is used for internal bookkeeping, and manages heartbeat and internal message passing. + +When `--resurrect` is provided,`gh-ost` attempts to find such status dump in the changelog table. Most interestingly this status included: + +- Last handled binlog event coordinates (any event up to that point has been applied to _ghost_ table) +- Last copied chunk range +- Other useful information + +Resurrection reconnects the streamer at last handled binlog coordinates, and skips rowcopy to proceed from last copied chunk range. + +Noteworthy is that it is not important to resume from _exact same_ coordinates and chunk as last applied; the context dump only runs once per minute, and resurrection may re-apply a minute's worth of binary logs, and re-iterate a minute's work of copied chunks. + +Row-based replication has the property of being idempotent for DML events. There is no damage in reapplying contiguous binlog events starting at some point in the past. + +Chunk-reiteration likewise poses no integrity concern and there is no harm in re-copying same range of rows. + +The only concern is to never skip binlog events, and never skip a row range. By virtue of only dumping events and ranges that have been applied, and by virtue of only proceessing binlog events and chunks moving forward, `gh-ost` keeps integrity intact. From 874cf24512d13f0718640cfc08c39d6e1a92d5a8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 25 Dec 2016 11:46:53 +0200 Subject: [PATCH 21/28] typo --- doc/resurrect.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/resurrect.md b/doc/resurrect.md index d66d707dc..01a672496 100644 --- a/doc/resurrect.md +++ b/doc/resurrect.md @@ -39,4 +39,4 @@ Row-based replication has the property of being idempotent for DML events. There Chunk-reiteration likewise poses no integrity concern and there is no harm in re-copying same range of rows. -The only concern is to never skip binlog events, and never skip a row range. By virtue of only dumping events and ranges that have been applied, and by virtue of only proceessing binlog events and chunks moving forward, `gh-ost` keeps integrity intact. +The only concern is to never skip binlog events, and never skip a row range. By virtue of only dumping events and ranges that have been applied, and by virtue of only processing binlog events and chunks moving forward, `gh-ost` keeps integrity intact. From 8952e24abaec466f0fa31325b9d4ee6ba787db58 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 28 Dec 2016 07:42:14 +0200 Subject: [PATCH 22/28] rewinding resurrecting at beginning of known logfile; more verbose --- go/logic/migrator.go | 3 +++ go/logic/streamer.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d33316269..74ef84e90 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -307,6 +307,9 @@ func (this *Migrator) applyResurrectedContext() error { return err } + log.Infof("Applied migration min values: [%s]", this.migrationContext.MigrationRangeMinValues) + log.Infof("Applied migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) + return nil } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 5451fb001..685b2902c 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -114,6 +114,8 @@ func (this *EventsStreamer) InitDBConnections(resurrectedContext *base.Migration return err } if this.migrationContext.Resurrect { + // Rewinding to beginning of logfile: + resurrectedContext.AppliedBinlogCoordinates.LogPos = 4 log.Infof("Resurrection: initiating streamer at resurrected coordinates %+v", resurrectedContext.AppliedBinlogCoordinates) this.initialBinlogCoordinates = &resurrectedContext.AppliedBinlogCoordinates } else { From 738270aabed7c02388f85d24e4cac9c512781457 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 28 Dec 2016 13:35:56 +0200 Subject: [PATCH 23/28] more verbose on resurrection --- go/logic/migrator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 74ef84e90..7cdb784d6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -309,6 +309,8 @@ func (this *Migrator) applyResurrectedContext() error { log.Infof("Applied migration min values: [%s]", this.migrationContext.MigrationRangeMinValues) log.Infof("Applied migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) + log.Infof("Applied migration iteration range min values: [%s]", this.migrationContext.MigrationIterationRangeMinValues) + log.Infof("Applied migration iteration range max values: [%s]", this.migrationContext.MigrationIterationRangeMaxValues) return nil } From 90f61f812df1b611d85ee87cdf6fb0e094b9c081 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 28 Dec 2016 14:23:50 +0200 Subject: [PATCH 24/28] resurrected execution does not apply migration range from terminated context --- go/base/context.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index f06fc752d..151450484 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -281,8 +281,8 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) { - this.MigrationRangeMinValues = other.MigrationRangeMinValues - this.MigrationRangeMaxValues = other.MigrationRangeMaxValues + // this.MigrationRangeMinValues = other.MigrationRangeMinValues + // this.MigrationRangeMaxValues = other.MigrationRangeMaxValues this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues From e4874c84bd393e274ab1a8d16561c4295a63319d Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 28 Dec 2016 23:06:18 +0200 Subject: [PATCH 25/28] making sure to dump context before row-copy, so we always have some initial resurrection context --- go/logic/migrator.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7cdb784d6..baf69492e 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -315,6 +315,24 @@ func (this *Migrator) applyResurrectedContext() error { return nil } +func (this *Migrator) dumpResurrectContext() error { + if this.migrationContext.Resurrect && atomic.LoadInt64(&this.migrationContext.IsResurrected) == 0 { + // we're in the process of resurrecting; don't dump context, because it would overwrite + // the very context we want to resurrect by! + return nil + } + + // we dump the context. Note that this operation works sequentially to any row copy or + // event handling. There is no concurrency issue here. + if jsonString, err := this.migrationContext.ToJSON(); err != nil { + return log.Errore(err) + } else { + this.applier.WriteChangelog("context", jsonString) + log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates) + } + return nil +} + // Migrate executes the complete migration logic. This is *the* major gh-ost function. func (this *Migrator) Migrate() (err error) { log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) @@ -393,6 +411,7 @@ func (this *Migrator) Migrate() (err error) { return err } } + this.dumpResurrectContext() go this.executeWriteFuncs() go this.iterateChunks() this.migrationContext.MarkRowCopyStartTime() @@ -1108,16 +1127,7 @@ func (this *Migrator) executeWriteFuncs() error { select { case <-contextDumpTick: { - if !(this.migrationContext.Resurrect && atomic.LoadInt64(&this.migrationContext.IsResurrected) == 0) { - // Not dumping context if we're _in the process of resurrecting_... - // otherwise, we dump the context. Note that this operation works sequentially to any row copy or - // event handling. There is no concurrency issue here. - if jsonString, err := this.migrationContext.ToJSON(); err == nil { - this.applier.WriteChangelog("context", jsonString) - log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates) - } - } - // If we're about to resurrect (resurrect requested) but haven't done so yet, do not wrtie resurrect info. + this.dumpResurrectContext() } case applyEventFunc := <-this.applyEventsQueue: { From e9e9d6d9da5fc7dd1583814ff3ed7a44cb070acb Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 28 Dec 2016 23:17:01 +0200 Subject: [PATCH 26/28] allowing EOF result for loadJSON --- go/logic/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index baf69492e..eefb7231f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -280,7 +280,7 @@ func (this *Migrator) readResurrectedContext() error { // Loading migration context to a temporary location: this.resurrectedContext = base.NewMigrationContext() - if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil { + if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil && err != io.EOF { return err } // Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?) From 24f5c6da6281987cbf33277c09fb25b4b0be1277 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 29 Dec 2016 10:23:48 +0200 Subject: [PATCH 27/28] ght/ghr suffix -> delr suffix --- go/base/context.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 151450484..79f67e9b2 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -279,7 +279,7 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { return nil } -// GetGhostTableName generates the name of ghost table, based on original table name +// ApplyResurrectedContext loads resurrection-related infor from given context func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) { // this.MigrationRangeMinValues = other.MigrationRangeMinValues // this.MigrationRangeMaxValues = other.MigrationRangeMaxValues @@ -303,10 +303,10 @@ func (this *MigrationContext) GetGhostTableName() string { // GetOldTableName generates the name of the "old" table, into which the original table is renamed. func (this *MigrationContext) GetOldTableName() string { if this.TestOnReplica { - return fmt.Sprintf("_%s_ght", this.OriginalTableName) + return fmt.Sprintf("_%s_delr", this.OriginalTableName) } if this.MigrateOnReplica { - return fmt.Sprintf("_%s_ghr", this.OriginalTableName) + return fmt.Sprintf("_%s_delr", this.OriginalTableName) } return fmt.Sprintf("_%s_del", this.OriginalTableName) } From 7bdfd1bff59039205478538a7080110468faf77b Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 29 Dec 2016 10:26:56 +0200 Subject: [PATCH 28/28] not applying range if nil --- go/base/context.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 79f67e9b2..cd302a955 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -283,8 +283,12 @@ func (this *MigrationContext) LoadJSON(jsonString string) error { func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) { // this.MigrationRangeMinValues = other.MigrationRangeMinValues // this.MigrationRangeMaxValues = other.MigrationRangeMaxValues - this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues - this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues + if other.MigrationIterationRangeMinValues != nil { + this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues + } + if other.MigrationIterationRangeMaxValues != nil { + this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues + } this.RowsEstimate = other.RowsEstimate this.RowsDeltaEstimate = other.RowsDeltaEstimate