Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change SSE response structure so that it's compatible with GraphQL SSE standard #292

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions lib/absinthe/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ defmodule Absinthe.Plug do
:analyze_complexity,
:max_complexity,
:token_limit,
:transport_batch_payload_key
:transport_batch_payload_key,
:standard_sse
]
@raw_options [
:analyze_complexity,
Expand All @@ -167,6 +168,7 @@ defmodule Absinthe.Plug do
- `:max_complexity` -- (Optional) Set the maximum allowed complexity of the GraphQL query. If a document’s calculated complexity exceeds the maximum, resolution will be skipped and an error will be returned in the result detailing the calculated and maximum complexities.
- `:token_limit` -- (Optional) Set a limit on the number of allowed parseable tokens in the GraphQL query. Queries with exceedingly high token counts can be expensive to parse. If a query's token count exceeds the set limit, an error will be returned during Absinthe parsing (default: `:infinity`).
- `:transport_batch_payload_key` -- (Optional) Set whether or not to nest Transport Batch request results in a `payload` key. Older clients expected this key to be present, but newer clients have dropped this pattern. (default: `true`)
- `:standard_sse` -- (Optional) Set whether or not to adopt SSE standard. Older clients don't support this key. (default: `false`)

"""
@type opts :: [
Expand All @@ -188,7 +190,8 @@ defmodule Absinthe.Plug do
before_send: {module, atom},
log_level: Logger.level(),
pubsub: module | nil,
transport_batch_payload_key: boolean
transport_batch_payload_key: boolean,
standard_sse: boolean
]

@doc """
Expand Down Expand Up @@ -235,6 +238,7 @@ defmodule Absinthe.Plug do
before_send = Keyword.get(opts, :before_send)

transport_batch_payload_key = Keyword.get(opts, :transport_batch_payload_key, true)
standard_sse = Keyword.get(opts, :standard_sse, false)

%{
adapter: adapter,
Expand All @@ -250,7 +254,8 @@ defmodule Absinthe.Plug do
log_level: log_level,
pubsub: pubsub,
before_send: before_send,
transport_batch_payload_key: transport_batch_payload_key
transport_batch_payload_key: transport_batch_payload_key,
standard_sse: standard_sse
}
end

Expand Down Expand Up @@ -369,14 +374,14 @@ defmodule Absinthe.Plug do
|> subscribe_loop(topic, config)
end

def subscribe_loop(conn, topic, config) do
defp subscribe_loop(conn, topic, config) do
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This is the only breaking change. I can easily undo it.

receive do
%{event: "subscription:data", payload: %{result: result}} ->
case chunk(conn, "#{encode_json!(result, config)}\n\n") do
case chunk(conn, encode_chunk!(result, config)) do
{:ok, conn} ->
subscribe_loop(conn, topic, config)

{:error, :closed} ->
{:error, _} ->
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Source code chunk/2

Absinthe.Subscription.unsubscribe(config.context.pubsub, topic)
conn
end
Expand All @@ -390,7 +395,7 @@ defmodule Absinthe.Plug do
{:ok, conn} ->
subscribe_loop(conn, topic, config)

{:error, :closed} ->
{:error, _} ->
Absinthe.Subscription.unsubscribe(config.context.pubsub, topic)
conn
end
Expand Down Expand Up @@ -609,4 +614,26 @@ defmodule Absinthe.Plug do

@doc false
def error_result(message), do: %{"errors" => [%{"message" => message}]}

# `encode_chunk!/2`
#
# When option `standard_sse` is set to TRUE, it will addopt the new standard.
# Otherwise, it will use legacy behaviour. This config is for keepoing
# backwards compatibility, but everyone is encouraged to adopt the standard.
#
#
# The encoded response additionally contains an event segment `event: next` and
# the data is prefixed with a `data:` field, indicating data segment.
#
# This structure is consistent with GraphQL over S[erver-Sent Events Protocol][1]
# specification and [official SSE standard][2].
#
# [1]: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#next-event
# [2]: https://html.spec.whatwg.org/multipage/server-sent-events.html

defp encode_chunk!(result, config) do
if config.standard_sse,
do: "event: next\ndata: #{encode_json!(result, config)}\n\n",
else: "#{encode_json!(result, config)}\n\n"
end
end
36 changes: 34 additions & 2 deletions test/lib/absinthe/plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,12 @@ defmodule Absinthe.PlugTest do
assert expected == resp_body
end

test "Subscriptions over HTTP with Server Sent Events chunked response" do
test "Subscriptions over HTTP with Server Sent Events chunked response (non standard)" do
TestPubSub.start_link()
Absinthe.Subscription.start_link(TestPubSub)

query = "subscription {update}"
opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub)
opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub, standard_sse: false)

request =
Task.async(fn ->
Expand Down Expand Up @@ -482,6 +482,38 @@ defmodule Absinthe.PlugTest do
assert Enum.member?(events, %{"data" => %{"update" => "BAR"}})
end

test "Subscriptions over HTTP with Server Sent Events chunked response (standard)" do
TestPubSub.start_link()
Absinthe.Subscription.start_link(TestPubSub)

query = "subscription {update}"
opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub, standard_sse: true)

request =
Task.async(fn ->
conn(:post, "/", query: query)
|> put_req_header("content-type", "application/json")
|> plug_parser
|> Absinthe.Plug.call(opts)
end)

Process.sleep(200)
Absinthe.Subscription.publish(TestPubSub, "FOO", update: "*")
Absinthe.Subscription.publish(TestPubSub, "BAR", update: "*")
send(request.pid, :close)

conn = Task.await(request)
{_module, state} = conn.adapter

[event1, event2] = String.split(state.chunks, "\n\n", trim: true)

assert "event: next\ndata: " <> event1_data = event1
assert "event: next\ndata: " <> event2_data = event2

assert Jason.decode!(event1_data) == %{"data" => %{"update" => "FOO"}}
assert Jason.decode!(event2_data) == %{"data" => %{"update" => "BAR"}}
end

@query """
query GetUser {
user
Expand Down