Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/arweave/src/ar_data_root_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ sync_block_data_roots(#state{ store_id = StoreID, range_start = RangeStart,
sync_block_data_roots(_StoreID, Cursor, RangeEnd) when Cursor >= RangeEnd ->
{ok, Cursor};
sync_block_data_roots(StoreID, Cursor, RangeEnd) ->
{BlockStart, BlockEnd, _} = ar_block_index:get_block_bounds(Cursor),
{BlockStart, BlockEnd, TXRoot} = ar_block_index:get_block_bounds(Cursor),
Cursor2 =
case BlockStart >= RangeEnd of
true ->
RangeEnd;
false ->
case ar_sync_record:is_recorded(Cursor + 1, data_roots, ?DEFAULT_MODULE) of
case ar_data_sync:are_data_roots_synced(BlockStart, BlockEnd, TXRoot) of
true ->
BlockEnd;
false ->
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3994,7 +3994,7 @@ record_chunk_cache_size_metric() ->
%% Return {ok, {TXRoot, BlockSize, [{DataRoot, TXSize, TXStartOffset, TXPath}, ...]}}
%% or {error, Reason}.
get_data_roots_for_offset(Offset) ->
case Offset > get_disk_pool_threshold() of
case Offset >= get_disk_pool_threshold() of
true ->
{error, not_found};
false ->
Expand Down
64 changes: 64 additions & 0 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,70 @@ handle(<<"GET">>, [<<"data_roots">>, OffsetBin], Req, _Pid) ->
end
end;

%% Accept data roots for a given block offset (>= BlockStartOffset, < BlockEndOffset).
%% Expect only entries corresponding to non-empty transactions.
%% Expect the complete list of entries in the order they appear in the data root index,
%% which corresponds to sorted #tx records in the block.
%% POST /data_roots/{offset}
handle(<<"POST">>, [<<"data_roots">>, OffsetBin], Req, Pid) ->
case ar_node:is_joined() of
false ->
not_joined(Req);
true ->
ok = ar_semaphore:acquire(get_data_roots, ?DEFAULT_CALL_TIMEOUT),
DiskPoolThreshold = ar_data_sync:get_disk_pool_threshold(),
ReadOffset =
case catch binary_to_integer(OffsetBin) of
{'EXIT', _} ->
{reply, {400, #{}, <<>>, Req}};
Offset when Offset >= DiskPoolThreshold ->
{reply, {400, #{}, jiffy:encode(#{ error => offset_above_disk_pool_threshold }), Req}};
Offset when Offset < 0 ->
{reply, {400, #{}, jiffy:encode(#{ error => negative_offset }), Req}};
Offset ->
{BlockStart, BlockEnd, ExpectedTXRoot} = ar_block_index:get_block_bounds(Offset),
case ar_data_sync:are_data_roots_synced(BlockStart, BlockEnd, ExpectedTXRoot) of
true ->
{reply, {200, #{}, <<>>, Req}};
false ->
{Offset, BlockStart, BlockEnd}
end
end,
case ReadOffset of
{reply, Reply} ->
Reply;
{Offset2, BlockStart2, BlockEnd2} ->
case read_complete_body(Req, Pid) of
{ok, Body, Req2} ->
case ar_serialize:binary_to_data_roots(Body) of
{ok, {TXRoot, BlockSize, Entries}} ->
case ar_data_root_sync:validate_data_roots(TXRoot, BlockSize, Entries, Offset2) of
{ok, _} ->
case catch ar_data_root_sync:store_data_roots_sync(
BlockStart2, BlockEnd2, TXRoot, Entries) of
ok ->
{200, #{}, <<>>, Req2};
{'EXIT', {timeout, _}} ->
{503, #{}, jiffy:encode(#{ error => timeout }), Req2};
{'EXIT', _} ->
{503, #{}, jiffy:encode(#{ error => timeout }), Req2};
{error, Reason} ->
{503, #{}, jiffy:encode(#{ error => Reason }), Req2}
end;
{error, Reason} ->
{400, #{}, jiffy:encode(#{ error => Reason }), Req2}
end;
_ ->
{400, #{}, jiffy:encode(#{ error => invalid_format }), Req2}
end;
{error, body_size_too_large} ->
{400, #{}, <<>>, Req};
{error, timeout} ->
{503, #{}, jiffy:encode(#{ error => timeout }), Req}
end
end
end;

handle(<<"POST">>, [<<"chunk">>], Req, Pid) ->
Joined =
case ar_node:is_joined() of
Expand Down
93 changes: 93 additions & 0 deletions apps/arweave/test/ar_data_roots_sync_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ data_roots_syncs_from_peer_test_() ->
{ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
fun test_data_roots_syncs_from_peer/0).

post_data_roots_test_() ->
ar_test_node:test_with_mocked_functions([
{ar_block, get_consensus_window_size, fun() -> 5 end},
{ar_block, get_max_tx_anchor_depth, fun() -> 5 end},
{ar_storage_module, get_overlap, fun(_Packing) -> 0 end}],
fun test_post_data_roots/0).

test_data_roots_syncs_from_peer() ->
Wallet = {_, Pub} = ar_wallet:new(),
[B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]),
Expand Down Expand Up @@ -110,6 +117,92 @@ test_data_roots_syncs_from_peer() ->
Blocks
).

test_post_data_roots() ->
Wallet = {_, Pub} = ar_wallet:new(),
[B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(2_000_000_000_000_000), <<>>}]),

ar_test_node:start_peer(peer1, B0),
ar_test_node:start_peer(main, B0),
ar_test_node:wait_until_joined(peer1),
ar_test_node:wait_until_joined(main),
ar_test_node:disconnect_from(peer1),

%% Mine blocks with transactions with data on peer1.
Data = lists:map(
fun(_) ->
TXData = generate_random_txs(Wallet),
TXs = [TX || {TX, _} <- TXData],
B = ar_test_node:post_and_mine(#{ miner => peer1, await_on => peer1 }, TXs),
{B, TXData}
end,
lists:seq(1, 5)
),

%% Mine some empty blocks to push the data blocks out of the recent window.
{LastB, _} = lists:last(Data),
lists:foldl(
fun(_, Height) ->
ar_test_node:mine(peer1),
ar_test_node:assert_wait_until_height(peer1, Height + 1),
Height + 1
end,
LastB#block.height,
lists:seq(1, 11)
),

%% Now re-join main (node A) with header syncing disabled.
{ok, BaseConfig} = arweave_config:get_env(),
MainConfig = BaseConfig#config{
mine = false,
header_sync_jobs = 0
},
ar_test_node:join_on(#{ node => main, join_on => peer1, config => MainConfig }, true),
ar_test_node:connect_to_peer(peer1),
ar_test_node:wait_until_joined(main),
ar_test_node:assert_wait_until_height(main, LastB#block.height + 11),
ar_test_node:disconnect_from(peer1),

%% Verify POST /data_roots and POST /chunk for each transaction.
lists:foreach(
fun({B, TXData}) ->
BlockStart = B#block.weave_size - B#block.block_size,
case {B#block.block_size > 0, BlockStart >= ar_data_sync:get_disk_pool_threshold()} of
{true, true} ->
assert_no_data_roots(BlockStart),

%% Fetch data roots from peer1 and POST to main.
Peer1 = ar_test_node:peer_ip(peer1),
Main = ar_test_node:peer_ip(main),
Path = "/data_roots/" ++ integer_to_list(BlockStart),
{ok, {{<<"200">>, _}, _, Body, _, _}} = ar_http:req(#{ method => get, peer => Peer1, path => Path }),
{ok, {{<<"200">>, _}, _, <<>>, _, _}} = ar_http:req(#{ method => post, peer => Main, path => Path, body => Body }),

%% For each transaction with data, POST its chunks to main.
lists:foreach(
fun({TX, Chunks}) ->
case TX#tx.data_size > 0 of
true ->
Proofs = ar_test_data_sync:get_records_with_proofs(B, TX, Chunks),
lists:foreach(
fun({_, _, _, {_, Proof}}) ->
{ok, {{<<"200">>, _}, _, <<>>, _, _}} = ar_test_node:post_chunk(main, ar_serialize:jsonify(Proof))
end,
Proofs
);
false ->
ok
end
end,
TXData
);
_ ->
ok
end
end,
Data
),
ok.

generate_random_txs(Wallet) ->
Coin = rand:uniform(12),
case Coin of
Expand Down
Loading