From 0a0eb9985ee28a467d2515d7e5c15fc9769ba8a9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 19 Sep 2025 03:08:36 +0200 Subject: [PATCH 1/2] Add storage-agnostic range and streaming support --- src/hb_data_reader.erl | 282 +++++++++++++++++++++++++++++++++++ src/hb_http.erl | 39 +++++ src/hb_http_range.erl | 85 +++++++++++ src/hb_http_server.erl | 166 ++++++++++++++++++++- src/hb_opts.erl | 5 + test/hb_data_reader_test.erl | 11 ++ test/hb_http_range_test.erl | 23 +++ 7 files changed, 609 insertions(+), 2 deletions(-) create mode 100644 src/hb_data_reader.erl create mode 100644 src/hb_http_range.erl create mode 100644 test/hb_data_reader_test.erl create mode 100644 test/hb_http_range_test.erl diff --git a/src/hb_data_reader.erl b/src/hb_data_reader.erl new file mode 100644 index 000000000..4aa4a67eb --- /dev/null +++ b/src/hb_data_reader.erl @@ -0,0 +1,282 @@ +-module(hb_data_reader). +-export([metadata/2, read_range/4, fetch_full/3, stream/4, stream_from/5, next_chunk/5]). +-ifdef(TEST). +-export([compute_next_range/3]). +-endif. + +-define(DEFAULT_CHUNK_SIZE, 1024 * 1024). + +chunk_size(Opts) -> + case hb_opts:get(stream_chunk_size, ?DEFAULT_CHUNK_SIZE, Opts) of + Size when is_integer(Size), Size > 0 -> Size; + _ -> ?DEFAULT_CHUNK_SIZE + end. + +%% @doc Retrieve dataset metadata via HEAD and fall back to Range probing. +metadata(ID, Opts) when is_binary(ID), is_map(Opts) -> + case head_request(ID, Opts) of + {ok, Meta} -> {ok, Meta}; + {error, _} -> range_metadata(ID, Opts) + end. + +%% @doc Materialize the bytes defined by a Range header. +read_range(ID, RangeHeader, Meta = #{size := Total}, Opts) when is_binary(RangeHeader) -> + case hb_http_range:parse(RangeHeader, Total) of + {ok, {Start, End}} -> + case range_request(ID, Start, End, Meta, Opts) of + {ok, RangeInfo} -> + {ok, RangeInfo#{ content_type => maps:get(content_type, Meta) }}; + {error, Reason} -> {error, Reason} + end; + {error, {range_not_satisfiable, _}} -> + {error, {range_not_satisfiable, Total}}; + {error, invalid_range} -> + {error, {invalid_range, Total}} + end; +read_range(_, _, _, _) -> {error, invalid_arguments}. + +%% @doc Fetch the entire body without loading it all at once unless unavoidable. +fetch_full(_ID, #{size := 0, content_type := CType}, _Opts) -> + {ok, #{ data => <<>>, content_type => CType }}; +fetch_full(ID, Meta = #{size := Total}, Opts) when Total > 0 -> + End = Total - 1, + case range_request(ID, 0, End, Meta, Opts) of + {ok, #{body := Body}} -> + {ok, #{ data => Body, content_type => maps:get(content_type, Meta) }}; + {error, {range_not_supported, _}} -> + full_get(ID, Meta, Opts); + {error, Reason} -> {error, Reason} + end. + +%% @doc Stream from the beginning, chunk size governed by configuration. +stream(ID, Meta = #{size := Total}, ChunkFun, Opts) when is_function(ChunkFun, 2) -> + case Total of + 0 -> + ChunkFun(<<>>, true), + {ok, Meta}; + _ -> + ChunkSize = chunk_size(Opts), + stream_loop(ID, Meta, 0, ChunkSize, ChunkFun, Opts) + end. + +%% @doc Continue streaming starting from a specific byte offset. +stream_from(ID, Meta, Offset, ChunkFun, Opts) when is_function(ChunkFun, 2) -> + ChunkSize = chunk_size(Opts), + stream_loop(ID, Meta, Offset, ChunkSize, ChunkFun, Opts). + +%% @doc Retrieve the next chunk without mutating state; used for preflight checks. +next_chunk(_ID, _Meta = #{size := Total}, Offset, _ChunkSize, _Opts) when Offset >= Total -> + {error, done}; +next_chunk(ID, Meta = #{size := Total}, Offset, ChunkSize, Opts) -> + Normalized = case ChunkSize > 0 of true -> ChunkSize; false -> ?DEFAULT_CHUNK_SIZE end, + {Start, End, _} = compute_next_range(Offset, Total, Normalized), + range_request(ID, Start, End, Meta, Opts). + +%% Internal helpers -------------------------------------------------------- +head_request(ID, Opts) -> + Req = base_request(ID, <<"HEAD">>), + case hb_http:request(Req, Opts) of + {ok, Msg} -> + with_content_length(Msg, + fun(Size) -> + {ok, #{ + size => Size, + content_type => extract_content_type(Msg, Opts) + }} + end, + Opts + ); + {error, _} -> {error, head_failed} + end. + +range_metadata(ID, Opts) -> + case range_request(ID, 0, 0, #{}, Opts) of + {ok, #{total := Total, content_type := CType}} -> + {ok, #{ size => Total, content_type => CType }}; + {error, {range_not_satisfiable, Total}} -> + {ok, #{ size => Total, content_type => hb_opts:get(range_default_content_type, <<"application/octet-stream">>, Opts) }}; + {error, Reason} -> {error, Reason} + end. + +full_get(ID, Meta, Opts) -> + Req = base_request(ID, <<"GET">>), + case hb_http:request(Req, Opts) of + {ok, Msg} -> + Body = hb_ao:get(<<"body">>, Msg, <<>>, Opts), + {ok, #{ data => Body, content_type => maps:get(content_type, Meta) }}; + {error, Reason} -> {error, Reason} + end. + +stream_loop(ID, Meta = #{size := Total}, Offset, ChunkSize, ChunkFun, Opts) -> + case Offset >= Total of + true -> {ok, Meta}; + false -> + {Start, End, IsFinal} = compute_next_range(Offset, Total, ChunkSize), + case range_request(ID, Start, End, Meta, Opts) of + {ok, #{body := Chunk, range_end := RangeEnd}} -> + ChunkFun(Chunk, IsFinal), + case IsFinal of + true -> {ok, Meta}; + false -> stream_loop(ID, Meta, RangeEnd + 1, ChunkSize, ChunkFun, Opts) + end; + {error, Reason} -> {error, Reason} + end + end. + +range_request(ID, Start, End, Meta, Opts) when Start =< End -> + RangeValue = range_header(Start, End), + BaseReq = base_request(ID, <<"GET">>), + Req = BaseReq#{ <<"range">> => RangeValue }, + case hb_http:request(Req, Opts) of + {ok, Msg} -> + handle_range_response(Msg, Start, End, Meta, Opts); + {error, Msg} -> + Status = hb_ao:get(<<"status">>, Msg, 0, Opts), + case Status of + 416 -> + Total = range_total_from_header(hb_ao:get(<<"content-range">>, Msg, <<"bytes */0">>, Opts)), + {error, {range_not_satisfiable, Total}}; + _ -> {error, {http_error, Status}} + end + end; +range_request(_, _, _, _, _) -> {error, invalid_range_request}. + +handle_range_response(Msg, Start, End, Meta, Opts) -> + Status = hb_ao:get(<<"status">>, Msg, 200, Opts), + Body = hb_ao:get(<<"body">>, Msg, <<>>, Opts), + CType = extract_content_type(Msg, Opts), + case Status of + 206 -> + CR = hb_ao:get(<<"content-range">>, Msg, undefined, Opts), + case parse_content_range(CR) of + {ok, RangeStart, RangeEnd, Total} -> + ResolvedTotal = resolve_total(Total, Meta, Body, Start, RangeEnd), + {ok, #{ + body => Body, + start => RangeStart, + range_end => RangeEnd, + total => ResolvedTotal, + content_type => CType, + final => final_flag(RangeEnd, ResolvedTotal) + }}; + error -> + ActualEnd = Start + erlang:max(byte_size(Body) - 1, 0), + ResolvedTotal = resolve_total(undefined, Meta, Body, Start, ActualEnd), + {ok, #{ + body => Body, + start => Start, + range_end => ActualEnd, + total => ResolvedTotal, + content_type => CType, + final => final_flag(ActualEnd, ResolvedTotal) + }} + end; + 200 -> + case {Start, byte_size(Body)} of + {0, Size} when Size >= (End - Start + 1) -> + ActualEnd = Start + Size - 1, + ResolvedTotal = resolve_total(Size, Meta, Body, Start, ActualEnd), + {ok, #{ + body => Body, + start => Start, + range_end => ActualEnd, + total => ResolvedTotal, + content_type => CType, + final => final_flag(ActualEnd, ResolvedTotal) + }}; + _ -> + {error, {range_not_supported, Status}} + end; + _ -> + {error, {http_error, Status}} + end. + +resolve_total(undefined, Meta, Body, Start, RangeEnd) -> + case maps:get(size, Meta, undefined) of + undefined -> Start + byte_size(Body); + Size -> + case RangeEnd >= Size of + true -> Size; + false -> Size + end + end; +resolve_total(Value, _Meta, _Body, _Start, _RangeEnd) when is_integer(Value) -> Value. + +range_total_from_header(ContentRange) -> + case parse_content_range(ContentRange) of + {ok, _S, _E, Total} when is_integer(Total) -> Total; + {unsatisfied, Total} -> Total; + _ -> 0 + end. + +parse_content_range(undefined) -> error; +parse_content_range(<<>>) -> error; +parse_content_range(<<"bytes ", Rest/binary>>) -> + case binary:split(Rest, <<"/">>, []) of + [<<"*">>, TotalBin] -> + case safe_int(TotalBin) of + {ok, Total} -> {unsatisfied, Total}; + error -> error + end; + [RangePart, TotalBin] -> + case {safe_int(TotalBin), binary:split(RangePart, <<"-">>, [])} of + {{ok, Total}, [StartBin, EndBin]} -> + case {safe_int(StartBin), safe_int(EndBin)} of + {{ok, Start}, {ok, RangeEnd}} -> {ok, Start, RangeEnd, Total}; + _ -> error + end; + _ -> error + end; + _ -> error + end; +parse_content_range(_) -> error. + +range_header(Start, End) when Start =< End -> + iolist_to_binary([ + <<"bytes=" >>, + integer_to_binary(Start), + <<"-" >>, + integer_to_binary(End) + ]); +range_header(Start, End) -> + error({invalid_range_bounds, Start, End}). + +with_content_length(Msg, Fun, Opts) -> + case hb_ao:get(<<"content-length">>, Msg, undefined, Opts) of + undefined -> {error, missing_content_length}; + Bin -> + case safe_int(Bin) of + {ok, Size} -> Fun(Size); + error -> {error, invalid_content_length} + end + end. + +extract_content_type(Msg, Opts) -> + hb_ao:get( + <<"content-type">>, + Msg, + hb_opts:get(range_default_content_type, <<"application/octet-stream">>, Opts), + Opts + ). + +safe_int(Bin) when is_binary(Bin) -> + try {ok, binary_to_integer(Bin)} catch _:_ -> error end; +safe_int(Int) when is_integer(Int) -> {ok, Int}; +safe_int(_) -> error. + +base_request(ID, Method) -> + #{ + <<"multirequest-responses">> => 1, + <<"path">> => <<"/raw/", ID/binary>>, + <<"method">> => Method + }. + +compute_next_range(Offset, Total, ChunkSize) when Offset < Total, ChunkSize > 0 -> + Start = Offset, + End = erlang:min(Start + ChunkSize - 1, Total - 1), + {Start, End, End >= Total - 1}; +compute_next_range(_, Total, _) when Total =< 0 -> {0, -1, true}; +compute_next_range(_, _, _) -> {0, -1, true}. + +final_flag(_RangeEnd, undefined) -> false; +final_flag(RangeEnd, Total) when is_integer(Total) -> RangeEnd >= Total - 1. diff --git a/src/hb_http.erl b/src/hb_http.erl index 5b1f5e2b5..6a57c5293 100644 --- a/src/hb_http.erl +++ b/src/hb_http.erl @@ -8,6 +8,7 @@ -export([get/2, get/3, post/3, post/4, request/2, request/4, request/5]). -export([message_to_request/2, reply/4, accept_to_codec/2]). -export([req_to_tabm_singleton/3]). +-export([reply_streamed/5, reply_streamed/6, send_streamed_response/4]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -512,6 +513,44 @@ reply(InitReq, TABMReq, Status, RawMessage, Opts) -> ), {ok, PostStreamReq, no_state}. +%% @doc Reply with chunked transfer encoding for streaming large responses +reply_streamed(Req, TABMReq, Status, StreamFun, Opts) -> + reply_streamed(Req, TABMReq, Status, #{}, StreamFun, Opts). + +reply_streamed(Req, TABMReq, Status, ExtraHeaders, StreamFun, Opts) -> + {ok, HeadersBeforeCors, _} = encode_reply(Status, TABMReq, #{}, Opts), + + HeadersWithChunked = HeadersBeforeCors#{ + <<"transfer-encoding">> => <<"chunked">> + }, + + HeadersWithExtras = maps:merge(HeadersWithChunked, ExtraHeaders), + + ReqHdr = cowboy_req:header(<<"access-control-request-headers">>, Req, <<"">>), + HeadersWithCors = add_cors_headers(HeadersWithExtras, ReqHdr, Opts), + EncodedHeaders = hb_private:reset(HeadersWithCors), + + StreamReq = cowboy_req:stream_reply(Status, EncodedHeaders, Req), + + try + StreamFun(StreamReq), + cowboy_req:stream_body(<<>>, fin, StreamReq) + catch + _:_ -> + % On error, try to finish the stream gracefully + cowboy_req:stream_body(<<>>, fin, StreamReq) + end, + + {ok, StreamReq, no_state}. + +%% @doc Send a streamed chunk response to the client +send_streamed_response(StreamReq, Chunk, IsFinal, _Opts) when is_binary(Chunk) -> + Flag = case IsFinal of + true -> fin; + false -> nofin + end, + cowboy_req:stream_body(Chunk, Flag, StreamReq). + %% @doc Handle replying with cookies if the message contains them. Returns the %% new Cowboy `Req` object, and the message with the cookies removed. Both %% `set-cookie' and `cookie' fields are treated as viable sources of cookies. diff --git a/src/hb_http_range.erl b/src/hb_http_range.erl new file mode 100644 index 000000000..dc9542e1b --- /dev/null +++ b/src/hb_http_range.erl @@ -0,0 +1,85 @@ +-module(hb_http_range). +-export([parse/2, build_content_range/3, build_unsatisfied_content_range/1]). + +%% @doc Parse a single HTTP Range header for a resource of TotalSize bytes. +%% Supports: +%% - bytes=Start-End +%% - bytes=Start- +%% - bytes=-Suffix +%% Returns {ok, {Start, End}} or {error, invalid_range} or {error, {range_not_satisfiable, Total}}. +parse(RangeBin, TotalSize) when is_binary(RangeBin), is_integer(TotalSize), TotalSize >= 0 -> + case TotalSize of + 0 -> {error, {range_not_satisfiable, 0}}; + _ -> do_parse(normalize(RangeBin), TotalSize) + end; +parse(_, TotalSize) -> {error, {range_not_satisfiable, max(0, TotalSize)}}. + +do_parse(<<"bytes=", Spec/binary>>, Total) -> + case has_comma(Spec) of + true -> {error, invalid_range}; % multi-range not supported + false -> parse_single(Spec, Total) + end; +do_parse(_, _Total) -> {error, invalid_range}. + +parse_single(Spec, Total) -> + Clean = strip_ws(Spec), + case Clean of + <<"-", SuffixBin/binary>> -> + case safe_int(SuffixBin) of + {ok, 0} -> {error, {range_not_satisfiable, Total}}; + {ok, N} when N > 0 -> + case N >= Total of + true -> {ok, {0, Total - 1}}; + false -> {ok, {Total - N, Total - 1}} + end; + error -> {error, invalid_range} + end; + _ -> + case binary:split(Clean, <<"-">>, [global]) of + [StartBin, <<>>] -> + case safe_int(StartBin) of + {ok, Start} when Start < Total -> {ok, {Start, Total - 1}}; + {ok, _} -> {error, {range_not_satisfiable, Total}}; + error -> {error, invalid_range} + end; + [StartBin, EndBin] -> + case {safe_int(StartBin), safe_int(EndBin)} of + {{ok, Start}, {ok, End}} when Start =< End -> + case Start < Total of + true -> {ok, {Start, min(End, Total - 1)}}; + false -> {error, {range_not_satisfiable, Total}} + end; + _ -> {error, invalid_range} + end; + _ -> {error, invalid_range} + end + end. + +has_comma(Bin) -> binary:match(Bin, <<",">>) =/= nomatch. + +strip_ws(Bin) -> binary:replace(Bin, <<" ">>, <<>>, [global]). + +normalize(Bin) -> + %% Lower-case the unit portion only; keep numbers as-is + Stripped = binary:replace(Bin, <<" ">>, <<>>, [global]), + case Stripped of + <<"bytes=", _/binary>> -> Stripped; + _ -> + Lower = to_lower(Stripped), + Lower + end. + +to_lower(<<>>) -> <<>>; +to_lower(<>) when C >= $A, C =< $Z -> <<(C + 32), (to_lower(Rest))/binary>>; +to_lower(<>) -> <>. + +safe_int(<<>>) -> error; +safe_int(B) -> + try {ok, binary_to_integer(B)} + catch _:_ -> error end. + +build_content_range(Start, End, Total) when is_integer(Start), is_integer(End), is_integer(Total) -> + <<"bytes ", (integer_to_binary(Start))/binary, "-", (integer_to_binary(End))/binary, "/", (integer_to_binary(Total))/binary>>. + +build_unsatisfied_content_range(Total) when is_integer(Total) -> + <<"bytes */", (integer_to_binary(Total))/binary>>. diff --git a/src/hb_http_server.erl b/src/hb_http_server.erl index 9a6c0453d..85050cbe9 100644 --- a/src/hb_http_server.erl +++ b/src/hb_http_server.erl @@ -374,7 +374,13 @@ handle_request(RawReq, Body, ServerID) -> _ -> % The request is of normal AO-Core form, so we parse it and invoke % the meta@1.0 device to handle it. - ?event(http, + Path = cowboy_req:path(RawReq), + Method = cowboy_req:method(RawReq), + case maybe_handle_data_request(Method, Path, RawReq, NodeMsg) of + {ok, _, _} = Reply -> + Reply; + pass -> + ?event(http, { http_inbound, {cowboy_req, {explicit, Req}, {body, {string, Body}}} @@ -411,7 +417,7 @@ handle_request(RawReq, Body, ServerID) -> }, ReqSingleton ), - hb_http:reply(Req, ReqSingleton, Res, NodeMsg) + hb_http:reply(Req, ReqSingleton, Res, NodeMsg) catch Type:Details:Stacktrace -> handle_error( @@ -423,8 +429,164 @@ handle_request(RawReq, Body, ServerID) -> NodeMsg ) end + end + end. + +maybe_handle_data_request(Method, Path, RawReq, NodeMsg) -> + case {hb_opts:get(range_requests_enabled, true, NodeMsg), Method, is_data_request(Path)} of + {true, <<"GET">>, {true, DataID}} -> + Headers = cowboy_req:headers(RawReq), + RangeHeader = case maps:get(<<"range">>, Headers, undefined) of + undefined -> maps:get(<<"Range">>, Headers, undefined); + V -> V + end, + handle_data_request(DataID, RangeHeader, RawReq, Path, NodeMsg); + _ -> pass end. +handle_data_request(DataID, RangeHeader, RawReq, Path, NodeMsg) -> + case hb_data_reader:metadata(DataID, NodeMsg) of + {ok, Meta} when is_binary(RangeHeader) -> + handle_range_request(DataID, RangeHeader, Meta, RawReq, NodeMsg); + {ok, Meta} -> + handle_full_request(DataID, Meta, RawReq, Path, NodeMsg); + {error, {http_error, 404}} -> + {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + {error, Reason} -> + ?event(error, {metadata_fetch_failed, {id, DataID}, {reason, Reason}}), + {ok, cowboy_req:reply(500, error_headers(), <<"Range metadata error">>, RawReq), no_state} + end. + +handle_range_request(DataID, RangeBin, Meta, RawReq, NodeMsg) -> + case hb_data_reader:read_range(DataID, RangeBin, Meta, NodeMsg) of + {ok, #{body := Partial, start := Start, range_end := End, total := Total} = RangeInfo} -> + CType = maps:get(content_type, RangeInfo, maps:get(content_type, Meta)), + ContentRange = hb_http_range:build_content_range(Start, End, Total), + Headers = + maps:merge( + cors_headers(), + #{ + <<"accept-ranges">> => <<"bytes">>, + <<"content-range">> => ContentRange, + <<"content-length">> => integer_to_binary((End - Start) + 1), + <<"content-type">> => CType + } + ), + {ok, cowboy_req:reply(206, Headers, Partial, RawReq), no_state}; + {error, {range_not_satisfiable, Total}} -> + Unsat = hb_http_range:build_unsatisfied_content_range(Total), + Headers = maps:merge(cors_headers(), #{ <<"content-range">> => Unsat }), + {ok, cowboy_req:reply(416, Headers, <<>>, RawReq), no_state}; + {error, {invalid_range, Total}} -> + Headers = maps:merge(cors_headers(), #{ <<"content-range">> => hb_http_range:build_unsatisfied_content_range(Total) }), + {ok, cowboy_req:reply(416, Headers, <<>>, RawReq), no_state}; + {error, {http_error, 404}} -> + {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + {error, Reason} -> + ?event(error, {range_request_failed, {id, DataID}, {reason, Reason}}), + {ok, cowboy_req:reply(500, error_headers(), <<"Range request failed">>, RawReq), no_state} + end. + +handle_full_request(DataID, Meta = #{size := Total}, RawReq, Path, NodeMsg) -> + CType = maps:get(content_type, Meta), + Threshold = hb_opts:get(streaming_threshold, default_streaming_threshold(), NodeMsg), + case Total =< Threshold of + true -> + case hb_data_reader:fetch_full(DataID, Meta, NodeMsg) of + {ok, #{data := Data}} -> + Headers = + maps:merge( + cors_headers(), + #{ + <<"accept-ranges">> => <<"bytes">>, + <<"content-type">> => CType, + <<"content-length">> => integer_to_binary(Total) + } + ), + {ok, cowboy_req:reply(200, Headers, Data, RawReq), no_state}; + {error, {http_error, 404}} -> + {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + {error, Reason} -> + ?event(error, {full_data_fetch_failed, {id, DataID}, {reason, Reason}}), + {ok, cowboy_req:reply(500, error_headers(), <<"Streaming request failed">>, RawReq), no_state} + end; + false -> + stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg) + end. + +stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg) -> + ChunkSize = hb_data_reader:chunk_size(NodeMsg), + case hb_data_reader:next_chunk(DataID, Meta, 0, ChunkSize, NodeMsg) of + {ok, FirstChunk} -> + ExtraHeaders = #{ + <<"accept-ranges">> => <<"bytes">>, + <<"content-type">> => CType + }, + StreamFun = fun(StreamReq) -> + Body0 = maps:get(body, FirstChunk), + Final0 = maps:get(final, FirstChunk, false), + hb_http:send_streamed_response(StreamReq, Body0, Final0, NodeMsg), + case Final0 of + true -> ok; + false -> + NextOffset = maps:get(range_end, FirstChunk) + 1, + case hb_data_reader:stream_from( + DataID, + Meta, + NextOffset, + fun(Chunk, IsFinal) -> + hb_http:send_streamed_response(StreamReq, Chunk, IsFinal, NodeMsg) + end, + NodeMsg + ) of + {ok, _} -> ok; + {error, Reason} -> + ?event(error, + {range_stream_failure, + {id, DataID}, + {offset, NextOffset}, + {reason, Reason} + } + ), + cowboy_req:stream_trailers( + StreamReq, + #{ <<"x-range-error">> => hb_util:bin(Reason) } + ) + end + end + end, + hb_http:reply_streamed(RawReq, #{ <<"path">> => Path }, 200, ExtraHeaders, StreamFun, NodeMsg); + {error, {http_error, 404}} -> + {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + {error, done} -> + {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + {error, Reason} -> + ?event(error, {prefetch_failed, {id, DataID}, {reason, Reason}}), + {ok, cowboy_req:reply(500, error_headers(), <<"Streaming request failed">>, RawReq), no_state} + end. + +%% @doc Detect if the request path is of the form /{ID}/data or /tx/{ID}/data +is_data_request(Path) when is_binary(Path) -> + Parts = [P || P <- binary:split(Path, <<"/">>, [global]), P =/= <<>>], + case Parts of + [ID, <<"data">>] -> {true, ID}; + [<<"tx">>, ID, <<"data">>] -> {true, ID}; + _ -> false + end. + +cors_headers() -> + #{ + <<"access-control-allow-origin">> => <<"*">>, + <<"access-control-allow-headers">> => <<"*">>, + <<"access-control-allow-methods">> => <<"GET, POST, PUT, DELETE, OPTIONS">> + }. + +error_headers() -> + maps:merge(cors_headers(), #{ <<"content-type">> => <<"text/plain">> }). + +default_streaming_threshold() -> + 10 * 1024 * 1024. + %% @doc Return a 500 error response to the client. handle_error(Req, Singleton, Type, Details, Stacktrace, NodeMsg) -> DetailsStr = hb_util:bin(hb_format:message(Details, NodeMsg, 1)), diff --git a/src/hb_opts.erl b/src/hb_opts.erl index fca045127..7308803dd 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -119,6 +119,11 @@ default_message() -> %% execute more messages on a process after it has returned a result. %% Options: aggressive, lazy compute_mode => lazy, + streaming_threshold => 10485760, + stream_chunk_size => 1048576, + range_default_content_type => <<"application/octet-stream">>, + %% Enable HTTP Range support for /{id}/data intercepts + range_requests_enabled => true, %% Choice of remote nodes for tasks that are not local to hyperbeam. gateway => <<"https://arweave.net">>, bundler_ans104 => <<"https://up.arweave.net:443">>, diff --git a/test/hb_data_reader_test.erl b/test/hb_data_reader_test.erl new file mode 100644 index 000000000..9d72df131 --- /dev/null +++ b/test/hb_data_reader_test.erl @@ -0,0 +1,11 @@ +-module(hb_data_reader_test). +-include_lib("eunit/include/eunit.hrl"). + +compute_next_range_middle_test() -> + ?assertEqual({0, 2, false}, hb_data_reader:compute_next_range(0, 10, 3)). + +compute_next_range_final_test() -> + ?assertEqual({9, 9, true}, hb_data_reader:compute_next_range(9, 10, 4)). + +compute_next_range_zero_size_test() -> + ?assertEqual({0, -1, true}, hb_data_reader:compute_next_range(0, 0, 4)). diff --git a/test/hb_http_range_test.erl b/test/hb_http_range_test.erl new file mode 100644 index 000000000..9e11e75fe --- /dev/null +++ b/test/hb_http_range_test.erl @@ -0,0 +1,23 @@ +-module(hb_http_range_test). +-include_lib("eunit/include/eunit.hrl"). + +abs_range_test() -> + ?assertEqual({ok, {0, 99}}, hb_http_range:parse(<<"bytes=0-99">>, 200)). + +open_end_range_test() -> + ?assertEqual({ok, {10, 199}}, hb_http_range:parse(<<"bytes=10-">>, 200)). + +suffix_range_test() -> + ?assertEqual({ok, {150, 199}}, hb_http_range:parse(<<"bytes=-50">>, 200)). + +invalid_range_format_test() -> + ?assertEqual({error, invalid_range}, hb_http_range:parse(<<"bytes=foo-bar">>, 200)). + +unsatisfiable_range_test() -> + ?assertEqual({error, {range_not_satisfiable, 100}}, hb_http_range:parse(<<"bytes=150-199">>, 100)). + +build_content_range_test() -> + ?assertEqual(<<"bytes 0-9/100">>, hb_http_range:build_content_range(0, 9, 100)). + +build_unsatisfied_content_range_test() -> + ?assertEqual(<<"bytes */100">>, hb_http_range:build_unsatisfied_content_range(100)). From 16248cc8abfa0eb06020ad4e2af62e74469b4f75 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Sat, 20 Sep 2025 05:17:07 +0200 Subject: [PATCH 2/2] Add support for signed streaming responses Implement signed streaming responses using HTTPSig for large data and range requests. --- src/dev_codec_httpsig.erl | 2 +- src/hb_data_reader.erl | 2 +- src/hb_http.erl | 30 +++++- src/hb_http_server.erl | 152 ++++++++++++++++++++---------- test/httpsig_stream_sign_test.erl | 56 +++++++++++ 5 files changed, 191 insertions(+), 51 deletions(-) create mode 100644 test/httpsig_stream_sign_test.erl diff --git a/src/dev_codec_httpsig.erl b/src/dev_codec_httpsig.erl index c104ec2ac..bdb14de7c 100644 --- a/src/dev_codec_httpsig.erl +++ b/src/dev_codec_httpsig.erl @@ -45,7 +45,7 @@ serialize(Msg, #{ <<"format">> := <<"components">> }, Opts) -> % Convert to HTTPSig via TABM through calling `hb_message:convert` rather % than executing `to/3` directly. This ensures that our responses are % normalized. - {ok, EncMsg} = hb_message:convert(Msg, <<"httpsig@1.0">>, Opts), + EncMsg = hb_message:convert(Msg, <<"httpsig@1.0">>, Opts), {ok, #{ <<"body">> => hb_maps:get(<<"body">>, EncMsg, <<>>), diff --git a/src/hb_data_reader.erl b/src/hb_data_reader.erl index 4aa4a67eb..05c5ecb7e 100644 --- a/src/hb_data_reader.erl +++ b/src/hb_data_reader.erl @@ -1,5 +1,5 @@ -module(hb_data_reader). --export([metadata/2, read_range/4, fetch_full/3, stream/4, stream_from/5, next_chunk/5]). +-export([metadata/2, read_range/4, fetch_full/3, stream/4, stream_from/5, next_chunk/5, chunk_size/1]). -ifdef(TEST). -export([compute_next_range/3]). -endif. diff --git a/src/hb_http.erl b/src/hb_http.erl index 6a57c5293..97a40e52d 100644 --- a/src/hb_http.erl +++ b/src/hb_http.erl @@ -8,7 +8,7 @@ -export([get/2, get/3, post/3, post/4, request/2, request/4, request/5]). -export([message_to_request/2, reply/4, accept_to_codec/2]). -export([req_to_tabm_singleton/3]). --export([reply_streamed/5, reply_streamed/6, send_streamed_response/4]). +-export([reply_streamed/5, reply_streamed/6, reply_streamed_signed/7, send_streamed_response/4]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -543,6 +543,34 @@ reply_streamed(Req, TABMReq, Status, ExtraHeaders, StreamFun, Opts) -> {ok, StreamReq, no_state}. +%% @doc Reply with chunked transfer encoding for streaming large responses, +%% using the provided Message to generate headers (e.g., for signatures). +reply_streamed_signed(Req, TABMReq, Status, Message, ExtraHeaders, StreamFun, Opts) -> + {ok, HeadersBeforeCors, _} = encode_reply(Status, TABMReq, Message, Opts), + + HeadersWithChunked = HeadersBeforeCors#{ + <<"transfer-encoding">> => <<"chunked">> + }, + + HeadersWithExtras = maps:merge(HeadersWithChunked, ExtraHeaders), + + ReqHdr = cowboy_req:header(<<"access-control-request-headers">>, Req, <<"">>), + HeadersWithCors = add_cors_headers(HeadersWithExtras, ReqHdr, Opts), + EncodedHeaders = hb_private:reset(HeadersWithCors), + + StreamReq = cowboy_req:stream_reply(Status, EncodedHeaders, Req), + + try + StreamFun(StreamReq), + cowboy_req:stream_body(<<>>, fin, StreamReq) + catch + _:_ -> + % On error, try to finish the stream gracefully + cowboy_req:stream_body(<<>>, fin, StreamReq) + end, + + {ok, StreamReq, no_state}. + %% @doc Send a streamed chunk response to the client send_streamed_response(StreamReq, Chunk, IsFinal, _Opts) when is_binary(Chunk) -> Flag = case IsFinal of diff --git a/src/hb_http_server.erl b/src/hb_http_server.erl index 85050cbe9..b2da2fb46 100644 --- a/src/hb_http_server.erl +++ b/src/hb_http_server.erl @@ -376,7 +376,7 @@ handle_request(RawReq, Body, ServerID) -> % the meta@1.0 device to handle it. Path = cowboy_req:path(RawReq), Method = cowboy_req:method(RawReq), - case maybe_handle_data_request(Method, Path, RawReq, NodeMsg) of + case maybe_handle_data_request(Method, Path, Req, NodeMsg) of {ok, _, _} = Reply -> Reply; pass -> @@ -458,70 +458,130 @@ handle_data_request(DataID, RangeHeader, RawReq, Path, NodeMsg) -> end. handle_range_request(DataID, RangeBin, Meta, RawReq, NodeMsg) -> - case hb_data_reader:read_range(DataID, RangeBin, Meta, NodeMsg) of - {ok, #{body := Partial, start := Start, range_end := End, total := Total} = RangeInfo} -> - CType = maps:get(content_type, RangeInfo, maps:get(content_type, Meta)), - ContentRange = hb_http_range:build_content_range(Start, End, Total), - Headers = - maps:merge( - cors_headers(), - #{ - <<"accept-ranges">> => <<"bytes">>, - <<"content-range">> => ContentRange, - <<"content-length">> => integer_to_binary((End - Start) + 1), - <<"content-type">> => CType - } - ), - {ok, cowboy_req:reply(206, Headers, Partial, RawReq), no_state}; - {error, {range_not_satisfiable, Total}} -> + Total = maps:get(size, Meta, 0), + case hb_http_range:parse(RangeBin, Total) of + {ok, {Start, End}} -> + CType = maps:get(content_type, Meta), + Threshold = hb_opts:get(streaming_threshold, default_streaming_threshold(), NodeMsg), + RangeLen = (End - Start) + 1, + case RangeLen > Threshold of + true -> + stream_range_request(DataID, Meta, CType, Start, End, RawReq, NodeMsg); + false -> + case hb_data_reader:read_range(DataID, RangeBin, Meta, NodeMsg) of + {ok, #{body := Partial, start := RespStart, range_end := RespEnd, total := RespTotal} = RangeInfo} -> + RespContentType = maps:get(content_type, RangeInfo, CType), + ContentRange = hb_http_range:build_content_range(RespStart, RespEnd, RespTotal), + Msg = #{ + <<"status">> => 206, + <<"ao-result">> => <<"body">>, + <<"body">> => Partial, + <<"content-type">> => RespContentType, + <<"content-range">> => ContentRange, + <<"accept-ranges">> => <<"bytes">> + }, + Signed = hb_message:commit(Msg, NodeMsg), + hb_http:reply(RawReq, #{ <<"path">> => cowboy_req:path(RawReq) }, Signed, NodeMsg); + {error, Reason} -> + ?event(error, {range_request_failed, {id, DataID}, {reason, Reason}}), + hb_http:reply(RawReq, #{ <<"path">> => cowboy_req:path(RawReq) }, #{ <<"status">> => 500, <<"body">> => <<"Range request failed">> }, NodeMsg) + end + end; + {error, {range_not_satisfiable, _}} -> Unsat = hb_http_range:build_unsatisfied_content_range(Total), - Headers = maps:merge(cors_headers(), #{ <<"content-range">> => Unsat }), - {ok, cowboy_req:reply(416, Headers, <<>>, RawReq), no_state}; - {error, {invalid_range, Total}} -> - Headers = maps:merge(cors_headers(), #{ <<"content-range">> => hb_http_range:build_unsatisfied_content_range(Total) }), - {ok, cowboy_req:reply(416, Headers, <<>>, RawReq), no_state}; - {error, {http_error, 404}} -> - {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; - {error, Reason} -> - ?event(error, {range_request_failed, {id, DataID}, {reason, Reason}}), - {ok, cowboy_req:reply(500, error_headers(), <<"Range request failed">>, RawReq), no_state} + Msg = #{ <<"status">> => 416, <<"content-range">> => Unsat }, + hb_http:reply(RawReq, #{ <<"path">> => cowboy_req:path(RawReq) }, Msg, NodeMsg); + {error, invalid_range} -> + Msg = #{ <<"status">> => 416, <<"content-range">> => hb_http_range:build_unsatisfied_content_range(Total) }, + hb_http:reply(RawReq, #{ <<"path">> => cowboy_req:path(RawReq) }, Msg, NodeMsg) end. +stream_range_request(DataID, Meta, CType, Start, End, RawReq, NodeMsg) -> + ContentRange = hb_http_range:build_content_range(Start, End, maps:get(size, Meta, 0)), + HeaderMsg = #{ + <<"status">> => 206, + <<"ao-result">> => <<"body">>, + <<"content-type">> => CType, + <<"content-range">> => ContentRange, + <<"accept-ranges">> => <<"bytes">> + }, + SignedHeaderMsg = hb_message:commit(HeaderMsg, NodeMsg, #{ + <<"commitment-device">> => <<"httpsig@1.0">>, + <<"committed">> => [<<"ao-result">>, <<"status">>] + }), + ExtraHeaders = #{ + <<"accept-ranges">> => <<"bytes">>, + <<"content-type">> => CType, + <<"content-range">> => ContentRange + }, + ChunkSize0 = hb_data_reader:chunk_size(NodeMsg), + StreamFun = fun(StreamReq) -> + stream_range_loop(StreamReq, DataID, Meta, Start, End, ChunkSize0, NodeMsg) + end, + hb_http:reply_streamed_signed(RawReq, #{ <<"path">> => cowboy_req:path(RawReq) }, 206, SignedHeaderMsg, ExtraHeaders, StreamFun, NodeMsg). + +stream_range_loop(StreamReq, DataID, Meta, Offset, End, ChunkSize, NodeMsg) when Offset =< End -> + MaxSize = End - Offset + 1, + ThisSize = case ChunkSize =< MaxSize of true -> ChunkSize; false -> MaxSize end, + case hb_data_reader:next_chunk(DataID, Meta, Offset, ThisSize, NodeMsg) of + {ok, #{ body := Body, range_end := RangeEnd }} -> + IsFinal = RangeEnd >= End, + hb_http:send_streamed_response(StreamReq, Body, IsFinal, NodeMsg), + case IsFinal of + true -> ok; + false -> stream_range_loop(StreamReq, DataID, Meta, RangeEnd + 1, End, ChunkSize, NodeMsg) + end; + {error, _} -> ok + end; +stream_range_loop(_StreamReq, _DataID, _Meta, _Offset, _End, _ChunkSize, _NodeMsg) -> ok. + handle_full_request(DataID, Meta = #{size := Total}, RawReq, Path, NodeMsg) -> CType = maps:get(content_type, Meta), Threshold = hb_opts:get(streaming_threshold, default_streaming_threshold(), NodeMsg), - case Total =< Threshold of + StreamSig = hb_util:atom(hb_opts:get(stream_signature, off, NodeMsg)), + ShouldStream = (Total > Threshold) andalso (StreamSig =/= off), + case ShouldStream of true -> + stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg); + false -> case hb_data_reader:fetch_full(DataID, Meta, NodeMsg) of {ok, #{data := Data}} -> - Headers = - maps:merge( - cors_headers(), - #{ - <<"accept-ranges">> => <<"bytes">>, - <<"content-type">> => CType, - <<"content-length">> => integer_to_binary(Total) - } - ), - {ok, cowboy_req:reply(200, Headers, Data, RawReq), no_state}; + Msg = #{ + <<"status">> => 200, + <<"ao-result">> => <<"body">>, + <<"body">> => Data, + <<"content-type">> => CType, + <<"accept-ranges">> => <<"bytes">> + }, + Signed = hb_message:commit(Msg, NodeMsg), + hb_http:reply(RawReq, #{ <<"path">> => Path }, Signed, NodeMsg); {error, {http_error, 404}} -> - {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; + hb_http:reply(RawReq, #{ <<"path">> => Path }, #{ <<"status">> => 404 }, NodeMsg); {error, Reason} -> ?event(error, {full_data_fetch_failed, {id, DataID}, {reason, Reason}}), - {ok, cowboy_req:reply(500, error_headers(), <<"Streaming request failed">>, RawReq), no_state} - end; - false -> - stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg) + hb_http:reply(RawReq, #{ <<"path">> => Path }, #{ <<"status">> => 500, <<"body">> => <<"Streaming request failed">> }, NodeMsg) + end end. stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg) -> ChunkSize = hb_data_reader:chunk_size(NodeMsg), case hb_data_reader:next_chunk(DataID, Meta, 0, ChunkSize, NodeMsg) of {ok, FirstChunk} -> + % Sign only header fields (no content-digest). Always use header mode for streaming. ExtraHeaders = #{ <<"accept-ranges">> => <<"bytes">>, <<"content-type">> => CType }, + HeaderMsg = #{ + <<"status">> => 200, + <<"ao-result">> => <<"body">>, + <<"content-type">> => CType + }, + CommitSpec = #{ + <<"commitment-device">> => <<"httpsig@1.0">>, + <<"committed">> => [<<"ao-result">>, <<"status">>] + }, + SignedHeaderMsg = hb_message:commit(HeaderMsg, NodeMsg, CommitSpec), StreamFun = fun(StreamReq) -> Body0 = maps:get(body, FirstChunk), Final0 = maps:get(final, FirstChunk, false), @@ -547,15 +607,11 @@ stream_large_data(DataID, Meta, CType, RawReq, Path, NodeMsg) -> {offset, NextOffset}, {reason, Reason} } - ), - cowboy_req:stream_trailers( - StreamReq, - #{ <<"x-range-error">> => hb_util:bin(Reason) } ) end end end, - hb_http:reply_streamed(RawReq, #{ <<"path">> => Path }, 200, ExtraHeaders, StreamFun, NodeMsg); + hb_http:reply_streamed_signed(RawReq, #{ <<"path">> => Path }, 200, SignedHeaderMsg, ExtraHeaders, StreamFun, NodeMsg); {error, {http_error, 404}} -> {ok, cowboy_req:reply(404, cors_headers(), <<>>, RawReq), no_state}; {error, done} -> diff --git a/test/httpsig_stream_sign_test.erl b/test/httpsig_stream_sign_test.erl new file mode 100644 index 000000000..def9a83aa --- /dev/null +++ b/test/httpsig_stream_sign_test.erl @@ -0,0 +1,56 @@ +-module(httpsig_stream_sign_test). +-include_lib("eunit/include/eunit.hrl"). + +header_only_stream_signature_test() -> + %% Simulate header-mode streaming: sign metadata only (no content-digest/body) + Msg = #{ + <<"status">> => 200, + <<"ao-result">> => <<"body">>, + <<"content-type">> => <<"video/mp4">> + }, + CommitSpec = #{ + <<"commitment-device">> => <<"httpsig@1.0">>, + <<"committed">> => [<<"ao-result">>, <<"status">>] + }, + Signed = hb_message:commit(Msg, #{ priv_wallet => hb:wallet() }, CommitSpec), + {ok, #{ <<"headers">> := Headers }} = + dev_codec_httpsig:serialize(Signed, #{ <<"format">> => <<"components">> }, #{}), + ?assert(maps:is_key(<<"signature">>, Headers)), + ?assert(maps:is_key(<<"signature-input">>, Headers)), + %% In header-only mode there is no digest + ?assertNot(maps:is_key(<<"content-digest">>, Headers)). + +%% Trailer-mode streaming removed: no trailer signature tests remain. + +full_body_signature_has_digest_test() -> + %% Full GET (no range): body present, encoder moves digest into headers + Msg = #{ + <<"status">> => 200, + <<"ao-result">> => <<"body">>, + <<"body">> => <<"abc">>, + <<"content-type">> => <<"application/octet-stream">> + }, + Signed = hb_message:commit(Msg, #{ priv_wallet => hb:wallet() }, <<"httpsig@1.0">>), + {ok, #{ <<"headers">> := Headers, <<"body">> := _Body }} = + dev_codec_httpsig:serialize(Signed, #{ <<"format">> => <<"components">> }, #{}), + ?assert(maps:is_key(<<"signature">>, Headers)), + ?assert(maps:is_key(<<"signature-input">>, Headers)), + ?assert(maps:is_key(<<"content-digest">>, Headers)). + +range_partial_signature_test() -> + %% Range 206: partial body signed (digest must be present) + Partial = <<"1234567890">>, + Msg = #{ + <<"status">> => 206, + <<"ao-result">> => <<"body">>, + <<"body">> => Partial, + <<"content-type">> => <<"application/octet-stream">>, + <<"content-range">> => <<"bytes 0-9/10">> + }, + Signed = hb_message:commit(Msg, #{ priv_wallet => hb:wallet() }, <<"httpsig@1.0">>), + {ok, #{ <<"headers">> := Headers }} = + dev_codec_httpsig:serialize(Signed, #{ <<"format">> => <<"components">> }, #{}), + ?assertEqual(<<"bytes 0-9/10">>, maps:get(<<"content-range">>, Headers)), + ?assert(maps:is_key(<<"signature">>, Headers)), + ?assert(maps:is_key(<<"signature-input">>, Headers)), + ?assert(maps:is_key(<<"content-digest">>, Headers)).