diff --git a/apps/arweave/src/ar_cli_parser.erl b/apps/arweave/src/ar_cli_parser.erl index 9a87f92577..138fea44db 100644 --- a/apps/arweave/src/ar_cli_parser.erl +++ b/apps/arweave/src/ar_cli_parser.erl @@ -198,6 +198,8 @@ show_help() -> " and downloads it from peers.", [?DEFAULT_HEADER_SYNC_JOBS] )}, + {"enable_data_roots_syncing [true|false]", + "Enable or disable background data roots syncing. Default: true."}, {"data_sync_request_packed_chunks", "Enables requesting the packed chunks from peers."}, {"disk_pool_jobs (num)", @@ -739,6 +741,10 @@ parse(["sync_jobs", Num | Rest], C) -> parse(Rest, C#config{ sync_jobs = list_to_integer(Num) }); parse(["header_sync_jobs", Num | Rest], C) -> parse(Rest, C#config{ header_sync_jobs = list_to_integer(Num) }); +parse(["enable_data_roots_syncing", "true" | Rest], C) -> + parse(Rest, C#config{ enable_data_roots_syncing = true }); +parse(["enable_data_roots_syncing", "false" | Rest], C) -> + parse(Rest, C#config{ enable_data_roots_syncing = false }); parse(["data_sync_request_packed_chunks" | Rest], C) -> parse(Rest, C#config{ data_sync_request_packed_chunks = true }); parse(["post_tx_timeout", Num | Rest], C) -> diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index 6ae18771a7..089b8cfe5b 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -487,6 +487,12 @@ parse_options([{<<"header_sync_jobs">>, Value} | Rest], Config) parse_options([{<<"header_sync_jobs">>, Value} | _], _) -> {error, {bad_type, header_sync_jobs, number}, Value}; +parse_options([{<<"enable_data_roots_syncing">>, Value} | Rest], Config) + when is_boolean(Value) -> + parse_options(Rest, Config#config{ enable_data_roots_syncing = Value }); +parse_options([{<<"enable_data_roots_syncing">>, Value} | _], _) -> + {error, {bad_type, enable_data_roots_syncing, boolean}, Value}; + parse_options([{<<"disk_pool_jobs">>, Value} | Rest], Config) when is_integer(Value) -> parse_options(Rest, Config#config{ disk_pool_jobs = Value }); diff --git a/apps/arweave/src/ar_data_root_sync.erl b/apps/arweave/src/ar_data_root_sync.erl index 31bbe6b01f..f04174e887 100644 --- a/apps/arweave/src/ar_data_root_sync.erl +++ b/apps/arweave/src/ar_data_root_sync.erl @@ -120,7 +120,14 @@ handle_cast(sync, State) -> ar_util:cast_after(500, self(), sync), {noreply, State}; true -> - {Delay, State2} = sync_block_data_roots(State), + {ok, Config} = arweave_config:get_env(), + {Delay, State2} = + case Config#config.enable_data_roots_syncing of + true -> + sync_block_data_roots(State); + false -> + {?DATA_ROOTS_SYNC_SCAN_INTERVAL_MS, State} + end, ar_util:cast_after(Delay, self(), sync), {noreply, State2} end; diff --git a/apps/arweave/src/ar_http.erl b/apps/arweave/src/ar_http.erl index 30679cbc81..431b26cbd1 100644 --- a/apps/arweave/src/ar_http.erl +++ b/apps/arweave/src/ar_http.erl @@ -468,8 +468,16 @@ should_retry_closed_connection({down, {shutdown, {error, einval}}}) -> true; should_retry_closed_connection({stream_error, closed}) -> true; +should_retry_closed_connection({stream_error, closing}) -> + true; +should_retry_closed_connection({stream_error, {closed, normal}}) -> + true; should_retry_closed_connection({shutdown, closed}) -> true; +should_retry_closed_connection(closed) -> + true; +should_retry_closed_connection(closing) -> + true; should_retry_closed_connection(_) -> false. diff --git a/apps/arweave/test/ar_data_roots_sync_tests.erl b/apps/arweave/test/ar_data_roots_sync_tests.erl index 3253692fc8..80c9da4643 100644 --- a/apps/arweave/test/ar_data_roots_sync_tests.erl +++ b/apps/arweave/test/ar_data_roots_sync_tests.erl @@ -45,26 +45,19 @@ chunk_after_data_roots_background_sync_test_() -> {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], fun test_chunk_after_data_roots_background_sync/0). -% %% Two TXs with the same data root. B2's data roots are synced and its chunk is promoted; -% %% B1's data roots are NOT yet in the index (incomplete background sync). POST B1's chunk — -% %% chunk_offsets_synced must not skip it (none → false). The chunk enters the disk pool. -% %% After B1's data roots are synced, the disk pool scan promotes the chunk to B1's offset. -% chunk_skipped_with_duplicate_data_root_test_() -> -% ar_test_node:test_with_mocked_functions([ -% {ar_block, get_consensus_window_size, fun() -> 5 end}, -% {ar_block, get_max_tx_anchor_depth, fun() -> 5 end}, -% {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], -% fun test_chunk_skipped_with_duplicate_data_root/0). - -% %% 6 TXs with the same data root: 5 highest-offset chunks synced, lowest not. POST the -% %% lowest chunk — chunk_offsets_synced exhausts its N=5 depth limit walking through the 5 -% %% synced entries. Before fix the N=0 base case returned true (skip); after fix returns false. -% chunk_skipped_with_depth_exhaustion_test_() -> -% ar_test_node:test_with_mocked_functions([ -% {ar_block, get_consensus_window_size, fun() -> 5 end}, -% {ar_block, get_max_tx_anchor_depth, fun() -> 5 end}, -% {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], -% fun test_chunk_skipped_with_depth_exhaustion/0). +chunk_skipped_with_duplicate_data_root_test_() -> + ar_test_node:test_with_mocked_functions([ + {ar_block, get_consensus_window_size, fun() -> 5 end}, + {ar_block, get_max_tx_anchor_depth, fun() -> 5 end}, + {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], + fun test_chunk_skipped_with_duplicate_data_root/0). + +chunk_skipped_with_depth_exhaustion_test_() -> + ar_test_node:test_with_mocked_functions([ + {ar_block, get_consensus_window_size, fun() -> 5 end}, + {ar_block, get_max_tx_anchor_depth, fun() -> 5 end}, + {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], + fun test_chunk_skipped_with_depth_exhaustion/0). test_data_roots_sync_from_peer() -> Wallet = {_, Pub} = ar_wallet:new(), @@ -133,18 +126,18 @@ test_data_roots_sync_from_peer() -> true -> ?debugFmt("Asserting data roots synced during consensus " "are stored, even outside the configured storage modules, " - "height: ~B, configured ranges: ~p, intersection: ~p", + "height: ~B, configured ranges: ~0p, intersection: ~0p", [B#block.height, ConfiguredRanges, Intersection]), wait_for_data_roots(main, B); false -> case ar_intervals:is_empty(Intersection) of false -> ?debugFmt("Asserting data roots synced for partitions " - "we configured, range intersection: ~p", [Intersection]), + "we configured, range intersection: ~0p", [Intersection]), wait_for_data_roots(main, B); true -> ?debugFmt("Asserting no data roots for partitions " - "we did not configure, block range: ~p", [BlockRange]), + "we did not configure, block range: ~0p", [BlockRange]), assert_no_data_roots(main, B) end end @@ -160,7 +153,7 @@ test_data_roots_http_post() -> {B, _} = mine_block_with_small_fixed_data_tx(peer1, Wallet), %% Mine some empty blocks to push the data block out of the recent window. mine_empty_blocks_on_peer_after(peer1, B, 11), - join_main_on_peer1(B#block.height + 11, 0), + join_main_on_peer1(B#block.height + 11, false), ar_test_node:disconnect_from(peer1), assert_no_data_roots(main, B), {ok, Body} = get_data_roots(peer1, B), @@ -182,10 +175,10 @@ test_chunk_after_data_roots_http_post() -> Guaranteed = mine_block_with_small_fixed_data_tx(peer1, Wallet), Random = lists:map( fun(_) -> - TXData = generate_random_txs(Wallet), - TXs = [TX || {TX, _} <- TXData], + TXData0 = generate_random_txs(Wallet), + TXs = [TX || {TX, _} <- TXData0], B = ar_test_node:post_and_mine(#{ miner => peer1, await_on => peer1 }, TXs), - {B, TXData} + {B, filter_mined_tx_data(B, TXData0)} end, lists:seq(1, 4) ), @@ -195,7 +188,7 @@ test_chunk_after_data_roots_http_post() -> {LastB, _} = lists:last(Data), mine_empty_blocks_on_peer_after(peer1, LastB, 11), - join_main_on_peer1(LastB#block.height + 11, 0), + join_main_on_peer1(LastB#block.height + 11, false), ar_test_node:disconnect_from(peer1), %% GET/POST /data_roots only apply below each peer's disk pool bound (see @@ -245,7 +238,7 @@ test_chunk_after_data_roots_background_sync() -> start_peers_then_disconnect(peer1, main, B0), {B, [{TX, Chunks}]} = mine_block_with_small_fixed_data_tx(peer1, Wallet), mine_empty_blocks_on_peer_after(peer1, B, 11), - join_main_on_peer1(B#block.height + 11, 2), + join_main_on_peer1(B#block.height + 11, true), true = B#block.block_size > 0, wait_until_data_roots_synced(main, B), ar_test_node:disconnect_from(peer1), @@ -262,10 +255,10 @@ test_chunk_skipped_with_duplicate_data_root() -> ?assertEqual(TX1#tx.data_root, TX2#tx.data_root), %% Mine empty blocks to push the data blocks out of the recent window. mine_empty_blocks_on_peer_after(peer1, B2, 11), - %% Join main with header_sync_jobs = 0 — data_root_index is NOT populated by the - %% join process. We manually sync only B2's data roots (simulating incomplete - %% background sync where B2 was processed but B1 was not). - join_main_on_peer1(B2#block.height + 11, 0), + %% Join main with background data_roots syncing turned off. + %% We manually sync only B2's data roots (simulating incomplete background sync where + %% B2 was processed but B1 was not). + join_main_on_peer1(B2#block.height + 11, false), %% Pre-fetch B1's data roots while still connected to peer1. We'll post them to main %% immediately after posting B1's chunk to beat the disk pool scan. {ok, Body1} = get_data_roots(peer1, B1), @@ -284,6 +277,7 @@ test_chunk_skipped_with_duplicate_data_root() -> %% https://github.com/ArweaveTeam/arweave-dev/issues/1112 [{AbsEnd1, Proof1}] = ar_test_data_sync:build_proofs(B1, TX1, Chunks1), post_chunk(main, Proof1), + %% Allow time for the chunk to be persisted (it shouldn't be) timer:sleep(10_000), ?assertEqual(not_found, get_chunk(main, AbsEnd1)), %% Now we post B1's data roots, which should allow Chunk1 to be persisted @@ -297,26 +291,29 @@ test_chunk_skipped_with_depth_exhaustion() -> Wallet = {_, Pub} = ar_wallet:new(), [B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]), start_peers_then_disconnect(peer1, main, B0), - %% Mine 6 blocks with identical data roots — enough to exhaust the N=5 depth limit - %% in chunk_offsets_synced. + %% Mine 6 blocks with identical data roots this will make sure the oldest chunk is will not + %% be persisted (we only persist the first 5 chunks with the same data root). Blocks = lists:map( fun(_) -> mine_block_with_small_fixed_data_tx(peer1, Wallet) end, lists:seq(1, 6) ), {LastB, _} = lists:last(Blocks), - mine_empty_blocks_on_peer_after(peer1, LastB, 11), - join_main_on_peer1(LastB#block.height + 11, 0), + %% Now we mine 10 empty blocks. This pushes the 6 chunks mined earlier out of the disk pool, + %% which will either promote them to storage (first 5) or purge it from memory (last 1). + mine_empty_blocks_on_peer_after(peer1, LastB, 10), + join_main_on_peer1(LastB#block.height + 10, false), + ar_test_node:disconnect_from(peer1), + [{B1, [{TX1, Chunks1}]} | HigherBlocks] = Blocks, lists:foreach( fun({B, _}) -> {ok, Body} = get_data_roots(peer1, B), post_data_roots(main, B, Body), wait_for_data_roots(main, B) end, - Blocks + HigherBlocks ), - ar_test_node:disconnect_from(peer1), - %% POST chunks for blocks 2-6 (the 5 highest offsets) and wait for promotion. - [{B1, [{TX1, Chunks1}]} | HigherBlocks] = Blocks, + %% POST chunks for blocks 2-6 and wait for promotion. The oldest block B1 is not among + %% the last 5 duplicate offsets, so it should not become queryable via duplicate fanout. HigherAbsEnds = lists:map( fun({B, [{TX, Chunks}]}) -> [{AbsEnd, Proof}] = ar_test_data_sync:build_proofs(B, TX, Chunks), @@ -327,17 +324,30 @@ test_chunk_skipped_with_depth_exhaustion() -> ), lists:foreach( fun(AbsEnd) -> - wait_for_sync_record_update(main, AbsEnd) + wait_for_chunk_to_persist(main, AbsEnd) end, HigherAbsEnds ), timer:sleep(10_000), - %% POST chunk for block 1 (lowest offset). chunk_offsets_synced walks the 5 synced - %% entries (blocks 6→5→4→3→2), then finds block 1's entry but N has reached 0. - %% Before fix: N=0 returned true → chunk skipped. After fix: returns false → accepted. + %% POST chunk for block 1 (lowest offset). chunk_offsets_synced/5 only checks the last + %% 5 synced duplicate offsets, so B1 is treated as already synced and skipped. [{AbsEnd1, Proof1}] = ar_test_data_sync:build_proofs(B1, TX1, Chunks1), post_chunk(main, Proof1), - ok = wait_for_chunk_to_persist(main, AbsEnd1). + timer:sleep(10_000), + ?assertEqual(not_found, get_chunk(main, AbsEnd1)), + {ok, Body1} = get_data_roots(peer1, B1), + %% This test confirms that doign a POST /data_roots before posting a chunk is not a reliable + %% workaround to force the chunk to be persisted. We will either need to change the + %% duplicate data_roots logic, or implement a new endpoint. + %% For more context, and to track the state of any improvements to the process, see: + %% https://github.com/ArweaveTeam/arweave-dev/issues/1112 + post_data_roots(main, B1, Body1), + wait_for_data_roots(main, B1), + post_chunk(main, Proof1), + timer:sleep(10_000), + ?assertEqual(not_found, get_chunk(main, AbsEnd1)), + % ok = wait_for_chunk_to_persist(main, AbsEnd1), + ok. %% Start PeerA and PeerB from the same genesis, wait until both joined, then have PeerA %% disconnect from PeerB so they stop syncing while tests extend the chain on one side. @@ -361,13 +371,22 @@ mine_empty_blocks_on_peer_after(Peer, Block, Count) -> lists:seq(1, Count) ). -%% Rejoin main onto peer1 with mine disabled and the given header_sync_jobs (e.g. 0 to isolate -%% data_roots tests, or >0 so ar_header_sync may call ar_data_sync:add_block/2 while -%% ar_data_root_sync is also writing roots). Does not disconnect — caller may call +%% Rejoin main onto peer1 with mine disabled. When EnableBackgroundSync is true, enable the +%% normal background sync setup; when false, disable both header syncing and background +%% data_roots syncing. Does not disconnect — caller may call %% ar_test_node:remote_call(main, ar_test_node, disconnect_from, [peer1]) when needed. -join_main_on_peer1(ExpectedHeight, HeaderSyncJobs) -> +join_main_on_peer1(ExpectedHeight, EnableBackgroundSync) -> {ok, BaseConfig} = arweave_config:get_env(), - MainConfig = BaseConfig#config{ mine = false, header_sync_jobs = HeaderSyncJobs }, + MainConfig = BaseConfig#config{ + mine = false, + sync_jobs = 0, + header_sync_jobs = + case EnableBackgroundSync of + true -> 2; + false -> 0 + end, + enable_data_roots_syncing = EnableBackgroundSync + }, ar_test_node:join_on(#{ node => main, join_on => peer1, config => MainConfig }, true), ar_test_node:connect_to_peer(peer1), ar_test_node:wait_until_joined(main), @@ -485,6 +504,10 @@ generate_random_txs(Wallet) -> block_start(B) -> B#block.weave_size - B#block.block_size. +filter_mined_tx_data(B, TXData) -> + MinedTXIDs = sets:from_list(B#block.txs), + [{TX, Chunks} || {TX, Chunks} <- TXData, sets:is_element(TX#tx.id, MinedTXIDs)]. + data_roots_path(BlockStart) -> "/data_roots/" ++ integer_to_list(BlockStart). @@ -586,7 +609,9 @@ wait_for_chunk_to_persist(Node, GlobalEndOffset, TimeoutMs) -> true -> ok; {error, timeout} -> - ?assert(false) + ?assert(false, + lists:flatten(io_lib:format("Timeout waiting for chunk to persist: ~p", + [GlobalEndOffset]))) end. wait_for_data_roots(Peer, B) -> diff --git a/apps/arweave/test/ar_test_node.erl b/apps/arweave/test/ar_test_node.erl index eaa71b7090..7fa5cc10dc 100644 --- a/apps/arweave/test/ar_test_node.erl +++ b/apps/arweave/test/ar_test_node.erl @@ -74,6 +74,7 @@ -define(WAIT_UNTIL_JOINED_TIMEOUT, 200_000). -define(WAIT_SYNCS_DATA_TIMEOUT, 200_000). -define(WAIT_UNTIL_MINING_PAUSED_TIMEOUT, 60_000). +-define(TEST_HTTP_CLIENT_KEEPALIVE, 4_000). %%%=================================================================== %%% Public interface. @@ -286,7 +287,8 @@ update_config(Config) -> mine = Config#config.mine, storage_modules = Config#config.storage_modules, repack_in_place_storage_modules = Config#config.repack_in_place_storage_modules, - allow_rebase = Config#config.allow_rebase + allow_rebase = Config#config.allow_rebase, + 'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE }, ok = arweave_config:set_env(Config2), ?LOG_INFO("Updated Config:"), @@ -680,6 +682,7 @@ start(B0, RewardAddr, Config, StorageModules) -> double_check_nonce_limiter, serve_wallet_lists | Config#config.enable], %% Disable rebasing by default to make the tests more reliable. allow_rebase = false, + 'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE, debug = true }), ar:start_dependencies(), @@ -903,7 +906,8 @@ join(JoinOnNode, Rejoin, Config) -> ok = arweave_config:set_env(Config#config{ start_from_latest_state = false, auto_join = true, - peers = [Peer] + peers = [Peer], + 'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE }), ar:start_dependencies(), whereis(ar_node_worker). diff --git a/apps/arweave/test/ar_test_runner.erl b/apps/arweave/test/ar_test_runner.erl index 5177655939..bd20ba0d08 100644 --- a/apps/arweave/test/ar_test_runner.erl +++ b/apps/arweave/test/ar_test_runner.erl @@ -147,6 +147,7 @@ start_for_tests(TestType) -> data_dir = ".tmp/data_" ++ atom_to_list(TestType) ++ "_main_" ++ UniqueName, port = ar_test_node:get_unused_port(), disable = [randomx_jit], + 'http_client.http.keepalive' = 4_000, auto_join = false }, ar:start(TestConfig). diff --git a/apps/arweave_config/include/arweave_config.hrl b/apps/arweave_config/include/arweave_config.hrl index 9a21db65e9..93fb932046 100644 --- a/apps/arweave_config/include/arweave_config.hrl +++ b/apps/arweave_config/include/arweave_config.hrl @@ -310,6 +310,7 @@ max_emitters = ?NUM_EMITTER_PROCESSES, sync_jobs = ?DEFAULT_SYNC_JOBS, header_sync_jobs = ?DEFAULT_HEADER_SYNC_JOBS, + enable_data_roots_syncing = true, data_sync_request_packed_chunks = false, disk_pool_jobs = ?DEFAULT_DISK_POOL_JOBS, load_key = not_set,