Skip to content

Commit f60839a

Browse files
committed
Add SSE middleware
1 parent 38c4034 commit f60839a

File tree

7 files changed

+302
-5
lines changed

7 files changed

+302
-5
lines changed

Diff for: README.md

+30-2
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,11 @@ Tesla.get(client, "/", opts: [adapter: [recv_timeout: 30_000]])
243243

244244
## Streaming
245245

246-
If adapter supports it, you can pass a [Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html) as body, e.g.:
246+
### Streaming Request Body
247+
248+
If adapter supports it, you can pass a
249+
[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html) as request
250+
body, e.g.:
247251

248252
```elixir
249253
defmodule ElasticSearch do
@@ -259,7 +263,31 @@ defmodule ElasticSearch do
259263
end
260264
```
261265

262-
Each piece of stream will be encoded as JSON and sent as a new line (conforming to JSON stream format).
266+
Each piece of stream will be encoded as JSON and sent as a new line (conforming
267+
to JSON stream format).
268+
269+
### Streaming Response Body
270+
271+
If adapter supports it, you can pass a `response: :stream` option to return
272+
response body as a
273+
[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html)
274+
275+
```elixir
276+
defmodule OpenAI do
277+
use Tesla
278+
279+
plug Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"
280+
plug Tesla.Middleware.JSON
281+
282+
def completion(messages) do
283+
post("/chat/completions", %{model: "gpt-3.5-turbo", messages: messages, stream: true})
284+
end
285+
end
286+
287+
{:ok, env} = OpenAI.completion("What is the meaning of life?")
288+
env.body
289+
|> Stream.each(fn chunk -> IO.puts(chunk) end)
290+
```
263291

264292
## Multipart
265293

Diff for: lib/tesla/adapter/finch.ex

+3-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ if Code.ensure_loaded?(Finch) do
131131
Task.await(task)
132132
nil
133133
after
134-
opts[:receive_timeout] -> nil
134+
opts[:receive_timeout] ->
135+
Task.shutdown(task, :brutal_kill)
136+
nil
135137
end
136138
end)
137139

Diff for: lib/tesla/middleware/json.ex

+16-1
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,18 @@ defmodule Tesla.Middleware.JSON do
108108
end
109109
end
110110

111+
defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body),
112+
do: {:ok, decode_stream(body, opts)}
113+
111114
defp decode_body(body, opts), do: process(body, :decode, opts)
112115

113116
defp decodable?(env, opts), do: decodable_body?(env) && decodable_content_type?(env, opts)
114117

115118
defp decodable_body?(env) do
116-
(is_binary(env.body) && env.body != "") || (is_list(env.body) && env.body != [])
119+
(is_binary(env.body) && env.body != "") ||
120+
(is_list(env.body) && env.body != []) ||
121+
is_function(env.body) ||
122+
is_struct(env.body, Stream)
117123
end
118124

119125
defp decodable_content_type?(env, opts) do
@@ -123,6 +129,15 @@ defmodule Tesla.Middleware.JSON do
123129
end
124130
end
125131

132+
defp decode_stream(body, opts) do
133+
Stream.map(body, fn chunk ->
134+
case decode_body(chunk, opts) do
135+
{:ok, item} -> item
136+
_ -> chunk
137+
end
138+
end)
139+
end
140+
126141
defp content_types(opts),
127142
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])
128143

Diff for: lib/tesla/middleware/sse.ex

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
defmodule Tesla.Middleware.SSE do
2+
@moduledoc """
3+
Decode Server Sent Events.
4+
5+
This middleware is mostly useful when streaming response body.
6+
7+
## Examples
8+
9+
```
10+
plug Tesla.Middleware.SSE, only: :data
11+
12+
```
13+
14+
## Options
15+
16+
- `:only` - keep only specified keys in event (necessary for using with `JSON` middleware)
17+
- `:decode_content_types` - list of additional decodable content-types
18+
"""
19+
20+
@behaviour Tesla.Middleware
21+
22+
@default_content_types ["text/event-stream"]
23+
24+
@impl Tesla.Middleware
25+
def call(env, next, opts) do
26+
opts = opts || []
27+
28+
with {:ok, env} <- Tesla.run(env, next) do
29+
decode(env, opts)
30+
end
31+
end
32+
33+
def decode(env, opts) do
34+
if decodable_content_type?(env, opts) do
35+
{:ok, %{env | body: decode_body(env.body, opts)}}
36+
else
37+
{:ok, env}
38+
end
39+
end
40+
41+
defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body) do
42+
body
43+
|> Stream.chunk_while(
44+
"",
45+
fn elem, acc ->
46+
{lines, [rest]} = (acc <> elem) |> String.split("\n\n") |> Enum.split(-1)
47+
{:cont, lines, rest}
48+
end,
49+
fn
50+
"" -> {:cont, ""}
51+
acc -> {:cont, acc, ""}
52+
end
53+
)
54+
|> Stream.flat_map(& &1)
55+
|> Stream.map(&decode_message/1)
56+
|> Stream.flat_map(&only(&1, opts[:only]))
57+
end
58+
59+
defp decode_body(binary, opts) when is_binary(binary) do
60+
binary
61+
|> String.split("\n\n")
62+
|> Enum.map(&decode_message/1)
63+
|> Enum.flat_map(&only(&1, opts[:only]))
64+
end
65+
66+
defp decode_message(message) do
67+
message
68+
|> String.split("\n")
69+
|> Enum.map(&decode_body/1)
70+
|> Enum.reduce(%{}, fn
71+
:empty, acc -> acc
72+
{:data, data}, acc -> Map.update(acc, :data, data, &(&1 <> "\n" <> data))
73+
{key, value}, acc -> Map.put_new(acc, key, value)
74+
end)
75+
end
76+
77+
defp decode_body(": " <> comment), do: {:comment, comment}
78+
defp decode_body("data: " <> data), do: {:data, data}
79+
defp decode_body("event: " <> event), do: {:event, event}
80+
defp decode_body("id: " <> id), do: {:id, id}
81+
defp decode_body("retry: " <> retry), do: {:retry, retry}
82+
defp decode_body(""), do: :empty
83+
84+
defp decodable_content_type?(env, opts) do
85+
case Tesla.get_header(env, "content-type") do
86+
nil -> false
87+
content_type -> Enum.any?(content_types(opts), &String.starts_with?(content_type, &1))
88+
end
89+
end
90+
91+
defp content_types(opts),
92+
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])
93+
94+
defp only(message, nil), do: [message]
95+
96+
defp only(message, key) do
97+
case Map.get(message, key) do
98+
nil -> []
99+
val -> [val]
100+
end
101+
end
102+
end

Diff for: mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ defmodule Tesla.Mixfile do
7979
{:excoveralls, "~> 0.8", only: :test},
8080
{:httparrot, "~> 1.3", only: :test},
8181
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
82-
{:mix_test_watch, "~> 1.0", only: :test, runtime: false},
82+
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
8383
{:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false},
8484
{:inch_ex, "~> 2.0", only: :docs},
8585

Diff for: test/tesla/middleware/json_test.exs

+26
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,32 @@ defmodule Tesla.Middleware.JsonTest do
167167
end
168168
end
169169

170+
describe "Streams" do
171+
test "encode stream" do
172+
adapter = fn env ->
173+
assert IO.iodata_to_binary(Enum.to_list(env.body)) == ~s|{"id":1}\n{"id":2}\n{"id":3}\n|
174+
end
175+
176+
stream = Stream.map(1..3, fn i -> %{id: i} end)
177+
Tesla.Middleware.JSON.call(%Tesla.Env{body: stream}, [{:fn, adapter}], [])
178+
end
179+
180+
test "decode stream" do
181+
adapter = fn _env ->
182+
stream = Stream.map(1..3, fn i -> ~s|{"id": #{i}}\n| end)
183+
184+
{:ok,
185+
%Tesla.Env{
186+
headers: [{"content-type", "application/json"}],
187+
body: stream
188+
}}
189+
end
190+
191+
assert {:ok, env} = Tesla.Middleware.JSON.call(%Tesla.Env{}, [{:fn, adapter}], [])
192+
assert Enum.to_list(env.body) == [%{"id" => 1}, %{"id" => 2}, %{"id" => 3}]
193+
end
194+
end
195+
170196
describe "Multipart" do
171197
defmodule MultipartClient do
172198
use Tesla

Diff for: test/tesla/middleware/sse_test.exs

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
defmodule Tesla.Middleware.SSETest do
2+
use ExUnit.Case
3+
4+
@env %Tesla.Env{
5+
status: 200,
6+
headers: [{"content-type", "text/event-stream"}]
7+
}
8+
9+
describe "Basics" do
10+
test "ignore not matching content-type" do
11+
adapter = fn _env ->
12+
{:ok, %Tesla.Env{headers: [{"content-type", "text/plain"}], body: "test"}}
13+
end
14+
15+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], [])
16+
assert env.body == "test"
17+
end
18+
19+
test "decode comment" do
20+
adapter = fn _env ->
21+
{:ok, %{@env | body: ": comment"}}
22+
end
23+
24+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], [])
25+
assert env.body == [%{comment: "comment"}]
26+
end
27+
28+
test "decode multiple messages" do
29+
body = """
30+
: this is a test stream
31+
32+
data: some text
33+
34+
data: another message
35+
data: with two lines
36+
"""
37+
38+
adapter = fn _env ->
39+
{:ok, %{@env | body: body}}
40+
end
41+
42+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], [])
43+
44+
assert env.body == [
45+
%{comment: "this is a test stream"},
46+
%{data: "some text"},
47+
%{data: "another message\nwith two lines"}
48+
]
49+
end
50+
51+
test "decode named events" do
52+
body = """
53+
event: userconnect
54+
data: {"username": "bobby", "time": "02:33:48"}
55+
56+
data: Here's a system message of some kind that will get used
57+
data: to accomplish some task.
58+
59+
event: usermessage
60+
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
61+
"""
62+
63+
adapter = fn _env ->
64+
{:ok, %{@env | body: body}}
65+
end
66+
67+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], [])
68+
69+
assert env.body == [
70+
%{event: "userconnect", data: ~s|{"username": "bobby", "time": "02:33:48"}|},
71+
%{
72+
data:
73+
"Here's a system message of some kind that will get used\nto accomplish some task."
74+
},
75+
%{
76+
event: "usermessage",
77+
data: ~s|{"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}|
78+
}
79+
]
80+
end
81+
82+
test "output only data" do
83+
body = """
84+
: comment1
85+
86+
event: userconnect
87+
data: data1
88+
89+
data: data2
90+
data: data3
91+
92+
event: usermessage
93+
data: data4
94+
"""
95+
96+
adapter = fn _env ->
97+
{:ok, %{@env | body: body}}
98+
end
99+
100+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], only: :data)
101+
102+
assert env.body == ["data1", "data2\ndata3", "data4"]
103+
end
104+
105+
test "handle stream data" do
106+
adapter = fn _env ->
107+
chunks = [
108+
~s|dat|,
109+
~s|a: dat|,
110+
~s|a1\n\ndata: data2\n\ndata: d|,
111+
~s|ata3\n\n|
112+
]
113+
114+
stream = Stream.map(chunks, & &1)
115+
116+
{:ok, %{@env | body: stream}}
117+
end
118+
119+
assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], [])
120+
121+
assert Enum.to_list(env.body) == [%{data: "data1"}, %{data: "data2"}, %{data: "data3"}]
122+
end
123+
end
124+
end

0 commit comments

Comments
 (0)