-
-
Notifications
You must be signed in to change notification settings - Fork 628
/
Copy pathobservable_query.dart
320 lines (266 loc) · 9.07 KB
/
observable_query.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import 'dart:async';
import 'package:graphql/src/exceptions/exceptions.dart';
import 'package:meta/meta.dart';
import 'package:graphql/src/core/query_manager.dart';
import 'package:graphql/src/core/query_options.dart';
import 'package:graphql/src/core/query_result.dart';
import 'package:graphql/src/scheduler/scheduler.dart';
typedef OnData = void Function(QueryResult result);
enum QueryLifecycle {
UNEXECUTED,
PENDING,
POLLING,
POLLING_STOPPED,
SIDE_EFFECTS_PENDING,
SIDE_EFFECTS_BLOCKING,
COMPLETED,
CLOSED
}
class ObservableQuery {
ObservableQuery({
@required this.queryManager,
@required this.options,
}) : queryId = queryManager.generateQueryId().toString() {
if (options.eagerlyFetchResults) {
_latestWasEagerlyFetched = true;
fetchResults();
}
controller = StreamController<QueryResult>.broadcast(
onListen: onListen,
);
}
// set to true when eagerly fetched to prevent back-to-back queries
bool _latestWasEagerlyFetched = false;
final String queryId;
final QueryManager queryManager;
QueryScheduler get scheduler => queryManager.scheduler;
final Set<StreamSubscription<QueryResult>> _onDataSubscriptions =
<StreamSubscription<QueryResult>>{};
/// The most recently seen result from this operation's stream
QueryResult latestResult;
QueryLifecycle lifecycle = QueryLifecycle.UNEXECUTED;
WatchQueryOptions options;
StreamController<QueryResult> controller;
Stream<QueryResult> get stream => controller.stream;
bool get isCurrentlyPolling => lifecycle == QueryLifecycle.POLLING;
bool get _isRefetchSafe {
switch (lifecycle) {
case QueryLifecycle.COMPLETED:
case QueryLifecycle.POLLING:
case QueryLifecycle.POLLING_STOPPED:
return true;
case QueryLifecycle.PENDING:
case QueryLifecycle.CLOSED:
case QueryLifecycle.UNEXECUTED:
case QueryLifecycle.SIDE_EFFECTS_PENDING:
case QueryLifecycle.SIDE_EFFECTS_BLOCKING:
return false;
}
return false;
}
/// Attempts to refetch, throwing error if not refetch safe
Future<QueryResult> refetch() {
if (_isRefetchSafe) {
return queryManager.refetchQuery(queryId);
}
return Future<QueryResult>.error(Exception('Query is not refetch safe'));
}
bool get isRebroadcastSafe {
switch (lifecycle) {
case QueryLifecycle.PENDING:
case QueryLifecycle.COMPLETED:
case QueryLifecycle.POLLING:
case QueryLifecycle.POLLING_STOPPED:
return true;
case QueryLifecycle.UNEXECUTED: // this might be ok
case QueryLifecycle.CLOSED:
case QueryLifecycle.SIDE_EFFECTS_PENDING:
case QueryLifecycle.SIDE_EFFECTS_BLOCKING:
return false;
}
return false;
}
void onListen() {
if (_latestWasEagerlyFetched) {
_latestWasEagerlyFetched = false;
// eager results are resolved synchronously,
// so we have to add them manually now that
// the stream is available
if (!controller.isClosed && latestResult != null) {
controller.add(latestResult);
}
return;
}
if (options.fetchResults) {
fetchResults();
}
}
MultiSourceResult fetchResults() {
final MultiSourceResult allResults =
queryManager.fetchQueryAsMultiSourceResult(queryId, options);
latestResult ??= allResults.eagerResult;
// if onData callbacks have been registered,
// they are waited on by default
lifecycle = _onDataSubscriptions.isNotEmpty
? QueryLifecycle.SIDE_EFFECTS_PENDING
: QueryLifecycle.PENDING;
if (options.pollInterval != null && options.pollInterval > 0) {
startPolling(options.pollInterval);
}
return allResults;
}
/// fetch more results and then merge them according to the updateQuery method.
/// the results will then be added to to stream for the widget to re-build
void fetchMore(FetchMoreOptions fetchMoreOptions) async {
// fetch more and udpate
assert(fetchMoreOptions.updateQuery != null);
final combinedOptions = QueryOptions(
fetchPolicy: FetchPolicy.noCache,
errorPolicy: options.errorPolicy,
documentNode: fetchMoreOptions.documentNode ?? options.documentNode,
context: options.context,
variables: {
...options.variables,
...fetchMoreOptions.variables,
},
);
// stream old results with a loading indicator
addResult(QueryResult(
data: latestResult.data,
loading: true,
));
QueryResult fetchMoreResult = await queryManager.query(combinedOptions);
try {
// combine the query with the new query, using the function provided by the user
fetchMoreResult.data = fetchMoreOptions.updateQuery(
latestResult.data,
fetchMoreResult.data,
);
assert(fetchMoreResult.data != null, 'updateQuery result cannot be null');
// stream the new results and rebuild
queryManager.addQueryResult(
queryId,
fetchMoreResult,
writeToCache: true,
);
} catch (error) {
if (fetchMoreResult.hasException) {
// because the updateQuery failure might have been because of these errors,
// we just add them to the old errors
latestResult.exception = coalesceErrors(
exception: latestResult.exception,
graphqlErrors: fetchMoreResult.exception.graphqlErrors,
clientException: fetchMoreResult.exception.clientException,
);
queryManager.addQueryResult(
queryId,
latestResult,
writeToCache: true,
);
return;
} else {
// TODO merge results OperationException
rethrow;
}
}
}
/// add a result to the stream,
/// copying `loading` and `optimistic`
/// from the `latestResult` if they aren't set.
void addResult(QueryResult result) {
// don't overwrite results due to some async/optimism issue
if (latestResult != null &&
latestResult.timestamp.isAfter(result.timestamp)) {
return;
}
if (latestResult != null) {
result.source ??= latestResult.source;
}
if (lifecycle == QueryLifecycle.PENDING && result.optimistic != true) {
lifecycle = QueryLifecycle.COMPLETED;
}
latestResult = result;
if (!controller.isClosed) {
controller.add(result);
}
}
// most mutation behavior happens here
/// call any registered callbacks, then rebroadcast queries
/// incase the underlying data has changed
void onData(Iterable<OnData> callbacks) {
callbacks ??= const <OnData>[];
StreamSubscription<QueryResult> subscription;
subscription = stream.listen((QueryResult result) async {
if (!result.loading) {
for (final callback in callbacks) {
await callback(result);
}
queryManager.rebroadcastQueries();
if (!result.optimistic) {
await subscription.cancel();
_onDataSubscriptions.remove(subscription);
if (_onDataSubscriptions.isEmpty) {
if (lifecycle == QueryLifecycle.SIDE_EFFECTS_BLOCKING) {
lifecycle = QueryLifecycle.COMPLETED;
close();
}
}
}
}
});
_onDataSubscriptions.add(subscription);
}
void startPolling(int pollInterval) {
if (options.fetchPolicy == FetchPolicy.cacheFirst ||
options.fetchPolicy == FetchPolicy.cacheOnly) {
throw Exception(
'Queries that specify the cacheFirst and cacheOnly fetch policies cannot also be polling queries.',
);
}
if (isCurrentlyPolling) {
scheduler.stopPollingQuery(queryId);
}
options.pollInterval = pollInterval;
lifecycle = QueryLifecycle.POLLING;
scheduler.startPollingQuery(options, queryId);
}
void stopPolling() {
if (isCurrentlyPolling) {
scheduler.stopPollingQuery(queryId);
options.pollInterval = null;
lifecycle = QueryLifecycle.POLLING_STOPPED;
}
}
set variables(Map<String, dynamic> variables) {
options.variables = variables;
}
/// Closes the query or mutation, or else queues it for closing.
///
/// To preserve Mutation side effects, `close` checks the `lifecycle`,
/// queuing the stream for closing if `lifecycle == QueryLifecycle.SIDE_EFFECTS_PENDING`.
/// You can override this check with `force: true`.
///
/// Returns a `FutureOr` of the resultant lifecycle
/// (`QueryLifecycle.SIDE_EFFECTS_BLOCKING | QueryLifecycle.CLOSED`)
FutureOr<QueryLifecycle> close({
bool force = false,
bool fromManager = false,
}) async {
if (lifecycle == QueryLifecycle.SIDE_EFFECTS_PENDING && !force) {
lifecycle = QueryLifecycle.SIDE_EFFECTS_BLOCKING;
// stop closing because we're waiting on something
return lifecycle;
}
// `fromManager` is used by the query manager when it wants to close a query to avoid infinite loops
if (!fromManager) {
queryManager.closeQuery(this, fromQuery: true);
}
for (StreamSubscription<QueryResult> subscription in _onDataSubscriptions) {
await subscription.cancel();
}
stopPolling();
await controller.close();
lifecycle = QueryLifecycle.CLOSED;
return QueryLifecycle.CLOSED;
}
}