Skip to content

Commit d2b8889

Browse files
authored
Feature/remove singletons (#7)
* Update logger * Remove singletons
1 parent 44d1f0a commit d2b8889

File tree

8 files changed

+104
-88
lines changed

8 files changed

+104
-88
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 1.0.0-pre.5
1+
## 1.0.0-pre.6
22

33
- **BREAKING CHANGE**: Change options to separate, platform-specific object
44
- You can now pass headers and other options to the IO web socket client

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ server-up:
2222
server-down:
2323
@docker compose -f server/docker-compose.yml down --remove-orphans
2424

25+
# dart run coverage:test_with_coverage -fb -o coverage -- --concurrency=6 --platform chrome,vm --coverage=./coverage --reporter=expanded test/ws_test.dart
2526
coverage: get
26-
@dart test --concurrency=6 --platform vm --coverage=coverage test/
27+
@dart test --concurrency=6 --platform chrome,vm --coverage=coverage test/
2728
@dart run coverage:format_coverage --lcov --in=coverage --out=coverage/lcov.info --packages=.packages --report-on=lib
2829
# @mv coverage/lcov.info coverage/lcov.base.info
2930
# @lcov -r coverage/lcov.base.info -o coverage/lcov.base.info "lib/**.freezed.dart" "lib/**.g.dart"

lib/src/client/ws_client.dart

+27-10
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ final class WebSocketClient implements IWebSocketClient {
2727
WebSocketClient([WebSocketOptions? options])
2828
: _client = $platformWebSocketClient(options),
2929
_options = options ?? WebSocketOptions.common() {
30-
WebSocketMetricsManager.instance.startObserving(this);
30+
_init();
3131
}
3232

3333
/// Creates a [WebSocketClient] from an existing [IWebSocketClient].
@@ -38,7 +38,7 @@ final class WebSocketClient implements IWebSocketClient {
3838
[WebSocketOptions? options])
3939
: _client = client,
4040
_options = options ?? WebSocketOptions.common() {
41-
WebSocketMetricsManager.instance.startObserving(this);
41+
_init();
4242
}
4343

4444
/// {@macro ws_client}
@@ -47,6 +47,10 @@ final class WebSocketClient implements IWebSocketClient {
4747

4848
final IWebSocketClient _client;
4949
final WebSocketEventQueue _eventQueue = WebSocketEventQueue();
50+
late final WebSocketMetricsManager _metricsManager =
51+
WebSocketMetricsManager(this);
52+
late final WebSocketConnectionManager _connectionManager =
53+
WebSocketConnectionManager(this);
5054

5155
/// Current options.
5256
/// {@nodoc}
@@ -57,8 +61,18 @@ final class WebSocketClient implements IWebSocketClient {
5761
bool _isClosed = false;
5862

5963
/// Get the metrics for this client.
60-
WebSocketMetrics get metrics =>
61-
WebSocketMetricsManager.instance.buildMetricFor(this);
64+
WebSocketMetrics get metrics {
65+
final (
66+
:bool active,
67+
:int attempt,
68+
:DateTime? nextReconnectionAttempt,
69+
) = _connectionManager.status;
70+
return _metricsManager.buildMetric(
71+
active: active,
72+
attempt: attempt,
73+
nextReconnectionAttempt: nextReconnectionAttempt,
74+
);
75+
}
6276

6377
@override
6478
WebSocketMessagesStream get stream => _client.stream;
@@ -69,6 +83,10 @@ final class WebSocketClient implements IWebSocketClient {
6983
@override
7084
WebSocketClientState get state => _client.state;
7185

86+
void _init() {
87+
_metricsManager.startObserving();
88+
}
89+
7290
@override
7391
Future<void> add(Object data) async {
7492
if (_isClosed) return Future<void>.error(const WSClientClosedException());
@@ -84,15 +102,14 @@ final class WebSocketClient implements IWebSocketClient {
84102
);
85103
}
86104
});
87-
WebSocketMetricsManager.instance.sent(this, data);
105+
_metricsManager.sent(this, data);
88106
}
89107

90108
@override
91109
Future<void> connect(String url) {
92110
if (_isClosed) return Future<void>.error(const WSClientClosedException());
93111
return _eventQueue.push('connect', () async {
94-
WebSocketConnectionManager.instance.startMonitoringConnection(
95-
this,
112+
_connectionManager.startMonitoringConnection(
96113
url,
97114
_options.connectionRetryInterval,
98115
);
@@ -115,7 +132,7 @@ final class WebSocketClient implements IWebSocketClient {
115132
[int? code = 1000, String? reason = 'NORMAL_CLOSURE']) {
116133
if (_isClosed) return Future<void>.error(const WSClientClosedException());
117134
return _eventQueue.push('disconnect', () async {
118-
WebSocketConnectionManager.instance.stopMonitoringConnection(this);
135+
_connectionManager.stopMonitoringConnection();
119136
try {
120137
await Future<void>.sync(() => _client.disconnect(code, reason))
121138
.timeout(_options.timeout);
@@ -136,7 +153,7 @@ final class WebSocketClient implements IWebSocketClient {
136153
try {
137154
_isClosed = true;
138155
// Stop monitoring the connection.
139-
WebSocketConnectionManager.instance.stopMonitoringConnection(this);
156+
_connectionManager.stopMonitoringConnection();
140157
// Clear the event queue and prevent new events from being processed.
141158
// Returns when the queue is empty and no new events are being processed.
142159
Future<void>.sync(_eventQueue.close).ignore();
@@ -154,7 +171,7 @@ final class WebSocketClient implements IWebSocketClient {
154171
// Wait for the next microtask to ensure that the metrics are updated
155172
// from state stream, before stopping observing.
156173
scheduleMicrotask(() {
157-
WebSocketMetricsManager.instance.stopObserving(this);
174+
_metricsManager.stopObserving();
158175
});
159176
}
160177
}

lib/src/manager/connection_manager.dart

+38-37
Original file line numberDiff line numberDiff line change
@@ -10,59 +10,60 @@ import 'package:ws/src/util/logger.dart';
1010
@internal
1111
final class WebSocketConnectionManager {
1212
/// {@nodoc}
13-
static final WebSocketConnectionManager instance =
14-
WebSocketConnectionManager._internal();
13+
WebSocketConnectionManager(IWebSocketClient client)
14+
: _client = WeakReference<IWebSocketClient>(client);
1515

1616
/// {@nodoc}
17-
WebSocketConnectionManager._internal();
17+
final WeakReference<IWebSocketClient> _client;
1818

1919
/// {@nodoc}
20-
final Expando<StreamSubscription<void>> _watchers =
21-
Expando<StreamSubscription<void>>();
20+
StreamSubscription<void>? _watcher;
2221

2322
/// {@nodoc}
24-
final Expando<Timer> _timers = Expando<Timer>();
23+
Timer? _timer;
2524

2625
/// {@nodoc}
27-
final Expando<int> _attempts = Expando<int>();
26+
int? _attempt;
2827

2928
/// {@nodoc}
30-
final Expando<DateTime> _nextReconnectionAttempts = Expando<DateTime>();
29+
DateTime? _nextReconnectionAttempt;
3130

3231
/// Recive the current status of reconnection for the client.
3332
/// {@nodoc}
3433
({
3534
int attempt,
3635
bool active,
3736
DateTime? nextReconnectionAttempt,
38-
}) getStatusFor(IWebSocketClient client) => (
39-
attempt: _attempts[client] ?? 0,
40-
active: _timers[client]?.isActive == true,
41-
nextReconnectionAttempt: _nextReconnectionAttempts[client],
37+
}) get status => (
38+
attempt: _attempt ?? 0,
39+
active: _timer?.isActive == true,
40+
nextReconnectionAttempt: _nextReconnectionAttempt,
4241
);
4342

4443
/// {@nodoc}
4544
void startMonitoringConnection(
46-
IWebSocketClient client,
4745
String url,
4846
({Duration max, Duration min})? connectionRetryInterval,
4947
) {
50-
stopMonitoringConnection(client);
51-
if (client.isClosed || connectionRetryInterval == null) return;
48+
stopMonitoringConnection();
49+
final client = _client.target;
50+
if (client == null || client.isClosed || connectionRetryInterval == null) {
51+
return;
52+
}
5253
final stateChangesHandler = _handleStateChange(
5354
client,
5455
url,
5556
connectionRetryInterval.min.inMilliseconds,
5657
connectionRetryInterval.max.inMilliseconds,
5758
);
58-
_watchers[client] =
59+
_watcher =
5960
client.stateChanges.listen(stateChangesHandler, cancelOnError: false);
6061
}
6162

6263
/// {@nodoc}
63-
void stopMonitoringConnection(IWebSocketClient client) {
64-
_stopSubscription(client);
65-
_stopTimer(client);
64+
void stopMonitoringConnection() {
65+
_stopSubscription();
66+
_stopTimer();
6667
}
6768

6869
/// {@nodoc}
@@ -75,51 +76,51 @@ final class WebSocketConnectionManager {
7576
(state) {
7677
switch (state) {
7778
case WebSocketClientState$Open _:
78-
_stopTimer(client);
79-
_attempts[client] = null; // reset attempt
80-
_nextReconnectionAttempts[client] = null; // reset expected time
79+
_stopTimer();
80+
_attempt = null; // reset attempt
81+
_nextReconnectionAttempt = null; // reset expected time
8182
case WebSocketClientState$Closed _:
82-
_stopTimer(client);
83+
_stopTimer();
8384
if (client.isClosed) return;
84-
final attempt = _attempts[client] ?? 0;
85+
final attempt = _attempt ?? 0;
8586
final delay = backoffDelay(attempt, minMs, maxMs);
8687
if (delay <= Duration.zero) {
8788
config('Reconnecting to $lastUrl immediately.');
8889
Future<void>.sync(() => client.connect(lastUrl)).ignore();
89-
_attempts[client] = attempt + 1;
90+
_attempt = attempt + 1;
9091
return;
9192
}
9293
config('Reconnecting to $lastUrl '
9394
'after ${delay.inMilliseconds} ms.');
94-
_nextReconnectionAttempts[client] = DateTime.now().add(delay);
95-
_timers[client] = Timer(
95+
_nextReconnectionAttempt = DateTime.now().add(delay);
96+
_timer = Timer(
9697
delay,
9798
() {
98-
_nextReconnectionAttempts[client] = null;
99+
_nextReconnectionAttempt = null;
99100
if (client.isClosed) {
100-
_stopTimer(client);
101+
_stopTimer();
101102
} else if (client.state.readyState.isClosed) {
102103
config('Auto reconnecting to $lastUrl '
103104
'after ${delay.inMilliseconds} ms.');
104105
Future<void>.sync(() => client.connect(lastUrl)).ignore();
105106
}
106107
},
107108
);
108-
_attempts[client] = attempt + 1;
109+
_attempt = attempt + 1;
109110
case WebSocketClientState$Connecting _:
110111
case WebSocketClientState$Disconnecting _:
111112
}
112113
};
113114

114-
void _stopSubscription(IWebSocketClient client) {
115-
_watchers[client]?.cancel().ignore();
116-
_watchers[client] = null;
115+
void _stopSubscription() {
116+
_watcher?.cancel().ignore();
117+
_watcher = null;
117118
}
118119

119-
void _stopTimer(IWebSocketClient client) {
120-
_nextReconnectionAttempts[client] = null;
121-
_timers[client]?.cancel();
122-
_timers[client] = null;
120+
void _stopTimer() {
121+
_nextReconnectionAttempt = null;
122+
_timer?.cancel();
123+
_timer = null;
123124
}
124125

125126
/// Full jitter technique.

lib/src/manager/metrics_manager.dart

+28-33
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,46 @@ import 'package:ws/src/client/metrics.dart';
55
import 'package:ws/src/client/state.dart';
66
import 'package:ws/src/client/web_socket_ready_state.dart';
77
import 'package:ws/src/client/ws_client_interface.dart';
8-
import 'package:ws/src/manager/connection_manager.dart';
98

109
/// {@nodoc}
1110
@internal
1211
final class WebSocketMetricsManager {
1312
/// {@nodoc}
14-
static final WebSocketMetricsManager instance =
15-
WebSocketMetricsManager._internal();
13+
WebSocketMetricsManager(IWebSocketClient client)
14+
: _client = WeakReference<IWebSocketClient>(client);
1615

1716
/// {@nodoc}
18-
WebSocketMetricsManager._internal();
17+
final WeakReference<IWebSocketClient> _client;
1918

2019
/// {@nodoc}
21-
final Expando<StreamSubscription<Object>> _receiveObservers =
22-
Expando<StreamSubscription<Object>>();
20+
StreamSubscription<Object>? _receiveObserver;
2321

2422
/// {@nodoc}
25-
final Expando<StreamSubscription<WebSocketClientState>> _stateObservers =
26-
Expando<StreamSubscription<WebSocketClientState>>();
23+
StreamSubscription<WebSocketClientState>? _stateObserver;
2724

2825
/// {@nodoc}
29-
final Expando<$WebSocketMetrics> _metrics = Expando<$WebSocketMetrics>();
26+
final $WebSocketMetrics _metrics = $WebSocketMetrics();
3027

3128
/// {@nodoc}
32-
void startObserving(IWebSocketClient client) {
33-
stopObserving(client);
34-
final metrics = _getMetrics(client);
35-
_receiveObservers[client] = client.stream.listen(
29+
void startObserving() {
30+
stopObserving();
31+
final metrics = _metrics;
32+
_receiveObserver = _client.target?.stream.listen(
3633
(data) => _onDataReceived(metrics, data),
3734
cancelOnError: false,
3835
);
39-
_stateObservers[client] = client.stateChanges.listen(
36+
_stateObserver = _client.target?.stateChanges.listen(
4037
(state) => _onStateChanged(metrics, state),
4138
cancelOnError: false,
4239
);
4340
}
4441

4542
/// {@nodoc}
46-
void stopObserving(IWebSocketClient client) {
47-
_receiveObservers[client]?.cancel().ignore();
48-
_stateObservers[client]?.cancel().ignore();
49-
_receiveObservers[client] = null;
50-
_stateObservers[client] = null;
43+
void stopObserving() {
44+
_receiveObserver?.cancel().ignore();
45+
_stateObserver?.cancel().ignore();
46+
_receiveObserver = null;
47+
_stateObserver = null;
5148
}
5249

5350
void _onDataReceived($WebSocketMetrics metrics, Object data) {
@@ -91,13 +88,9 @@ final class WebSocketMetricsManager {
9188
}
9289
}
9390

94-
/// {@nodoc}
95-
$WebSocketMetrics _getMetrics(IWebSocketClient client) =>
96-
_metrics[client] ??= $WebSocketMetrics();
97-
9891
/// {@nodoc}
9992
@internal
100-
void sent(IWebSocketClient client, Object data) => _getMetrics(client)
93+
void sent(IWebSocketClient client, Object data) => _metrics
10194
..transferredCount += BigInt.one
10295
..transferredSize += switch (data) {
10396
String text => BigInt.from(text.length),
@@ -107,12 +100,15 @@ final class WebSocketMetricsManager {
107100

108101
/// {@nodoc}
109102
@internal
110-
WebSocketMetrics buildMetricFor(IWebSocketClient client) {
111-
final metrics = _getMetrics(client);
112-
final readyState = client.state.readyState;
103+
WebSocketMetrics buildMetric({
104+
required bool active,
105+
required int attempt,
106+
required DateTime? nextReconnectionAttempt,
107+
}) {
108+
final metrics = _metrics;
109+
final readyState =
110+
_client.target?.state.readyState ?? WebSocketReadyState.closed;
113111
final lastDisconnectTime = metrics.lastDisconnectTime;
114-
final reconnectionStatus =
115-
WebSocketConnectionManager.instance.getStatusFor(client);
116112
return WebSocketMetrics(
117113
timestamp: DateTime.now(),
118114
readyState: readyState,
@@ -126,14 +122,13 @@ final class WebSocketMetricsManager {
126122
lastDisconnectTime: lastDisconnectTime,
127123
lastDisconnect: metrics.lastDisconnect,
128124
lastUrl: metrics.lastUrl,
129-
isReconnectionActive: reconnectionStatus.active,
130-
currentReconnectAttempts: reconnectionStatus.attempt,
125+
isReconnectionActive: active,
126+
currentReconnectAttempts: attempt,
131127
nextReconnectionAttempt: switch (readyState) {
132128
WebSocketReadyState.open => null,
133129
WebSocketReadyState.connecting => DateTime.now(),
134130
WebSocketReadyState.disconnecting => null,
135-
WebSocketReadyState.closed =>
136-
reconnectionStatus.nextReconnectionAttempt,
131+
WebSocketReadyState.closed => nextReconnectionAttempt,
137132
},
138133
);
139134
}

0 commit comments

Comments
 (0)