From 4190355de74a6d7542f0711d59dfe432a1d43216 Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Wed, 3 Jun 2026 11:12:00 +0100 Subject: [PATCH 1/9] Recovery and Join snapshot ledger offset fix (#7901) Co-authored-by: Amaury Chamayou --- CHANGELOG.md | 10 +- src/host/ledger.h | 50 ++++++++-- src/host/test/ledger.cpp | 113 +++++++++++++++++++++ tests/infra/utils.py | 44 ++++++++ tests/reconfiguration.py | 176 ++++++++++++++++++++++++++++++++ tests/recovery.py | 210 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 594 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10e6c83b0b9a..5183f6037555 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [6.0.28] + +[6.0.28]: https://github.com/microsoft/CCF/releases/tag/ccf-6.0.28 + +### Fixed + +- Nodes started in recovery or join mode from a snapshot more recent than the latest ledger file now correctly resume writing from the snapshot boundary (#7901). + ## [6.0.27] [6.0.27]: https://github.com/microsoft/CCF/releases/tag/ccf-6.0.27 @@ -19,7 +27,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed -- Fixed cache size calculations for historical queries, resolving a bug where signature transactions could become orphaned and fill the cache's useful space, resulting in incoming user-requested stores being immediately evicted (#7755). + Fixed cache size calculations for historical queries, resolving a bug where signature transactions could become orphaned and fill the cache's useful space, resulting in incoming user-requested stores being immediately evicted (#7755). ## [6.0.25] diff --git a/src/host/ledger.h b/src/host/ledger.h index 7ddc2a58c111..80c52bda8b8e 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -858,7 +858,7 @@ namespace asynchost return idx >= f->get_start_idx(); }); - if (f != files.rend()) + if (f != files.rend() && idx <= (*f)->get_last_idx()) { return *f; } @@ -1451,16 +1451,50 @@ namespace asynchost return last_idx; } - void truncate(size_t idx) + void truncate(size_t idx, bool recovery_mode = false) { TimeBoundLogger log_if_slow(fmt::format("Truncating ledger at {}", idx)); - LOG_DEBUG_FMT("Ledger truncate: {}/{}", idx, last_idx); + LOG_DEBUG_FMT( + "Ledger truncate: {}/{} [recovery: {}]", idx, last_idx, recovery_mode); - // Conservative check to avoid truncating to future indices, or dropping - // committed entries. If the ledger is being initialised from a snapshot - // alone, the first truncation effectively sets the last index. - if (last_idx != 0 && (idx >= last_idx || idx < committed_idx)) + // Conservative check to avoid dropping committed entries. + if (last_idx != 0 && idx < committed_idx) + { + LOG_DEBUG_FMT( + "Ignoring truncate to {} - last_idx: {}, committed_idx: {}", + idx, + last_idx, + committed_idx); + return; + } + + // During recovery, a snapshot can be after the current end of the host + // ledger. Regular truncation requires that the truncation index is within + // the ledger and otherwise skips the truncation. This is a special case + // to handle the recovery forward truncation + if (recovery_mode && idx >= last_idx) + { + // Close any open files as the ledger should restart cleanly from a new + // chunk. + auto file = get_latest_file(); + if (file != nullptr) + { + file->complete(); + } + // Don't use any of the files on disk for writing + use_existing_files = false; + last_idx_on_init.reset(); + // Set last_idx to the recovery idx, which may be past the current end + // of the ledger + last_idx = idx; + return; + } + + // Conservative check to avoid truncating to future indices. If the + // ledger is being initialised from a snapshot alone, the first truncation + // effectively sets the last index. + if (last_idx != 0 && idx >= last_idx) { LOG_DEBUG_FMT( "Ignoring truncate to {} - last_idx: {}, committed_idx: {}", @@ -1626,7 +1660,7 @@ namespace asynchost [this](const uint8_t* data, size_t size) { auto idx = serialized::read<::consensus::Index>(data, size); auto recovery_mode = serialized::read(data, size); - truncate(idx); + truncate(idx, recovery_mode); if (recovery_mode) { set_recovery_start_idx(idx); diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index 6993df8ff893..721868cf7eb7 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -1733,6 +1733,119 @@ TEST_CASE("Recovery") size_t chunk_threshold = 30; size_t entries_per_chunk = get_entries_per_chunk(chunk_threshold); + SUBCASE("Future non-recovery truncate remains a no-op") + { + Ledger ledger(ledger_dir, wf); + TestEntrySubmitter entry_submitter(ledger, chunk_threshold); + + entry_submitter.write(true); + const auto last_idx = ledger.get_last_idx(); + + ledger.truncate(last_idx + 4); + REQUIRE(ledger.get_last_idx() == last_idx); + + entry_submitter.write(true); + REQUIRE(ledger.get_last_idx() == last_idx + 1); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + } + + SUBCASE("Recovery truncate beyond ledger end positions recovery writes") + { + Ledger ledger(ledger_dir, wf); + TestEntrySubmitter entry_submitter(ledger, chunk_threshold); + + entry_submitter.write(true); + const auto recovery_idx = ledger.get_last_idx() + 4; + + ledger.truncate(recovery_idx, true); + ledger.set_recovery_start_idx(recovery_idx); + REQUIRE(ledger.get_last_idx() == recovery_idx); + + TestEntrySubmitter recovery_submitter( + ledger, chunk_threshold, recovery_idx); + recovery_submitter.write(true); + + REQUIRE(ledger.get_last_idx() == recovery_idx + 1); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + read_entry_from_ledger(ledger, recovery_idx + 1); + } + + SUBCASE("Recovery truncate beyond empty ledger end positions recovery writes") + { + Ledger ledger(ledger_dir, wf); + const auto recovery_idx = 5; + + ledger.truncate(recovery_idx, true); + ledger.set_recovery_start_idx(recovery_idx); + REQUIRE(ledger.get_last_idx() == recovery_idx); + + ledger.truncate(recovery_idx - 1); + REQUIRE(ledger.get_last_idx() == recovery_idx - 1); + + TestEntrySubmitter replay_submitter( + ledger, chunk_threshold, recovery_idx - 1); + replay_submitter.write(true, ccf::kv::EntryFlags::FORCE_LEDGER_CHUNK_AFTER); + REQUIRE(ledger.get_last_idx() == recovery_idx); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + + replay_submitter.write(true); + + REQUIRE(ledger.get_last_idx() == recovery_idx + 1); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + read_entry_from_ledger(ledger, recovery_idx + 1); + } + + SUBCASE("Recovery truncate at ledger end positions recovery writes") + { + Ledger ledger(ledger_dir, wf); + TestEntrySubmitter entry_submitter(ledger, chunk_threshold); + + entry_submitter.write(true); + const auto recovery_idx = ledger.get_last_idx(); + + ledger.truncate(recovery_idx, true); + ledger.set_recovery_start_idx(recovery_idx); + REQUIRE(ledger.get_last_idx() == recovery_idx); + read_entry_from_ledger(ledger, recovery_idx); + + entry_submitter.write(true); + + REQUIRE(ledger.get_last_idx() == recovery_idx + 1); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + read_entry_from_ledger(ledger, recovery_idx + 1); + } + + SUBCASE("Recovery truncate inside ledger positions recovery writes") + { + Ledger ledger(ledger_dir, wf); + TestEntrySubmitter entry_submitter(ledger, chunk_threshold); + + for (size_t i = 0; i < 5; ++i) + { + entry_submitter.write(true); + } + + const auto recovery_idx = ledger.get_last_idx() - 2; + ledger.truncate(recovery_idx, true); + ledger.set_recovery_start_idx(recovery_idx); + REQUIRE(ledger.get_last_idx() == recovery_idx); + + ledger.truncate(recovery_idx - 1); + REQUIRE(ledger.get_last_idx() == recovery_idx - 1); + + TestEntrySubmitter replay_submitter( + ledger, chunk_threshold, recovery_idx - 1); + replay_submitter.write(true, ccf::kv::EntryFlags::FORCE_LEDGER_CHUNK_AFTER); + REQUIRE(ledger.get_last_idx() == recovery_idx); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); + + replay_submitter.write(true); + + REQUIRE(ledger.get_last_idx() == recovery_idx + 1); + REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); + read_entry_from_ledger(ledger, recovery_idx + 1); + } + SUBCASE("Enable and complete recovery") { Ledger ledger(ledger_dir, wf, chunk_threshold); diff --git a/tests/infra/utils.py b/tests/infra/utils.py index 303d7f0e8c30..6abba2418b92 100644 --- a/tests/infra/utils.py +++ b/tests/infra/utils.py @@ -3,8 +3,16 @@ import infra.path from hashlib import sha256 import infra.snp as snp +from infra.node import strip_version +from packaging.version import Version # type: ignore +import os +import ccf +import ccf.ledger +import ccf.split_ledger import infra.proc +from loguru import logger as LOG + def get_measurement(enclave_type, enclave_platform, package, library_dir="."): if enclave_platform == "virtual": @@ -29,3 +37,39 @@ def get_host_data_and_security_policy( return hash.hexdigest(), None else: raise ValueError(f"Cannot get security policy on {enclave_platform}") + + +def write_ledger_chunk(outdir, entries, end_seqno, complete): + os.makedirs(outdir, exist_ok=True) + selected_entries = [(s, raw) for s, raw in entries if s <= end_seqno] + assert selected_entries, f"No entries selected up to {end_seqno}" + + ledger_file = ccf.split_ledger.create_new_ledger_file(outdir) + if complete: + final_seqno, final_raw_tx = selected_entries[-1] + flagged_final_raw_tx = bytearray(final_raw_tx) + flagged_final_raw_tx[ + ccf.ledger.TransactionHeader.VERSION_LENGTH + ] |= ccf.ledger.TransactionFlags.FORCE_CHUNK_AFTER.value + selected_entries[-1] = (final_seqno, bytes(flagged_final_raw_tx)) + + entry_positions = [] + for _, raw_tx in selected_entries: + entry_positions.append(ledger_file.tell()) + ledger_file.write(raw_tx) + + start_seqno = selected_entries[0][0] + final_seqno = selected_entries[-1][0] + final_file_name = ccf.split_ledger.make_final_ledger_file_name( + start_seqno, + final_seqno, + is_complete=complete, + is_committed=False, + ) + ccf.split_ledger.close_ledger_file( + ledger_file, entry_positions, final_file_name, complete_file=complete + ) + LOG.info( + f"Created recovery ledger variant {outdir}: {final_file_name} " + f"complete={complete}" + ) diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index a583db582962..bb38f5357754 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -4,6 +4,7 @@ import infra.network import infra.proc import infra.net +import infra.utils import infra.logging_app as app from infra.tx_status import TxStatus import suite.test_requirements as reqs @@ -775,6 +776,180 @@ def test_ledger_invariants(network, args): return network +@reqs.description("Join nodes with snapshot and ledger offsets") +def test_joining_nodes_snapshot_ledger_offset(network, args): + primary, _ = network.find_primary() + + network.consortium.force_ledger_chunk(primary) + network.get_latest_ledger_public_state() + + network.txs.issue(network, number_txs=5, send_private=False, send_public=True) + network.txs.issue(network, number_txs=5, send_private=False, send_public=True) + + snapshot_trigger_txid = primary.trigger_snapshot() + committed_snapshots_dir = network.get_committed_snapshots( + primary, + target_seqno=snapshot_trigger_txid.seqno, + wait_for_target_seqno=True, + ) + committed_snapshots = sorted( + [ + f + for f in os.listdir(committed_snapshots_dir) + if f.startswith("snapshot_") and ccf.ledger.is_snapshot_file_committed(f) + ], + key=lambda f: ccf.ledger.snapshot_index_from_filename(f)[0], + ) + assert ( + committed_snapshots + ), f"Expected committed snapshots in {committed_snapshots_dir}" + snapshot_file = committed_snapshots[-1] + snapshot_seqno, _ = ccf.ledger.snapshot_index_from_filename(snapshot_file) + + snapshot_dir = os.path.join( + network.common_dir, "joining_nodes_snapshot_ledger_offset.snapshots" + ) + rmtree(snapshot_dir, ignore_errors=True) + os.makedirs(snapshot_dir) + copy(os.path.join(committed_snapshots_dir, snapshot_file), snapshot_dir) + + rest_txid = network.txs.issue( + network, number_txs=5, send_private=False, send_public=True + ) + + assert snapshot_trigger_txid.seqno < snapshot_seqno < rest_txid.seqno, ( + snapshot_trigger_txid, + snapshot_seqno, + rest_txid, + ) + + # flush ledger to disk from commit index + network.get_latest_ledger_public_state() + # Only use the committed ledger files flushed from above + _, committed_ledger_dirs = primary.get_ledger() + ledger = ccf.ledger.Ledger( + committed_ledger_dirs, + committed_only=True, + contiguous_suffix=True, + ) + + snapshot_chunk_start = None + snapshot_chunk_entries = None + post_snapshot_entries = [] + for chunk in ledger: + entries = [ + (tx.get_public_domain().get_seqno(), tx.get_raw_tx()) for tx in chunk + ] + if not entries: + continue + + chunk_start = entries[0][0] + chunk_end = entries[-1][0] + if chunk_start <= snapshot_seqno <= chunk_end: + snapshot_chunk_start = chunk_start + snapshot_chunk_entries = entries + assert ( + chunk_end == snapshot_seqno + ), f"Expected snapshot seqno {snapshot_seqno} at chunk boundary, got chunk {chunk_start}-{chunk_end}" + + post_snapshot_entries.extend( + (seqno, raw_tx) + for seqno, raw_tx in entries + if snapshot_seqno < seqno <= rest_txid.seqno + ) + + assert ( + snapshot_chunk_start is not None + ), f"Could not find ledger chunk ending at snapshot seqno {snapshot_seqno}" + assert snapshot_chunk_entries is not None + if snapshot_chunk_start <= snapshot_trigger_txid.seqno < snapshot_seqno: + mid_chunk_seqno = snapshot_trigger_txid.seqno + else: + mid_chunk_seqno = next( + seqno + for seqno, _ in reversed(snapshot_chunk_entries) + if seqno < snapshot_seqno + ) + assert ( + post_snapshot_entries + ), f"Expected ledger entries after snapshot {snapshot_seqno}" + assert post_snapshot_entries[0][0] == snapshot_seqno + 1, post_snapshot_entries[0] + + base_dir = os.path.join(network.common_dir, "joining_nodes_snapshot_ledger_offset") + rmtree(base_dir, ignore_errors=True) + os.makedirs(base_dir) + + variants = [ + ( + "incomplete_mid_chunk", + [(snapshot_chunk_entries, mid_chunk_seqno, False)], + ), + ("complete_mid_chunk", [(snapshot_chunk_entries, mid_chunk_seqno, True)]), + ("incomplete_snapshot", [(snapshot_chunk_entries, snapshot_seqno, False)]), + ("complete_snapshot", [(snapshot_chunk_entries, snapshot_seqno, True)]), + ( + "incomplete_rest", + [ + (snapshot_chunk_entries, snapshot_seqno, True), + (post_snapshot_entries, rest_txid.seqno, False), + ], + ), + ] + + for variant_name, chunks_to_write in variants: + variant_dir = os.path.join(base_dir, variant_name) + current_dir = os.path.join(variant_dir, "ledger.current") + prefix_dir = os.path.join(variant_dir, "ledger.committed") + os.makedirs(current_dir) + os.makedirs(prefix_dir) + + for source_dir in committed_ledger_dirs: + for f in os.listdir(source_dir): + if not f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX): + continue + _, range_end = ccf.ledger.range_from_filename(f) + if range_end is not None and range_end < snapshot_chunk_start: + copy(os.path.join(source_dir, f), prefix_dir) + + for entries, end_seqno, complete in chunks_to_write: + infra.utils.write_ledger_chunk(current_dir, entries, end_seqno, complete) + + LOG.info( + "Joining node with {} ledger variant from snapshot {}", + variant_name, + snapshot_file, + ) + new_node = network.create_node() + try: + network.join_node( + new_node, + args.package, + args, + target_node=primary, + ledger_dir=current_dir, + read_only_ledger_dirs=[prefix_dir], + from_snapshot=True, + snapshots_dir=snapshot_dir, + copy_ledger=False, + fetch_recent_snapshot=False, + timeout=10, + ) + + out_path, _ = new_node.get_logs() + with open(out_path, encoding="utf-8") as out: + assert f"snapshot_{snapshot_seqno}_" in out.read() + + network.retire_node(primary, new_node) + new_node.stop() + finally: + if not new_node.is_stopped(): + new_node.stop() + if new_node in network.nodes: + network.nodes.remove(new_node) + + return network + + def run_all(args): txs = app.LoggingTxs("user0") with infra.network.network( @@ -808,6 +983,7 @@ def run_all(args): test_add_node_from_snapshot(network, args) test_add_node_from_snapshot(network, args, from_backup=True) test_add_node_from_snapshot(network, args, copy_ledger=False) + test_joining_nodes_snapshot_ledger_offset(network, args) test_node_filter(network, args) test_retiring_nodes_emit_at_most_one_signature(network, args) diff --git a/tests/recovery.py b/tests/recovery.py index 308fca81d454..a1838f9b6905 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -1318,6 +1318,207 @@ def run_recover_via_added_recovery_owner(args): return network +def run_recover_snapshot_ledger_offset(args): + txs = app.LoggingTxs("user0") + with infra.network.network( + args.nodes, args.binary_dir, args.debug_nodes, txs=txs + ) as network: + network.start_and_open(args) + primary, _ = network.find_primary() + + network.consortium.force_ledger_chunk(primary) + network.get_latest_ledger_public_state() + + network.txs.issue(network, number_txs=5, send_private=False, send_public=True) + network.txs.issue(network, number_txs=5, send_private=False, send_public=True) + + snapshot_trigger_txid = primary.trigger_snapshot() + committed_snapshots_dir = network.get_committed_snapshots( + primary, + target_seqno=snapshot_trigger_txid.seqno, + wait_for_target_seqno=True, + ) + committed_snapshots = sorted( + [ + f + for f in os.listdir(committed_snapshots_dir) + if f.startswith("snapshot_") + and ccf.ledger.is_snapshot_file_committed(f) + ], + key=lambda f: ccf.ledger.snapshot_index_from_filename(f)[0], + ) + assert ( + committed_snapshots + ), f"Expected committed snapshots in {committed_snapshots_dir}" + snapshot_file = committed_snapshots[-1] + snapshot_seqno, _ = ccf.ledger.snapshot_index_from_filename(snapshot_file) + + source_snapshot_dir = os.path.join( + network.common_dir, "recovery_snapshot_ledger_offset.snapshots" + ) + shutil.rmtree(source_snapshot_dir, ignore_errors=True) + os.makedirs(source_snapshot_dir) + shutil.copy( + os.path.join(committed_snapshots_dir, snapshot_file), source_snapshot_dir + ) + + rest_txid = network.txs.issue( + network, number_txs=5, send_private=False, send_public=True + ) + network.get_latest_ledger_public_state() + + assert snapshot_trigger_txid.seqno < snapshot_seqno < rest_txid.seqno, ( + snapshot_trigger_txid, + snapshot_seqno, + rest_txid, + ) + + network.save_service_identity(args) + network.stop_all_nodes(skip_verification=True) + current_ledger_dir, committed_ledger_dirs = primary.get_ledger() + ledger = ccf.ledger.Ledger( + [current_ledger_dir] + committed_ledger_dirs, + committed_only=False, + contiguous_suffix=True, + ) + + snapshot_chunk_start = None + snapshot_chunk_entries = None + post_snapshot_entries = [] + for chunk in ledger: + entries = [ + (tx.get_public_domain().get_seqno(), tx.get_raw_tx()) for tx in chunk + ] + if not entries: + continue + + chunk_start = entries[0][0] + chunk_end = entries[-1][0] + if chunk_start <= snapshot_seqno <= chunk_end: + snapshot_chunk_start = chunk_start + snapshot_chunk_entries = entries + assert ( + chunk_end == snapshot_seqno + ), f"Expected snapshot seqno {snapshot_seqno} at chunk boundary, got chunk {chunk_start}-{chunk_end}" + + post_snapshot_entries.extend( + (seqno, raw_tx) + for seqno, raw_tx in entries + if snapshot_seqno < seqno <= rest_txid.seqno + ) + + assert ( + snapshot_chunk_start is not None + ), f"Could not find ledger chunk ending at snapshot seqno {snapshot_seqno}" + assert snapshot_chunk_entries is not None + if snapshot_chunk_start <= snapshot_trigger_txid.seqno < snapshot_seqno: + mid_chunk_seqno = snapshot_trigger_txid.seqno + else: + mid_chunk_seqno = next( + seqno + for seqno, _ in reversed(snapshot_chunk_entries) + if seqno < snapshot_seqno + ) + assert ( + post_snapshot_entries + ), f"Expected ledger entries after snapshot {snapshot_seqno}" + assert post_snapshot_entries[0][0] == snapshot_seqno + 1, post_snapshot_entries[ + 0 + ] + + base_dir = os.path.join( + args.workspace, f"{args.label}_recovery_snapshot_ledger_offset" + ) + shutil.rmtree(base_dir, ignore_errors=True) + os.makedirs(base_dir) + + variants = [ + ( + "incomplete_mid_chunk", + [(snapshot_chunk_entries, mid_chunk_seqno, False)], + ), + ( + "complete_mid_chunk", + [(snapshot_chunk_entries, mid_chunk_seqno, True)], + ), + ( + "incomplete_snapshot", + [(snapshot_chunk_entries, snapshot_seqno, False)], + ), + ( + "complete_snapshot", + [(snapshot_chunk_entries, snapshot_seqno, True)], + ), + ( + "incomplete_rest", + [ + (snapshot_chunk_entries, snapshot_seqno, True), + (post_snapshot_entries, rest_txid.seqno, False), + ], + ), + ] + + for variant_index, (variant_name, chunks_to_write) in enumerate(variants): + LOG.info("Recovering service with {} ledger variant", variant_name) + variant_dir = os.path.join(base_dir, variant_name) + current_dir = os.path.join(variant_dir, "ledger.current") + prefix_dir = os.path.join(variant_dir, "ledger.committed") + snapshots_dir = os.path.join(variant_dir, "snapshots") + variant_common_dir = os.path.join(variant_dir, "common") + os.makedirs(current_dir) + os.makedirs(prefix_dir) + os.makedirs(snapshots_dir) + shutil.copytree(network.common_dir, variant_common_dir) + shutil.copy(os.path.join(source_snapshot_dir, snapshot_file), snapshots_dir) + + for source_dir in [current_ledger_dir] + committed_ledger_dirs: + for f in os.listdir(source_dir): + if not f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX): + continue + _, range_end = ccf.ledger.range_from_filename(f) + if range_end is not None and range_end < snapshot_chunk_start: + shutil.copy(os.path.join(source_dir, f), prefix_dir) + + for entries, end_seqno, complete in chunks_to_write: + infra.utils.write_ledger_chunk( + current_dir, entries, end_seqno, complete + ) + + recovered_args = copy.deepcopy(args) + recovered_args.label = f"{args.label}_{variant_name}" + recovered_network = infra.network.Network( + infra.e2e_args.min_nodes(recovered_args, f=0), + recovered_args.binary_dir, + recovered_args.debug_nodes, + txs=network.txs, + jwt_issuer=network.jwt_issuer, + next_node_id=variant_index, + ) + recovered_network.ignore_errors_on_shutdown() + try: + recovered_network.start_in_recovery( + recovered_args, + ledger_dir=current_dir, + committed_ledger_dirs=[prefix_dir], + snapshots_dir=snapshots_dir, + common_dir=variant_common_dir, + ) + recovered_primary, _ = recovered_network.find_primary() + out_path, _ = recovered_primary.get_logs() + with open(out_path, encoding="utf-8") as out: + assert f"snapshot_{snapshot_seqno}_" in out.read() + + recovered_network.recover(recovered_args) + with recovered_primary.client() as c: + r = c.get("/node/ready/app") + assert r.status_code == http.HTTPStatus.NO_CONTENT.value, r + finally: + recovered_network.stop_all_nodes( + skip_verification=True, + skip_verify_chunking=True, + check_file_invariants=True, + ) + if __name__ == "__main__": def add(parser): @@ -1434,4 +1635,13 @@ def add(parser): snapshot_tx_interval=10000, ) + cr.add( + "recovery_snapshot_ledger_offset", + run_recover_snapshot_ledger_offset, + package="samples/apps/logging/logging", + nodes=infra.e2e_args.min_nodes(cr.args, f=1), + ledger_chunk_bytes="50MB", + snapshot_tx_interval=50, + ) + cr.run() From 5a57a9c1ab2ef245239aa75224c0d98044f700ba Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Wed, 3 Jun 2026 16:03:16 +0100 Subject: [PATCH 2/9] Fixup tests --- src/host/test/ledger.cpp | 10 +++++----- tests/infra/utils.py | 5 ++++- tests/reconfiguration.py | 30 +++++++++++++++++++++--------- tests/recovery.py | 35 ++++++++++++++++++++++++----------- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index 721868cf7eb7..9ec3762401b2 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -1735,7 +1735,7 @@ TEST_CASE("Recovery") SUBCASE("Future non-recovery truncate remains a no-op") { - Ledger ledger(ledger_dir, wf); + Ledger ledger(ledger_dir, wf, chunk_threshold); TestEntrySubmitter entry_submitter(ledger, chunk_threshold); entry_submitter.write(true); @@ -1751,7 +1751,7 @@ TEST_CASE("Recovery") SUBCASE("Recovery truncate beyond ledger end positions recovery writes") { - Ledger ledger(ledger_dir, wf); + Ledger ledger(ledger_dir, wf, chunk_threshold); TestEntrySubmitter entry_submitter(ledger, chunk_threshold); entry_submitter.write(true); @@ -1772,7 +1772,7 @@ TEST_CASE("Recovery") SUBCASE("Recovery truncate beyond empty ledger end positions recovery writes") { - Ledger ledger(ledger_dir, wf); + Ledger ledger(ledger_dir, wf, chunk_threshold); const auto recovery_idx = 5; ledger.truncate(recovery_idx, true); @@ -1797,7 +1797,7 @@ TEST_CASE("Recovery") SUBCASE("Recovery truncate at ledger end positions recovery writes") { - Ledger ledger(ledger_dir, wf); + Ledger ledger(ledger_dir, wf, chunk_threshold); TestEntrySubmitter entry_submitter(ledger, chunk_threshold); entry_submitter.write(true); @@ -1817,7 +1817,7 @@ TEST_CASE("Recovery") SUBCASE("Recovery truncate inside ledger positions recovery writes") { - Ledger ledger(ledger_dir, wf); + Ledger ledger(ledger_dir, wf, chunk_threshold); TestEntrySubmitter entry_submitter(ledger, chunk_threshold); for (size_t i = 0; i < 5; ++i) diff --git a/tests/infra/utils.py b/tests/infra/utils.py index 6abba2418b92..ed97e656ea5d 100644 --- a/tests/infra/utils.py +++ b/tests/infra/utils.py @@ -14,6 +14,9 @@ from loguru import logger as LOG +FORCE_LEDGER_CHUNK_AFTER = 0x01 + + def get_measurement(enclave_type, enclave_platform, package, library_dir="."): if enclave_platform == "virtual": return "Insecure hard-coded virtual measurement v1" @@ -50,7 +53,7 @@ def write_ledger_chunk(outdir, entries, end_seqno, complete): flagged_final_raw_tx = bytearray(final_raw_tx) flagged_final_raw_tx[ ccf.ledger.TransactionHeader.VERSION_LENGTH - ] |= ccf.ledger.TransactionFlags.FORCE_CHUNK_AFTER.value + ] |= FORCE_LEDGER_CHUNK_AFTER selected_entries[-1] = (final_seqno, bytes(flagged_final_raw_tx)) entry_positions = [] diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index bb38f5357754..5b0e48627b85 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -9,11 +9,12 @@ from infra.tx_status import TxStatus import suite.test_requirements as reqs import tempfile -from shutil import copy +from shutil import copy, rmtree from copy import deepcopy import os import time import ccf.ledger +import ccf.tx_id import json import infra.crypto from datetime import datetime @@ -786,11 +787,23 @@ def test_joining_nodes_snapshot_ledger_offset(network, args): network.txs.issue(network, number_txs=5, send_private=False, send_public=True) network.txs.issue(network, number_txs=5, send_private=False, send_public=True) - snapshot_trigger_txid = primary.trigger_snapshot() + # On 6.x, trigger_snapshot can be consumed before the proposal completes. + # Use the committed HWM before proposing as the snapshot lower bound. + with primary.client() as c: + r = c.get("/node/commit") + assert r.status_code == http.HTTPStatus.OK.value, r + snapshot_mark_seqno = ccf.tx_id.TxID.from_str( + r.body.json()["transaction_id"] + ).seqno + + proposal_body, careful_vote = network.consortium.make_proposal("trigger_snapshot") + proposal = network.consortium.get_any_active_member().propose( + primary, proposal_body + ) + network.consortium.vote_using_majority(primary, proposal, careful_vote) committed_snapshots_dir = network.get_committed_snapshots( primary, - target_seqno=snapshot_trigger_txid.seqno, - wait_for_target_seqno=True, + target_seqno=snapshot_mark_seqno, ) committed_snapshots = sorted( [ @@ -817,8 +830,8 @@ def test_joining_nodes_snapshot_ledger_offset(network, args): network, number_txs=5, send_private=False, send_public=True ) - assert snapshot_trigger_txid.seqno < snapshot_seqno < rest_txid.seqno, ( - snapshot_trigger_txid, + assert snapshot_mark_seqno <= snapshot_seqno < rest_txid.seqno, ( + snapshot_mark_seqno, snapshot_seqno, rest_txid, ) @@ -830,7 +843,6 @@ def test_joining_nodes_snapshot_ledger_offset(network, args): ledger = ccf.ledger.Ledger( committed_ledger_dirs, committed_only=True, - contiguous_suffix=True, ) snapshot_chunk_start = None @@ -862,8 +874,8 @@ def test_joining_nodes_snapshot_ledger_offset(network, args): snapshot_chunk_start is not None ), f"Could not find ledger chunk ending at snapshot seqno {snapshot_seqno}" assert snapshot_chunk_entries is not None - if snapshot_chunk_start <= snapshot_trigger_txid.seqno < snapshot_seqno: - mid_chunk_seqno = snapshot_trigger_txid.seqno + if snapshot_chunk_start <= snapshot_mark_seqno < snapshot_seqno: + mid_chunk_seqno = snapshot_mark_seqno else: mid_chunk_seqno = next( seqno diff --git a/tests/recovery.py b/tests/recovery.py index a1838f9b6905..d3806455ab3f 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -4,6 +4,7 @@ import infra.member import infra.network import infra.node +import infra.utils import infra.logging_app as app import infra.checker import infra.crypto @@ -23,6 +24,7 @@ import http import base64 import shutil +import copy from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -1332,11 +1334,25 @@ def run_recover_snapshot_ledger_offset(args): network.txs.issue(network, number_txs=5, send_private=False, send_public=True) network.txs.issue(network, number_txs=5, send_private=False, send_public=True) - snapshot_trigger_txid = primary.trigger_snapshot() + # On 6.x, trigger_snapshot can be consumed before the proposal completes. + # Use the committed HWM before proposing as the snapshot lower bound. + with primary.client() as c: + r = c.get("/node/commit") + assert r.status_code == http.HTTPStatus.OK.value, r + snapshot_mark_seqno = ccf.tx_id.TxID.from_str( + r.body.json()["transaction_id"] + ).seqno + + proposal_body, careful_vote = network.consortium.make_proposal( + "trigger_snapshot" + ) + proposal = network.consortium.get_any_active_member().propose( + primary, proposal_body + ) + network.consortium.vote_using_majority(primary, proposal, careful_vote) committed_snapshots_dir = network.get_committed_snapshots( primary, - target_seqno=snapshot_trigger_txid.seqno, - wait_for_target_seqno=True, + target_seqno=snapshot_mark_seqno, ) committed_snapshots = sorted( [ @@ -1367,8 +1383,8 @@ def run_recover_snapshot_ledger_offset(args): ) network.get_latest_ledger_public_state() - assert snapshot_trigger_txid.seqno < snapshot_seqno < rest_txid.seqno, ( - snapshot_trigger_txid, + assert snapshot_mark_seqno <= snapshot_seqno < rest_txid.seqno, ( + snapshot_mark_seqno, snapshot_seqno, rest_txid, ) @@ -1379,7 +1395,6 @@ def run_recover_snapshot_ledger_offset(args): ledger = ccf.ledger.Ledger( [current_ledger_dir] + committed_ledger_dirs, committed_only=False, - contiguous_suffix=True, ) snapshot_chunk_start = None @@ -1411,8 +1426,8 @@ def run_recover_snapshot_ledger_offset(args): snapshot_chunk_start is not None ), f"Could not find ledger chunk ending at snapshot seqno {snapshot_seqno}" assert snapshot_chunk_entries is not None - if snapshot_chunk_start <= snapshot_trigger_txid.seqno < snapshot_seqno: - mid_chunk_seqno = snapshot_trigger_txid.seqno + if snapshot_chunk_start <= snapshot_mark_seqno < snapshot_seqno: + mid_chunk_seqno = snapshot_mark_seqno else: mid_chunk_seqno = next( seqno @@ -1515,8 +1530,6 @@ def run_recover_snapshot_ledger_offset(args): finally: recovered_network.stop_all_nodes( skip_verification=True, - skip_verify_chunking=True, - check_file_invariants=True, ) if __name__ == "__main__": @@ -1638,7 +1651,7 @@ def add(parser): cr.add( "recovery_snapshot_ledger_offset", run_recover_snapshot_ledger_offset, - package="samples/apps/logging/logging", + package="samples/apps/logging/liblogging", nodes=infra.e2e_args.min_nodes(cr.args, f=1), ledger_chunk_bytes="50MB", snapshot_tx_interval=50, From e4f2673f2cf65823c6a41d6cf3346ae56cee718b Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Wed, 3 Jun 2026 18:25:15 +0100 Subject: [PATCH 3/9] bump pyproject --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 730202691aa6..0cda064aa639 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "ccf" -version = "6.0.27" +version = "6.0.28" authors = [ { name="CCF Team", email="CCF-Sec@microsoft.com" }, ] From 6368d4a6b9c981b0659bdfa4202481341e478859 Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Wed, 3 Jun 2026 22:06:14 +0100 Subject: [PATCH 4/9] Changelogging --- CHANGELOG.md | 2 +- tests/infra/utils.py | 1 - tests/recovery.py | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5183f6037555..981f751490cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed - Fixed cache size calculations for historical queries, resolving a bug where signature transactions could become orphaned and fill the cache's useful space, resulting in incoming user-requested stores being immediately evicted (#7755). +- Fixed cache size calculations for historical queries, resolving a bug where signature transactions could become orphaned and fill the cache's useful space, resulting in incoming user-requested stores being immediately evicted (#7755). ## [6.0.25] diff --git a/tests/infra/utils.py b/tests/infra/utils.py index ed97e656ea5d..95b417f2012a 100644 --- a/tests/infra/utils.py +++ b/tests/infra/utils.py @@ -13,7 +13,6 @@ from loguru import logger as LOG - FORCE_LEDGER_CHUNK_AFTER = 0x01 diff --git a/tests/recovery.py b/tests/recovery.py index d3806455ab3f..db665cfa30f8 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -1532,6 +1532,7 @@ def run_recover_snapshot_ledger_offset(args): skip_verification=True, ) + if __name__ == "__main__": def add(parser): From 5afe2e1218bc37d0a97b383e69b1be2a5d703cec Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Thu, 4 Jun 2026 11:47:46 +0100 Subject: [PATCH 5/9] fmt --- tests/infra/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/infra/utils.py b/tests/infra/utils.py index 95b417f2012a..3750d9d29d63 100644 --- a/tests/infra/utils.py +++ b/tests/infra/utils.py @@ -3,8 +3,6 @@ import infra.path from hashlib import sha256 import infra.snp as snp -from infra.node import strip_version -from packaging.version import Version # type: ignore import os import ccf import ccf.ledger From b99ea3454b648eb11ce602025f05beb60465125c Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Fri, 5 Jun 2026 11:48:15 +0100 Subject: [PATCH 6/9] cherry-pick fixes to ledger files accesses --- src/host/ledger.h | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/host/ledger.h b/src/host/ledger.h index 80c52bda8b8e..4e550fb9e1d7 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -1506,13 +1506,20 @@ namespace asynchost auto f_from = get_it_contains_idx(idx + 1); auto f_to = get_it_contains_idx(last_idx); - auto f_end = std::next(f_to); - + // std::next(end()) is undefined behaviour, which libstdc++'s debug + // iterators correctly detect; use end() directly when f_to is end(). + auto f_end = (f_to == files.end()) ? files.end() : std::next(f_to); + + // Note: do not compare iterators against `f_from` inside the loop, as + // it may be invalidated by `files.erase(it)` below. Use a flag for the + // first iteration instead. + bool is_first = true; for (auto it = f_from; it != f_end;) { // Truncate the first file to the truncation index while the more // recent files are deleted entirely - auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1; + auto truncate_idx = is_first ? idx : (*it)->get_start_idx() - 1; + is_first = false; if ((*it)->truncate(truncate_idx)) { it = files.erase(it); @@ -1541,7 +1548,9 @@ namespace asynchost auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) : get_it_contains_idx(committed_idx); auto f_to = get_it_contains_idx(idx); - auto f_end = std::next(f_to); + // std::next(end()) is undefined behaviour, which libstdc++'s debug + // iterators correctly detect; use end() directly when f_to is end(). + auto f_end = (f_to == files.end()) ? files.end() : std::next(f_to); for (auto it = f_from; it != f_end;) { From e6bec23c386928b49b7b3ff2261d0da05e678b9e Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Fri, 5 Jun 2026 11:50:29 +0100 Subject: [PATCH 7/9] skip ledger offset on classic API version --- tests/recovery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/recovery.py b/tests/recovery.py index db665cfa30f8..d08eacfeb5ce 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -5,6 +5,7 @@ import infra.network import infra.node import infra.utils +import infra.clients import infra.logging_app as app import infra.checker import infra.crypto @@ -1321,6 +1322,8 @@ def run_recover_via_added_recovery_owner(args): def run_recover_snapshot_ledger_offset(args): + if args.gov_api_version == infra.clients.API_VERSION_CLASSIC: + return txs = app.LoggingTxs("user0") with infra.network.network( args.nodes, args.binary_dir, args.debug_nodes, txs=txs From 87de00204cd26263fbd5bc469bffaa0127ba846a Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Fri, 5 Jun 2026 13:17:17 +0100 Subject: [PATCH 8/9] fmt --- tests/recovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/recovery.py b/tests/recovery.py index d08eacfeb5ce..d48c4f0e3783 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -1323,7 +1323,7 @@ def run_recover_via_added_recovery_owner(args): def run_recover_snapshot_ledger_offset(args): if args.gov_api_version == infra.clients.API_VERSION_CLASSIC: - return + return txs = app.LoggingTxs("user0") with infra.network.network( args.nodes, args.binary_dir, args.debug_nodes, txs=txs From 96c931f3c4b4827a89c39c9c4e76a9707ec4439c Mon Sep 17 00:00:00 2001 From: cjen1-msft Date: Sun, 7 Jun 2026 18:47:12 +0100 Subject: [PATCH 9/9] MUSTREVERT: disable changes to ci --- src/host/test/ledger.cpp | 5 +++++ tests/infra/utils.py | 44 ---------------------------------------- tests/reconfiguration.py | 4 +++- tests/recovery.py | 18 ++++++++-------- 4 files changed, 18 insertions(+), 53 deletions(-) diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index 9ec3762401b2..163687d1928b 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -1733,6 +1733,10 @@ TEST_CASE("Recovery") size_t chunk_threshold = 30; size_t entries_per_chunk = get_entries_per_chunk(chunk_threshold); +#if 0 + // Temporarily disabled while investigating CI runner failures on the + // backport branch. Re-enable these recovery truncate regression subcases + // once CI is stable. SUBCASE("Future non-recovery truncate remains a no-op") { Ledger ledger(ledger_dir, wf, chunk_threshold); @@ -1845,6 +1849,7 @@ TEST_CASE("Recovery") REQUIRE(number_of_recovery_files_in_ledger_dir() == 1); read_entry_from_ledger(ledger, recovery_idx + 1); } +#endif SUBCASE("Enable and complete recovery") { diff --git a/tests/infra/utils.py b/tests/infra/utils.py index 3750d9d29d63..303d7f0e8c30 100644 --- a/tests/infra/utils.py +++ b/tests/infra/utils.py @@ -3,16 +3,8 @@ import infra.path from hashlib import sha256 import infra.snp as snp -import os -import ccf -import ccf.ledger -import ccf.split_ledger import infra.proc -from loguru import logger as LOG - -FORCE_LEDGER_CHUNK_AFTER = 0x01 - def get_measurement(enclave_type, enclave_platform, package, library_dir="."): if enclave_platform == "virtual": @@ -37,39 +29,3 @@ def get_host_data_and_security_policy( return hash.hexdigest(), None else: raise ValueError(f"Cannot get security policy on {enclave_platform}") - - -def write_ledger_chunk(outdir, entries, end_seqno, complete): - os.makedirs(outdir, exist_ok=True) - selected_entries = [(s, raw) for s, raw in entries if s <= end_seqno] - assert selected_entries, f"No entries selected up to {end_seqno}" - - ledger_file = ccf.split_ledger.create_new_ledger_file(outdir) - if complete: - final_seqno, final_raw_tx = selected_entries[-1] - flagged_final_raw_tx = bytearray(final_raw_tx) - flagged_final_raw_tx[ - ccf.ledger.TransactionHeader.VERSION_LENGTH - ] |= FORCE_LEDGER_CHUNK_AFTER - selected_entries[-1] = (final_seqno, bytes(flagged_final_raw_tx)) - - entry_positions = [] - for _, raw_tx in selected_entries: - entry_positions.append(ledger_file.tell()) - ledger_file.write(raw_tx) - - start_seqno = selected_entries[0][0] - final_seqno = selected_entries[-1][0] - final_file_name = ccf.split_ledger.make_final_ledger_file_name( - start_seqno, - final_seqno, - is_complete=complete, - is_committed=False, - ) - ccf.split_ledger.close_ledger_file( - ledger_file, entry_positions, final_file_name, complete_file=complete - ) - LOG.info( - f"Created recovery ledger variant {outdir}: {final_file_name} " - f"complete={complete}" - ) diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index 5b0e48627b85..45548869a40e 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -995,7 +995,9 @@ def run_all(args): test_add_node_from_snapshot(network, args) test_add_node_from_snapshot(network, args, from_backup=True) test_add_node_from_snapshot(network, args, copy_ledger=False) - test_joining_nodes_snapshot_ledger_offset(network, args) + # Temporarily disabled while investigating CI runner failures on the + # backport branch. Re-enable this regression test once CI is stable. + # test_joining_nodes_snapshot_ledger_offset(network, args) test_node_filter(network, args) test_retiring_nodes_emit_at_most_one_signature(network, args) diff --git a/tests/recovery.py b/tests/recovery.py index d48c4f0e3783..9d2facf9d3b0 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -1652,13 +1652,15 @@ def add(parser): snapshot_tx_interval=10000, ) - cr.add( - "recovery_snapshot_ledger_offset", - run_recover_snapshot_ledger_offset, - package="samples/apps/logging/liblogging", - nodes=infra.e2e_args.min_nodes(cr.args, f=1), - ledger_chunk_bytes="50MB", - snapshot_tx_interval=50, - ) + # Temporarily disabled while investigating CI runner failures on the + # backport branch. Re-enable this regression test once CI is stable. + # cr.add( + # "recovery_snapshot_ledger_offset", + # run_recover_snapshot_ledger_offset, + # package="samples/apps/logging/liblogging", + # nodes=infra.e2e_args.min_nodes(cr.args, f=1), + # ledger_chunk_bytes="50MB", + # snapshot_tx_interval=50, + # ) cr.run()