diff --git a/apps/arweave/src/ar_data_root_sync.erl b/apps/arweave/src/ar_data_root_sync.erl index 8926279c55..3fe898ebc4 100644 --- a/apps/arweave/src/ar_data_root_sync.erl +++ b/apps/arweave/src/ar_data_root_sync.erl @@ -155,13 +155,13 @@ sync_block_data_roots(#state{ store_id = StoreID, range_start = RangeStart, sync_block_data_roots(_StoreID, Cursor, RangeEnd) when Cursor >= RangeEnd -> {ok, Cursor}; sync_block_data_roots(StoreID, Cursor, RangeEnd) -> - {BlockStart, BlockEnd, _} = ar_block_index:get_block_bounds(Cursor), + {BlockStart, BlockEnd, TXRoot} = ar_block_index:get_block_bounds(Cursor), Cursor2 = case BlockStart >= RangeEnd of true -> RangeEnd; false -> - case ar_sync_record:is_recorded(Cursor + 1, data_roots, ?DEFAULT_MODULE) of + case ar_data_sync:are_data_roots_synced(BlockStart, BlockEnd, TXRoot) of true -> BlockEnd; false -> diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 093d787d56..b119903d5e 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -3994,7 +3994,7 @@ record_chunk_cache_size_metric() -> %% Return {ok, {TXRoot, BlockSize, [{DataRoot, TXSize, TXStartOffset, TXPath}, ...]}} %% or {error, Reason}. get_data_roots_for_offset(Offset) -> - case Offset > get_disk_pool_threshold() of + case Offset >= get_disk_pool_threshold() of true -> {error, not_found}; false -> diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index fd0d5d2d72..b381936c94 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -552,6 +552,70 @@ handle(<<"GET">>, [<<"data_roots">>, OffsetBin], Req, _Pid) -> end end; +%% Accept data roots for a given block offset (>= BlockStartOffset, < BlockEndOffset). +%% Expect only entries corresponding to non-empty transactions. +%% Expect the complete list of entries in the order they appear in the data root index, +%% which corresponds to sorted #tx records in the block. +%% POST /data_roots/{offset} +handle(<<"POST">>, [<<"data_roots">>, OffsetBin], Req, Pid) -> + case ar_node:is_joined() of + false -> + not_joined(Req); + true -> + ok = ar_semaphore:acquire(get_data_roots, ?DEFAULT_CALL_TIMEOUT), + DiskPoolThreshold = ar_data_sync:get_disk_pool_threshold(), + ReadOffset = + case catch binary_to_integer(OffsetBin) of + {'EXIT', _} -> + {reply, {400, #{}, <<>>, Req}}; + Offset when Offset >= DiskPoolThreshold -> + {reply, {400, #{}, jiffy:encode(#{ error => offset_above_disk_pool_threshold }), Req}}; + Offset when Offset < 0 -> + {reply, {400, #{}, jiffy:encode(#{ error => negative_offset }), Req}}; + Offset -> + {BlockStart, BlockEnd, ExpectedTXRoot} = ar_block_index:get_block_bounds(Offset), + case ar_data_sync:are_data_roots_synced(BlockStart, BlockEnd, ExpectedTXRoot) of + true -> + {reply, {200, #{}, <<>>, Req}}; + false -> + {Offset, BlockStart, BlockEnd} + end + end, + case ReadOffset of + {reply, Reply} -> + Reply; + {Offset2, BlockStart2, BlockEnd2} -> + case read_complete_body(Req, Pid) of + {ok, Body, Req2} -> + case ar_serialize:binary_to_data_roots(Body) of + {ok, {TXRoot, BlockSize, Entries}} -> + case ar_data_root_sync:validate_data_roots(TXRoot, BlockSize, Entries, Offset2) of + {ok, _} -> + case catch ar_data_root_sync:store_data_roots_sync( + BlockStart2, BlockEnd2, TXRoot, Entries) of + ok -> + {200, #{}, <<>>, Req2}; + {'EXIT', {timeout, _}} -> + {503, #{}, jiffy:encode(#{ error => timeout }), Req2}; + {'EXIT', _} -> + {503, #{}, jiffy:encode(#{ error => timeout }), Req2}; + {error, Reason} -> + {503, #{}, jiffy:encode(#{ error => Reason }), Req2} + end; + {error, Reason} -> + {400, #{}, jiffy:encode(#{ error => Reason }), Req2} + end; + _ -> + {400, #{}, jiffy:encode(#{ error => invalid_format }), Req2} + end; + {error, body_size_too_large} -> + {400, #{}, <<>>, Req}; + {error, timeout} -> + {503, #{}, jiffy:encode(#{ error => timeout }), Req} + end + end + end; + handle(<<"POST">>, [<<"chunk">>], Req, Pid) -> Joined = case ar_node:is_joined() of diff --git a/apps/arweave/test/ar_data_roots_sync_tests.erl b/apps/arweave/test/ar_data_roots_sync_tests.erl index dfd261685c..de1a1c46ee 100644 --- a/apps/arweave/test/ar_data_roots_sync_tests.erl +++ b/apps/arweave/test/ar_data_roots_sync_tests.erl @@ -15,6 +15,13 @@ data_roots_syncs_from_peer_test_() -> {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], fun test_data_roots_syncs_from_peer/0). +post_data_roots_test_() -> + ar_test_node:test_with_mocked_functions([ + {ar_block, get_consensus_window_size, fun() -> 5 end}, + {ar_block, get_max_tx_anchor_depth, fun() -> 5 end}, + {ar_storage_module, get_overlap, fun(_Packing) -> 0 end}], + fun test_post_data_roots/0). + test_data_roots_syncs_from_peer() -> Wallet = {_, Pub} = ar_wallet:new(), [B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]), @@ -110,6 +117,92 @@ test_data_roots_syncs_from_peer() -> Blocks ). +test_post_data_roots() -> + Wallet = {_, Pub} = ar_wallet:new(), + [B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]), + + ar_test_node:start_peer(peer1, B0), + ar_test_node:start_peer(main, B0), + ar_test_node:wait_until_joined(peer1), + ar_test_node:wait_until_joined(main), + ar_test_node:disconnect_from(peer1), + + %% Mine blocks with transactions with data on peer1. + Data = lists:map( + fun(_) -> + TXData = generate_random_txs(Wallet), + TXs = [TX || {TX, _} <- TXData], + B = ar_test_node:post_and_mine(#{ miner => peer1, await_on => peer1 }, TXs), + {B, TXData} + end, + lists:seq(1, 5) + ), + + %% Mine some empty blocks to push the data blocks out of the recent window. + {LastB, _} = lists:last(Data), + lists:foldl( + fun(_, Height) -> + ar_test_node:mine(peer1), + ar_test_node:assert_wait_until_height(peer1, Height + 1), + Height + 1 + end, + LastB#block.height, + lists:seq(1, 11) + ), + + %% Now re-join main (node A) with header syncing disabled. + {ok, BaseConfig} = arweave_config:get_env(), + MainConfig = BaseConfig#config{ + mine = false, + header_sync_jobs = 0 + }, + ar_test_node:join_on(#{ node => main, join_on => peer1, config => MainConfig }, true), + ar_test_node:connect_to_peer(peer1), + ar_test_node:wait_until_joined(main), + ar_test_node:assert_wait_until_height(main, LastB#block.height + 11), + ar_test_node:disconnect_from(peer1), + + %% Verify POST /data_roots and POST /chunk for each transaction. + lists:foreach( + fun({B, TXData}) -> + BlockStart = B#block.weave_size - B#block.block_size, + case {B#block.block_size > 0, BlockStart >= ar_data_sync:get_disk_pool_threshold()} of + {true, true} -> + assert_no_data_roots(BlockStart), + + %% Fetch data roots from peer1 and POST to main. + Peer1 = ar_test_node:peer_ip(peer1), + Main = ar_test_node:peer_ip(main), + Path = "/data_roots/" ++ integer_to_list(BlockStart), + {ok, {{<<"200">>, _}, _, Body, _, _}} = ar_http:req(#{ method => get, peer => Peer1, path => Path }), + {ok, {{<<"200">>, _}, _, <<>>, _, _}} = ar_http:req(#{ method => post, peer => Main, path => Path, body => Body }), + + %% For each transaction with data, POST its chunks to main. + lists:foreach( + fun({TX, Chunks}) -> + case TX#tx.data_size > 0 of + true -> + Proofs = ar_test_data_sync:get_records_with_proofs(B, TX, Chunks), + lists:foreach( + fun({_, _, _, {_, Proof}}) -> + {ok, {{<<"200">>, _}, _, <<>>, _, _}} = ar_test_node:post_chunk(main, ar_serialize:jsonify(Proof)) + end, + Proofs + ); + false -> + ok + end + end, + TXData + ); + _ -> + ok + end + end, + Data + ), + ok. + generate_random_txs(Wallet) -> Coin = rand:uniform(12), case Coin of