diff --git a/lib/absinthe/plug.ex b/lib/absinthe/plug.ex index d8fdf38..8b7de07 100644 --- a/lib/absinthe/plug.ex +++ b/lib/absinthe/plug.ex @@ -140,7 +140,8 @@ defmodule Absinthe.Plug do :analyze_complexity, :max_complexity, :token_limit, - :transport_batch_payload_key + :transport_batch_payload_key, + :standard_sse ] @raw_options [ :analyze_complexity, @@ -167,6 +168,7 @@ defmodule Absinthe.Plug do - `:max_complexity` -- (Optional) Set the maximum allowed complexity of the GraphQL query. If a document’s calculated complexity exceeds the maximum, resolution will be skipped and an error will be returned in the result detailing the calculated and maximum complexities. - `:token_limit` -- (Optional) Set a limit on the number of allowed parseable tokens in the GraphQL query. Queries with exceedingly high token counts can be expensive to parse. If a query's token count exceeds the set limit, an error will be returned during Absinthe parsing (default: `:infinity`). - `:transport_batch_payload_key` -- (Optional) Set whether or not to nest Transport Batch request results in a `payload` key. Older clients expected this key to be present, but newer clients have dropped this pattern. (default: `true`) + - `:standard_sse` -- (Optional) Set whether or not to adopt SSE standard. Older clients don't support this key. (default: `false`) """ @type opts :: [ @@ -188,7 +190,8 @@ defmodule Absinthe.Plug do before_send: {module, atom}, log_level: Logger.level(), pubsub: module | nil, - transport_batch_payload_key: boolean + transport_batch_payload_key: boolean, + standard_sse: boolean ] @doc """ @@ -235,6 +238,7 @@ defmodule Absinthe.Plug do before_send = Keyword.get(opts, :before_send) transport_batch_payload_key = Keyword.get(opts, :transport_batch_payload_key, true) + standard_sse = Keyword.get(opts, :standard_sse, false) %{ adapter: adapter, @@ -250,7 +254,8 @@ defmodule Absinthe.Plug do log_level: log_level, pubsub: pubsub, before_send: before_send, - transport_batch_payload_key: transport_batch_payload_key + transport_batch_payload_key: transport_batch_payload_key, + standard_sse: standard_sse } end @@ -369,14 +374,14 @@ defmodule Absinthe.Plug do |> subscribe_loop(topic, config) end - def subscribe_loop(conn, topic, config) do + defp subscribe_loop(conn, topic, config) do receive do %{event: "subscription:data", payload: %{result: result}} -> - case chunk(conn, "#{encode_json!(result, config)}\n\n") do + case chunk(conn, encode_chunk!(result, config)) do {:ok, conn} -> subscribe_loop(conn, topic, config) - {:error, :closed} -> + {:error, _} -> Absinthe.Subscription.unsubscribe(config.context.pubsub, topic) conn end @@ -390,7 +395,7 @@ defmodule Absinthe.Plug do {:ok, conn} -> subscribe_loop(conn, topic, config) - {:error, :closed} -> + {:error, _} -> Absinthe.Subscription.unsubscribe(config.context.pubsub, topic) conn end @@ -609,4 +614,26 @@ defmodule Absinthe.Plug do @doc false def error_result(message), do: %{"errors" => [%{"message" => message}]} + + # `encode_chunk!/2` + # + # When option `standard_sse` is set to TRUE, it will addopt the new standard. + # Otherwise, it will use legacy behaviour. This config is for keepoing + # backwards compatibility, but everyone is encouraged to adopt the standard. + # + # + # The encoded response additionally contains an event segment `event: next` and + # the data is prefixed with a `data:` field, indicating data segment. + # + # This structure is consistent with GraphQL over S[erver-Sent Events Protocol][1] + # specification and [official SSE standard][2]. + # + # [1]: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#next-event + # [2]: https://html.spec.whatwg.org/multipage/server-sent-events.html + + defp encode_chunk!(result, config) do + if config.standard_sse, + do: "event: next\ndata: #{encode_json!(result, config)}\n\n", + else: "#{encode_json!(result, config)}\n\n" + end end diff --git a/test/lib/absinthe/plug_test.exs b/test/lib/absinthe/plug_test.exs index 975ebc6..9361f5d 100644 --- a/test/lib/absinthe/plug_test.exs +++ b/test/lib/absinthe/plug_test.exs @@ -449,12 +449,12 @@ defmodule Absinthe.PlugTest do assert expected == resp_body end - test "Subscriptions over HTTP with Server Sent Events chunked response" do + test "Subscriptions over HTTP with Server Sent Events chunked response (non standard)" do TestPubSub.start_link() Absinthe.Subscription.start_link(TestPubSub) query = "subscription {update}" - opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub) + opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub, standard_sse: false) request = Task.async(fn -> @@ -482,6 +482,38 @@ defmodule Absinthe.PlugTest do assert Enum.member?(events, %{"data" => %{"update" => "BAR"}}) end + test "Subscriptions over HTTP with Server Sent Events chunked response (standard)" do + TestPubSub.start_link() + Absinthe.Subscription.start_link(TestPubSub) + + query = "subscription {update}" + opts = Absinthe.Plug.init(schema: TestSchema, pubsub: TestPubSub, standard_sse: true) + + request = + Task.async(fn -> + conn(:post, "/", query: query) + |> put_req_header("content-type", "application/json") + |> plug_parser + |> Absinthe.Plug.call(opts) + end) + + Process.sleep(200) + Absinthe.Subscription.publish(TestPubSub, "FOO", update: "*") + Absinthe.Subscription.publish(TestPubSub, "BAR", update: "*") + send(request.pid, :close) + + conn = Task.await(request) + {_module, state} = conn.adapter + + [event1, event2] = String.split(state.chunks, "\n\n", trim: true) + + assert "event: next\ndata: " <> event1_data = event1 + assert "event: next\ndata: " <> event2_data = event2 + + assert Jason.decode!(event1_data) == %{"data" => %{"update" => "FOO"}} + assert Jason.decode!(event2_data) == %{"data" => %{"update" => "BAR"}} + end + @query """ query GetUser { user