From be3d0a13e8eff5d5ddfe08f430214012432e5359 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Fri, 22 May 2026 01:31:00 +0200 Subject: [PATCH 1/2] Add experimental SQLite row-stream apply path --- packages/reprint-importer/src/import.php | 815 ++++++++++++++++-- .../class-sql-statement-rewriter.php | 5 + .../class-sqlite-row-stream-sidecar.php | 341 ++++++++ .../src/lib/url-rewrite/load.php | 1 + tests/Import/NewSiteUrlSqliteTest.php | 108 +++ .../SQLiteRowStreamSidecarTest.php | 129 +++ tests/e2e/benchmark/bench-pull.mjs | 18 +- 7 files changed, 1325 insertions(+), 92 deletions(-) create mode 100644 packages/reprint-importer/src/lib/url-rewrite/class-sqlite-row-stream-sidecar.php create mode 100644 tests/UrlRewriting/SQLiteRowStreamSidecarTest.php diff --git a/packages/reprint-importer/src/import.php b/packages/reprint-importer/src/import.php index c77fe16c..2e662818 100755 --- a/packages/reprint-importer/src/import.php +++ b/packages/reprint-importer/src/import.php @@ -960,6 +960,7 @@ class ImportClient private const SAVE_STATE_EVERY_N_CHUNKS = 50; private const STATE_PATH_ENCODING_PREFIX = "base64:"; private const SQLITE_PREPARED_INSERT_CACHE_MAX = 128; + private const SQLITE_ROW_STREAM_SIDECAR = ".import-sqlite-row-stream.jsonl"; /** * Maximum number of consecutive cURL timeouts with no cursor progress @@ -1181,6 +1182,9 @@ class ImportClient /** @var string SQL output mode: 'file' (default), 'stdout', or 'mysql'. */ private $sql_output_mode = 'file'; + /** @var bool Enable the non-default SQLite row-stream sidecar path. */ + private $experimental_sqlite_row_stream = false; + /** @var string|null MySQL host for --sql-output=mysql. */ private $mysql_host; @@ -1703,6 +1707,24 @@ public function run(array $options = []): void ); } + if (array_key_exists("experimental_sqlite_row_stream", $options)) { + $this->experimental_sqlite_row_stream = (bool) $options["experimental_sqlite_row_stream"]; + $this->state["experimental_sqlite_row_stream"] = $this->experimental_sqlite_row_stream; + $this->save_state($this->state); + } else { + $this->experimental_sqlite_row_stream = (bool) ($this->state["experimental_sqlite_row_stream"] ?? false); + } + + if ( + $this->experimental_sqlite_row_stream && + in_array($command, ["pull", "db-pull"], true) && + $this->sql_output_mode !== "file" + ) { + throw new InvalidArgumentException( + "--experimental-sqlite-row-stream requires --sql-output=file so fallback SQL byte ranges remain available.", + ); + } + $this->initialize_tuner($options); // Initialize HMAC authentication if a shared secret was provided. @@ -1950,6 +1972,15 @@ private function handle_abort(string $command): void "FILE DELETE | {$domains_file} | abort db-pull", ); } + $row_stream_file = $this->state_dir . "/" . self::SQLITE_ROW_STREAM_SIDECAR; + if (file_exists($row_stream_file)) { + unlink($row_stream_file); + $this->audit_log( + "FILE DELETE | {$row_stream_file} | abort db-pull", + ); + } + $this->state["sqlite_row_stream"] = $this->default_state()["sqlite_row_stream"]; + $this->save_state($this->state); break; case "db-index": @@ -3572,6 +3603,10 @@ public function run_db_sync(): void return; } + if ($this->experimental_sqlite_row_stream && $this->sql_output_mode === "file") { + $this->build_sqlite_row_stream_sidecar($sql_file); + } + // Mark as complete $this->state["status"] = "complete"; $this->save_state($this->state); @@ -5118,6 +5153,16 @@ public function run_db_apply(array $options): void ); } + // Load pre-computed statement count from db-pull for progress reporting. + $sql_stats_file = $this->state_dir . "/.import-sql-stats.json"; + $statements_total = null; + if (file_exists($sql_stats_file)) { + $stats = json_decode(file_get_contents($sql_stats_file), true); + if (is_array($stats) && isset($stats["statements_total"])) { + $statements_total = (int) $stats["statements_total"]; + } + } + [$pdo, $connection_label] = $this->create_target_db_apply_connection($options); $sqlite_prepared_pdo = null; $sqlite_prepared_statement_cache = []; @@ -5142,6 +5187,25 @@ public function run_db_apply(array $options): void false, ); + if ($this->experimental_sqlite_row_stream) { + if ($sqlite_prepared_pdo === null) { + throw new RuntimeException( + "--experimental-sqlite-row-stream is only supported with --target-engine=sqlite.", + ); + } + $this->run_db_apply_sqlite_row_stream( + $pdo, + $sqlite_prepared_pdo, + $stmt_rewriter, + $sqlite_prepared_statement_cache, + $sqlite_prepared_statement_cache_order, + $options, + $statements_executed, + $statements_total, + ); + return; + } + // Stream db.sql through the query stream and execute. Use the // fast strcspn-based parser by default; it self-falls-back to // WP_MySQL_Naive_Query_Stream if it ever fails to make progress @@ -5178,16 +5242,6 @@ public function run_db_apply(array $options): void $save_every = 100; $stmts_since_save = 0; - // Load pre-computed statement count from db-pull for progress reporting - $sql_stats_file = $this->state_dir . "/.import-sql-stats.json"; - $statements_total = null; - if (file_exists($sql_stats_file)) { - $stats = json_decode(file_get_contents($sql_stats_file), true); - if (is_array($stats) && isset($stats["statements_total"])) { - $statements_total = (int) $stats["statements_total"]; - } - } - // If resuming, seek to saved position. bytes_read is the byte offset // right after the last successfully executed query (tracked via // query_stream->get_bytes_consumed()), so no statement skipping is @@ -5430,6 +5484,393 @@ public function run_db_apply(array $options): void } } + private function run_db_apply_sqlite_row_stream( + PDO $pdo, + PDO $sqlite_prepared_pdo, + ?SqlStatementRewriter $stmt_rewriter, + array &$sqlite_prepared_statement_cache, + array &$sqlite_prepared_statement_cache_order, + array $options, + int $statements_executed, + ?int $statements_total + ): void { + $sql_file = $this->state_dir . "/db.sql"; + $sidecar_file = $this->state_dir . "/" . self::SQLITE_ROW_STREAM_SIDECAR; + if (!file_exists($sidecar_file)) { + throw new RuntimeException( + "SQLite row stream sidecar not found. Run db-pull with --experimental-sqlite-row-stream first.", + ); + } + + $sql_file_size = filesize($sql_file); + $sidecar_file_size = filesize($sidecar_file); + if ($sql_file_size === false || $sidecar_file_size === false) { + throw new RuntimeException("Cannot stat SQLite row stream inputs."); + } + + $sidecar_handle = fopen($sidecar_file, "r"); + if (!$sidecar_handle) { + throw new RuntimeException("Cannot open SQLite row stream sidecar: {$sidecar_file}"); + } + $sql_handle = fopen($sql_file, "r"); + if (!$sql_handle) { + fclose($sidecar_handle); + throw new RuntimeException("Cannot open SQL file: {$sql_file}"); + } + + $meta_line = fgets($sidecar_handle); + if ($meta_line === false) { + fclose($sidecar_handle); + fclose($sql_handle); + throw new RuntimeException("SQLite row stream sidecar is empty."); + } + $meta = json_decode($meta_line, true); + if ( + !is_array($meta) || + ($meta['kind'] ?? null) !== 'meta' || + ($meta['v'] ?? null) !== SQLiteRowStreamSidecar::VERSION + ) { + fclose($sidecar_handle); + fclose($sql_handle); + throw new RuntimeException("SQLite row stream sidecar has invalid metadata."); + } + if ((int) ($meta['sql_bytes'] ?? -1) !== (int) $sql_file_size) { + fclose($sidecar_handle); + fclose($sql_handle); + throw new RuntimeException( + "SQLite row stream sidecar does not match db.sql size. Re-run db-pull with --abort.", + ); + } + + $first_record_offset = ftell($sidecar_handle); + if ($first_record_offset === false) { + $first_record_offset = strlen($meta_line); + } + + $row_stream_state = $this->state["sqlite_row_stream"] ?? []; + if ( + $statements_total === null && + isset($row_stream_state["statements_total"]) && + (int) $row_stream_state["statements_total"] > 0 + ) { + $statements_total = (int) $row_stream_state["statements_total"]; + } + + $apply_state = $this->state["apply"] ?? $this->default_state()["apply"]; + $row_stream_bytes_read = (int) ($apply_state["row_stream_bytes_read"] ?? 0); + $last_sql_bytes_read = (int) ($apply_state["bytes_read"] ?? 0); + $stmt_count = 0; + $stmts_to_skip = 0; + if ( + $row_stream_bytes_read > $first_record_offset && + $row_stream_bytes_read < $sidecar_file_size + ) { + fseek($sidecar_handle, $row_stream_bytes_read); + $stmt_count = $statements_executed; + } elseif ($statements_executed > 0) { + fseek($sidecar_handle, $first_record_offset); + $stmts_to_skip = $statements_executed; + } else { + fseek($sidecar_handle, $first_record_offset); + } + + $this->audit_log( + sprintf( + "SQLITE ROW STREAM db-apply | sidecar=%s | resume_statements=%d | row_stream_offset=%d", + $sidecar_file, + $statements_executed, + $row_stream_bytes_read, + ), + false, + ); + + $this->output_progress([ + "status" => "starting", + "phase" => "db-apply", + "statements_total" => $statements_total, + "message" => "Applying SQLite row stream" . ($statements_total !== null ? " ({$statements_total} statements)" : ""), + ]); + + $save_every = 100; + $stmts_since_save = 0; + + try { + while (($line = fgets($sidecar_handle)) !== false) { + if ($this->shutdown_requested) { + $this->audit_log("SHUTDOWN REQUESTED | saving SQLite row stream state", true); + break; + } + if (function_exists("pcntl_signal_dispatch")) { + pcntl_signal_dispatch(); + } + + $trimmed = trim($line); + if ($trimmed === '') { + continue; + } + $record = json_decode($trimmed, true); + if (!is_array($record)) { + throw new RuntimeException("Invalid SQLite row stream JSON record at statement " . ($stmt_count + 1)); + } + if (($record['kind'] ?? null) === 'meta') { + continue; + } + + $stmt_count++; + if ($stmts_to_skip > 0) { + $stmts_to_skip--; + continue; + } + + $executed_query = ''; + try { + $this->execute_db_apply_row_stream_record( + $pdo, + $sqlite_prepared_pdo, + $sql_handle, + $record, + $stmt_rewriter, + $sqlite_prepared_statement_cache, + $sqlite_prepared_statement_cache_order, + $executed_query, + ); + } catch (PDOException $e) { + $this->audit_log( + sprintf( + "SQL ERROR | stmt=%d | %s | query=%.200s", + $stmt_count, + $e->getMessage(), + $executed_query, + ), + true, + ); + throw new RuntimeException( + "SQL execution error at statement {$stmt_count}: " . + $e->getMessage(), + ); + } + + $statements_executed++; + $stmts_since_save++; + if (isset($record['sql_offset'], $record['sql_length'])) { + $last_sql_bytes_read = (int) $record['sql_offset'] + (int) $record['sql_length']; + } + + if ($stmts_since_save >= $save_every) { + $this->state["apply"]["statements_executed"] = $statements_executed; + $this->state["apply"]["bytes_read"] = $last_sql_bytes_read; + $this->state["apply"]["row_stream_bytes_read"] = ftell($sidecar_handle) ?: 0; + $this->save_state($this->state); + $stmts_since_save = 0; + + $apply_fraction = $sql_file_size > 0 + ? $last_sql_bytes_read / $sql_file_size + : null; + $pct = $apply_fraction !== null ? round($apply_fraction * 100, 1) : 0; + $progress_message = sprintf( + "%s statements", + $statements_total === null + ? number_format($statements_executed) + : number_format($statements_executed) . " / " . number_format($statements_total), + ); + + $this->output_progress([ + "phase" => "db-apply", + "statements_executed" => $statements_executed, + "bytes_read" => $last_sql_bytes_read, + "bytes_total" => $sql_file_size, + "pct" => $pct, + "statements_total" => $statements_total, + "message" => $progress_message, + ]); + $this->progress->show_progress_line($progress_message, $apply_fraction); + } + } + + if ($this->shutdown_requested) { + $this->state["apply"]["statements_executed"] = $statements_executed; + $this->state["apply"]["bytes_read"] = $last_sql_bytes_read; + $this->state["apply"]["row_stream_bytes_read"] = ftell($sidecar_handle) ?: 0; + $this->state["status"] = "partial"; + $this->save_state($this->state); + $this->audit_log( + sprintf( + "PARTIAL SQLite row stream db-apply | %d statements executed", + $statements_executed, + ), + true, + ); + $this->output_progress([ + "status" => "partial", + "phase" => "db-apply", + "statements_executed" => $statements_executed, + "statements_total" => $statements_total, + "message" => "db-apply partial: {$statements_executed} statements executed", + ], true); + return; + } + + $deactivated = $this->deactivate_host_plugins($pdo); + foreach ($deactivated as $basename) { + $this->audit_log("DB-APPLY | deactivated plugin {$basename} (host-specific)"); + } + + $deactivated = $this->deactivate_path_incompatible_plugins( + $pdo, + (string) ($options["new_site_url"] ?? ""), + ); + foreach ($deactivated as $basename) { + $this->audit_log("DB-APPLY | deactivated plugin {$basename} (path-incompatible siteurl)"); + } + + $this->state["apply"]["statements_executed"] = $statements_executed; + $this->state["apply"]["bytes_read"] = (int) $sql_file_size; + $this->state["apply"]["row_stream_bytes_read"] = (int) $sidecar_file_size; + $this->state["status"] = "complete"; + $this->save_state($this->state); + + $this->audit_log( + sprintf( + "SQLite row stream db-apply complete | %d statements executed", + $statements_executed, + ), + true, + ); + + $this->output_progress([ + "status" => "complete", + "phase" => "db-apply", + "statements_executed" => $statements_executed, + "statements_total" => $statements_total, + "message" => "db-apply complete ({$statements_executed} statements executed)", + ]); + + if (!$this->progress->is_quiet_lifecycle()) { + $this->progress->clear_progress_line(); + } + $this->progress->show_lifecycle_line("db-apply complete ({$statements_executed} statements executed)\n"); + } finally { + fclose($sidecar_handle); + fclose($sql_handle); + } + } + + private function execute_db_apply_row_stream_record( + PDO $pdo, + PDO $sqlite_prepared_pdo, + $sql_handle, + array $record, + ?SqlStatementRewriter $stmt_rewriter, + array &$sqlite_prepared_statement_cache, + array &$sqlite_prepared_statement_cache_order, + string &$executed_query + ): void { + $executed_query = ''; + + if (SQLiteRowStreamSidecar::is_insert_record($record)) { + $rewrite_value = $stmt_rewriter !== null + ? function (string $value, string $table, ?string $column) use ($stmt_rewriter): string { + return $stmt_rewriter->rewrite_sqlite_row_stream_value($value, $table, $column); + } + : null; + $prepared_insert = SQLiteRowStreamSidecar::record_to_prepared_insert($record, $rewrite_value); + if ($prepared_insert !== null) { + $executed_query = $prepared_insert['sql']; + $this->execute_sqlite_prepared_insert( + $sqlite_prepared_pdo, + $prepared_insert, + $sqlite_prepared_statement_cache, + $sqlite_prepared_statement_cache_order, + ); + return; + } + } + + $query = $this->read_sql_record_slice($sql_handle, $record); + $this->execute_db_apply_query( + $pdo, + $query, + $stmt_rewriter, + $sqlite_prepared_pdo, + $sqlite_prepared_statement_cache, + $sqlite_prepared_statement_cache_order, + $executed_query, + ); + } + + private function read_sql_record_slice($sql_handle, array $record): string + { + if (!isset($record['sql_offset'], $record['sql_length'])) { + throw new RuntimeException("SQLite row stream fallback record is missing SQL byte range."); + } + + $offset = (int) $record['sql_offset']; + $length = (int) $record['sql_length']; + if ($offset < 0 || $length < 0) { + throw new RuntimeException("SQLite row stream fallback record has an invalid SQL byte range."); + } + + if (fseek($sql_handle, $offset) !== 0) { + throw new RuntimeException("Failed to seek db.sql for SQLite row stream fallback."); + } + + $sql = ''; + while (strlen($sql) < $length) { + $chunk = fread($sql_handle, $length - strlen($sql)); + if ($chunk === false || $chunk === '') { + break; + } + $sql .= $chunk; + } + + if (strlen($sql) !== $length) { + throw new RuntimeException("Failed to read complete SQL fallback statement from db.sql."); + } + + return $sql; + } + + /** + * @param array{sql: string, params: list, param_types: list} $prepared_insert + */ + private function execute_sqlite_prepared_insert( + PDO $sqlite_prepared_pdo, + array $prepared_insert, + array &$sqlite_prepared_statement_cache, + array &$sqlite_prepared_statement_cache_order + ): void { + $statement = $sqlite_prepared_statement_cache[$prepared_insert['sql']] ?? null; + if (!$statement instanceof PDOStatement) { + $statement = $sqlite_prepared_pdo->prepare($prepared_insert['sql']); + if ($statement === false) { + throw new PDOException('Failed to prepare SQLite INSERT statement.'); + } + + $sqlite_prepared_statement_cache[$prepared_insert['sql']] = $statement; + $sqlite_prepared_statement_cache_order[] = $prepared_insert['sql']; + if (count($sqlite_prepared_statement_cache_order) > self::SQLITE_PREPARED_INSERT_CACHE_MAX) { + $oldest_sql = array_shift($sqlite_prepared_statement_cache_order); + if (is_string($oldest_sql)) { + unset($sqlite_prepared_statement_cache[$oldest_sql]); + } + } + } else { + $statement->closeCursor(); + } + + foreach ($prepared_insert['params'] as $index => $value) { + $statement->bindValue( + $index + 1, + $value, + $prepared_insert['param_types'][$index] ?? PDO::PARAM_STR + ); + } + + if ($statement->execute() === false) { + throw new PDOException('Failed to execute SQLite INSERT statement.'); + } + } + private function execute_db_apply_query( PDO $pdo, string $query, @@ -5448,36 +5889,12 @@ private function execute_db_apply_query( if ($prepared_insert !== null) { $executed_query = $prepared_insert['sql']; - $statement = $sqlite_prepared_statement_cache[$prepared_insert['sql']] ?? null; - if (!$statement instanceof PDOStatement) { - $statement = $sqlite_prepared_pdo->prepare($prepared_insert['sql']); - if ($statement === false) { - throw new PDOException('Failed to prepare SQLite INSERT statement.'); - } - - $sqlite_prepared_statement_cache[$prepared_insert['sql']] = $statement; - $sqlite_prepared_statement_cache_order[] = $prepared_insert['sql']; - if (count($sqlite_prepared_statement_cache_order) > self::SQLITE_PREPARED_INSERT_CACHE_MAX) { - $oldest_sql = array_shift($sqlite_prepared_statement_cache_order); - if (is_string($oldest_sql)) { - unset($sqlite_prepared_statement_cache[$oldest_sql]); - } - } - } else { - $statement->closeCursor(); - } - - foreach ($prepared_insert['params'] as $index => $value) { - $statement->bindValue( - $index + 1, - $value, - $prepared_insert['param_types'][$index] ?? PDO::PARAM_STR - ); - } - - if ($statement->execute() === false) { - throw new PDOException('Failed to execute SQLite INSERT statement.'); - } + $this->execute_sqlite_prepared_insert( + $sqlite_prepared_pdo, + $prepared_insert, + $sqlite_prepared_statement_cache, + $sqlite_prepared_statement_cache_order, + ); return; } } @@ -7244,11 +7661,14 @@ private function download_sql(): void } } - // Domain discovery and statement counting: scan SQL for URLs during download - $query_stream = class_exists('WP_MySQL_Naive_Query_Stream') + // Domain discovery and statement counting: scan SQL for URLs during download. + // The experimental SQLite row-stream path does one fast full-file pass + // after db.sql is complete, so avoid the older lexer stream here. + $defer_sql_scan = $this->experimental_sqlite_row_stream && $mode === "file"; + $query_stream = !$defer_sql_scan && class_exists('WP_MySQL_Naive_Query_Stream') ? new \WP_MySQL_Naive_Query_Stream() : null; - $domain_collector = class_exists('DomainCollector') + $domain_collector = !$defer_sql_scan && class_exists('DomainCollector') ? new \DomainCollector() : null; $domains_file = $this->state_dir . "/.import-domains.json"; @@ -7688,6 +8108,174 @@ private function download_sql(): void } } + private function build_sqlite_row_stream_sidecar(string $sql_file): void + { + if (!file_exists($sql_file)) { + throw new RuntimeException("Cannot build SQLite row stream sidecar: db.sql not found."); + } + + $sidecar_file = $this->state_dir . "/" . self::SQLITE_ROW_STREAM_SIDECAR; + $tmp_file = $sidecar_file . ".tmp"; + $sql_bytes = filesize($sql_file); + if ($sql_bytes === false) { + throw new RuntimeException("Cannot stat SQL file: {$sql_file}"); + } + + $this->progress->show_lifecycle_line("Building SQLite row stream sidecar\n"); + $this->audit_log( + sprintf("SQLITE ROW STREAM | building sidecar from %s (%d bytes)", $sql_file, $sql_bytes), + false, + ); + + $sql_handle = fopen($sql_file, "r"); + if (!$sql_handle) { + throw new RuntimeException("Cannot open SQL file: {$sql_file}"); + } + $sidecar_handle = fopen($tmp_file, "w"); + if (!$sidecar_handle) { + fclose($sql_handle); + throw new RuntimeException("Cannot open SQLite row stream sidecar: {$tmp_file}"); + } + + $query_stream = new \WP_MySQL_FastQueryStream(); + $statements_total = 0; + $structured_inserts = 0; + $fallback_statements = 0; + $query_stream->set_error_logger(function (array $err) use (&$statements_total) { + $this->audit_log( + sprintf( + "SQLITE ROW STREAM query stream fallback | reason=%s | byte_offset=%d | stmt=%d | %s | context=%.200s", + $err['reason'] ?? '?', + $err['byte_offset'] ?? 0, + $statements_total, + $err['message'] ?? '', + $err['context'] ?? '' + ), + true + ); + }); + + $domain_collector = new \DomainCollector(); + $parsed_url = parse_url($this->remote_url); + if ($parsed_url && isset($parsed_url['scheme'], $parsed_url['host'])) { + $source_origin = $parsed_url['scheme'] . '://' . $parsed_url['host']; + if (!empty($parsed_url['port'])) { + $source_origin .= ':' . $parsed_url['port']; + } + $domain_collector->merge([$source_origin]); + } + + $write_record = function (array $record) use ($sidecar_handle, &$structured_inserts, &$fallback_statements): void { + $is_insert = SQLiteRowStreamSidecar::is_insert_record($record); + $line = SQLiteRowStreamSidecar::encode_record($record); + if ($line === null && $is_insert) { + $record = SQLiteRowStreamSidecar::sql_range_record( + (int) ($record['sql_offset'] ?? 0), + (int) ($record['sql_length'] ?? 0) + ); + $is_insert = false; + $line = SQLiteRowStreamSidecar::encode_record($record); + } + if ($line === null) { + throw new RuntimeException("Failed to encode SQLite row stream sidecar record."); + } + $bytes = fwrite($sidecar_handle, $line); + if ($bytes === false || $bytes !== strlen($line)) { + throw new RuntimeException("Failed to write SQLite row stream sidecar record."); + } + if ($is_insert) { + $structured_inserts++; + } else { + $fallback_statements++; + } + }; + + $meta_line = SQLiteRowStreamSidecar::encode_record( + SQLiteRowStreamSidecar::meta_record((int) $sql_bytes) + ); + if ($meta_line === null || fwrite($sidecar_handle, $meta_line) !== strlen($meta_line)) { + fclose($sql_handle); + fclose($sidecar_handle); + @unlink($tmp_file); + throw new RuntimeException("Failed to write SQLite row stream sidecar metadata."); + } + + try { + $chunk_size = 64 * 1024; + while (!feof($sql_handle)) { + $data = fread($sql_handle, $chunk_size); + if ($data === false || $data === '') { + break; + } + $query_stream->append_sql($data); + + while ($query_stream->next_query()) { + $query = $query_stream->get_query(); + $statement_end = $query_stream->get_bytes_consumed(); + $statement_offset = $statement_end - strlen($query); + $record = SQLiteRowStreamSidecar::record_from_sql($query, $statement_offset); + $write_record($record); + $statements_total++; + $this->scan_query_for_domains($query, $domain_collector); + } + } + + $query_stream->mark_input_complete(); + while ($query_stream->next_query()) { + $query = $query_stream->get_query(); + $statement_end = $query_stream->get_bytes_consumed(); + $statement_offset = $statement_end - strlen($query); + $record = SQLiteRowStreamSidecar::record_from_sql($query, $statement_offset); + $write_record($record); + $statements_total++; + $this->scan_query_for_domains($query, $domain_collector); + } + } finally { + fclose($sql_handle); + fclose($sidecar_handle); + } + + if (!rename($tmp_file, $sidecar_file)) { + @unlink($tmp_file); + throw new RuntimeException("Failed to finalize SQLite row stream sidecar: {$sidecar_file}"); + } + + $domains = $domain_collector->get_domains(); + if (!empty($domains)) { + file_put_contents( + $this->state_dir . "/.import-domains.json", + json_encode($domains, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) . "\n", + ); + } + + if ($statements_total > 0) { + file_put_contents( + $this->state_dir . "/.import-sql-stats.json", + json_encode(["statements_total" => $statements_total]) . "\n", + ); + } + + $this->state["sqlite_row_stream"] = [ + "file" => self::SQLITE_ROW_STREAM_SIDECAR, + "sql_bytes" => (int) $sql_bytes, + "statements_total" => $statements_total, + "structured_inserts" => $structured_inserts, + "fallback_statements" => $fallback_statements, + "updated_at" => date("c"), + ]; + $this->save_state($this->state); + + $this->audit_log( + sprintf( + "SQLITE ROW STREAM | complete | statements=%d structured_inserts=%d fallback=%d", + $statements_total, + $structured_inserts, + $fallback_statements, + ), + false, + ); + } + /** * Drain complete SQL statements from a query stream and scan their * base64-decoded values for URL domains. @@ -7702,57 +8290,64 @@ private function drain_query_stream_for_domains( if ($statements_counted !== null) { $statements_counted++; } - // Only scan INSERT statements (they contain data values). - if (!self::sql_starts_with_token($query, \WP_MySQL_Lexer::INSERT_SYMBOL)) { - continue; + $this->scan_query_for_domains($query, $domain_collector); + } + } + + private function scan_query_for_domains(string $query, \DomainCollector $domain_collector): void + { + // Only scan INSERT statements (they contain data values). + if (!self::sql_starts_with_token($query, \WP_MySQL_Lexer::INSERT_SYMBOL)) { + return; + } + // Only scan statements with base64 values. + if (strpos($query, "FROM_BASE64(") === false) { + return; + } + + $table = self::extract_insert_table($query); + $is_options_table = substr($table, -8) === '_options'; + + $scanner = new \Base64ValueScanner($query); + while ($scanner->next_value()) { + // For _options tables, extract the option_name (second column) + // and skip transients — they contain ephemeral cached data + // that would pollute the domain list. + $option_name = null; + $match_offset = $scanner->get_match_offset(); + if ($is_options_table) { + $option_name = self::extract_option_name($query, $match_offset); + if ($option_name !== null && ( + strpos($option_name, '_transient') === 0 || + strpos($option_name, '_site_transient') === 0 + )) { + continue; + } } - // Only scan statements with base64 values - if (strpos($query, "FROM_BASE64(") === false) { + + $new_domains = $domain_collector->scan($scanner->get_value()); + if (empty($new_domains)) { continue; } - $table = self::extract_insert_table($query); - $is_options_table = substr($table, -8) === '_options'; - - $scanner = new \Base64ValueScanner($query); - while ($scanner->next_value()) { - // For _options tables, extract the option_name (second column) - // and skip transients — they contain ephemeral cached data - // that would pollute the domain list. - $option_name = null; - $match_offset = $scanner->get_match_offset(); - if ($is_options_table) { - $option_name = self::extract_option_name($query, $match_offset); - if ($option_name !== null && ( - strpos($option_name, '_transient') === 0 || - strpos($option_name, '_site_transient') === 0 - )) { - continue; - } - } - - $new_domains = $domain_collector->scan($scanner->get_value()); - if (!empty($new_domains)) { - $row_id = self::extract_row_identifier($query, $match_offset); + $row_id = self::extract_row_identifier($query, $match_offset); - $option_ctx = ''; - if ($option_name !== null) { - $option_ctx = ' option=' . $option_name; - } + $option_ctx = ''; + if ($option_name !== null) { + $option_ctx = ' option=' . $option_name; + } - foreach ($new_domains as $domain) { - $this->audit_log( - sprintf( - "NEW DOMAIN | %s | table=%s %s%s", - $domain, - $table, - $row_id, - $option_ctx, - ), - false, - ); - } - } + foreach ($new_domains as $domain) { + $this->audit_log( + sprintf( + "NEW DOMAIN | %s | table=%s %s%s", + $domain, + $table, + $row_id, + $option_ctx, + ), + false, + ); } } } @@ -10197,6 +10792,8 @@ private function reset_state(): void $nonempty = $this->state["fs_root_nonempty_behavior"] ?? "error"; $max_packet = $this->state["max_allowed_packet"] ?? null; $pull = $this->state["pull"] ?? null; + $experimental_row_stream = $this->state["experimental_sqlite_row_stream"] ?? false; + $sqlite_row_stream = $this->state["sqlite_row_stream"] ?? null; $this->state = $this->default_state(); $this->state["preflight"] = $preflight; $this->state["version"] = $version; @@ -10204,6 +10801,10 @@ private function reset_state(): void $this->state["follow_symlinks"] = $follow; $this->state["fs_root_nonempty_behavior"] = $nonempty; $this->state["max_allowed_packet"] = $max_packet; + $this->state["experimental_sqlite_row_stream"] = $experimental_row_stream; + if ($sqlite_row_stream !== null) { + $this->state["sqlite_row_stream"] = $sqlite_row_stream; + } if ($pull !== null) { $this->state["pull"] = $pull; } @@ -10225,6 +10826,15 @@ public function default_state(): array "fs_root_nonempty_behavior" => "error", "filter" => "none", "max_allowed_packet" => null, + "experimental_sqlite_row_stream" => false, + "sqlite_row_stream" => [ + "file" => null, + "sql_bytes" => 0, + "statements_total" => 0, + "structured_inserts" => 0, + "fallback_statements" => 0, + "updated_at" => null, + ], "db_index" => [ "file" => null, "tables" => 0, @@ -10261,6 +10871,7 @@ public function default_state(): array "apply" => [ "statements_executed" => 0, "bytes_read" => 0, + "row_stream_bytes_read" => 0, "rewrite_url" => null, // Target database configuration — persisted by db-apply // so that apply-runtime can generate DB_* constants. @@ -10353,6 +10964,18 @@ private function normalize_state(array $state): array } $apply = array_intersect_key($apply, $defaults["apply"]); $state["apply"] = array_merge($defaults["apply"], $apply); + $sqlite_row_stream = $state["sqlite_row_stream"] ?? []; + if (!is_array($sqlite_row_stream)) { + $sqlite_row_stream = []; + } + $sqlite_row_stream = array_intersect_key( + $sqlite_row_stream, + $defaults["sqlite_row_stream"], + ); + $state["sqlite_row_stream"] = array_merge( + $defaults["sqlite_row_stream"], + $sqlite_row_stream, + ); $pull = $state["pull"] ?? []; if (!is_array($pull)) { $pull = []; @@ -11167,6 +11790,22 @@ function get_importer_version(): string { 'help' => 'MySQL database (required for --sql-output=mysql)', 'commands' => ['db-pull'], ], + [ + 'name' => 'experimental-sqlite-row-stream', + 'type' => 'flag', + 'target' => 'experimental_sqlite_row_stream', + 'flag_value' => true, + 'help' => 'Experimental: write/use a SQLite row-stream sidecar for faster SQLite db-apply', + 'commands' => ['pull', 'db-pull', 'db-apply'], + ], + [ + 'name' => 'no-experimental-sqlite-row-stream', + 'type' => 'flag', + 'target' => 'experimental_sqlite_row_stream', + 'flag_value' => false, + 'help' => null, + 'commands' => [], + ], // ── db-apply options ───────────────────────────────────── [ diff --git a/packages/reprint-importer/src/lib/url-rewrite/class-sql-statement-rewriter.php b/packages/reprint-importer/src/lib/url-rewrite/class-sql-statement-rewriter.php index 12873979..d6ea233c 100644 --- a/packages/reprint-importer/src/lib/url-rewrite/class-sql-statement-rewriter.php +++ b/packages/reprint-importer/src/lib/url-rewrite/class-sql-statement-rewriter.php @@ -176,6 +176,11 @@ function (string $value, string $table, ?string $column): string { ); } + public function rewrite_sqlite_row_stream_value(string $value, string $table, ?string $column): string + { + return $this->rewrite_value_for_column($value, $table, $column); + } + private function rewrite_value_for_column(string $value, string $table, ?string $column): string { if (strpos($value, 'http') === false) { diff --git a/packages/reprint-importer/src/lib/url-rewrite/class-sqlite-row-stream-sidecar.php b/packages/reprint-importer/src/lib/url-rewrite/class-sqlite-row-stream-sidecar.php new file mode 100644 index 00000000..b917fbfe --- /dev/null +++ b/packages/reprint-importer/src/lib/url-rewrite/class-sqlite-row-stream-sidecar.php @@ -0,0 +1,341 @@ + */ + private static array $template_sql_cache = []; + + /** @var string[] */ + private static array $template_sql_cache_order = []; + + /** + * @return array{v: int, kind: string, sql_bytes: int, format: string} + */ + public static function meta_record(int $sql_bytes): array + { + return [ + 'v' => self::VERSION, + 'kind' => 'meta', + 'format' => 'sqlite-row-stream-sidecar', + 'sql_bytes' => $sql_bytes, + ]; + } + + /** + * @return array{v: int, kind: string, sql_offset: int, sql_length: int} + */ + public static function sql_record(int $sql_offset, string $sql): array + { + return self::sql_range_record($sql_offset, strlen($sql)); + } + + /** + * @return array{v: int, kind: string, sql_offset: int, sql_length: int} + */ + public static function sql_range_record(int $sql_offset, int $sql_length): array + { + return [ + 'v' => self::VERSION, + 'kind' => 'sql', + 'sql_offset' => $sql_offset, + 'sql_length' => $sql_length, + ]; + } + + /** + * @return array + */ + public static function record_from_sql(string $sql, int $sql_offset): array + { + $fallback = self::sql_record($sql_offset, $sql); + + $fast = FastInsertScanner::scan($sql, false); + if ($fast === null || empty($fast['value_entries'])) { + return $fallback; + } + + $column_count = count($fast['columns']); + $value_count = count($fast['value_entries']); + if ($column_count === 0 || $value_count === 0 || $value_count % $column_count !== 0) { + return $fallback; + } + + $rows = []; + $row = []; + foreach ($fast['value_entries'] as $entry) { + $value = self::value_entry_to_record($entry); + if ($value === null) { + return $fallback; + } + + $row[] = $value; + if (count($row) === $column_count) { + $rows[] = $row; + $row = []; + } + } + + return [ + 'v' => self::VERSION, + 'kind' => 'insert', + 'sql_offset' => $sql_offset, + 'sql_length' => strlen($sql), + 'table' => $fast['table'], + 'columns' => $fast['columns'], + 'rows' => $rows, + ]; + } + + /** + * @param array $record + */ + public static function encode_record(array $record): ?string + { + $json = json_encode($record, JSON_UNESCAPED_SLASHES); + if ($json === false) { + return null; + } + return $json . "\n"; + } + + /** + * @param array $record + */ + public static function is_insert_record(array $record): bool + { + return + ($record['v'] ?? null) === self::VERSION && + ($record['kind'] ?? null) === 'insert'; + } + + /** + * @param array $record + * @param callable|null $rewrite_value Optional callback: + * fn(string $value, string $table, ?string $column): string + * @return array{sql: string, params: list, param_types: list}|null + */ + public static function record_to_prepared_insert(array $record, ?callable $rewrite_value = null): ?array + { + if (!self::is_insert_record($record)) { + return null; + } + + $table = $record['table'] ?? null; + $columns = $record['columns'] ?? null; + $rows = $record['rows'] ?? null; + if (!is_string($table) || !is_array($columns) || !is_array($rows) || empty($columns) || empty($rows)) { + return null; + } + + foreach ($columns as $column) { + if (!is_string($column)) { + return null; + } + } + + $column_count = count($columns); + $shape_key_parts = [ + strlen($table) . ':' . $table, + (string) $column_count, + ]; + foreach ($columns as $column) { + $shape_key_parts[] = strlen($column) . ':' . $column; + } + + $row_placeholders = []; + foreach ($rows as $row) { + if (!is_array($row) || count($row) !== $column_count) { + return null; + } + + $placeholders = []; + foreach ($row as $value_record) { + if (!is_array($value_record)) { + return null; + } + $placeholder = self::placeholder_for_value_record($value_record); + if ($placeholder === null) { + return null; + } + $shape_key_parts[] = $placeholder; + $placeholders[] = $placeholder; + } + $row_placeholders[] = $placeholders; + } + + $shape_key = implode('|', $shape_key_parts); + $template_sql = self::$template_sql_cache[$shape_key] ?? null; + if ($template_sql === null) { + $quoted_columns = []; + foreach ($columns as $column) { + $quoted_columns[] = self::quote_identifier($column); + } + + $sql_rows = []; + foreach ($row_placeholders as $placeholders) { + $sql_rows[] = '(' . implode(', ', $placeholders) . ')'; + } + + $template_sql = 'INSERT INTO ' + . self::quote_identifier($table) + . ' (' . implode(', ', $quoted_columns) . ') VALUES' + . implode(',', $sql_rows) + . ';'; + + self::$template_sql_cache[$shape_key] = $template_sql; + self::$template_sql_cache_order[] = $shape_key; + if (count(self::$template_sql_cache_order) > self::TEMPLATE_CACHE_MAX) { + $oldest_key = array_shift(self::$template_sql_cache_order); + if (is_string($oldest_key)) { + unset(self::$template_sql_cache[$oldest_key]); + } + } + } + + $params = []; + $param_types = []; + foreach ($rows as $row) { + foreach (array_values($row) as $index => $value_record) { + $param = self::param_for_value_record( + $value_record, + $table, + $columns[$index] ?? null, + $rewrite_value + ); + if ($param === null) { + return null; + } + $params[] = $param['value']; + $param_types[] = $param['type']; + } + } + + return [ + 'sql' => $template_sql, + 'params' => $params, + 'param_types' => $param_types, + ]; + } + + /** + * @param array $entry + * @return array{t: string, v?: string}|null + */ + private static function value_entry_to_record(array $entry): ?array + { + switch ($entry['kind'] ?? null) { + case 'null': + return ['t' => 'null']; + + case 'empty_string': + return ['t' => 'empty_string']; + + case 'numeric': + if (!isset($entry['raw']) || !is_string($entry['raw'])) { + return null; + } + return ['t' => 'numeric', 'v' => $entry['raw']]; + + case 'base64': + if (!isset($entry['encoded_value']) || !is_string($entry['encoded_value'])) { + return null; + } + if (base64_decode($entry['encoded_value'], true) === false) { + return null; + } + return ['t' => 'base64', 'v' => $entry['encoded_value']]; + } + + return null; + } + + /** + * @param array $value_record + */ + private static function placeholder_for_value_record(array $value_record): ?string + { + switch ($value_record['t'] ?? null) { + case 'null': + case 'empty_string': + case 'base64': + return '?'; + + case 'numeric': + $raw = $value_record['v'] ?? null; + if (!is_string($raw)) { + return null; + } + return strpbrk($raw, '.eE') === false + ? 'CAST(? AS NUMERIC)' + : 'CAST(? AS REAL)'; + } + + return null; + } + + /** + * @param array $value_record + * @return array{value: mixed, type: int}|null + */ + private static function param_for_value_record( + array $value_record, + string $table, + ?string $column, + ?callable $rewrite_value + ): ?array { + switch ($value_record['t'] ?? null) { + case 'null': + return ['value' => null, 'type' => PDO::PARAM_NULL]; + + case 'empty_string': + return ['value' => '', 'type' => PDO::PARAM_STR]; + + case 'numeric': + $raw = $value_record['v'] ?? null; + if (!is_string($raw)) { + return null; + } + return ['value' => $raw, 'type' => PDO::PARAM_STR]; + + case 'base64': + $encoded = $value_record['v'] ?? null; + if (!is_string($encoded)) { + return null; + } + $value = base64_decode($encoded, true); + if ($value === false) { + return null; + } + if ($rewrite_value !== null) { + $value = $rewrite_value($value, $table, $column); + } + return ['value' => $value, 'type' => PDO::PARAM_STR]; + } + + return null; + } + + private static function quote_identifier(string $identifier): string + { + return '`' . str_replace('`', '``', $identifier) . '`'; + } +} diff --git a/packages/reprint-importer/src/lib/url-rewrite/load.php b/packages/reprint-importer/src/lib/url-rewrite/load.php index 5e27304d..02e543d5 100644 --- a/packages/reprint-importer/src/lib/url-rewrite/load.php +++ b/packages/reprint-importer/src/lib/url-rewrite/load.php @@ -15,6 +15,7 @@ require_once __DIR__ . '/class-base64-value-scanner.php'; require_once __DIR__ . '/class-fast-insert-scanner.php'; require_once __DIR__ . '/class-sqlite-prepared-insert-builder.php'; +require_once __DIR__ . '/class-sqlite-row-stream-sidecar.php'; // Depend on the iterators above require_once __DIR__ . '/class-structured-data-url-rewriter.php'; diff --git a/tests/Import/NewSiteUrlSqliteTest.php b/tests/Import/NewSiteUrlSqliteTest.php index 3c780654..e4c3f80c 100644 --- a/tests/Import/NewSiteUrlSqliteTest.php +++ b/tests/Import/NewSiteUrlSqliteTest.php @@ -114,6 +114,13 @@ private function writeState(array $extra = []): void ); } + private function buildRowStreamSidecar(\ImportClient $client, string $sqlFile): void + { + $method = new \ReflectionMethod($client, 'build_sqlite_row_stream_sidecar'); + $method->setAccessible(true); + $method->invoke($client, $sqlFile); + } + /** * Read a value from the SQLite database using the MySQL-on-SQLite driver. */ @@ -370,6 +377,107 @@ public function testSqliteDbApplyPreservesArbitraryBase64Bytes(): void $this->assertSame(strtoupper(bin2hex($bytes)), $rows[0]['hex_value']); } + public function testExperimentalRowStreamAppliesStructuredAndFallbackRecordsToSqlite(): void + { + $oldUrl = 'https://old-site.example.com'; + $newUrl = 'https://new-site.example.com'; + $sqlitePath = $this->tempDir . '/database/wordpress.sqlite'; + + $stmts = []; + $stmts[] = "DROP TABLE IF EXISTS `wp_options`;"; + $stmts[] = "CREATE TABLE `wp_options` (" + . "`option_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, " + . "`option_name` varchar(191) NOT NULL DEFAULT '', " + . "`option_value` longtext NOT NULL, " + . "`autoload` varchar(20) NOT NULL DEFAULT 'yes', " + . "PRIMARY KEY (`option_id`), " + . "UNIQUE KEY `option_name` (`option_name`)" + . ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"; + $stmts[] = sprintf( + "INSERT INTO `wp_options` (`option_id`, `option_name`, `option_value`, `autoload`) VALUES " + . "(1, FROM_BASE64('%s'), FROM_BASE64('%s'), FROM_BASE64('%s')), " + . "(2, FROM_BASE64('%s'), FROM_BASE64('%s'), FROM_BASE64('%s'));", + base64_encode('siteurl'), + base64_encode($oldUrl), + base64_encode('yes'), + base64_encode('home'), + base64_encode($oldUrl), + base64_encode('yes'), + ); + $stmts[] = "DROP TABLE IF EXISTS `wp_posts`;"; + $stmts[] = "CREATE TABLE `wp_posts` (" + . "`ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, " + . "`post_content` longtext NOT NULL, " + . "`menu_order` int(11) NOT NULL DEFAULT 0, " + . "PRIMARY KEY (`ID`)" + . ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"; + $stmts[] = sprintf( + "INSERT INTO `wp_posts` (`ID`, `post_content`, `menu_order`) VALUES " + . "(1, FROM_BASE64('%s'), -3.5e+2);", + base64_encode('Structured'), + ); + $stmts[] = sprintf( + "INSERT INTO `wp_posts` VALUES (2, FROM_BASE64('%s'), 4);", + base64_encode('Fallback'), + ); + $sql = implode("\n", $stmts); + $sqlFile = $this->tempDir . '/db.sql'; + file_put_contents($sqlFile, $sql); + + $state = [ + 'preflight' => [ + 'data' => [ + 'database' => ['wp' => ['table_prefix' => 'wp_']], + ], + ], + ]; + $this->writeState($state); + + $client = new \ImportClient( + $oldUrl . '/?reprint-api', + $this->tempDir, + $this->tempDir . '/fs-root', + ); + $client->state = array_merge($client->default_state(), $state); + $this->buildRowStreamSidecar($client, $sqlFile); + + $sidecarPath = $this->tempDir . '/.import-sqlite-row-stream.jsonl'; + $this->assertFileExists($sidecarPath); + $sidecarState = json_decode(file_get_contents($this->tempDir . '/.import-state.json'), true); + $this->assertGreaterThanOrEqual(2, $sidecarState['sqlite_row_stream']['structured_inserts']); + $this->assertGreaterThanOrEqual(1, $sidecarState['sqlite_row_stream']['fallback_statements']); + + $client->run([ + 'command' => 'db-apply', + 'abort' => false, + 'verbose' => false, + 'secret' => null, + 'tuning_config' => [], + 'target_engine' => 'sqlite', + 'target_sqlite_path' => $sqlitePath, + 'target_db' => 'wp_test', + 'new_site_url' => $newUrl, + 'experimental_sqlite_row_stream' => true, + ]); + + $posts = $this->querySqlite( + $sqlitePath, + 'SELECT ID, post_content, menu_order FROM wp_posts ORDER BY ID', + 'wp_test', + ); + + $this->assertCount(2, $posts); + $this->assertSame('Structured', $posts[0]['post_content']); + $this->assertEquals(-350, $posts[0]['menu_order']); + $this->assertSame('Fallback', $posts[1]['post_content']); + + $state = json_decode(file_get_contents($this->tempDir . '/.import-state.json'), true); + $this->assertSame('complete', $state['status']); + $this->assertSame(count($stmts), $state['apply']['statements_executed']); + $this->assertSame(strlen($sql), $state['apply']['bytes_read']); + $this->assertSame(filesize($sidecarPath), $state['apply']['row_stream_bytes_read']); + } + public function testSqliteImportPragmasDoNotChangeProgressCounters(): void { $sqlitePath = $this->tempDir . '/database/wordpress.sqlite'; diff --git a/tests/UrlRewriting/SQLiteRowStreamSidecarTest.php b/tests/UrlRewriting/SQLiteRowStreamSidecarTest.php new file mode 100644 index 00000000..15979cdb --- /dev/null +++ b/tests/UrlRewriting/SQLiteRowStreamSidecarTest.php @@ -0,0 +1,129 @@ +, param_types: list} $prepared + */ + private function executePrepared(PDO $pdo, array $prepared): void + { + $statement = $pdo->prepare($prepared['sql']); + $this->assertInstanceOf(PDOStatement::class, $statement); + foreach ($prepared['params'] as $index => $value) { + $statement->bindValue($index + 1, $value, $prepared['param_types'][$index]); + } + $this->assertTrue($statement->execute()); + } + + private function createSqlite(): PDO + { + if (!extension_loaded('pdo_sqlite')) { + $this->markTestSkipped('pdo_sqlite is required for row-stream sidecar tests.'); + } + + $pdo = new PDO('sqlite::memory:'); + $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); + $pdo->sqliteCreateFunction('FROM_BASE64', function ($data) { + if ($data === null) { + return null; + } + return base64_decode($data); + }, 1); + return $pdo; + } + + public function testRecordPreparedInsertMatchesExistingBuilder(): void + { + $sql = sprintf( + "INSERT INTO `wp_posts` (`ID`, `post_content`, `post_excerpt`, `menu_order`) VALUES" . + "(1, FROM_BASE64('%s'), '', NULL)," . + "(2, CONVERT(FROM_BASE64('%s') USING utf8mb4), FROM_BASE64('%s'), -3.5e+2);", + base64_encode('alpha'), + base64_encode('{"url":"https://example.test/a"}'), + base64_encode('excerpt-a') + ); + + $record = SQLiteRowStreamSidecar::record_from_sql($sql, 123); + $prepared = SQLiteRowStreamSidecar::record_to_prepared_insert($record); + $existing = SQLitePreparedInsertBuilder::build($sql); + + $this->assertTrue(SQLiteRowStreamSidecar::is_insert_record($record)); + $this->assertSame(123, $record['sql_offset']); + $this->assertSame(strlen($sql), $record['sql_length']); + $this->assertSame($existing, $prepared); + } + + public function testRecordPreparedInsertUsesStructuredUrlRewriter(): void + { + $rewriter = new SqlStatementRewriter( + new StructuredDataUrlRewriter([ + 'https://old-site.com' => 'https://new-site.com', + ]) + ); + $sql = sprintf( + "INSERT INTO `wp_posts` (`ID`, `post_content`) VALUES(7, FROM_BASE64('%s'));", + base64_encode('Link') + ); + + $record = SQLiteRowStreamSidecar::record_from_sql($sql, 0); + $prepared = SQLiteRowStreamSidecar::record_to_prepared_insert( + $record, + function (string $value, string $table, ?string $column) use ($rewriter): string { + return $rewriter->rewrite_sqlite_row_stream_value($value, $table, $column); + } + ); + + $this->assertNotNull($prepared); + $this->assertSame('7', $prepared['params'][0]); + $this->assertSame('Link', $prepared['params'][1]); + } + + public function testStructuredAndFallbackRecordsProduceSameSqliteRows(): void + { + $statements = [ + "CREATE TABLE `wp_posts` (`ID` INTEGER, `post_content` TEXT, `menu_order`);", + sprintf( + "INSERT INTO `wp_posts` (`ID`, `post_content`, `menu_order`) VALUES" . + "(1, FROM_BASE64('%s'), -3.5e+2)," . + "(2, FROM_BASE64('%s'), NULL);", + base64_encode('structured alpha'), + base64_encode('structured bravo') + ), + sprintf( + "INSERT INTO `wp_posts` VALUES(3, FROM_BASE64('%s'), 4.25);", + base64_encode('fallback charlie') + ), + ]; + + $fallback = $this->createSqlite(); + foreach ($statements as $statement) { + $fallback->exec($statement); + } + + $rowStream = $this->createSqlite(); + foreach ($statements as $statement) { + $record = SQLiteRowStreamSidecar::record_from_sql($statement, 0); + if (SQLiteRowStreamSidecar::is_insert_record($record)) { + $prepared = SQLiteRowStreamSidecar::record_to_prepared_insert($record); + $this->assertNotNull($prepared); + $this->executePrepared($rowStream, $prepared); + } else { + $rowStream->exec($statement); + } + } + + $fallbackRows = $fallback + ->query('SELECT ID, post_content, menu_order, typeof(menu_order) AS menu_order_type FROM `wp_posts` ORDER BY ID') + ->fetchAll(PDO::FETCH_ASSOC); + $rowStreamRows = $rowStream + ->query('SELECT ID, post_content, menu_order, typeof(menu_order) AS menu_order_type FROM `wp_posts` ORDER BY ID') + ->fetchAll(PDO::FETCH_ASSOC); + + $this->assertSame($fallbackRows, $rowStreamRows); + } +} diff --git a/tests/e2e/benchmark/bench-pull.mjs b/tests/e2e/benchmark/bench-pull.mjs index 21761d09..183afe3e 100644 --- a/tests/e2e/benchmark/bench-pull.mjs +++ b/tests/e2e/benchmark/bench-pull.mjs @@ -49,6 +49,15 @@ const PLAYGROUND_PHP_BINARY = process.env.BENCH_PLAYGROUND_PHP_BINARY || join(PR const PLAYGROUND_PHP_VERSION = process.env.PLAYGROUND_PHP_VERSION || '8.3'; const REGISTRY = JSON.parse(readFileSync(join(import.meta.dirname, '..', 'site-registry.json'), 'utf-8')); const MYSQL_PARSER_MANIFEST = process.env.WP_MYSQL_PARSER_EXTENSION_MANIFEST || ''; +const DB_PULL_EXTRA_ARGS = envArgs('BENCH_DB_PULL_EXTRA_ARGS'); +const DB_APPLY_EXTRA_ARGS = envArgs('BENCH_DB_APPLY_EXTRA_ARGS'); + +function envArgs(name) { + return (process.env[name] || '') + .split(/\s+/) + .map((arg) => arg.trim()) + .filter(Boolean); +} async function seedSourceDb() { const dbName = `e2e_${SITE.replace(/-/g, '_')}`; @@ -276,7 +285,7 @@ function runPlaygroundSqliteDbPullBenchmark() { return proofFailureResult('playground-sqlite-db-pull', start, proof, baseDetails); } - const result = runStage('db-pull', stateDir, [], { + const result = runStage('db-pull', stateDir, DB_PULL_EXTRA_ARGS, { phpBinary: PLAYGROUND_PHP_BINARY, env: playgroundPhpEnv(), }); @@ -302,7 +311,7 @@ function runPlaygroundSqliteDbApplyBenchmark() { 'playground sqlite benchmark preflight', ); requireBenchStageOk( - runStage('db-pull', stateDir), + runStage('db-pull', stateDir, DB_PULL_EXTRA_ARGS), 'playground sqlite benchmark db-pull', ); const proof = runNativeMysqlParserProof({ requireParser: true }); @@ -327,6 +336,7 @@ function runPlaygroundSqliteDbApplyBenchmark() { `--target-sqlite-path=${sqlitePath}`, '--target-db=playground_sqlite_bench', '--new-site-url=http://localhost:9999', + ...DB_APPLY_EXTRA_ARGS, ], { phpBinary: PLAYGROUND_PHP_BINARY, env: playgroundPhpEnv(), @@ -629,8 +639,8 @@ async function main() { const stages = [ { name: 'preflight', extra: [] }, { name: 'files-pull', extra: [] }, - { name: 'db-pull', extra: [] }, - { name: 'db-apply', extra: dbApplyArgs }, + { name: 'db-pull', extra: DB_PULL_EXTRA_ARGS }, + { name: 'db-apply', extra: [...dbApplyArgs, ...DB_APPLY_EXTRA_ARGS] }, // apply-runtime is local-only; it doesn't take a remote URL. { name: 'apply-runtime', extra: runtimeArgs, includeUrl: false }, ]; From a1f9b23a522c78763e1b7b45ac5cccfbfbd1efc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Fri, 22 May 2026 11:16:56 +0200 Subject: [PATCH 2/2] Fix SQLite row stream partial resume --- packages/reprint-importer/src/import.php | 6 +- tests/Import/NewSiteUrlSqliteTest.php | 111 +++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/packages/reprint-importer/src/import.php b/packages/reprint-importer/src/import.php index 2e662818..056e815f 100755 --- a/packages/reprint-importer/src/import.php +++ b/packages/reprint-importer/src/import.php @@ -5085,7 +5085,11 @@ public function run_db_apply(array $options): void $apply_state = $this->state["apply"] ?? $this->default_state()["apply"]; $statements_executed = (int) ($apply_state["statements_executed"] ?? 0); $bytes_read = (int) ($apply_state["bytes_read"] ?? 0); - $is_resume = $current_status === "in_progress" && $statements_executed > 0; + $is_resume = in_array($current_status, ["in_progress", "partial"], true) && ( + $statements_executed > 0 || + $bytes_read > 0 || + (int) ($apply_state["row_stream_bytes_read"] ?? 0) > 0 + ); if ($is_resume) { $this->audit_log( diff --git a/tests/Import/NewSiteUrlSqliteTest.php b/tests/Import/NewSiteUrlSqliteTest.php index e4c3f80c..c41ac2a4 100644 --- a/tests/Import/NewSiteUrlSqliteTest.php +++ b/tests/Import/NewSiteUrlSqliteTest.php @@ -121,6 +121,20 @@ private function buildRowStreamSidecar(\ImportClient $client, string $sqlFile): $method->invoke($client, $sqlFile); } + private function sidecarOffsetAfterRecords(string $sidecarPath, int $records): int + { + $handle = fopen($sidecarPath, 'r'); + $this->assertIsResource($handle); + for ($i = 0; $i <= $records; $i++) { + $line = fgets($handle); + $this->assertNotFalse($line); + } + $offset = ftell($handle); + fclose($handle); + $this->assertIsInt($offset); + return $offset; + } + /** * Read a value from the SQLite database using the MySQL-on-SQLite driver. */ @@ -478,6 +492,103 @@ public function testExperimentalRowStreamAppliesStructuredAndFallbackRecordsToSq $this->assertSame(filesize($sidecarPath), $state['apply']['row_stream_bytes_read']); } + public function testExperimentalRowStreamResumesPartialState(): void + { + $sqlitePath = $this->tempDir . '/database/wordpress.sqlite'; + $create = "CREATE TABLE `wp_resume` (" . + "`id` bigint(20) unsigned NOT NULL, " . + "`name` longtext NOT NULL, " . + "PRIMARY KEY (`id`)" . + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"; + $firstInsert = sprintf( + "INSERT INTO `wp_resume` (`id`, `name`) VALUES (1, FROM_BASE64('%s'));", + base64_encode('one'), + ); + $secondInsert = sprintf( + "INSERT INTO `wp_resume` (`id`, `name`) VALUES (2, FROM_BASE64('%s'));", + base64_encode('two'), + ); + $prefixSql = implode("\n", [$create, $firstInsert]); + $fullSql = implode("\n", [$create, $firstInsert, $secondInsert]); + $sqlFile = $this->tempDir . '/db.sql'; + + file_put_contents($sqlFile, $prefixSql); + $this->writeState(); + $client = new \ImportClient( + 'https://old-site.example.com/?reprint-api', + $this->tempDir, + $this->tempDir . '/fs-root', + ); + $client->run([ + 'command' => 'db-apply', + 'abort' => false, + 'verbose' => false, + 'secret' => null, + 'tuning_config' => [], + 'target_engine' => 'sqlite', + 'target_sqlite_path' => $sqlitePath, + 'target_db' => 'wp_test', + ]); + + file_put_contents($sqlFile, $fullSql); + $client = new \ImportClient( + 'https://old-site.example.com/?reprint-api', + $this->tempDir, + $this->tempDir . '/fs-root', + ); + $this->buildRowStreamSidecar($client, $sqlFile); + $sidecarPath = $this->tempDir . '/.import-sqlite-row-stream.jsonl'; + $this->writeState([ + 'command' => 'db-apply', + 'status' => 'partial', + 'apply' => [ + 'statements_executed' => 2, + 'bytes_read' => strlen($prefixSql), + 'row_stream_bytes_read' => $this->sidecarOffsetAfterRecords($sidecarPath, 2), + 'target_engine' => 'sqlite', + 'target_db' => 'wp_test', + 'target_sqlite_path' => $sqlitePath, + ], + ]); + + $client = new \ImportClient( + 'https://old-site.example.com/?reprint-api', + $this->tempDir, + $this->tempDir . '/fs-root', + ); + $client->run([ + 'command' => 'db-apply', + 'abort' => false, + 'verbose' => false, + 'secret' => null, + 'tuning_config' => [], + 'target_engine' => 'sqlite', + 'target_sqlite_path' => $sqlitePath, + 'target_db' => 'wp_test', + 'experimental_sqlite_row_stream' => true, + ]); + + $rows = $this->querySqlite( + $sqlitePath, + 'SELECT id, name FROM wp_resume ORDER BY id', + 'wp_test', + ); + $rows = array_map( + fn(array $row): array => ['id' => $row['id'], 'name' => $row['name']], + $rows, + ); + $this->assertSame([ + ['id' => 1, 'name' => 'one'], + ['id' => 2, 'name' => 'two'], + ], $rows); + + $state = json_decode(file_get_contents($this->tempDir . '/.import-state.json'), true); + $this->assertSame('complete', $state['status']); + $this->assertSame(3, $state['apply']['statements_executed']); + $this->assertSame(strlen($fullSql), $state['apply']['bytes_read']); + $this->assertSame(filesize($sidecarPath), $state['apply']['row_stream_bytes_read']); + } + public function testSqliteImportPragmasDoNotChangeProgressCounters(): void { $sqlitePath = $this->tempDir . '/database/wordpress.sqlite';