Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for brokered subscriptions #217

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions lib/absinthe/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ defmodule Absinthe.Plug do
- `:log_level` -- (Optional) Set the logger level for Absinthe Logger. Defaults to `:debug`.
- `:analyze_complexity` -- (Optional) Set whether to calculate the complexity of incoming GraphQL queries.
- `:max_complexity` -- (Optional) Set the maximum allowed complexity of the GraphQL query. If a document’s calculated complexity exceeds the maximum, resolution will be skipped and an error will be returned in the result detailing the calculated and maximum complexities.
- `:websocket_subscriptions` -- (Optional) Whether to assume subscriptions are sent through and receive updates from a WebSocket. (default: `true`).

"""
@type opts :: [
Expand All @@ -160,7 +161,8 @@ defmodule Absinthe.Plug do
serializer: module | {module, Keyword.t()},
content_type: String.t(),
before_send: {module, atom},
log_level: Logger.level()
log_level: Logger.level(),
websocket_subscriptions: boolean
]

@doc """
Expand Down Expand Up @@ -206,6 +208,8 @@ defmodule Absinthe.Plug do

before_send = Keyword.get(opts, :before_send)

websocket_subscriptions = Keyword.get(opts, :websocket_subscriptions, true)

%{
adapter: adapter,
context: context,
Expand All @@ -219,7 +223,8 @@ defmodule Absinthe.Plug do
content_type: content_type,
log_level: log_level,
pubsub: pubsub,
before_send: before_send
before_send: before_send,
websocket_subscriptions: websocket_subscriptions
}
end

Expand Down Expand Up @@ -301,10 +306,15 @@ defmodule Absinthe.Plug do
def subscribe(conn, topic, %{context: %{pubsub: pubsub}} = config) do
pubsub.subscribe(topic)

conn
|> put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
|> subscribe_loop(topic, config)
if config[:websocket_subscriptions] do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong option name I think. What Absinthe.Plug is doing here is NOT websockets, it's long polling. Websockets is handled by absinthe phoenix, and happens when a subscription is sent over a pre-established websocket channel. Everything in Absinthe.Plug is HTTP.

However I get what you're going for. You want to make it possible to simply return a subscription handle, and indicate that the subscription was registered elsewhere. This is a need we'll honestly have for both Absinthe.Plug and Absinthe.Phoenix, cause in theory nothing should stop you from submitting a subscription document over websocket, but having that still set something up in MQTT.

However, I think this needs to be possible on a request by request basis, instead of as a flag that categorically sets a given plug one way or another. Not entirely sure how that option would be specified however. It seems like it could be something that Absinthe itself indicates in the return value from a subscribe call.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sure, that would work too. How should we do this -- do you want to think more about it and let me know how you want to structure it? Or were you planning to build something different yourself?

conn
|> put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
|> subscribe_loop(topic, config)
else
conn
|> encode(200, %{meta: %{subscriptionId: topic}}, config)
end
end

def subscribe_loop(conn, topic, config) do
Expand Down