Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/arweave/src/ar_cli_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ show_help() ->
" and downloads it from peers.",
[?DEFAULT_HEADER_SYNC_JOBS]
)},
{"enable_data_roots_syncing [true|false]",
"Enable or disable background data roots syncing. Default: true."},
{"data_sync_request_packed_chunks",
"Enables requesting the packed chunks from peers."},
{"disk_pool_jobs (num)",
Expand Down Expand Up @@ -739,6 +741,10 @@ parse(["sync_jobs", Num | Rest], C) ->
parse(Rest, C#config{ sync_jobs = list_to_integer(Num) });
parse(["header_sync_jobs", Num | Rest], C) ->
parse(Rest, C#config{ header_sync_jobs = list_to_integer(Num) });
parse(["enable_data_roots_syncing", "true" | Rest], C) ->
parse(Rest, C#config{ enable_data_roots_syncing = true });
parse(["enable_data_roots_syncing", "false" | Rest], C) ->
parse(Rest, C#config{ enable_data_roots_syncing = false });
parse(["data_sync_request_packed_chunks" | Rest], C) ->
parse(Rest, C#config{ data_sync_request_packed_chunks = true });
parse(["post_tx_timeout", Num | Rest], C) ->
Expand Down
6 changes: 6 additions & 0 deletions apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ parse_options([{<<"header_sync_jobs">>, Value} | Rest], Config)
parse_options([{<<"header_sync_jobs">>, Value} | _], _) ->
{error, {bad_type, header_sync_jobs, number}, Value};

parse_options([{<<"enable_data_roots_syncing">>, Value} | Rest], Config)
when is_boolean(Value) ->
parse_options(Rest, Config#config{ enable_data_roots_syncing = Value });
parse_options([{<<"enable_data_roots_syncing">>, Value} | _], _) ->
{error, {bad_type, enable_data_roots_syncing, boolean}, Value};

parse_options([{<<"disk_pool_jobs">>, Value} | Rest], Config)
when is_integer(Value) ->
parse_options(Rest, Config#config{ disk_pool_jobs = Value });
Expand Down
9 changes: 8 additions & 1 deletion apps/arweave/src/ar_data_root_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ handle_cast(sync, State) ->
ar_util:cast_after(500, self(), sync),
{noreply, State};
true ->
{Delay, State2} = sync_block_data_roots(State),
{ok, Config} = arweave_config:get_env(),
{Delay, State2} =
case Config#config.enable_data_roots_syncing of
true ->
sync_block_data_roots(State);
false ->
{?DATA_ROOTS_SYNC_SCAN_INTERVAL_MS, State}
end,
ar_util:cast_after(Delay, self(), sync),
{noreply, State2}
end;
Expand Down
8 changes: 8 additions & 0 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,16 @@ should_retry_closed_connection({down, {shutdown, {error, einval}}}) ->
true;
should_retry_closed_connection({stream_error, closed}) ->
true;
should_retry_closed_connection({stream_error, closing}) ->
true;
should_retry_closed_connection({stream_error, {closed, normal}}) ->
true;
should_retry_closed_connection({shutdown, closed}) ->
true;
should_retry_closed_connection(closed) ->
true;
should_retry_closed_connection(closing) ->
true;
should_retry_closed_connection(_) ->
false.

Expand Down
129 changes: 77 additions & 52 deletions apps/arweave/test/ar_data_roots_sync_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,19 @@ chunk_after_data_roots_background_sync_test_() ->
{ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
fun test_chunk_after_data_roots_background_sync/0).

% %% Two TXs with the same data root. B2's data roots are synced and its chunk is promoted;
% %% B1's data roots are NOT yet in the index (incomplete background sync). POST B1's chunk —
% %% chunk_offsets_synced must not skip it (none → false). The chunk enters the disk pool.
% %% After B1's data roots are synced, the disk pool scan promotes the chunk to B1's offset.
% chunk_skipped_with_duplicate_data_root_test_() ->
% ar_test_node:test_with_mocked_functions([
% {ar_block, get_consensus_window_size, fun() -> 5 end},
% {ar_block, get_max_tx_anchor_depth, fun() -> 5 end},
% {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
% fun test_chunk_skipped_with_duplicate_data_root/0).

% %% 6 TXs with the same data root: 5 highest-offset chunks synced, lowest not. POST the
% %% lowest chunk — chunk_offsets_synced exhausts its N=5 depth limit walking through the 5
% %% synced entries. Before fix the N=0 base case returned true (skip); after fix returns false.
% chunk_skipped_with_depth_exhaustion_test_() ->
% ar_test_node:test_with_mocked_functions([
% {ar_block, get_consensus_window_size, fun() -> 5 end},
% {ar_block, get_max_tx_anchor_depth, fun() -> 5 end},
% {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
% fun test_chunk_skipped_with_depth_exhaustion/0).
chunk_skipped_with_duplicate_data_root_test_() ->
ar_test_node:test_with_mocked_functions([
{ar_block, get_consensus_window_size, fun() -> 5 end},
{ar_block, get_max_tx_anchor_depth, fun() -> 5 end},
{ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
fun test_chunk_skipped_with_duplicate_data_root/0).

chunk_skipped_with_depth_exhaustion_test_() ->
ar_test_node:test_with_mocked_functions([
{ar_block, get_consensus_window_size, fun() -> 5 end},
{ar_block, get_max_tx_anchor_depth, fun() -> 5 end},
{ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
fun test_chunk_skipped_with_depth_exhaustion/0).

test_data_roots_sync_from_peer() ->
Wallet = {_, Pub} = ar_wallet:new(),
Expand Down Expand Up @@ -133,18 +126,18 @@ test_data_roots_sync_from_peer() ->
true ->
?debugFmt("Asserting data roots synced during consensus "
"are stored, even outside the configured storage modules, "
"height: ~B, configured ranges: ~p, intersection: ~p",
"height: ~B, configured ranges: ~0p, intersection: ~0p",
[B#block.height, ConfiguredRanges, Intersection]),
wait_for_data_roots(main, B);
false ->
case ar_intervals:is_empty(Intersection) of
false ->
?debugFmt("Asserting data roots synced for partitions "
"we configured, range intersection: ~p", [Intersection]),
"we configured, range intersection: ~0p", [Intersection]),
wait_for_data_roots(main, B);
true ->
?debugFmt("Asserting no data roots for partitions "
"we did not configure, block range: ~p", [BlockRange]),
"we did not configure, block range: ~0p", [BlockRange]),
assert_no_data_roots(main, B)
end
end
Expand All @@ -160,7 +153,7 @@ test_data_roots_http_post() ->
{B, _} = mine_block_with_small_fixed_data_tx(peer1, Wallet),
%% Mine some empty blocks to push the data block out of the recent window.
mine_empty_blocks_on_peer_after(peer1, B, 11),
join_main_on_peer1(B#block.height + 11, 0),
join_main_on_peer1(B#block.height + 11, false),
ar_test_node:disconnect_from(peer1),
assert_no_data_roots(main, B),
{ok, Body} = get_data_roots(peer1, B),
Expand All @@ -182,10 +175,10 @@ test_chunk_after_data_roots_http_post() ->
Guaranteed = mine_block_with_small_fixed_data_tx(peer1, Wallet),
Random = lists:map(
fun(_) ->
TXData = generate_random_txs(Wallet),
TXs = [TX || {TX, _} <- TXData],
TXData0 = generate_random_txs(Wallet),
TXs = [TX || {TX, _} <- TXData0],
B = ar_test_node:post_and_mine(#{ miner => peer1, await_on => peer1 }, TXs),
{B, TXData}
{B, filter_mined_tx_data(B, TXData0)}
end,
lists:seq(1, 4)
),
Expand All @@ -195,7 +188,7 @@ test_chunk_after_data_roots_http_post() ->
{LastB, _} = lists:last(Data),
mine_empty_blocks_on_peer_after(peer1, LastB, 11),

join_main_on_peer1(LastB#block.height + 11, 0),
join_main_on_peer1(LastB#block.height + 11, false),
ar_test_node:disconnect_from(peer1),

%% GET/POST /data_roots only apply below each peer's disk pool bound (see
Expand Down Expand Up @@ -245,7 +238,7 @@ test_chunk_after_data_roots_background_sync() ->
start_peers_then_disconnect(peer1, main, B0),
{B, [{TX, Chunks}]} = mine_block_with_small_fixed_data_tx(peer1, Wallet),
mine_empty_blocks_on_peer_after(peer1, B, 11),
join_main_on_peer1(B#block.height + 11, 2),
join_main_on_peer1(B#block.height + 11, true),
true = B#block.block_size > 0,
wait_until_data_roots_synced(main, B),
ar_test_node:disconnect_from(peer1),
Expand All @@ -262,10 +255,10 @@ test_chunk_skipped_with_duplicate_data_root() ->
?assertEqual(TX1#tx.data_root, TX2#tx.data_root),
%% Mine empty blocks to push the data blocks out of the recent window.
mine_empty_blocks_on_peer_after(peer1, B2, 11),
%% Join main with header_sync_jobs = 0 — data_root_index is NOT populated by the
%% join process. We manually sync only B2's data roots (simulating incomplete
%% background sync where B2 was processed but B1 was not).
join_main_on_peer1(B2#block.height + 11, 0),
%% Join main with background data_roots syncing turned off.
%% We manually sync only B2's data roots (simulating incomplete background sync where
%% B2 was processed but B1 was not).
join_main_on_peer1(B2#block.height + 11, false),
%% Pre-fetch B1's data roots while still connected to peer1. We'll post them to main
%% immediately after posting B1's chunk to beat the disk pool scan.
{ok, Body1} = get_data_roots(peer1, B1),
Expand All @@ -284,6 +277,7 @@ test_chunk_skipped_with_duplicate_data_root() ->
%% https://github.com/ArweaveTeam/arweave-dev/issues/1112
[{AbsEnd1, Proof1}] = ar_test_data_sync:build_proofs(B1, TX1, Chunks1),
post_chunk(main, Proof1),
%% Allow time for the chunk to be persisted (it shouldn't be)
timer:sleep(10_000),
?assertEqual(not_found, get_chunk(main, AbsEnd1)),
%% Now we post B1's data roots, which should allow Chunk1 to be persisted
Expand All @@ -297,26 +291,29 @@ test_chunk_skipped_with_depth_exhaustion() ->
Wallet = {_, Pub} = ar_wallet:new(),
[B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]),
start_peers_then_disconnect(peer1, main, B0),
%% Mine 6 blocks with identical data roots — enough to exhaust the N=5 depth limit
%% in chunk_offsets_synced.
%% Mine 6 blocks with identical data roots this will make sure the oldest chunk is will not
%% be persisted (we only persist the first 5 chunks with the same data root).
Blocks = lists:map(
fun(_) -> mine_block_with_small_fixed_data_tx(peer1, Wallet) end,
lists:seq(1, 6)
),
{LastB, _} = lists:last(Blocks),
mine_empty_blocks_on_peer_after(peer1, LastB, 11),
join_main_on_peer1(LastB#block.height + 11, 0),
%% Now we mine 10 empty blocks. This pushes the 6 chunks mined earlier out of the disk pool,
%% which will either promote them to storage (first 5) or purge it from memory (last 1).
mine_empty_blocks_on_peer_after(peer1, LastB, 10),
join_main_on_peer1(LastB#block.height + 10, false),
ar_test_node:disconnect_from(peer1),
[{B1, [{TX1, Chunks1}]} | HigherBlocks] = Blocks,
lists:foreach(
fun({B, _}) ->
{ok, Body} = get_data_roots(peer1, B),
post_data_roots(main, B, Body),
wait_for_data_roots(main, B)
end,
Blocks
HigherBlocks
),
ar_test_node:disconnect_from(peer1),
%% POST chunks for blocks 2-6 (the 5 highest offsets) and wait for promotion.
[{B1, [{TX1, Chunks1}]} | HigherBlocks] = Blocks,
%% POST chunks for blocks 2-6 and wait for promotion. The oldest block B1 is not among
%% the last 5 duplicate offsets, so it should not become queryable via duplicate fanout.
HigherAbsEnds = lists:map(
fun({B, [{TX, Chunks}]}) ->
[{AbsEnd, Proof}] = ar_test_data_sync:build_proofs(B, TX, Chunks),
Expand All @@ -327,17 +324,30 @@ test_chunk_skipped_with_depth_exhaustion() ->
),
lists:foreach(
fun(AbsEnd) ->
wait_for_sync_record_update(main, AbsEnd)
wait_for_chunk_to_persist(main, AbsEnd)
end,
HigherAbsEnds
),
timer:sleep(10_000),
%% POST chunk for block 1 (lowest offset). chunk_offsets_synced walks the 5 synced
%% entries (blocks 6→5→4→3→2), then finds block 1's entry but N has reached 0.
%% Before fix: N=0 returned true → chunk skipped. After fix: returns false → accepted.
%% POST chunk for block 1 (lowest offset). chunk_offsets_synced/5 only checks the last
%% 5 synced duplicate offsets, so B1 is treated as already synced and skipped.
[{AbsEnd1, Proof1}] = ar_test_data_sync:build_proofs(B1, TX1, Chunks1),
post_chunk(main, Proof1),
ok = wait_for_chunk_to_persist(main, AbsEnd1).
timer:sleep(10_000),
?assertEqual(not_found, get_chunk(main, AbsEnd1)),
{ok, Body1} = get_data_roots(peer1, B1),
%% This test confirms that doign a POST /data_roots before posting a chunk is not a reliable
%% workaround to force the chunk to be persisted. We will either need to change the
%% duplicate data_roots logic, or implement a new endpoint.
%% For more context, and to track the state of any improvements to the process, see:
%% https://github.com/ArweaveTeam/arweave-dev/issues/1112
post_data_roots(main, B1, Body1),
wait_for_data_roots(main, B1),
post_chunk(main, Proof1),
timer:sleep(10_000),
?assertEqual(not_found, get_chunk(main, AbsEnd1)),
% ok = wait_for_chunk_to_persist(main, AbsEnd1),
ok.

%% Start PeerA and PeerB from the same genesis, wait until both joined, then have PeerA
%% disconnect from PeerB so they stop syncing while tests extend the chain on one side.
Expand All @@ -361,13 +371,22 @@ mine_empty_blocks_on_peer_after(Peer, Block, Count) ->
lists:seq(1, Count)
).

%% Rejoin main onto peer1 with mine disabled and the given header_sync_jobs (e.g. 0 to isolate
%% data_roots tests, or >0 so ar_header_sync may call ar_data_sync:add_block/2 while
%% ar_data_root_sync is also writing roots). Does not disconnect — caller may call
%% Rejoin main onto peer1 with mine disabled. When EnableBackgroundSync is true, enable the
%% normal background sync setup; when false, disable both header syncing and background
%% data_roots syncing. Does not disconnect — caller may call
%% ar_test_node:remote_call(main, ar_test_node, disconnect_from, [peer1]) when needed.
join_main_on_peer1(ExpectedHeight, HeaderSyncJobs) ->
join_main_on_peer1(ExpectedHeight, EnableBackgroundSync) ->
{ok, BaseConfig} = arweave_config:get_env(),
MainConfig = BaseConfig#config{ mine = false, header_sync_jobs = HeaderSyncJobs },
MainConfig = BaseConfig#config{
mine = false,
sync_jobs = 0,
header_sync_jobs =
case EnableBackgroundSync of
true -> 2;
false -> 0
end,
enable_data_roots_syncing = EnableBackgroundSync
},
ar_test_node:join_on(#{ node => main, join_on => peer1, config => MainConfig }, true),
ar_test_node:connect_to_peer(peer1),
ar_test_node:wait_until_joined(main),
Expand Down Expand Up @@ -485,6 +504,10 @@ generate_random_txs(Wallet) ->
block_start(B) ->
B#block.weave_size - B#block.block_size.

filter_mined_tx_data(B, TXData) ->
MinedTXIDs = sets:from_list(B#block.txs),
[{TX, Chunks} || {TX, Chunks} <- TXData, sets:is_element(TX#tx.id, MinedTXIDs)].

data_roots_path(BlockStart) ->
"/data_roots/" ++ integer_to_list(BlockStart).

Expand Down Expand Up @@ -586,7 +609,9 @@ wait_for_chunk_to_persist(Node, GlobalEndOffset, TimeoutMs) ->
true ->
ok;
{error, timeout} ->
?assert(false)
?assert(false,
lists:flatten(io_lib:format("Timeout waiting for chunk to persist: ~p",
[GlobalEndOffset])))
end.

wait_for_data_roots(Peer, B) ->
Expand Down
8 changes: 6 additions & 2 deletions apps/arweave/test/ar_test_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
-define(WAIT_UNTIL_JOINED_TIMEOUT, 200_000).
-define(WAIT_SYNCS_DATA_TIMEOUT, 200_000).
-define(WAIT_UNTIL_MINING_PAUSED_TIMEOUT, 60_000).
-define(TEST_HTTP_CLIENT_KEEPALIVE, 4_000).

%%%===================================================================
%%% Public interface.
Expand Down Expand Up @@ -286,7 +287,8 @@ update_config(Config) ->
mine = Config#config.mine,
storage_modules = Config#config.storage_modules,
repack_in_place_storage_modules = Config#config.repack_in_place_storage_modules,
allow_rebase = Config#config.allow_rebase
allow_rebase = Config#config.allow_rebase,
'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE
},
ok = arweave_config:set_env(Config2),
?LOG_INFO("Updated Config:"),
Expand Down Expand Up @@ -680,6 +682,7 @@ start(B0, RewardAddr, Config, StorageModules) ->
double_check_nonce_limiter, serve_wallet_lists | Config#config.enable],
%% Disable rebasing by default to make the tests more reliable.
allow_rebase = false,
'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE,
debug = true
}),
ar:start_dependencies(),
Expand Down Expand Up @@ -903,7 +906,8 @@ join(JoinOnNode, Rejoin, Config) ->
ok = arweave_config:set_env(Config#config{
start_from_latest_state = false,
auto_join = true,
peers = [Peer]
peers = [Peer],
'http_client.http.keepalive' = ?TEST_HTTP_CLIENT_KEEPALIVE
}),
ar:start_dependencies(),
whereis(ar_node_worker).
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/test/ar_test_runner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ start_for_tests(TestType) ->
data_dir = ".tmp/data_" ++ atom_to_list(TestType) ++ "_main_" ++ UniqueName,
port = ar_test_node:get_unused_port(),
disable = [randomx_jit],
'http_client.http.keepalive' = 4_000,
auto_join = false
},
ar:start(TestConfig).
1 change: 1 addition & 0 deletions apps/arweave_config/include/arweave_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@
max_emitters = ?NUM_EMITTER_PROCESSES,
sync_jobs = ?DEFAULT_SYNC_JOBS,
header_sync_jobs = ?DEFAULT_HEADER_SYNC_JOBS,
enable_data_roots_syncing = true,
data_sync_request_packed_chunks = false,
disk_pool_jobs = ?DEFAULT_DISK_POOL_JOBS,
load_key = not_set,
Expand Down
Loading