Skip to content

Commit e1c6d54

Browse files
feat(graphql): added WebSocket token refresh and autoReconnect toggling
1 parent 71d7502 commit e1c6d54

File tree

4 files changed

+325
-27
lines changed

4 files changed

+325
-27
lines changed

packages/graphql/README.md

+96-14
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,87 @@ subscription = client.subscribe(
327327
subscription.listen(reactToAddedReview)
328328
```
329329

330+
#### Adding headers (including auth) to WebSocket
331+
332+
In order to add auth header or any other header to websocket connection use `initialPayload` property
333+
334+
```dart
335+
initialPayload: () {
336+
var headers = <String, String>{};
337+
headers.putIfAbsent(HttpHeaders.authorizationHeader, () => token);
338+
339+
return headers;
340+
},
341+
```
342+
343+
#### Refreshing headers (including auth)
344+
345+
In order to refresh auth header you need to setup `onConnectionLost` function
346+
347+
```dart
348+
onConnectionLost: (int? code, String? reason) async {
349+
if (code == 4001) {
350+
await authTokenService.issueToken(refresh: true);
351+
return Duration.zero;
352+
}
353+
354+
return null;
355+
}
356+
```
357+
358+
Where `code` and `reason` are values returned from the server on connection close. There is no such code like 401 in WebSockets so you can use your custom and server code could look similar:
359+
360+
```typescript
361+
subscriptions: {
362+
'graphql-ws': {
363+
onConnect: async (context: any) => {
364+
const { connectionParams } = context;
365+
366+
if (!connectionParams) {
367+
throw new Error('Connection params are missing');
368+
}
369+
370+
const authToken = connectionParams.authorization;
371+
372+
if (authToken) {
373+
const isValid await authService.isTokenValid(authToken);
374+
375+
if (!isValid) {
376+
context.extra.socket.close(4001, 'Unauthorized');
377+
}
378+
379+
return;
380+
}
381+
},
382+
},
383+
},
384+
```
385+
386+
`onConnectionLost` function returns `Duration` which is basically `delayBetweenReconnectionAttempts` for current reconnect attempt. If duration is `null` then default `delayBetweenReconnectionAttempts` will be used. Otherwise returned value. For example upon expired auth token there is not much sense to wait after token is refreshed.
387+
388+
#### Handling connection manually
389+
390+
`toggleConnection` stream was introduced to allow connect or disconnect manually.
391+
392+
```dart
393+
var toggleConnection = PublishSubject<ToggleConnectionState>;
394+
395+
SocketClientConfig(
396+
toggleConnection: toggleConnection,
397+
),
398+
```
399+
400+
later from your code call
401+
402+
```dart
403+
toggleConnection.add(ToggleConnectionState.disconnect);
404+
//OR
405+
toggleConnection.add(ToggleConnectionState.connect);
406+
```
407+
408+
When `disconnect` event is called `autoReconnect` stops. When `connect` is called `autoReconnect` resumes.
409+
this is useful when for some reason you want to stop reconnection. For example when user logouts from the system and reconnection would cause auth error from server causing infinite loop.
410+
330411
#### Customizing WebSocket Connections
331412

332413
`WebSocketLink` now has an experimental `connect` parameter that can be
@@ -427,15 +508,15 @@ class _Connection {
427508
428509
```
429510

430-
2- if you need to update your socket just cancel your subscription and resubscribe again using usual way
511+
2- if you need to update your socket just cancel your subscription and resubscribe again using usual way
431512
and if the token changed it will be reconnect with the new token otherwise it will use the same client
432513

433514

434515

435516
### `client.watchQuery` and `ObservableQuery`
436517

437518
[`client.watchQuery`](https://pub.dev/documentation/graphql/latest/graphql/GraphQLClient/watchQuery.html)
438-
can be used to execute both queries and mutations, then reactively listen to changes to the underlying data in the cache.
519+
can be used to execute both queries and mutations, then reactively listen to changes to the underlying data in the cache.
439520

440521
```dart
441522
final observableQuery = client.watchQuery(
@@ -506,7 +587,7 @@ To disable cache normalization entirely, you could pass `(data) => null`.
506587
If you only cared about `nodeId`, you could pass `(data) => data['nodeId']`.
507588

508589
Here's a more detailed example where the system involved contains versioned entities you don't want to clobber:
509-
```dart
590+
```dart
510591
String customDataIdFromObject(Map<String, Object> data) {
511592
final typeName = data['__typename'];
512593
final entityId = data['entityId'];
@@ -589,17 +670,17 @@ query {
589670

590671
```
591672

592-
if you're not providing the possible type map and introspecting the typename, the cache can't be updated.
673+
if you're not providing the possible type map and introspecting the typename, the cache can't be updated.
593674

594675
## Direct Cache Access API
595676

596677
The [`GraphQLCache`](https://pub.dev/documentation/graphql/latest/graphql/GraphQLCache-class.html)
597678
leverages [`normalize`] to give us a fairly apollo-ish [direct cache access] API, which is also available on `GraphQLClient`.
598679
This means we can do [local state management] in a similar fashion as well.
599680

600-
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
681+
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
601682
> **NB** counter-intuitively, you likely never want to use use direct cache access methods directly on the `cache`,
602-
> as they will not be rebroadcast automatically.
683+
> as they will not be rebroadcast automatically.
603684
> **Prefer `client.writeQuery` and `client.writeFragment` to those on the `client.cache` for automatic rebroadcasting**
604685
605686
In addition to this overview, a complete and well-commented rundown of can be found in the
@@ -641,10 +722,10 @@ final data = client.readQuery(queryRequest);
641722
client.writeQuery(queryRequest, data);
642723
```
643724

644-
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
645-
> **NB** counter-intuitively, you likely never want to use use direct cache access methods on the cache
725+
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
726+
> **NB** counter-intuitively, you likely never want to use use direct cache access methods on the cache
646727
cache.readQuery(queryRequest);
647-
client.readQuery(queryRequest); //
728+
client.readQuery(queryRequest); //
648729

649730
### `FragmentRequest`, `readFragment`, and `writeFragment`
650731
`FragmentRequest` has almost the same api as `Request`, but is provided directly from `graphql` for consistency.
@@ -710,7 +791,7 @@ client.query(QueryOptions(
710791
errorPolicy: ErrorPolicy.ignore,
711792
// ignore cache data.
712793
cacheRereadPolicy: CacheRereadPolicy.ignore,
713-
// ...
794+
// ...
714795
));
715796
```
716797
Defaults can also be overridden via `defaultPolices` on the client itself:
@@ -724,11 +805,11 @@ GraphQLClient(
724805
CacheRereadPolicy.mergeOptimistic,
725806
),
726807
),
727-
// ...
808+
// ...
728809
)
729810
```
730811

731-
**[`FetchPolicy`](https://pub.dev/documentation/graphql/latest/graphql/FetchPolicy-class.html):** determines where the client may return a result from, and whether that result will be saved to the cache.
812+
**[`FetchPolicy`](https://pub.dev/documentation/graphql/latest/graphql/FetchPolicy-class.html):** determines where the client may return a result from, and whether that result will be saved to the cache.
732813
Possible options:
733814

734815
- cacheFirst: return result from cache. Only fetch from network if cached result is not available.
@@ -737,7 +818,7 @@ Possible options:
737818
- noCache: return result from network, fail if network call doesn't succeed, don't save to cache.
738819
- networkOnly: return result from network, fail if network call doesn't succeed, save to cache.
739820

740-
**[`ErrorPolicy`](https://pub.dev/documentation/graphql/latest/graphql/ErrorPolicy-class.html):** determines the level of events for errors in the execution result.
821+
**[`ErrorPolicy`](https://pub.dev/documentation/graphql/latest/graphql/ErrorPolicy-class.html):** determines the level of events for errors in the execution result.
741822
Possible options:
742823

743824
- none (default): Any GraphQL Errors are treated the same as network errors and any data is ignored from the response.
@@ -869,7 +950,7 @@ API key, IAM, and Federated provider authorization could be accomplished through
869950

870951
This package does not support code-generation out of the box, but [graphql_codegen](https://pub.dev/packages/graphql_codegen) does!
871952

872-
This package extensions on the client which takes away the struggle of serialization and gives you confidence through type-safety.
953+
This package extensions on the client which takes away the struggle of serialization and gives you confidence through type-safety.
873954
It is also more performant than parsing GraphQL queries at runtime.
874955

875956
For example, by creating the `.graphql` file
@@ -966,3 +1047,4 @@ HttpLink httpLink = HttpLink('https://api.url/graphql', defaultHeaders: {
9661047
[local state management]: https://www.apollographql.com/docs/tutorial/local-state/#update-local-data
9671048
[`typepolicies`]: https://www.apollographql.com/docs/react/caching/cache-configuration/#the-typepolicy-type
9681049
[direct cache access]: https://www.apollographql.com/docs/react/caching/cache-interaction/
1050+

packages/graphql/lib/src/links/websocket_link/websocket_client.dart

+56-13
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class SubscriptionListener {
3737

3838
enum SocketConnectionState { notConnected, handshake, connecting, connected }
3939

40+
enum ToggleConnectionState { disconnect, connect }
41+
4042
class SocketClientConfig {
4143
const SocketClientConfig({
4244
this.serializer = const RequestSerializer(),
@@ -48,6 +50,8 @@ class SocketClientConfig {
4850
this.initialPayload,
4951
this.headers,
5052
this.connectFn,
53+
this.onConnectionLost,
54+
this.toggleConnection,
5155
});
5256

5357
/// Serializer used to serialize request
@@ -98,6 +102,11 @@ class SocketClientConfig {
98102
/// Custom header to add inside the client
99103
final Map<String, dynamic>? headers;
100104

105+
final Future<Duration?>? Function(int? code, String? reason)?
106+
onConnectionLost;
107+
108+
final Stream<ToggleConnectionState>? toggleConnection;
109+
101110
/// Function to define another connection without call directly
102111
/// the connection function
103112
FutureOr<WebSocketChannel> connect(
@@ -192,6 +201,7 @@ class SocketClient {
192201
@visibleForTesting this.onMessage,
193202
@visibleForTesting this.onStreamError = _defaultOnStreamError,
194203
}) {
204+
_listenToToggleConnection();
195205
_connect();
196206
}
197207

@@ -232,6 +242,30 @@ class SocketClient {
232242
Response Function(Map<String, dynamic>) get parse =>
233243
config.parser.parseResponse;
234244

245+
bool _isReconnectionPaused = false;
246+
final _unsubscriber = PublishSubject<void>();
247+
248+
void _listenToToggleConnection() {
249+
if (config.toggleConnection != null) {
250+
config.toggleConnection!
251+
.where((_) => !_connectionStateController.isClosed)
252+
.takeUntil(_unsubscriber)
253+
.listen((event) {
254+
if (event == ToggleConnectionState.disconnect &&
255+
_connectionStateController.value ==
256+
SocketConnectionState.connected) {
257+
_isReconnectionPaused = true;
258+
onConnectionLost();
259+
} else if (event == ToggleConnectionState.connect &&
260+
_connectionStateController.value ==
261+
SocketConnectionState.notConnected) {
262+
_isReconnectionPaused = false;
263+
_connect();
264+
}
265+
});
266+
}
267+
}
268+
235269
void _disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) {
236270
_keepAliveSubscription = messages.whereType<ConnectionKeepAlive>().timeout(
237271
config.inactivityTimeout!,
@@ -334,6 +368,9 @@ class SocketClient {
334368
}
335369

336370
void onConnectionLost([Object? e]) async {
371+
var code = socketChannel!.closeCode;
372+
var reason = socketChannel!.closeReason;
373+
337374
await _closeSocketChannel();
338375
if (e != null) {
339376
print('There was an error causing connection lost: $e');
@@ -344,27 +381,32 @@ class SocketClient {
344381
_keepAliveSubscription?.cancel();
345382
_messageSubscription?.cancel();
346383

384+
//TODO: do we really need this check here because few lines bellow there is another check
347385
if (_connectionStateController.isClosed || _wasDisposed) {
348386
return;
349387
}
350388

351389
_connectionWasLost = true;
352390
_subscriptionInitializers.values.forEach((s) => s.hasBeenTriggered = false);
353391

354-
if (config.autoReconnect &&
355-
!_connectionStateController.isClosed &&
356-
!_wasDisposed) {
357-
if (config.delayBetweenReconnectionAttempts != null) {
358-
_reconnectTimer = Timer(
359-
config.delayBetweenReconnectionAttempts!,
360-
() {
361-
_connect();
362-
},
363-
);
364-
} else {
365-
Timer.run(() => _connect());
366-
}
392+
if (_isReconnectionPaused ||
393+
!config.autoReconnect ||
394+
_connectionStateController.isClosed ||
395+
_wasDisposed) {
396+
return;
367397
}
398+
399+
var duration = config.delayBetweenReconnectionAttempts ?? Duration.zero;
400+
if (config.onConnectionLost != null) {
401+
duration = (await config.onConnectionLost!(code, reason)) ?? duration;
402+
}
403+
404+
_reconnectTimer = Timer(
405+
duration,
406+
() async {
407+
_connect();
408+
},
409+
);
368410
}
369411

370412
void _enqueuePing() {
@@ -389,6 +431,7 @@ class SocketClient {
389431
_reconnectTimer?.cancel();
390432
_pingTimer?.cancel();
391433
_keepAliveSubscription?.cancel();
434+
_unsubscriber.close();
392435

393436
await Future.wait([
394437
_closeSocketChannel(),

packages/graphql/test/mock_server/ws_echo_server.dart

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'dart:convert';
66
import 'dart:io';
77

88
const String forceDisconnectCommand = '___force_disconnect___';
9+
const String forceAuthDisconnectCommand = '___force_auth_disconnect___';
910

1011
/// Main function to create and run the echo server over the web socket.
1112
Future<String> runWebSocketServer(
@@ -20,6 +21,8 @@ void onWebSocketData(WebSocket client) {
2021
client.listen((data) async {
2122
if (data == forceDisconnectCommand) {
2223
client.close(WebSocketStatus.normalClosure, 'shutting down');
24+
} else if (data == forceAuthDisconnectCommand) {
25+
client.close(4001, 'Unauthorized');
2326
} else {
2427
final message = json.decode(data.toString());
2528
if (message['type'] == 'connection_init' &&

0 commit comments

Comments
 (0)