Skip to content

Commit aceed40

Browse files
committed
Request/Response streaming for Finch adapter
1 parent 308a676 commit aceed40

File tree

5 files changed

+92
-11
lines changed

5 files changed

+92
-11
lines changed

Diff for: lib/tesla/adapter/finch.ex

+65-8
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,20 @@ if Code.ensure_loaded?(Finch) do
5252
@behaviour Tesla.Adapter
5353
alias Tesla.Multipart
5454

55+
@defaults [
56+
receive_timeout: 15_000
57+
]
58+
5559
@impl Tesla.Adapter
5660
def call(%Tesla.Env{} = env, opts) do
57-
opts = Tesla.Adapter.opts(env, opts)
61+
opts = Tesla.Adapter.opts(@defaults, env, opts)
5862

5963
name = Keyword.fetch!(opts, :name)
6064
url = Tesla.build_url(env.url, env.query)
6165
req_opts = Keyword.take(opts, [:pool_timeout, :receive_timeout])
66+
req = build(env.method, url, env.headers, env.body)
6267

63-
case request(name, env.method, url, env.headers, env.body, req_opts) do
68+
case request(req, name, req_opts, opts) do
6469
{:ok, %Finch.Response{status: status, headers: headers, body: body}} ->
6570
{:ok, %Tesla.Env{env | status: status, headers: headers, body: body}}
6671

@@ -69,20 +74,72 @@ if Code.ensure_loaded?(Finch) do
6974
end
7075
end
7176

72-
defp request(name, method, url, headers, %Multipart{} = mp, opts) do
77+
defp build(method, url, headers, %Multipart{} = mp) do
7378
headers = headers ++ Multipart.headers(mp)
7479
body = Multipart.body(mp) |> Enum.to_list()
7580

76-
request(name, method, url, headers, body, opts)
81+
build(method, url, headers, body)
82+
end
83+
84+
defp build(method, url, headers, %Stream{} = body_stream) do
85+
build(method, url, headers, {:stream, body_stream})
7786
end
7887

79-
defp request(_name, _method, _url, _headers, %Stream{}, _opts) do
80-
raise "Streaming is not supported by this adapter!"
88+
defp build(method, url, headers, body_stream_fun) when is_function(body_stream_fun) do
89+
build(method, url, headers, {:stream, body_stream_fun})
8190
end
8291

83-
defp request(name, method, url, headers, body, opts) do
92+
defp build(method, url, headers, body) do
8493
Finch.build(method, url, headers, body)
85-
|> Finch.request(name, opts)
94+
end
95+
96+
defp request(req, name, req_opts, opts) do
97+
case opts[:response] do
98+
:stream -> stream(req, name, req_opts)
99+
nil -> Finch.request(req, name, req_opts)
100+
other -> raise "Unknown response option: #{inspect(other)}"
101+
end
102+
end
103+
104+
defp stream(req, name, opts) do
105+
owner = self()
106+
ref = make_ref()
107+
108+
fun = fn
109+
{:status, status}, _acc -> status
110+
{:headers, headers}, status -> send(owner, {ref, {:status, status, headers}})
111+
{:data, data}, _acc -> send(owner, {ref, {:data, data}})
112+
end
113+
114+
task =
115+
Task.async(fn ->
116+
case Finch.stream(req, name, nil, fun, opts) do
117+
{:ok, _acc} -> send(owner, {ref, :eof})
118+
{:error, error} -> send(owner, {ref, {:error, error}})
119+
end
120+
end)
121+
122+
receive do
123+
{^ref, {:status, status, headers}} ->
124+
body =
125+
Stream.unfold(nil, fn _ ->
126+
receive do
127+
{^ref, {:data, data}} ->
128+
{data, nil}
129+
130+
{^ref, :eof} ->
131+
Task.await(task)
132+
nil
133+
after
134+
opts[:receive_timeout] -> nil
135+
end
136+
end)
137+
138+
{:ok, %Finch.Response{status: status, headers: headers, body: body}}
139+
after
140+
opts[:receive_timeout] ->
141+
{:error, :timeout}
142+
end
86143
end
87144
end
88145
end

Diff for: mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule Tesla.Mixfile do
7575
{:excoveralls, "~> 0.8", only: :test},
7676
{:httparrot, "~> 1.3", only: :test},
7777
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
78-
{:mix_test_watch, "~> 1.0", only: :dev},
78+
{:mix_test_watch, "~> 1.0", only: :test, runtime: false},
7979
{:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false},
8080
{:inch_ex, "~> 2.0", only: :docs},
8181

Diff for: test/support/adapter_case/stream_request_body.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Tesla.AdapterCase.StreamRequestBody do
33
quote do
44
alias Tesla.Env
55

6-
describe "Stream" do
6+
describe "Stream Request" do
77
test "stream request body: Stream.map" do
88
request = %Env{
99
method: :post,

Diff for: test/support/adapter_case/stream_response_body.ex

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
defmodule Tesla.AdapterCase.StreamResponseBody do
2+
defmacro __using__(_) do
3+
quote do
4+
alias Tesla.Env
5+
6+
describe "Stream Response" do
7+
test "stream response body" do
8+
request = %Env{
9+
method: :get,
10+
url: "#{@http}/stream/20"
11+
}
12+
13+
assert {:ok, %Env{} = response} = call(request, response: :stream)
14+
assert response.status == 200
15+
assert is_function(response.body) || response.body.__struct__ == Stream
16+
17+
body = Enum.to_list(response.body)
18+
assert Enum.count(body) == 20
19+
end
20+
end
21+
end
22+
end
23+
end

Diff for: test/tesla/adapter/finch_test.exs

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ defmodule Tesla.Adapter.FinchTest do
66
use Tesla.AdapterCase, adapter: {Tesla.Adapter.Finch, [name: @finch_name]}
77
use Tesla.AdapterCase.Basic
88
use Tesla.AdapterCase.Multipart
9-
# use Tesla.AdapterCase.StreamRequestBody
9+
use Tesla.AdapterCase.StreamRequestBody
10+
use Tesla.AdapterCase.StreamResponseBody
1011
use Tesla.AdapterCase.SSL
1112

1213
setup do

0 commit comments

Comments
 (0)