Skip to content

Commit 96e803c

Browse files
committed
Add subscription ordinal
1 parent d40b6d9 commit 96e803c

File tree

13 files changed

+216
-73
lines changed

13 files changed

+216
-73
lines changed

lib/absinthe.ex

+7-4
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,15 @@ defmodule Absinthe do
4444
%{message: String.t()}
4545
| %{message: String.t(), locations: [%{line: pos_integer, column: integer}]}
4646

47-
@type continuation_t :: nil | [Continuation.t()]
47+
@type continuation_t :: nil | [Absinthe.Blueprint.Continuation.t()]
4848

4949
@type result_t ::
50-
%{required(:data) => nil | result_selection_t,
50+
%{
51+
required(:data) => nil | result_selection_t,
52+
optional(:ordinal) => term(),
5153
optional(:continuation) => continuation_t,
52-
optional(:errors) => [result_error_t]}
54+
optional(:errors) => [result_error_t]
55+
}
5356
| %{errors: [result_error_t]}
5457

5558
@doc """
@@ -115,7 +118,7 @@ defmodule Absinthe do
115118
|> build_result()
116119
end
117120

118-
@spec continue([Continuation.t()]) :: run_result()
121+
@spec continue([Absinthe.Blueprint.Continuation.t()]) :: run_result()
119122
def continue(continuation) do
120123
continuation
121124
|> Absinthe.Pipeline.continue()

lib/absinthe/blueprint/continuation.ex

+3-4
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ defmodule Absinthe.Blueprint.Continuation do
1212
]
1313

1414
@type t :: %__MODULE__{
15-
phase_input: Pipeline.data_t,
16-
pipeline: Pipeline.t()
17-
}
18-
15+
phase_input: Pipeline.data_t(),
16+
pipeline: Pipeline.t()
17+
}
1918
end

lib/absinthe/blueprint/result/list.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@ defmodule Absinthe.Blueprint.Result.List do
1919
errors: [Phase.Error.t()],
2020
flags: Blueprint.flags_t(),
2121
extensions: %{any => any},
22-
continuations: [Continuation.t()]
22+
continuations: [Blueprint.Continuation.t()]
2323
}
2424
end

lib/absinthe/blueprint/result/object.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ defmodule Absinthe.Blueprint.Result.Object do
2020
errors: [Phase.Error.t()],
2121
flags: Blueprint.flags_t(),
2222
extensions: %{any => any},
23-
continuations: [Continuation.t()]
23+
continuations: [Blueprint.Continuation.t()]
2424
}
2525
end

lib/absinthe/phase/document/result.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ defmodule Absinthe.Phase.Document.Result do
138138
defp format_location(_), do: []
139139

140140
defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [],
141-
do: Map.put(result, :continuation, continuations)
141+
do: Map.put(result, :continuation, continuations)
142142

143143
defp maybe_add_continuations(result, _), do: result
144144
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
defmodule Absinthe.Phase.Subscription.GetOrdinal do
2+
use Absinthe.Phase
3+
4+
alias Absinthe.Phase.Subscription.SubscribeSelf
5+
6+
@moduledoc false
7+
8+
alias Absinthe.Blueprint
9+
10+
@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
11+
def run(blueprint, _options \\ []) do
12+
op = Blueprint.current_operation(blueprint)
13+
14+
if op.type == :subscription do
15+
{:ok,
16+
%{blueprint | result: Map.put(blueprint.result, :ordinal, get_ordinal(op, blueprint))}}
17+
else
18+
{:ok, blueprint}
19+
end
20+
end
21+
22+
defp get_ordinal(op, blueprint) do
23+
%{selections: [field]} = op
24+
{:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint)
25+
26+
case config[:ordinal] do
27+
nil ->
28+
nil
29+
30+
fun when is_function(fun, 1) ->
31+
fun.(blueprint.execution.root_value)
32+
33+
_fun ->
34+
IO.write(
35+
:stderr,
36+
"Ordinal function must be 1-arity"
37+
)
38+
39+
nil
40+
end
41+
end
42+
end
+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
defmodule Absinthe.Phase.Subscription.Prime do
22
@moduledoc false
33

4-
@spec run(any(), Keyword.t()) :: Phase.result_t()
5-
def run(blueprint, [prime_result: cr]) do
4+
@spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t()
5+
def run(blueprint, prime_result: cr) do
66
{:ok, put_in(blueprint.execution.root_value, cr)}
77
end
88
end

lib/absinthe/phase/subscription/result.ex

+32-25
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,53 @@ defmodule Absinthe.Phase.Subscription.Result do
66

77
alias Absinthe.Blueprint
88
alias Absinthe.Blueprint.Continuation
9+
alias Absinthe.Phase
910

1011
@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
1112
def run(blueprint, options) do
12-
topic = Keyword.get(options, :topic)
13+
topic = Keyword.fetch!(options, :topic)
1314
prime = Keyword.get(options, :prime)
1415
result = %{"subscribed" => topic}
16+
1517
case prime do
1618
nil ->
1719
{:ok, put_in(blueprint.result, result)}
1820

19-
prime_fun when is_function(prime_fun, 0) ->
20-
{:ok, prime_results} = prime_fun.()
21-
22-
result =
23-
if prime_results != [] do
24-
continuations =
25-
Enum.map(prime_results, fn cr ->
26-
%Continuation{
27-
phase_input: blueprint,
28-
pipeline: [
29-
{Absinthe.Phase.Subscription.Prime, [prime_result: cr]},
30-
{Absinthe.Phase.Document.Execution.Resolution, options},
31-
Absinthe.Phase.Document.Result
32-
]
33-
}
34-
end)
35-
36-
Map.put(result, :continuation, continuations)
37-
else
38-
result
39-
end
40-
41-
{:ok, put_in(blueprint.result, result)}
21+
prime_fun when is_function(prime_fun, 1) ->
22+
do_prime(prime_fun, result, blueprint, options)
4223

4324
val ->
4425
raise """
45-
Invalid prime function. Must be a function of arity 0.
26+
Invalid prime function. Must be a function of arity 1.
4627
4728
#{inspect(val)}
4829
"""
4930
end
5031
end
32+
33+
def do_prime(prime_fun, base_result, blueprint, options) do
34+
{:ok, prime_results} = prime_fun.(blueprint.execution)
35+
36+
result =
37+
if prime_results != [] do
38+
continuations =
39+
Enum.map(prime_results, fn cr ->
40+
%Continuation{
41+
phase_input: blueprint,
42+
pipeline: [
43+
{Phase.Subscription.Prime, [prime_result: cr]},
44+
{Phase.Document.Execution.Resolution, options},
45+
Phase.Subscription.GetOrdinal,
46+
Phase.Document.Result
47+
]
48+
}
49+
end)
50+
51+
Map.put(base_result, :continuation, continuations)
52+
else
53+
base_result
54+
end
55+
56+
{:ok, put_in(blueprint.result, result)}
57+
end
5158
end

lib/absinthe/phase/subscription/subscribe_self.ex

+11-16
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
2222
%{selections: [field]} = op
2323

2424
with {:ok, config} <- get_config(field, context, blueprint) do
25-
{field_keys, prime} = get_field_keys(field, config)
25+
field_keys = get_field_keys(field, config)
2626
subscription_id = get_subscription_id(config, blueprint, options)
2727

2828
for field_key <- field_keys,
2929
do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)
3030

3131
{:replace, blueprint,
3232
[
33-
{Phase.Subscription.Result, topic: subscription_id, prime: prime},
33+
{Phase.Subscription.Result, topic: subscription_id, prime: config[:prime]},
3434
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
3535
]}
3636
else
@@ -45,11 +45,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
4545
end
4646
end
4747

48-
defp get_config(
49-
%{schema_node: schema_node, argument_data: argument_data} = field,
50-
context,
51-
blueprint
52-
) do
48+
def get_config(
49+
%{schema_node: schema_node, argument_data: argument_data} = field,
50+
context,
51+
blueprint
52+
) do
5353
name = schema_node.identifier
5454

5555
config =
@@ -96,9 +96,8 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
9696
defp get_field_keys(%{schema_node: schema_node} = _field, config) do
9797
name = schema_node.identifier
9898

99-
{keys, prime} = find_field_keys!(config)
100-
field_keys = Enum.map(keys, fn key -> {name, key} end)
101-
{field_keys, prime}
99+
find_field_keys!(config)
100+
|> Enum.map(fn key -> {name, key} end)
102101
end
103102

104103
defp ensure_pubsub!(context) do
@@ -133,12 +132,8 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
133132
"""
134133

135134
val ->
136-
topics = List.wrap(val)
137-
|> Enum.map(&to_string/1)
138-
139-
prime = config[:prime] || nil
140-
141-
{topics, prime}
135+
List.wrap(val)
136+
|> Enum.map(&to_string/1)
142137
end
143138
end
144139

lib/absinthe/pipeline.ex

+5-1
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ defmodule Absinthe.Pipeline do
3737
case result do
3838
{:ok, blueprint, phases} when rest == [] ->
3939
{:ok, blueprint, phases}
40+
4041
{:ok, blueprint, phases} ->
4142
bp_result = Map.put(blueprint.result, :continuation, rest)
4243
blueprint = Map.put(blueprint, :result, bp_result)
4344
{:ok, blueprint, phases}
44-
error -> error
45+
46+
error ->
47+
error
4548
end
4649
end
4750

@@ -132,6 +135,7 @@ defmodule Absinthe.Pipeline do
132135
# Execution
133136
{Phase.Subscription.SubscribeSelf, options},
134137
{Phase.Document.Execution.Resolution, options},
138+
Phase.Subscription.GetOrdinal,
135139
# Format Result
136140
Phase.Document.Result,
137141
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}

lib/absinthe/subscription.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ defmodule Absinthe.Subscription do
4747

4848
@type subscription_field_spec :: {atom, term | (term -> term)}
4949

50-
@type prime_fun :: (-> {:ok, [map()]})
50+
@type prime_fun :: (Absinthe.Resolution.t() -> {:ok, [map()]})
5151

5252
@doc """
5353
Publish a mutation

lib/absinthe/subscription/local.ex

+4-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ defmodule Absinthe.Subscription.Local do
4646
|> Pipeline.without(Phase.Subscription.SubscribeSelf)
4747
|> Pipeline.insert_before(
4848
Phase.Document.Execution.Resolution,
49-
{Phase.Document.OverrideRoot, root_value: mutation_result}
49+
[
50+
{Phase.Document.OverrideRoot, root_value: mutation_result},
51+
Phase.Subscription.GetOrdinal
52+
]
5053
)
5154
|> Pipeline.upto(Phase.Document.Execution.Resolution)
5255

0 commit comments

Comments
 (0)