diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5b377a80e..da1e20f2f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -657,7 +657,7 @@ func (mgtr *Migrator) Migrate() (err error) { } else { retrier = mgtr.retryOperation } - if err := retrier(mgtr.cutOver); err != nil { + if err := mgtr.cutOverWithMetrics(retrier); err != nil { return err } atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) @@ -782,7 +782,7 @@ func (mgtr *Migrator) Revert() error { if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil { return err } - if err := retrier(mgtr.cutOver); err != nil { + if err := mgtr.cutOverWithMetrics(retrier); err != nil { return err } atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) @@ -802,6 +802,28 @@ func (mgtr *Migrator) ExecOnFailureHook() (err error) { return mgtr.hooksExecutor.OnFailure() } +func (mgtr *Migrator) cutOverWithMetrics(retrier func(func() error, ...bool) error) error { + return mgtr.cutOverOperationWithMetrics(retrier, mgtr.cutOver) +} + +func (mgtr *Migrator) cutOverOperationWithMetrics(retrier func(func() error, ...bool) error, operation func() error) error { + cutOverStartTime := time.Now() + err := retrier(func() error { + err := operation() + if err != nil { + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeRetry) + return err + } + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeSuccess) + return nil + }) + if err != nil { + metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeAbort) + } + metrics.RecordCutOverTotal(mgtr.migrationContext.Metrics, time.Since(cutOverStartTime), err) + return err +} + func (mgtr *Migrator) handleCutOverResult(cutOverError error) (err error) { if mgtr.migrationContext.TestOnReplica { // We're merely testing, we don't want to keep this state. Rollback the renames as possible @@ -965,9 +987,12 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) { defer atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 0) atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + phaseStartTime := time.Now() if err := mgtr.retryOperation(mgtr.applier.LockOriginalTable); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), nil) if err := mgtr.retryOperation(mgtr.waitForEventsUpToLock); err != nil { return err @@ -978,12 +1003,19 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) { return err } } + phaseStartTime = time.Now() if err := mgtr.retryOperation(mgtr.applier.SwapTablesQuickAndBumpy); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), nil) + + phaseStartTime = time.Now() if err := mgtr.retryOperation(mgtr.applier.UnlockTables); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), err) return err } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), nil) lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime) renameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.RenameTablesStartTime) @@ -1007,14 +1039,17 @@ func (mgtr *Migrator) atomicCutOver() (err error) { tableLocked := make(chan error, 2) tableUnlocked := make(chan error, 2) var renameLockSessionId int64 + phaseStartTime := time.Now() go func() { if err := mgtr.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil { mgtr.migrationContext.Log.Errore(err) } }() if err := <-tableLocked; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), err) return mgtr.migrationContext.Log.Errore(err) } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), nil) lockOriginalSessionId := <-lockOriginalSessionIdChan mgtr.migrationContext.Log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId) // At this point we know the original table is locked. @@ -1032,7 +1067,8 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. - mgtr.migrationContext.RenameTablesStartTime = time.Now() + phaseStartTime = time.Now() + mgtr.migrationContext.RenameTablesStartTime = phaseStartTime var tableRenameKnownToHaveFailed int64 renameSessionIdChan := make(chan int64, 2) @@ -1057,6 +1093,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { } // Wait for the RENAME to appear in PROCESSLIST if err := mgtr.retryOperation(waitForRename, true); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) // Abort! Release the lock okToUnlockTable <- true return err @@ -1065,6 +1102,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { mgtr.migrationContext.Log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)") } if err := mgtr.applier.ExpectUsedLock(lockOriginalSessionId); err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) // Abort operation. Just make sure to drop the magic table. return mgtr.migrationContext.Log.Errore(err) } @@ -1074,16 +1112,21 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // we know it is safe to proceed to release the lock renameLockSessionId = renameSessionId + unlockStartTime := time.Now() okToUnlockTable <- true // BAM! magic table dropped, original table lock is released // -> RENAME released -> queries on original are unblocked. if err := <-tableUnlocked; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), err) return mgtr.migrationContext.Log.Errore(err) } + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), nil) if err := <-tablesRenamed; err != nil { + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err) return mgtr.migrationContext.Log.Errore(err) } mgtr.migrationContext.RenameTablesEndTime = time.Now() + metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, mgtr.migrationContext.RenameTablesEndTime.Sub(phaseStartTime), nil) // ooh nice! We're actually truly and thankfully done lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime) diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index d939ae97d..a8c555220 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -29,6 +29,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go" @@ -425,6 +426,79 @@ func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) { func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) { } +type cutOverMetricsSpy struct { + countNames []string + countTags [][]string + histogramNames []string + histogramTags [][]string +} + +func (s *cutOverMetricsSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (s *cutOverMetricsSpy) Count(name string, _ int64, tags ...string) { + s.countNames = append(s.countNames, name) + s.countTags = append(s.countTags, append([]string(nil), tags...)) +} + +func (s *cutOverMetricsSpy) Histogram(name string, _ float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramTags = append(s.histogramTags, append([]string(nil), tags...)) +} + +func TestCutOverOperationWithMetricsRetryThenSuccess(t *testing.T) { + spy := &cutOverMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + migrator := NewMigrator(ctx, "test") + + attempts := 0 + retrier := func(operation func() error, _ ...bool) error { + for { + err := operation() + if err == nil { + return nil + } + if attempts >= 2 { + return err + } + } + } + operation := func() error { + attempts++ + if attempts == 1 { + return errors.New("transient cutover failure") + } + return nil + } + + require.NoError(t, migrator.cutOverOperationWithMetrics(retrier, operation)) + assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.countTags) + assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.histogramTags) +} + +func TestCutOverOperationWithMetricsAbort(t *testing.T) { + spy := &cutOverMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + migrator := NewMigrator(ctx, "test") + cutOverErr := errors.New("cutover failed") + + retrier := func(operation func() error, _ ...bool) error { + return operation() + } + operation := func() error { + return cutOverErr + } + + require.ErrorIs(t, migrator.cutOverOperationWithMetrics(retrier, operation), cutOverErr) + assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeAbort}}, spy.countTags) + assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames) + assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeAbort}}, spy.histogramTags) +} + func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { spy := &progressGaugeSpy{} ctx := base.NewMigrationContext() diff --git a/go/metrics/emit.go b/go/metrics/emit.go index f0357daca..91472be73 100644 --- a/go/metrics/emit.go +++ b/go/metrics/emit.go @@ -130,6 +130,48 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) { emit.Count("throttle.events_total", 1, tags...) } +const ( + CutOverOutcomeSuccess = "success" + CutOverOutcomeRetry = "retry" + CutOverOutcomeAbort = "abort" + + CutOverPhaseMagicLock = "magic_lock" + CutOverPhaseOriginalTableLock = "original_table_lock" + CutOverPhaseMagicRename = "magic_rename" + CutOverPhaseUnlock = "unlock" +) + +// RecordCutOverPhase emits gh_ost.cut_over.phase_duration_milliseconds. +func RecordCutOverPhase(emit Emitter, phase string, duration time.Duration, err error) { + if emit == nil || phase == "" || duration < 0 { + return + } + emit.Histogram("cut_over.phase_duration_milliseconds", float64(duration.Milliseconds()), "phase:"+phase, "outcome:"+cutOverOutcomeFromError(err)) +} + +// RecordCutOverAttempt emits gh_ost.cut_over.attempts_total. +func RecordCutOverAttempt(emit Emitter, outcome string) { + if emit == nil || outcome == "" { + return + } + emit.Count("cut_over.attempts_total", 1, "outcome:"+outcome) +} + +// RecordCutOverTotal emits gh_ost.cut_over.total_duration_milliseconds for terminal cut-over outcomes. +func RecordCutOverTotal(emit Emitter, duration time.Duration, err error) { + if emit == nil || duration < 0 { + return + } + emit.Histogram("cut_over.total_duration_milliseconds", float64(duration.Milliseconds()), "outcome:"+cutOverOutcomeFromError(err)) +} + +func cutOverOutcomeFromError(err error) string { + if err != nil { + return CutOverOutcomeAbort + } + return CutOverOutcomeSuccess +} + // RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags. func RecordQueryDuration(emit Emitter, side string, kind string, duration time.Duration, err error) { if emit == nil || side == "" || kind == "" || duration < 0 { diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go index 657e55099..f2a1b5aba 100644 --- a/go/metrics/emit_test.go +++ b/go/metrics/emit_test.go @@ -32,6 +32,22 @@ func (g *gaugeSpy) Count(name string, value int64, tags ...string) { func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) { } +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {} + +func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {} + +func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { + h.names = append(h.names, name) + h.values = append(h.values, value) + h.tags = append(h.tags, tags) +} + func TestEmitProgressGauges(t *testing.T) { spy := &gaugeSpy{} EmitProgressGauges(spy, 1000, 5000, 42) @@ -259,22 +275,6 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) { EmitThrottleInterval(&gaugeSpy{}, time.Second, "test") } -type histogramSpy struct { - names []string - values []float64 - tags [][]string -} - -func (h *histogramSpy) Gauge(_ string, _ float64, _ ...string) {} - -func (h *histogramSpy) Count(_ string, _ int64, _ ...string) {} - -func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { - h.names = append(h.names, name) - h.values = append(h.values, value) - h.tags = append(h.tags, tags) -} - func TestRecordQueryDuration(t *testing.T) { spy := &histogramSpy{} @@ -302,7 +302,7 @@ func TestRecordQueryDurationNilSafe(t *testing.T) { RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil) } -type sleepSpy struct { +type histogramCountSpy struct { histogramNames []string histogramValues []float64 histogramTags [][]string @@ -311,22 +311,58 @@ type sleepSpy struct { countTags [][]string } -func (s *sleepSpy) Gauge(_ string, _ float64, _ ...string) {} +func (s *histogramCountSpy) Gauge(_ string, _ float64, _ ...string) {} -func (s *sleepSpy) Histogram(name string, value float64, tags ...string) { +func (s *histogramCountSpy) Histogram(name string, value float64, tags ...string) { s.histogramNames = append(s.histogramNames, name) s.histogramValues = append(s.histogramValues, value) s.histogramTags = append(s.histogramTags, tags) } -func (s *sleepSpy) Count(name string, value int64, tags ...string) { +func (s *histogramCountSpy) Count(name string, value int64, tags ...string) { s.countNames = append(s.countNames, name) s.countValues = append(s.countValues, value) s.countTags = append(s.countTags, tags) } +func TestRecordCutOverMetrics(t *testing.T) { + spy := &histogramCountSpy{} + + RecordCutOverPhase(spy, CutOverPhaseMagicLock, 1500*time.Millisecond, nil) + RecordCutOverAttempt(spy, CutOverOutcomeSuccess) + RecordCutOverTotal(spy, 2*time.Second, errors.New("boom")) + + if len(spy.histogramNames) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.histogramNames)) + } + if spy.histogramNames[0] != "cut_over.phase_duration_milliseconds" || spy.histogramValues[0] != 1500 { + t.Fatalf("got first histogram %s=%v", spy.histogramNames[0], spy.histogramValues[0]) + } + if !slices.Equal(spy.histogramTags[0], []string{"phase:magic_lock", "outcome:success"}) { + t.Fatalf("got phase tags %#v", spy.histogramTags[0]) + } + if spy.histogramNames[1] != "cut_over.total_duration_milliseconds" || spy.histogramValues[1] != 2000 { + t.Fatalf("got second histogram %s=%v", spy.histogramNames[1], spy.histogramValues[1]) + } + if !slices.Equal(spy.histogramTags[1], []string{"outcome:abort"}) { + t.Fatalf("got total tags %#v", spy.histogramTags[1]) + } + if len(spy.countNames) != 1 || spy.countNames[0] != "cut_over.attempts_total" || spy.countValues[0] != 1 { + t.Fatalf("got counts %#v values %#v", spy.countNames, spy.countValues) + } + if !slices.Equal(spy.countTags[0], []string{"outcome:success"}) { + t.Fatalf("got count tags %#v", spy.countTags[0]) + } +} + +func TestRecordCutOverMetricsNilSafe(t *testing.T) { + RecordCutOverPhase(nil, CutOverPhaseMagicLock, time.Second, nil) + RecordCutOverAttempt(nil, CutOverOutcomeSuccess) + RecordCutOverTotal(nil, time.Second, nil) +} + func TestRecordSleep(t *testing.T) { - spy := &sleepSpy{} + spy := &histogramCountSpy{} RecordSleep(spy, "retry_backoff", 2*time.Second) @@ -351,7 +387,7 @@ func TestRecordSleep(t *testing.T) { } func TestRecordSleepSubSecond(t *testing.T) { - spy := &sleepSpy{} + spy := &histogramCountSpy{} RecordSleep(spy, "replica_wait", 500*time.Millisecond) @@ -365,6 +401,6 @@ func TestRecordSleepSubSecond(t *testing.T) { func TestRecordSleepNilSafe(t *testing.T) { RecordSleep(nil, "retry_backoff", time.Second) - RecordSleep(&sleepSpy{}, "", time.Second) - RecordSleep(&sleepSpy{}, "retry_backoff", -time.Second) + RecordSleep(&histogramCountSpy{}, "", time.Second) + RecordSleep(&histogramCountSpy{}, "retry_backoff", -time.Second) }