Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (mgtr *Migrator) Migrate() (err error) {
} else {
retrier = mgtr.retryOperation
}
if err := retrier(mgtr.cutOver); err != nil {
if err := mgtr.cutOverWithMetrics(retrier); err != nil {
return err
}
atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1)
Expand Down Expand Up @@ -782,7 +782,7 @@ func (mgtr *Migrator) Revert() error {
if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil {
return err
}
if err := retrier(mgtr.cutOver); err != nil {
if err := mgtr.cutOverWithMetrics(retrier); err != nil {
return err
}
atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1)
Expand All @@ -802,6 +802,28 @@ func (mgtr *Migrator) ExecOnFailureHook() (err error) {
return mgtr.hooksExecutor.OnFailure()
}

func (mgtr *Migrator) cutOverWithMetrics(retrier func(func() error, ...bool) error) error {
return mgtr.cutOverOperationWithMetrics(retrier, mgtr.cutOver)
}

func (mgtr *Migrator) cutOverOperationWithMetrics(retrier func(func() error, ...bool) error, operation func() error) error {
cutOverStartTime := time.Now()
err := retrier(func() error {
err := operation()
if err != nil {
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeRetry)
return err
}
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeSuccess)
return nil
})
if err != nil {
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeAbort)
}
metrics.RecordCutOverTotal(mgtr.migrationContext.Metrics, time.Since(cutOverStartTime), err)
return err
}

func (mgtr *Migrator) handleCutOverResult(cutOverError error) (err error) {
if mgtr.migrationContext.TestOnReplica {
// We're merely testing, we don't want to keep this state. Rollback the renames as possible
Expand Down Expand Up @@ -965,9 +987,12 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) {
defer atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)

phaseStartTime := time.Now()
if err := mgtr.retryOperation(mgtr.applier.LockOriginalTable); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), nil)

if err := mgtr.retryOperation(mgtr.waitForEventsUpToLock); err != nil {
return err
Expand All @@ -978,12 +1003,19 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) {
return err
}
}
phaseStartTime = time.Now()
if err := mgtr.retryOperation(mgtr.applier.SwapTablesQuickAndBumpy); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), nil)

phaseStartTime = time.Now()
if err := mgtr.retryOperation(mgtr.applier.UnlockTables); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), nil)

lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime)
renameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.RenameTablesStartTime)
Expand All @@ -1007,14 +1039,17 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
var renameLockSessionId int64
phaseStartTime := time.Now()
go func() {
if err := mgtr.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
mgtr.migrationContext.Log.Errore(err)
}
}()
if err := <-tableLocked; err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), err)
return mgtr.migrationContext.Log.Errore(err)
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), nil)
lockOriginalSessionId := <-lockOriginalSessionIdChan
mgtr.migrationContext.Log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
// At this point we know the original table is locked.
Expand All @@ -1032,7 +1067,8 @@ func (mgtr *Migrator) atomicCutOver() (err error) {

// Step 2
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
mgtr.migrationContext.RenameTablesStartTime = time.Now()
phaseStartTime = time.Now()
mgtr.migrationContext.RenameTablesStartTime = phaseStartTime

var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make(chan int64, 2)
Expand All @@ -1057,6 +1093,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
}
// Wait for the RENAME to appear in PROCESSLIST
if err := mgtr.retryOperation(waitForRename, true); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
// Abort! Release the lock
okToUnlockTable <- true
return err
Expand All @@ -1065,6 +1102,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
mgtr.migrationContext.Log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)")
}
if err := mgtr.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
// Abort operation. Just make sure to drop the magic table.
return mgtr.migrationContext.Log.Errore(err)
}
Expand All @@ -1074,16 +1112,21 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
// we know it is safe to proceed to release the lock

renameLockSessionId = renameSessionId
unlockStartTime := time.Now()
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <-tableUnlocked; err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), err)
return mgtr.migrationContext.Log.Errore(err)
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), nil)
if err := <-tablesRenamed; err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
return mgtr.migrationContext.Log.Errore(err)
}
mgtr.migrationContext.RenameTablesEndTime = time.Now()
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, mgtr.migrationContext.RenameTablesEndTime.Sub(phaseStartTime), nil)

// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime)
Expand Down
74 changes: 74 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -425,6 +426,79 @@ func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) {
func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) {
}

type cutOverMetricsSpy struct {
countNames []string
countTags [][]string
histogramNames []string
histogramTags [][]string
}

func (s *cutOverMetricsSpy) Gauge(_ string, _ float64, _ ...string) {}

func (s *cutOverMetricsSpy) Count(name string, _ int64, tags ...string) {
s.countNames = append(s.countNames, name)
s.countTags = append(s.countTags, append([]string(nil), tags...))
}

func (s *cutOverMetricsSpy) Histogram(name string, _ float64, tags ...string) {
s.histogramNames = append(s.histogramNames, name)
s.histogramTags = append(s.histogramTags, append([]string(nil), tags...))
}

func TestCutOverOperationWithMetricsRetryThenSuccess(t *testing.T) {
spy := &cutOverMetricsSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
migrator := NewMigrator(ctx, "test")

attempts := 0
retrier := func(operation func() error, _ ...bool) error {
for {
err := operation()
if err == nil {
return nil
}
if attempts >= 2 {
return err
}
}
}
operation := func() error {
attempts++
if attempts == 1 {
return errors.New("transient cutover failure")
}
return nil
}

require.NoError(t, migrator.cutOverOperationWithMetrics(retrier, operation))
assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.countTags)
assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.histogramTags)
}

func TestCutOverOperationWithMetricsAbort(t *testing.T) {
spy := &cutOverMetricsSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
migrator := NewMigrator(ctx, "test")
cutOverErr := errors.New("cutover failed")

retrier := func(operation func() error, _ ...bool) error {
return operation()
}
operation := func() error {
return cutOverErr
}

require.ErrorIs(t, migrator.cutOverOperationWithMetrics(retrier, operation), cutOverErr)
assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeAbort}}, spy.countTags)
assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeAbort}}, spy.histogramTags)
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
Expand Down
42 changes: 42 additions & 0 deletions go/metrics/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,48 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) {
emit.Count("throttle.events_total", 1, tags...)
}

const (
CutOverOutcomeSuccess = "success"
CutOverOutcomeRetry = "retry"
CutOverOutcomeAbort = "abort"

CutOverPhaseMagicLock = "magic_lock"
CutOverPhaseOriginalTableLock = "original_table_lock"
CutOverPhaseMagicRename = "magic_rename"
CutOverPhaseUnlock = "unlock"
)

// RecordCutOverPhase emits gh_ost.cut_over.phase_duration_milliseconds.
func RecordCutOverPhase(emit Emitter, phase string, duration time.Duration, err error) {
if emit == nil || phase == "" || duration < 0 {
return
}
emit.Histogram("cut_over.phase_duration_milliseconds", float64(duration.Milliseconds()), "phase:"+phase, "outcome:"+cutOverOutcomeFromError(err))
}

// RecordCutOverAttempt emits gh_ost.cut_over.attempts_total.
func RecordCutOverAttempt(emit Emitter, outcome string) {
if emit == nil || outcome == "" {
return
}
emit.Count("cut_over.attempts_total", 1, "outcome:"+outcome)
}

// RecordCutOverTotal emits gh_ost.cut_over.total_duration_milliseconds for terminal cut-over outcomes.
func RecordCutOverTotal(emit Emitter, duration time.Duration, err error) {
if emit == nil || duration < 0 {
return
}
emit.Histogram("cut_over.total_duration_milliseconds", float64(duration.Milliseconds()), "outcome:"+cutOverOutcomeFromError(err))
}

func cutOverOutcomeFromError(err error) string {
if err != nil {
return CutOverOutcomeAbort
}
return CutOverOutcomeSuccess
}

// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags.
func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) {
if emit == nil || side == "" || kind == "" || duration < 0 {
Expand Down
Loading
Loading