-
Notifications
You must be signed in to change notification settings - Fork 72
/
Schema.fs
379 lines (347 loc) · 18.5 KB
/
Schema.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
// The MIT License (MIT)
// Copyright (c) 2016 Bazinga Technologies Inc
namespace FSharp.Data.GraphQL
open FSharp.Data.GraphQL.Types
open FSharp.Data.GraphQL.Types.Patterns
open FSharp.Data.GraphQL.Types.Introspection
open FSharp.Data.GraphQL.Introspection
open FSharp.Data.GraphQL.Helpers
open FSharp.Control.Reactive
open System.Collections.Generic
open System.Reactive.Linq
open System.Reactive.Subjects
open System.Text.Json
open System.Text.Json.Serialization
type private Channel = ISubject<obj>
type private ChannelBag() =
let untagged = List<Channel>()
let tagged = Dictionary<Tag, List<Channel>>()
member _.AddNew(tags : Tag seq) : Channel =
let channel = new Subject<obj>()
untagged.Add(channel)
let adder tag =
match tagged.TryGetValue(tag) with
| true, channels ->
channels.Add(channel)
| false, _ ->
let channels = List<Channel>()
channels.Add(channel)
tagged.Add(tag, channels)
tags |> Seq.iter adder
upcast channel
member _.GetAll() =
untagged |> Seq.map id
member _.GetByTag(tag) =
match tagged.TryGetValue(tag) with
| false, _ -> Seq.empty
| true, channels -> channels |> Seq.map id
type private SubscriptionManager() =
let subscriptions = Dictionary<string, Subscription * ChannelBag>()
member _.Add(subscription : Subscription) =
subscriptions.Add(subscription.Name, (subscription, new ChannelBag()))
member _.TryGet(subscriptionName) =
match subscriptions.TryGetValue(subscriptionName) with
| true, sub -> Some sub
| _ -> None
/// A configuration object fot the GraphQL server schema.
type SchemaConfig =
{ /// List of types that couldn't be resolved from schema query root
/// tree traversal, but should be included anyway.
Types : NamedDef list
/// List of custom directives that should be included as known to the schema.
Directives : DirectiveDef list
/// Function called when errors occurred during query execution.
/// It's used to retrieve messages shown as output to the client.
/// May be also used to log messages before returning them.
ParseError : FieldPath -> exn -> IGQLError list
/// Provider for the back-end of the subscription system.
SubscriptionProvider : ISubscriptionProvider
/// Provider for the back-end of the live query subscription system.
LiveFieldSubscriptionProvider : ILiveFieldSubscriptionProvider
/// JSON serialization options
JsonOptions : JsonSerializerOptions
}
/// Returns the default Subscription Provider, backed by Observable streams.
static member DefaultSubscriptionProvider() =
let subscriptionManager = new SubscriptionManager()
{ new ISubscriptionProvider with
member _.AsyncRegister (subscription : Subscription) = async {
return subscriptionManager.Add(subscription) }
member _.Add (ctx: ResolveFieldContext) (root: obj) (subdef: SubscriptionFieldDef) =
let tags = subdef.TagsResolver ctx
match subscriptionManager.TryGet(subdef.Name) with
| Some (sub, channels) ->
channels.AddNew(tags)
// TODO: See notes on flatmapAsync in ObservableExtensionsTests.
|> Observable.flatmapAsync (fun o -> sub.Filter ctx root o)
|> Observable.choose id
| None -> Observable.Empty()
member _.AsyncPublish<'T> (name: string) (value: 'T) = async {
match subscriptionManager.TryGet(name) with
| Some (_, channels) -> channels.GetAll() |> Seq.iter (fun channel -> channel.OnNext(value))
| None -> () }
member _.AsyncPublishTag<'T> (name: string) (tag : Tag) (value: 'T) = async {
match subscriptionManager.TryGet(name) with
| Some (_, channels) -> channels.GetByTag(tag) |> Seq.iter (fun channel -> channel.OnNext(value))
| None -> () } }
/// Returns the default live field Subscription Provider, backed by Observable streams.
static member DefaultLiveFieldSubscriptionProvider() =
let registeredSubscriptions = new Dictionary<string * string, ILiveFieldSubscription * Subject<obj>>()
{ new ILiveFieldSubscriptionProvider with
member _.HasSubscribers (typeName : string) (fieldName : string) =
let key = typeName, fieldName
match registeredSubscriptions.TryGetValue(key) with
| true, (_, channel) -> channel.HasObservers
| _ -> false
member _.IsRegistered (typeName : string) (fieldName : string) =
let key = typeName, fieldName
registeredSubscriptions.ContainsKey(key)
member _.AsyncRegister (subscription : ILiveFieldSubscription) = async {
let key = subscription.TypeName, subscription.FieldName
let value = subscription, new Subject<obj>()
registeredSubscriptions.Add(key, value) }
member _.TryFind (typeName : string) (fieldName : string) =
let key = typeName, fieldName
match registeredSubscriptions.TryGetValue(key) with
| (true, (sub, _)) -> Some sub
| _ -> None
member _.Add filterFn (typeName : string) (fieldName : string) =
let key = typeName, fieldName
match registeredSubscriptions.TryGetValue(key) with
| true, (sub, channel) -> channel |> Observable.choose (fun x -> if filterFn x then Some (sub.Project x) else None)
| false, _ -> Observable.Empty()
member _.AsyncPublish<'T> (typeName : string) (fieldName : string) (value : 'T) = async {
let key = typeName, fieldName
match registeredSubscriptions.TryGetValue(key) with
| true, (_, channel) -> channel.OnNext(box value)
| false, _ -> () } }
/// Default SchemaConfig used by Schema when no config is provided.
static member Default =
{ Types = []
Directives = [ IncludeDirective; SkipDirective; DeferDirective; StreamDirective; LiveDirective ]
ParseError =
fun path ex ->
match ex with
| :? GQLMessageException as ex -> [ex]
| ex -> [{ new IGQLError with member _.Message = ex.Message }]
SubscriptionProvider = SchemaConfig.DefaultSubscriptionProvider()
LiveFieldSubscriptionProvider = SchemaConfig.DefaultLiveFieldSubscriptionProvider()
JsonOptions = JsonFSharpOptions.Default().ToJsonSerializerOptions() }
/// <summary>
/// Default SchemaConfig with buffered stream support.
/// This config modifies the stream directive to have two optional arguments: 'interval' and 'preferredBatchSize'.
/// This argument will allow the user to buffer streamed results in a query by timing and/or batch size preferences.
/// </summary>
/// <param name="streamOptions">
/// The way the buffered stream will behavior by default - standard values for the 'interval' and 'preferredBatchSize' arguments.
/// </param>
static member DefaultWithBufferedStream(streamOptions : BufferedStreamOptions) =
let streamDirective =
let args = [|
Define.Input(
"interval",
Nullable IntType,
defaultValue = streamOptions.Interval,
description = "An optional argument used to buffer stream results. " +
"When it's value is greater than zero, stream results will be buffered for milliseconds equal to the value, then sent to the client. " +
"After that, starts buffering again until all results are streamed.")
Define.Input(
"preferredBatchSize",
Nullable IntType,
defaultValue = streamOptions.PreferredBatchSize,
description = "An optional argument used to buffer stream results. " +
"When it's value is greater than zero, stream results will be buffered until item count reaches this value, then sent to the client. " +
"After that, starts buffering again until all results are streamed.") |]
{ StreamDirective with Args = args }
{ SchemaConfig.Default with
Directives = [ IncludeDirective; SkipDirective; DeferDirective; streamDirective; LiveDirective ] }
/// GraphQL server schema. Defines the complete type system to be used by GraphQL queries.
type Schema<'Root> (query: ObjectDef<'Root>, ?mutation: ObjectDef<'Root>, ?subscription: SubscriptionObjectDef<'Root>, ?config: SchemaConfig) =
let schemaConfig =
match config with
| None -> SchemaConfig.Default
| Some c -> c
let typeMap : TypeMap =
let initialTypes: NamedDef list =
[ IntType
StringType
BooleanType
FloatType
IDType
DateTimeOffsetType
DateOnlyType
UriType
__Schema
query ]
let m = mutation |> function Some (Named n) -> [n] | _ -> []
let s = subscription |> function Some (Named n) -> [n] | _ -> []
seq { initialTypes; s; m; schemaConfig.Types } |> Seq.collect id |> TypeMap.FromSeq
let getImplementations (typeMap : TypeMap) =
typeMap.ToSeq()
|> Seq.choose (fun (_, v) ->
match v with
| Object odef -> Some odef
| _ -> None)
|> Seq.fold (fun acc objdef ->
objdef.Implements
|> Array.fold (fun acc' iface ->
match Map.tryFind iface.Name acc' with
| Some list -> Map.add iface.Name (objdef::list) acc'
| None -> Map.add iface.Name [objdef] acc') acc) Map.empty
let implementations = lazy (getImplementations typeMap)
let getPossibleTypes abstractDef =
match abstractDef with
| Union u -> u.Options
| Interface i -> Map.find i.Name (implementations.Force()) |> Array.ofList
| _ -> [||]
let rec introspectTypeRef isNullable (namedTypes: Map<string, IntrospectionTypeRef>) typedef =
match typedef with
| Nullable inner -> introspectTypeRef true namedTypes inner
| List inner ->
if isNullable
then IntrospectionTypeRef.List(introspectTypeRef false namedTypes inner)
else IntrospectionTypeRef.NonNull(introspectTypeRef true namedTypes typedef)
| Named named ->
if isNullable
then Map.find named.Name namedTypes
else IntrospectionTypeRef.NonNull(introspectTypeRef true namedTypes typedef)
| _ -> failwithf "Unexpected value of typedef: %O" typedef
let introspectInput (namedTypes: Map<string, IntrospectionTypeRef>) (inputDef: InputFieldDef) : IntrospectionInputVal =
// We need this so a default value that is an option is not printed as "Some"
let unwrap =
function
| ObjectOption x -> x
| x -> x
let defaultValue =
inputDef.DefaultValue
|> Option.map (fun value -> JsonSerializer.Serialize(unwrap value, schemaConfig.JsonOptions))
{ Name = inputDef.Name
Description = inputDef.Description
Type = introspectTypeRef (Option.isSome inputDef.DefaultValue) namedTypes inputDef.TypeDef
DefaultValue = defaultValue }
let introspectField (namedTypes: Map<string, IntrospectionTypeRef>) (fdef: FieldDef) =
{ Name = fdef.Name
Description = fdef.Description
Args = fdef.Args |> Array.map (introspectInput namedTypes)
Type = introspectTypeRef false namedTypes fdef.TypeDef
IsDeprecated = Option.isSome fdef.DeprecationReason
DeprecationReason = fdef.DeprecationReason }
let instrospectSubscriptionField (namedTypes: Map<string, IntrospectionTypeRef>) (subdef: SubscriptionFieldDef) =
{ Name = subdef.Name
Description = subdef.Description
Args = subdef.Args |> Array.map (introspectInput namedTypes)
Type = introspectTypeRef false namedTypes subdef.OutputTypeDef
IsDeprecated = Option.isSome subdef.DeprecationReason
DeprecationReason = subdef.DeprecationReason }
let introspectEnumVal (enumVal: EnumVal) : IntrospectionEnumVal =
{ Name = enumVal.Name
Description = enumVal.Description
IsDeprecated = Option.isSome enumVal.DeprecationReason
DeprecationReason = enumVal.DeprecationReason }
let locationToList location =
System.Enum.GetValues(typeof<DirectiveLocation>)
|> Seq.cast<DirectiveLocation>
|> Seq.filter (fun v -> int(location) &&& int(v) <> 0)
|> Seq.toArray
let introspectDirective (namedTypes: Map<string, IntrospectionTypeRef>) (directive: DirectiveDef) : IntrospectionDirective =
{ Name = directive.Name
Description = directive.Description
Locations = locationToList directive.Locations
Args = directive.Args |> Array.map (introspectInput namedTypes) }
let introspectType (namedTypes: Map<string, IntrospectionTypeRef>) typedef =
match typedef with
| Scalar scalardef ->
IntrospectionType.Scalar(scalardef.Name, scalardef.Description)
| SubscriptionObject subdef ->
let fields =
subdef.Fields
|> Map.toArray
|> Array.map (snd >> instrospectSubscriptionField namedTypes)
let interfaces =
subdef.Implements
|> Array.map (fun idef -> Map.find idef.Name namedTypes)
IntrospectionType.Object(subdef.Name, subdef.Description, fields, interfaces)
| Object objdef ->
let fields =
objdef.Fields
|> Map.toArray
|> Array.map (snd >> introspectField namedTypes)
let interfaces =
objdef.Implements
|> Array.map (fun idef -> Map.find idef.Name namedTypes)
IntrospectionType.Object(objdef.Name, objdef.Description, fields, interfaces)
| InputObject inObjDef ->
let inputs =
inObjDef.Fields
|> Array.map (introspectInput namedTypes)
IntrospectionType.InputObject(inObjDef.Name, inObjDef.Description, inputs)
| Union uniondef ->
let possibleTypes =
getPossibleTypes uniondef
|> Array.map (fun tdef -> Map.find tdef.Name namedTypes)
IntrospectionType.Union(uniondef.Name, uniondef.Description, possibleTypes)
| Enum enumdef ->
let enumVals =
enumdef.Options
|> Array.map introspectEnumVal
IntrospectionType.Enum(enumdef.Name, enumdef.Description, enumVals)
| Interface idef ->
let fields =
idef.Fields
|> Array.map (introspectField namedTypes)
let possibleTypes =
getPossibleTypes idef
|> Array.map (fun tdef -> Map.find tdef.Name namedTypes)
IntrospectionType.Interface(idef.Name, idef.Description, fields, possibleTypes)
| _ -> failwithf "Unexpected value of typedef: %O" typedef
let introspectSchema (types : TypeMap) : IntrospectionSchema =
let inamed =
types.ToSeq()
|> Seq.map (fun (typeName, typedef) ->
match typedef with
| Scalar x -> typeName, { Kind = TypeKind.SCALAR; Name = Some typeName; Description = x.Description; OfType = None }
| Object x -> typeName, { Kind = TypeKind.OBJECT; Name = Some typeName; Description = x.Description; OfType = None }
| InputObject x -> typeName, { Kind = TypeKind.INPUT_OBJECT; Name = Some typeName; Description = x.Description; OfType = None }
| Union x -> typeName, { Kind = TypeKind.UNION; Name = Some typeName; Description = x.Description; OfType = None }
| Enum x -> typeName, { Kind = TypeKind.ENUM; Name = Some typeName; Description = x.Description; OfType = None }
| Interface x -> typeName, { Kind = TypeKind.INTERFACE; Name = Some typeName; Description = x.Description; OfType = None }
| _ -> failwithf "Unexpected value of typedef: %O" typedef)
|> Map.ofSeq
let itypes =
types.ToSeq()
|> Seq.toArray
|> Array.map (snd >> (introspectType inamed))
let idirectives =
schemaConfig.Directives
|> List.map (introspectDirective inamed)
|> List.toArray
{ QueryType = Map.find query.Name inamed
MutationType = mutation |> Option.map (fun m -> Map.find m.Name inamed)
SubscriptionType = subscription |> Option.map(fun s -> Map.find s.Name inamed)
Types = itypes
Directives = idirectives }
let introspected = lazy (introspectSchema typeMap)
interface ISchema with
member _.TypeMap = typeMap
member _.Directives = schemaConfig.Directives |> List.toArray
member _.Introspected = introspected.Force()
member _.Query = upcast query
member _.Mutation = mutation |> Option.map (fun x -> upcast x)
member _.Subscription = subscription |> Option.map (fun x -> upcast x)
member _.TryFindType typeName = typeMap.TryFind(typeName, includeDefaultTypes = true)
member _.GetPossibleTypes typedef = getPossibleTypes typedef
member _.ParseError path exn = schemaConfig.ParseError path exn
member x.IsPossibleType abstractdef (possibledef: ObjectDef) =
match (x :> ISchema).GetPossibleTypes abstractdef with
| [||] -> false
| possibleTypes -> possibleTypes |> Array.exists (fun t -> t.Name = possibledef.Name)
member _.SubscriptionProvider = schemaConfig.SubscriptionProvider
member _.LiveFieldSubscriptionProvider = schemaConfig.LiveFieldSubscriptionProvider
interface ISchema<'Root> with
member _.Query = query
member _.Mutation = mutation
member _.Subscription = subscription
interface System.Collections.Generic.IEnumerable<NamedDef> with
member _.GetEnumerator() = (typeMap.ToSeq() |> Seq.map snd).GetEnumerator()
interface System.Collections.IEnumerable with
member _.GetEnumerator() = (typeMap.ToSeq() |> Seq.map snd :> System.Collections.IEnumerable).GetEnumerator()