From 5f7ff63f82cc84f16399780f81c575a5caca3a67 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 20 May 2026 11:10:28 +0000 Subject: [PATCH 1/3] Add `ApplyIterationMoveTableCopyQueries` function to `Applier` to use for move-tables feature --- go/logic/applier.go | 120 ++++++++++++++++++++++++++++++++++++++ go/logic/applier_test.go | 123 ++++++++++++++++++++++++++++++++++++++- go/logic/test_utils.go | 5 ++ 3 files changed, 247 insertions(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index b49e131b8..a469e135c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -83,6 +83,11 @@ type Applier struct { dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder + + moveTablesTargetDB *gosql.DB + moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder + moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder + moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -1013,6 +1018,121 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i return chunkSize, rowsAffected, duration, nil } +// ApplyIterationMoveTableCopyQueries issues a SELECT query on the original table and an INSERT query on the target table, +// copying a chunk of rows. It is used when `--move-table` is specified, instead of ApplyIterationInsertQuery. +func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { + startTime := time.Now() + chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + + // First, select data from the source database: + rows, err := func() ([]*sql.ColumnValues, error) { + var qb *sql.MoveTableCopySelectQueryBuilder + if apl.migrationContext.GetIteration() == 0 { + qb = apl.moveTablesCopySelectFirstQueryBuilder + } else { + qb = apl.moveTablesCopySelectNextQueryBuilder + } + query, explodedArgs, err := qb.BuildQuery( + apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), + ) + if err != nil { + return nil, err + } + sqlRows, err := apl.db.Query(query, explodedArgs...) + if err != nil { + return nil, err + } + defer sqlRows.Close() + chunkRows := make([]*sql.ColumnValues, 0, chunkSize) + for sqlRows.Next() { + row := sql.NewColumnValues(apl.migrationContext.SharedColumns.Len()) + err := sqlRows.Scan(row.ValuesPointers...) + if err != nil { + return nil, err + } + chunkRows = append(chunkRows, row) + } + return chunkRows, nil + }() + if err != nil { + return chunkSize, rowsAffected, duration, err + } + + // Then, insert data into the destination database: + sqlResult, err := func() (gosql.Result, error) { + query, explodedArgs, err := apl.moveTablesCopyInsertQueryBuilder.BuildQuery(rows) + if err != nil { + return nil, err + } + tx, err := apl.moveTablesTargetDB.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`, + apl.migrationContext.ApplierTimeZone, + apl.generateSqlModeQuery()) + if _, err := tx.Exec(sessionQuery); err != nil { + return nil, err + } + + sqlResult, err := tx.Exec(query, explodedArgs...) + if err != nil { + return nil, err + } + + if apl.migrationContext.PanicOnWarnings { + rows, err := tx.Query("SHOW WARNINGS") + if err != nil { + return nil, err + } + defer rows.Close() + if err = rows.Err(); err != nil { + return nil, err + } + migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() + if err != nil { + return nil, err + } + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings + } + + if err := tx.Commit(); err != nil { + return nil, err + } + return sqlResult, nil + }() + if err != nil { + return chunkSize, rowsAffected, duration, err + } + rowsAffected, _ = sqlResult.RowsAffected() + duration = time.Since(startTime) + apl.migrationContext.Log.Debugf( + "Issued SELECT+INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", + apl.migrationContext.MigrationIterationRangeMinValues, + apl.migrationContext.MigrationIterationRangeMaxValues, + apl.migrationContext.GetIteration(), + chunkSize, + ) + + return chunkSize, rowsAffected, duration, nil +} + // LockOriginalTable places a write lock on the original table func (apl *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6d7ba42f4..4b436e712 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -9,6 +9,7 @@ import ( "context" gosql "database/sql" "errors" + "net" "strings" "testing" "time" @@ -271,6 +272,7 @@ type ApplierTestSuite struct { mysqlContainer testcontainers.Container db *gosql.DB + otherDB *gosql.DB } func (suite *ApplierTestSuite) SetupSuite() { @@ -291,12 +293,29 @@ func (suite *ApplierTestSuite) SetupSuite() { db, err := gosql.Open("mysql", dsn) suite.Require().NoError(err) - suite.db = db + + containerHost, err := mysqlContainer.Host(ctx) + suite.Require().NoError(err) + containerPort, err := mysqlContainer.MappedPort(ctx, "3306/tcp") + suite.Require().NoError(err) + + // Second database & connection for move-tables tests: + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther)) + otherConf := drivermysql.NewConfig() + otherConf.DBName = testMysqlDatabaseOther + otherConf.User = testMysqlUser + otherConf.Passwd = testMysqlPass + otherConf.Net = "tcp" + otherConf.Addr = net.JoinHostPort(containerHost, containerPort.Port()) + otherDB, err := gosql.Open("mysql", otherConf.FormatDSN()) + suite.Require().NoError(err) + suite.otherDB = otherDB } func (suite *ApplierTestSuite) TeardownSuite() { suite.Assert().NoError(suite.db.Close()) + suite.Assert().NoError(suite.otherDB.Close()) suite.Assert().NoError(testcontainers.TerminateContainer(suite.mysqlContainer)) } @@ -1542,6 +1561,108 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { // Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back } +func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { + ctx := context.Background() + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestOtherTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name, created_at) VALUES (1, 'alice', '2024-01-15 10:30:00'), (2, 'bob', '2024-06-20 14:45:00'), (3, 'carol', '2025-12-31 23:59:59');", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Set up the move-tables query builders and target DB + applier.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + testMysqlDatabase, testMysqlTableName, + migrationContext.SharedColumns, migrationContext.UniqueKey.Name, + &migrationContext.UniqueKey.Columns, true, + ) + suite.Require().NoError(err) + + applier.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + testMysqlDatabase, testMysqlTableName, + migrationContext.SharedColumns, migrationContext.UniqueKey.Name, + &migrationContext.UniqueKey.Columns, false, + ) + suite.Require().NoError(err) + + applier.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder( + testMysqlDatabaseOther, testMysqlTableName, + migrationContext.MappedSharedColumns, + ) + suite.Require().NoError(err) + + applier.moveTablesTargetDB = suite.otherDB + + err = applier.CreateChangelogTable() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues() + suite.Require().NoError(err) + + migrationContext.SetNextIterationRangeMinValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + suite.Require().NoError(err) + suite.Require().True(hasFurtherRange) + + chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries() + suite.Require().NoError(err) + suite.Require().Equal(int64(3), rowsAffected) + suite.Require().Equal(int64(1000), chunkSize) + suite.Require().Greater(duration, time.Duration(0)) + + // Verify rows were copied to the other table + rows, err := suite.otherDB.QueryContext(ctx, "SELECT id, name, created_at FROM "+getTestOtherTableName()+" ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + type row struct { + id int + name string + createdAt string + } + var results []row + for rows.Next() { + var r row + err = rows.Scan(&r.id, &r.name, &r.createdAt) + suite.Require().NoError(err) + results = append(results, r) + } + suite.Require().NoError(rows.Err()) + + suite.Require().Len(results, 3) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice", results[0].name) + suite.Require().Equal("2024-01-15 10:30:00", results[0].createdAt) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("bob", results[1].name) + suite.Require().Equal("2024-06-20 14:45:00", results[1].createdAt) + suite.Require().Equal(3, results[2].id) + suite.Require().Equal("carol", results[2].name) + suite.Require().Equal("2025-12-31 23:59:59", results[2].createdAt) +} + func TestApplier(t *testing.T) { if testing.Short() { t.Skip("skipping applier test suite in short mode") diff --git a/go/logic/test_utils.go b/go/logic/test_utils.go index f552cfc76..966eed1b6 100644 --- a/go/logic/test_utils.go +++ b/go/logic/test_utils.go @@ -17,6 +17,7 @@ var ( testMysqlUser = "root" testMysqlPass = "root-password" testMysqlDatabase = "test" + testMysqlDatabaseOther = "test_other" testMysqlTableName = "testing" ) @@ -36,6 +37,10 @@ func getTestOldTableName() string { return fmt.Sprintf("`%s`.`_%s_del`", testMysqlDatabase, testMysqlTableName) } +func getTestOtherTableName() string { + return fmt.Sprintf("`%s`.`%s`", testMysqlDatabaseOther, testMysqlTableName) +} + func getTestConnectionConfig(ctx context.Context, container testcontainers.Container) (*mysql.ConnectionConfig, error) { host, err := container.Host(ctx) if err != nil { From 927e8cb8d7cc136ea33181e7298f7472129cebb0 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 20 May 2026 11:32:10 +0000 Subject: [PATCH 2/3] Add move-tables query builders to applier's prepareQueries method --- go/logic/applier.go | 29 +++++++++++++++++++++++++++++ go/logic/applier_test.go | 24 +++--------------------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index a469e135c..9d9d2ed4b 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -187,6 +187,35 @@ func (apl *Applier) prepareQueries() (err error) { return err } } + if apl.migrationContext.IsMoveTablesMode() { + if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + apl.migrationContext.DatabaseName, + apl.migrationContext.OriginalTableName, + apl.migrationContext.SharedColumns, + apl.migrationContext.UniqueKey.Name, + &apl.migrationContext.UniqueKey.Columns, + true, // <-- include start range values for first select query + ); err != nil { + return err + } + if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + apl.migrationContext.DatabaseName, + apl.migrationContext.OriginalTableName, + apl.migrationContext.SharedColumns, + apl.migrationContext.UniqueKey.Name, + &apl.migrationContext.UniqueKey.Columns, + false, + ); err != nil { + return err + } + if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder( + apl.migrationContext.MoveTables.TargetDatabase, + apl.migrationContext.OriginalTableName, + apl.migrationContext.SharedColumns, + ); err != nil { + return err + } + } return nil } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 4b436e712..72c42c75f 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1585,34 +1585,16 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { Name: "PRIMARY", Columns: *sql.NewColumnList([]string{"id"}), } + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther applier := NewApplier(migrationContext) + applier.prepareQueries() defer applier.Teardown() err = applier.InitDBConnections() suite.Require().NoError(err) - // Set up the move-tables query builders and target DB - applier.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( - testMysqlDatabase, testMysqlTableName, - migrationContext.SharedColumns, migrationContext.UniqueKey.Name, - &migrationContext.UniqueKey.Columns, true, - ) - suite.Require().NoError(err) - - applier.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( - testMysqlDatabase, testMysqlTableName, - migrationContext.SharedColumns, migrationContext.UniqueKey.Name, - &migrationContext.UniqueKey.Columns, false, - ) - suite.Require().NoError(err) - - applier.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder( - testMysqlDatabaseOther, testMysqlTableName, - migrationContext.MappedSharedColumns, - ) - suite.Require().NoError(err) - applier.moveTablesTargetDB = suite.otherDB err = applier.CreateChangelogTable() From ac6f966319ba477fd9324d6cb57add815e930604 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 20 May 2026 12:12:39 +0000 Subject: [PATCH 3/3] Call `ApplyIterationMoveTableCopyQueries` in migrator --- go/logic/migrator.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a4114eb90..9d57c5eb9 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1700,7 +1700,12 @@ func (mgtr *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() + var rowsAffected int64 + if mgtr.migrationContext.IsMoveTablesMode() { + _, rowsAffected, _, err = mgtr.applier.ApplyIterationMoveTableCopyQueries() + } else { + _, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery() + } if err != nil { return err // wrapping call will retry }