Skip to content

Commit

Permalink
Add @defer directive support
Browse files Browse the repository at this point in the history
  • Loading branch information
Bernard Duggan committed Jun 19, 2018
1 parent 36fa631 commit 391d696
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 13 deletions.
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use Mix.Config

config :logger, level: :debug

config :absinthe,
max_defer_time: 60000

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
Expand Down
5 changes: 4 additions & 1 deletion lib/absinthe/blueprint/execution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ defmodule Absinthe.Blueprint.Execution do
result: nil,
acc: %{},
context: %{},
root_value: %{}
root_value: %{},
deferred_fields: [],
deferring: false,
defer_topic: nil
]

@type t :: %__MODULE__{
Expand Down
85 changes: 85 additions & 0 deletions lib/absinthe/phase/document/execution/deferred_resolution.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Absinthe.Phase.Document.Execution.DeferredResolution do
alias Absinthe.{Blueprint, Phase}
alias Absinthe.Phase.Document.Execution.Resolution
alias Absinthe.Resolution.DeferredField

use Absinthe.Phase

@spec run(Blueprint.t(), Keyword.t()) :: Phase.result_t()
def run(blueprint, options \\ [])

def run(%Blueprint{execution: %{deferred_fields: []}} = blueprint, _options) do
{:ok, blueprint}
end

def run(%Blueprint{execution: %{deferred_fields: fields}} = blueprint, _options) do
context = blueprint.execution.context
pubsub = ensure_pubsub!(context)

topic = "__absinthe__:deferred_fields-" <> to_string(:erlang.unique_integer([:positive]))
pubsub.subscribe(topic)

self = self()

Task.start(fn -> do_defers(fields, blueprint, pubsub, topic, self) end)

{:ok, put_in(blueprint.execution.defer_topic, topic)}
end

defp do_defers(fields, blueprint, pubsub, topic, pid) do
fields
|> Enum.map(fn f -> Task.async(fn -> resolve_field(f, blueprint, pubsub, topic) end) end)
|> Task.yield_many(Application.get_env(:absinthe, :max_defer_time))
|> Enum.map(fn {task, res} ->
# Shutdown the tasks that did not reply nor exit
res || Task.shutdown(task)
end)

pubsub.unsubscribe(topic, pid)
end

defp resolve_field(f, blueprint, pubsub, topic) do
{result, _exec} = perform_deferred_resolution(f)

data =
blueprint.execution.result
|> put_in(result)
|> Absinthe.Phase.Document.Result.process()
|> add_path(f.path)

pubsub.publish_deferred(topic, data)
end

defp perform_deferred_resolution(%DeferredField{} = field) do
Resolution.do_resolve_field(
field.resolution,
%{field.exec | deferring: true},
field.source,
field.path
)
end

defp add_path(result, path) do
Map.put(result, :path, Enum.reverse(make_path(path)))
end

defp make_path(path) do
path
|> Enum.map(& &1.name)
|> Enum.filter(fn e -> e != nil end)
end

defp ensure_pubsub!(context) do
case Absinthe.Subscription.extract_pubsub(context) do
{:ok, pubsub} ->
pubsub

_ ->
raise """
Pubsub not configured!
Deferred fields require a configured pubsub module.
"""
end
end
end
26 changes: 24 additions & 2 deletions lib/absinthe/phase/document/execution/resolution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ defmodule Absinthe.Phase.Document.Execution.Resolution do
defp resolve_fields(parent, exec, source, path) do
# parent is the parent field, we need to get the return type of that field
# that return type could be an interface or union, so let's make it concrete

parent
|> get_return_type
|> get_concrete_type(source, exec)
Expand Down Expand Up @@ -154,7 +155,28 @@ defmodule Absinthe.Phase.Document.Execution.Resolution do
do_resolve_fields(fields, exec, source, parent_type, path, [result | acc])
end

defp do_resolve_fields([], exec, _, _, _, acc), do: {:lists.reverse(acc), exec}
defp do_resolve_fields([], exec, _, _, _, acc) do
acc =
acc
|> Enum.filter(fn e -> not is_nil(e) end)
|> Enum.reverse()
{acc, exec}
end

def resolve_field(%{flags: %{defer: _}} = field, %{deferring: false} = exec,
source, parent_type, path) do
resolution = build_resolution_struct(exec, field, source, parent_type, path)
deferred_field = %Absinthe.Resolution.DeferredField{
field: update_in(field.flags, &Map.drop(&1, [:defer])),
resolution: resolution,
exec: exec,
source: source,
parent_type: parent_type,
path: path
}

{nil, %{exec | deferred_fields: [deferred_field | exec.deferred_fields]}}
end

def resolve_field(field, exec, source, parent_type, path) do
exec
Expand All @@ -163,7 +185,7 @@ defmodule Absinthe.Phase.Document.Execution.Resolution do
end

# bp_field needs to have a concrete schema node, AKA no unions or interfaces
defp do_resolve_field(res, exec, source, path) do
def do_resolve_field(res, exec, source, path) do
res
|> reduce_resolution
|> case do
Expand Down
12 changes: 10 additions & 2 deletions lib/absinthe/phase/document/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Absinthe.Phase.Document.Result do
{:ok, %{bp | result: result}}
end

defp process(blueprint) do
def process(blueprint) do
result =
case blueprint.execution do
%{validation_errors: [], result: result} ->
Expand All @@ -22,7 +22,9 @@ defmodule Absinthe.Phase.Document.Result do
{:validation_failed, errors}
end

format_result(result)
result
|> format_result()
|> maybe_add_topic(blueprint.execution)
end

defp format_result(:execution_failed) do
Expand All @@ -47,6 +49,12 @@ defmodule Absinthe.Phase.Document.Result do
%{errors: [format_error(error)]}
end

defp maybe_add_topic(result, %{defer_topic: nil}), do: result

defp maybe_add_topic(result, %{defer_topic: topic}) do
Map.put(result, :defer_topic, topic)
end

defp data(%{errors: [_ | _] = field_errors}, errors), do: {nil, field_errors ++ errors}

# Leaf
Expand Down
1 change: 1 addition & 0 deletions lib/absinthe/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ defmodule Absinthe.Pipeline do
# Execution
{Phase.Subscription.SubscribeSelf, options},
{Phase.Document.Execution.Resolution, options},
Phase.Document.Execution.DeferredResolution,
# Format Result
Phase.Document.Result
]
Expand Down
12 changes: 12 additions & 0 deletions lib/absinthe/resolution/deferred_field.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Absinthe.Resolution.DeferredField do

defstruct [
:field,
:resolution,
:exec,
:source,
:parent_type,
:path
]

end
10 changes: 10 additions & 0 deletions lib/absinthe/subscription/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ defmodule Absinthe.Subscription.Pubsub do
# other fields as needed
}
Unsubscribe the supplied process from a given topic
"""
@callback unsubscribe(topic :: binary, process :: pid) :: term

@doc """
"""
@callback publish_mutation(
Expand All @@ -62,4 +67,9 @@ defmodule Absinthe.Subscription.Pubsub do
only.
"""
@callback publish_subscription(topic :: binary, data :: map) :: term

@doc """
Publish the result of a deferred field
"""
@callback publish_deferred(topic :: binary, data :: term) :: term
end
15 changes: 7 additions & 8 deletions lib/absinthe/subscription/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -552,24 +552,23 @@ defmodule Absinthe.Subscription.Registry do
[]
"""
@spec unregister(registry, key) :: :ok
def unregister(registry, key) when is_atom(registry) do
self = self()
@spec unregister(registry, key, pid) :: :ok
def unregister(registry, key, pid \\ self()) when is_atom(registry) do
{kind, partitions, key_ets, pid_ets, listeners} = info!(registry)
{key_partition, pid_partition} = partitions(kind, key, self, partitions)
{key_partition, pid_partition} = partitions(kind, key, pid, partitions)
key_ets = key_ets || key_ets!(registry, key_partition)
{pid_server, pid_ets} = pid_ets || pid_ets!(registry, pid_partition)

# Remove first from the key_ets because in case of crashes
# the pid_ets will still be able to clean up. The last step is
# to clean if we have no more entries.
true = :ets.match_delete(key_ets, {key, {self, :_}})
true = :ets.delete_object(pid_ets, {self, key, key_ets})
true = :ets.match_delete(key_ets, {key, {pid, :_}})
true = :ets.delete_object(pid_ets, {pid, key, key_ets})

unlink_if_unregistered(pid_server, pid_ets, self)
unlink_if_unregistered(pid_server, pid_ets, pid)

for listener <- listeners do
Kernel.send(listener, {:unregister, registry, key, self})
Kernel.send(listener, {:unregister, registry, key, pid})
end

:ok
Expand Down
11 changes: 11 additions & 0 deletions lib/absinthe/type/built_ins/directives.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ defmodule Absinthe.Type.BuiltIns.Directives do
Blueprint.put_flag(node, :include, __MODULE__)
end
end

directive :defer do
description """
Directs the executor that it may defer evaluation of this field and send
the response later.
"""

on [:field, :fragment_spread, :inline_fragment]

expand fn _, node -> Blueprint.put_flag(node, :defer, __MODULE__) end
end
end
Loading

0 comments on commit 391d696

Please sign in to comment.