Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder

moveTablesTargetDB *gosql.DB
moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -182,6 +187,35 @@
return err
}
}
if apl.migrationContext.IsMoveTablesMode() {
if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
true, // <-- include start range values for first select query
); err != nil {
return err
}
if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
false,
); err != nil {
return err
}
if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder(
apl.migrationContext.MoveTables.TargetDatabase,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -1013,6 +1047,121 @@
return chunkSize, rowsAffected, duration, nil
}

// ApplyIterationMoveTableCopyQueries issues a SELECT query on the original table and an INSERT query on the target table,
// copying a chunk of rows. It is used when `--move-table` is specified, instead of ApplyIterationInsertQuery.
func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize)

// First, select data from the source database:
rows, err := func() ([]*sql.ColumnValues, error) {
var qb *sql.MoveTableCopySelectQueryBuilder
if apl.migrationContext.GetIteration() == 0 {
qb = apl.moveTablesCopySelectFirstQueryBuilder
} else {
qb = apl.moveTablesCopySelectNextQueryBuilder
}
query, explodedArgs, err := qb.BuildQuery(
apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
)
if err != nil {
return nil, err
}
sqlRows, err := apl.db.Query(query, explodedArgs...)

Check failure on line 1071 in go/logic/applier.go

View workflow job for this annotation

GitHub Actions / lint

rows.Err must be checked (rowserrcheck)
if err != nil {
return nil, err
}
defer sqlRows.Close()
chunkRows := make([]*sql.ColumnValues, 0, chunkSize)
for sqlRows.Next() {
row := sql.NewColumnValues(apl.migrationContext.SharedColumns.Len())
err := sqlRows.Scan(row.ValuesPointers...)
if err != nil {
return nil, err
}
chunkRows = append(chunkRows, row)
}
return chunkRows, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}

// Then, insert data into the destination database:
sqlResult, err := func() (gosql.Result, error) {
query, explodedArgs, err := apl.moveTablesCopyInsertQueryBuilder.BuildQuery(rows)
if err != nil {
return nil, err
}
tx, err := apl.moveTablesTargetDB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()

sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`,
apl.migrationContext.ApplierTimeZone,
apl.generateSqlModeQuery())
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}

sqlResult, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}

if apl.migrationContext.PanicOnWarnings {
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}
migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex()
if err != nil {
return nil, err
}
var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}

if err := tx.Commit(); err != nil {
return nil, err
}
return sqlResult, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
apl.migrationContext.Log.Debugf(
"Issued SELECT+INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
apl.migrationContext.MigrationIterationRangeMinValues,
apl.migrationContext.MigrationIterationRangeMaxValues,
apl.migrationContext.GetIteration(),
chunkSize,
)

return chunkSize, rowsAffected, duration, nil
}

// LockOriginalTable places a write lock on the original table
func (apl *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
Expand Down
105 changes: 104 additions & 1 deletion go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"context"
gosql "database/sql"
"errors"
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -271,6 +272,7 @@

mysqlContainer testcontainers.Container
db *gosql.DB
otherDB *gosql.DB
}

func (suite *ApplierTestSuite) SetupSuite() {
Expand All @@ -291,12 +293,29 @@

db, err := gosql.Open("mysql", dsn)
suite.Require().NoError(err)

suite.db = db

containerHost, err := mysqlContainer.Host(ctx)
suite.Require().NoError(err)
containerPort, err := mysqlContainer.MappedPort(ctx, "3306/tcp")
suite.Require().NoError(err)

// Second database & connection for move-tables tests:
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther))

Check failure on line 304 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
otherConf := drivermysql.NewConfig()
otherConf.DBName = testMysqlDatabaseOther
otherConf.User = testMysqlUser
otherConf.Passwd = testMysqlPass
otherConf.Net = "tcp"
otherConf.Addr = net.JoinHostPort(containerHost, containerPort.Port())
otherDB, err := gosql.Open("mysql", otherConf.FormatDSN())
suite.Require().NoError(err)
suite.otherDB = otherDB
}

func (suite *ApplierTestSuite) TeardownSuite() {
suite.Assert().NoError(suite.db.Close())
suite.Assert().NoError(suite.otherDB.Close())
suite.Assert().NoError(testcontainers.TerminateContainer(suite.mysqlContainer))
}

Expand Down Expand Up @@ -1542,6 +1561,90 @@
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
}

func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() {
ctx := context.Background()
var err error

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestTableName()))
suite.Require().NoError(err)
_, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestOtherTableName()))
suite.Require().NoError(err)
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name, created_at) VALUES (1, 'alice', '2024-01-15 10:30:00'), (2, 'bob', '2024-06-20 14:45:00'), (3, 'carol', '2025-12-31 23:59:59');", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name", "created_at"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther

applier := NewApplier(migrationContext)
applier.prepareQueries()
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

applier.moveTablesTargetDB = suite.otherDB

err = applier.CreateChangelogTable()
suite.Require().NoError(err)

err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

migrationContext.SetNextIterationRangeMinValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries()
suite.Require().NoError(err)
suite.Require().Equal(int64(3), rowsAffected)
suite.Require().Equal(int64(1000), chunkSize)
suite.Require().Greater(duration, time.Duration(0))

// Verify rows were copied to the other table
rows, err := suite.otherDB.QueryContext(ctx, "SELECT id, name, created_at FROM "+getTestOtherTableName()+" ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

type row struct {
id int
name string
createdAt string
}
var results []row
for rows.Next() {
var r row
err = rows.Scan(&r.id, &r.name, &r.createdAt)
suite.Require().NoError(err)
results = append(results, r)
}
suite.Require().NoError(rows.Err())

suite.Require().Len(results, 3)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("alice", results[0].name)
suite.Require().Equal("2024-01-15 10:30:00", results[0].createdAt)
suite.Require().Equal(2, results[1].id)
suite.Require().Equal("bob", results[1].name)
suite.Require().Equal("2024-06-20 14:45:00", results[1].createdAt)
suite.Require().Equal(3, results[2].id)
suite.Require().Equal("carol", results[2].name)
suite.Require().Equal("2025-12-31 23:59:59", results[2].createdAt)
}

func TestApplier(t *testing.T) {
if testing.Short() {
t.Skip("skipping applier test suite in short mode")
Expand Down
7 changes: 6 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,12 @@ func (mgtr *Migrator) iterateChunks() error {
// _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage.
return nil
}
_, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery()
var rowsAffected int64
if mgtr.migrationContext.IsMoveTablesMode() {
_, rowsAffected, _, err = mgtr.applier.ApplyIterationMoveTableCopyQueries()
} else {
_, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery()
}
if err != nil {
return err // wrapping call will retry
}
Expand Down
5 changes: 5 additions & 0 deletions go/logic/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
testMysqlUser = "root"
testMysqlPass = "root-password"
testMysqlDatabase = "test"
testMysqlDatabaseOther = "test_other"
testMysqlTableName = "testing"
)

Expand All @@ -36,6 +37,10 @@ func getTestOldTableName() string {
return fmt.Sprintf("`%s`.`_%s_del`", testMysqlDatabase, testMysqlTableName)
}

func getTestOtherTableName() string {
return fmt.Sprintf("`%s`.`%s`", testMysqlDatabaseOther, testMysqlTableName)
}

func getTestConnectionConfig(ctx context.Context, container testcontainers.Container) (*mysql.ConnectionConfig, error) {
host, err := container.Host(ctx)
if err != nil {
Expand Down
Loading