Skip to content

Commit 94b7b5f

Browse files
committed
Defer call to prime function until continuation
This allows the caller to complete their pubsub call before the prime function, avoiding data races.
1 parent 96e803c commit 94b7b5f

File tree

5 files changed

+79
-29
lines changed

5 files changed

+79
-29
lines changed

lib/absinthe.ex

+5-1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ defmodule Absinthe do
102102
]
103103

104104
@type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()}
105+
@type continue_result :: run_result | :no_more_results
105106

106107
@spec run(
107108
binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(),
@@ -118,7 +119,7 @@ defmodule Absinthe do
118119
|> build_result()
119120
end
120121

121-
@spec continue([Absinthe.Blueprint.Continuation.t()]) :: run_result()
122+
@spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result
122123
def continue(continuation) do
123124
continuation
124125
|> Absinthe.Pipeline.continue()
@@ -127,6 +128,9 @@ defmodule Absinthe do
127128

128129
defp build_result(output) do
129130
case output do
131+
{:ok, %{result: :no_more_results}, _phases} ->
132+
:no_more_results
133+
130134
{:ok, %{result: %{continuation: c} = result}, _phases} when c != [] ->
131135
{:more, result}
132136

+40-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,46 @@
11
defmodule Absinthe.Phase.Subscription.Prime do
22
@moduledoc false
33

4+
alias Absinthe.Blueprint.Continuation
5+
alias Absinthe.Phase
6+
47
@spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t()
5-
def run(blueprint, prime_result: cr) do
6-
{:ok, put_in(blueprint.execution.root_value, cr)}
8+
def run(blueprint, prime_result: prime_result) do
9+
{:ok, put_in(blueprint.execution.root_value, prime_result)}
10+
end
11+
12+
def run(blueprint, prime_fun: prime_fun, resolution_options: options) do
13+
{:ok, prime_results} = prime_fun.(blueprint.execution)
14+
15+
case prime_results do
16+
[first | rest] ->
17+
blueprint = put_in(blueprint.execution.root_value, first)
18+
maybe_add_continuations(blueprint, rest, options)
19+
{:ok, blueprint}
20+
21+
[] ->
22+
blueprint = put_in(blueprint.result, :no_more_results)
23+
{:replace, blueprint, []}
24+
end
25+
end
26+
27+
defp maybe_add_continuations(blueprint, [], _options), do: blueprint
28+
29+
defp maybe_add_continuations(blueprint, remaining_results, options) do
30+
continuations =
31+
Enum.map(
32+
remaining_results,
33+
&%Continuation{
34+
phase_input: blueprint,
35+
pipeline: [
36+
{__MODULE__, [prime_result: &1]},
37+
{Phase.Document.Execution.Resolution, options},
38+
Phase.Subscription.GetOrdinal,
39+
Phase.Document.Result
40+
]
41+
}
42+
)
43+
44+
put_in(blueprint.result, %{continuation: continuations})
745
end
846
end

lib/absinthe/phase/subscription/result.ex

+13-23
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ defmodule Absinthe.Phase.Subscription.Result do
1919
{:ok, put_in(blueprint.result, result)}
2020

2121
prime_fun when is_function(prime_fun, 1) ->
22-
do_prime(prime_fun, result, blueprint, options)
22+
stash_prime(prime_fun, result, blueprint, options)
2323

2424
val ->
2525
raise """
@@ -30,28 +30,18 @@ defmodule Absinthe.Phase.Subscription.Result do
3030
end
3131
end
3232

33-
def do_prime(prime_fun, base_result, blueprint, options) do
34-
{:ok, prime_results} = prime_fun.(blueprint.execution)
35-
36-
result =
37-
if prime_results != [] do
38-
continuations =
39-
Enum.map(prime_results, fn cr ->
40-
%Continuation{
41-
phase_input: blueprint,
42-
pipeline: [
43-
{Phase.Subscription.Prime, [prime_result: cr]},
44-
{Phase.Document.Execution.Resolution, options},
45-
Phase.Subscription.GetOrdinal,
46-
Phase.Document.Result
47-
]
48-
}
49-
end)
50-
51-
Map.put(base_result, :continuation, continuations)
52-
else
53-
base_result
54-
end
33+
def stash_prime(prime_fun, base_result, blueprint, options) do
34+
continuation = %Continuation{
35+
phase_input: blueprint,
36+
pipeline: [
37+
{Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]},
38+
{Phase.Document.Execution.Resolution, options},
39+
Phase.Subscription.GetOrdinal,
40+
Phase.Document.Result
41+
]
42+
}
43+
44+
result = Map.put(base_result, :continuation, [continuation])
5545

5646
{:ok, put_in(blueprint.result, result)}
5747
end

lib/absinthe/pipeline.ex

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ defmodule Absinthe.Pipeline do
1818
@type data_t :: any
1919

2020
@type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t(), [Phase.t()]}
21+
@type continue_result_t :: run_result_t | :no_more_results
2122

2223
@type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()}
2324

@@ -30,7 +31,7 @@ defmodule Absinthe.Pipeline do
3031
|> run_phase(input)
3132
end
3233

33-
@spec continue([Continuation.t()]) :: run_result_t
34+
@spec continue([Continuation.t()]) :: continue_result_t
3435
def continue([continuation | rest]) do
3536
result = run_phase(continuation.pipeline, continuation.phase_input)
3637

@@ -39,8 +40,8 @@ defmodule Absinthe.Pipeline do
3940
{:ok, blueprint, phases}
4041

4142
{:ok, blueprint, phases} ->
42-
bp_result = Map.put(blueprint.result, :continuation, rest)
43-
blueprint = Map.put(blueprint, :result, bp_result)
43+
new_continuations = Map.get(blueprint.result, :continuation, [])
44+
blueprint = put_in(blueprint.result.continuation, rest ++ new_continuations)
4445
{:ok, blueprint, phases}
4546

4647
error ->

test/absinthe/execution/subscription_test.exs

+17
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,23 @@ defmodule Absinthe.Execution.SubscriptionTest do
684684
Absinthe.continue(continuation)
685685
end
686686

687+
test "continuation with no extra data" do
688+
client_id = "abc"
689+
690+
assert {:more, %{"subscribed" => _topic, continuation: continuation}} =
691+
run_subscription(
692+
@query,
693+
Schema,
694+
variables: %{
695+
"primeData" => [],
696+
"clientId" => client_id
697+
},
698+
context: %{prime_id: "test_prime_id"}
699+
)
700+
701+
assert :no_more_results == Absinthe.continue(continuation)
702+
end
703+
687704
@query """
688705
subscription ($clientId: ID!) {
689706
ordinal(clientId: $clientId) {

0 commit comments

Comments
 (0)