From b5828529a03d97220e22cead44f44c06aa017c46 Mon Sep 17 00:00:00 2001 From: Jumper Chen Date: Wed, 28 Apr 2021 11:45:00 +0800 Subject: [PATCH] Fix #172 socket id's not synced --- CHANGELOG.md | 10 ++++ README.md | 1 + lib/src/manager.dart | 124 ++++++++++--------------------------------- lib/src/socket.dart | 70 ++++++++++++++++++------ pubspec.yaml | 2 +- 5 files changed, 94 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 485d632..5329b73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.0.0-beta.4-nullsafety.0 + +**New Feature:** + +* [#177](https://github.com/rikulo/socket.io-client-dart/pull/177) Send credentials with the auth option + +**Bug fix:** + +* [#172] (https://github.com/rikulo/socket.io-client-dart/issues/172) socket id's not synced + ## 2.0.0-beta.3-nullsafety.0 **New Feature:** diff --git a/README.md b/README.md index efb5576..aa3a963 100644 --- a/README.md +++ b/README.md @@ -275,3 +275,4 @@ If you are new to Git or GitHub, please read [this guide](https://help.github.co - Thanks [@bruce3x](https://github.com/bruce3x) for https://github.com/rikulo/socket.io-client-dart/issues/25 - Thanks [@Kavantix](https://github.com/Kavantix) for https://github.com/rikulo/socket.io-client-dart/issues/26 - Thanks [@luandnguyen](https://github.com/luandnguyen) for https://github.com/rikulo/socket.io-client-dart/issues/59 +- Thanks [@jorgefspereira](https://github.com/jorgefspereira) for https://github.com/rikulo/socket.io-client-dart/pull/177 diff --git a/lib/src/manager.dart b/lib/src/manager.dart index 6bf7588..6de51b2 100644 --- a/lib/src/manager.dart +++ b/lib/src/manager.dart @@ -66,13 +66,9 @@ class Manager extends EventEmitter { _Backoff? backoff; String readyState = 'closed'; late String uri; - List connecting = []; - num? lastPing; - bool encoding = false; - List packetBuffer = []; bool reconnecting = false; - late engine_socket.Socket engine; + engine_socket.Socket? engine; Encoder encoder = Encoder(); Decoder decoder = Decoder(); late bool autoConnect; @@ -98,41 +94,6 @@ class Manager extends EventEmitter { if (autoConnect) open(); } - /// - /// Propagate given event to sockets and emit on `this` - /// - /// @api private - /// - void emitAll(String event, [data]) { - emit(event, data); - for (var nsp in nsps.keys) { - nsps[nsp]!.emit(event, data); - } - } - - /// - /// Update `socket.id` of all sockets - /// - /// @api private - /// - void updateSocketIds() { - for (var nsp in nsps.keys) { - nsps[nsp]!.id = generateId(nsp); - } - } - - /// - /// generate `socket.id` for the given `nsp` - /// - /// @param {String} nsp - /// @return {String} - /// @api private - /// - String generateId(String nsp) { - if (nsp.startsWith('/')) nsp = nsp.substring(1); - return (nsp.isEmpty ? '' : (nsp + '#')) + (engine.id ?? ''); - } - num? get randomizationFactor => _randomizationFactor; set randomizationFactor(num? v) { _randomizationFactor = v; @@ -182,12 +143,12 @@ class Manager extends EventEmitter { _logger.fine('opening $uri'); engine = engine_socket.Socket(uri, options); - var socket = engine; + var socket = engine!; readyState = 'opening'; skipReconnect = false; // emit `open` - var openSub = util.on(socket, 'open', (_) { + var openSubDestroy = util.on(socket, 'open', (_) { onopen(); if (callback != null) callback(); }); @@ -197,7 +158,7 @@ class Manager extends EventEmitter { _logger.fine('connect_error'); cleanup(); readyState = 'closed'; - emitAll('connect_error', data); + super.emit('error', data); if (callback != null) { callback({'error': 'Connection error', 'data': data}); } else { @@ -210,19 +171,22 @@ class Manager extends EventEmitter { if (timeout != null) { _logger.fine('connect attempt will timeout after $timeout'); + if (timeout == 0) { + openSubDestroy + .destroy(); // prevents a race condition with the 'open' event + } // set timer var timer = Timer(Duration(milliseconds: timeout!.toInt()), () { _logger.fine('connect attempt timed out after $timeout'); - openSub.destroy(); + openSubDestroy.destroy(); socket.close(); socket.emit('error', 'timeout'); - emitAll('connect_timeout', timeout); }); subs.add(Destroyable(() => timer.cancel())); } - subs.add(openSub); + subs.add(openSubDestroy); subs.add(errorSub); return this; @@ -244,7 +208,7 @@ class Manager extends EventEmitter { emit('open'); // add subs - var socket = engine; + var socket = engine!; subs.add(util.on(socket, 'data', ondata)); subs.add(util.on(socket, 'ping', onping)); // subs.add(util.on(socket, 'pong', onpong)); @@ -259,8 +223,7 @@ class Manager extends EventEmitter { /// @api private /// void onping([_]) { - lastPing = DateTime.now().millisecondsSinceEpoch; - emitAll('ping'); + emit('ping'); } /// @@ -297,7 +260,7 @@ class Manager extends EventEmitter { /// void onerror(err) { _logger.fine('error $err'); - emitAll('error', err); + emit('error', err); } /// @@ -309,24 +272,9 @@ class Manager extends EventEmitter { Socket socket(String nsp, Map opts) { var socket = nsps[nsp]; - var onConnecting = ([_]) { - if (!connecting.contains(socket)) { - connecting.add(socket); - } - }; - if (socket == null) { socket = Socket(this, nsp, opts); nsps[nsp] = socket; - socket.on('connecting', onConnecting); - socket.on('connect', (_) { - socket!.id = generateId(nsp); - }); - - if (autoConnect) { - // manually call here since connecting event is fired before listening - onConnecting(); - } } return socket; @@ -338,8 +286,16 @@ class Manager extends EventEmitter { /// @param {Socket} socket /// void destroy(socket) { - connecting.remove(socket); - if (connecting.isNotEmpty) return; + final nsps = this.nsps.keys; + + for (var nsp in nsps) { + final socket = this.nsps[nsp]; + + if (socket!.active) { + _logger.fine('socket $nsp is still active, skipping close'); + return; + } + } close(); } @@ -352,9 +308,6 @@ class Manager extends EventEmitter { /// void packet(Map packet) { _logger.fine('writing packet $packet'); - if (packet.containsKey('query') && packet['type'] == 0) { - packet['nsp'] += '''?${packet['query']}'''; - } // if (encoding != true) { // encode, then write to engine with result @@ -362,7 +315,7 @@ class Manager extends EventEmitter { var encodedPackets = encoder.encode(packet); for (var i = 0; i < encodedPackets.length; i++) { - engine.write(encodedPackets[i], packet['options']); + engine!.write(encodedPackets[i], packet['options']); } // } else { // add packet to the queue @@ -370,19 +323,6 @@ class Manager extends EventEmitter { // } } - /// - /// If packet buffer is non-empty, begins encoding the - /// next packet in line. - /// - /// @api private - /// - void processPacketQueue() { - if (packetBuffer.isNotEmpty && encoding != true) { - var pack = packetBuffer.removeAt(0); - packet(pack); - } - } - /// /// Clean up transport subscriptions and packet buffer. /// @@ -397,10 +337,6 @@ class Manager extends EventEmitter { sub.destroy(); } - packetBuffer = []; - encoding = false; - lastPing = null; - decoder.destroy(); } @@ -422,7 +358,7 @@ class Manager extends EventEmitter { } backoff!.reset(); readyState = 'closed'; - engine.close(); + engine?.close(); } /// @@ -454,7 +390,7 @@ class Manager extends EventEmitter { if (backoff!.attempts >= reconnectionAttempts!) { _logger.fine('reconnect failed'); backoff!.reset(); - emitAll('reconnect_failed'); + emit('reconnect_failed'); reconnecting = false; } else { var delay = backoff!.duration; @@ -465,8 +401,7 @@ class Manager extends EventEmitter { if (skipReconnect!) return; _logger.fine('attempting reconnect'); - emitAll('reconnect_attempt', backoff!.attempts); - emitAll('reconnecting', backoff!.attempts); + emit('reconnect_attempt', backoff!.attempts); // check again for the case socket closed in above events if (skipReconnect!) return; @@ -476,7 +411,7 @@ class Manager extends EventEmitter { _logger.fine('reconnect attempt error'); reconnecting = false; reconnect(); - emitAll('reconnect_error', err['data']); + emit('reconnect_error', err['data']); } else { _logger.fine('reconnect success'); onreconnect(); @@ -498,8 +433,7 @@ class Manager extends EventEmitter { var attempt = backoff!.attempts; reconnecting = false; backoff!.reset(); - updateSocketIds(); - emitAll('reconnect', attempt); + emit('reconnect', attempt); } } diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 80001b3..56c2d21 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -59,9 +59,9 @@ class Socket extends EventEmitter { List sendBuffer = []; List receiveBuffer = []; String? query; - Map? auth; + dynamic? auth; List? subs; - Map? flags; + Map flags = {}; String? id; Socket(this.io, this.nsp, this.opts) { @@ -84,10 +84,16 @@ class Socket extends EventEmitter { subs = [ util.on(io, 'open', onopen), util.on(io, 'packet', onpacket), + util.on(io, 'error', onerror), util.on(io, 'close', onclose) ]; } + /// Whether the Socket will try to reconnect when its Manager connects or reconnects + bool get active { + return subs != null; + } + /// /// "Opens" the socket. /// @@ -95,11 +101,12 @@ class Socket extends EventEmitter { Socket open() => connect(); Socket connect() { - if (connected == true) return this; + if (connected) return this; subEvents(); - io.open(); // ensure open + if (!io.reconnecting) { + io.open(); // ensure open + } if ('open' == io.readyState) onopen(); - emit('connecting'); return this; } @@ -147,7 +154,7 @@ class Socket extends EventEmitter { var packet = { 'type': EVENT, 'data': sendData, - 'options': {'compress': flags?.isNotEmpty == true && flags!['compress']} + 'options': {'compress': flags.isNotEmpty == true && flags['compress']} }; // event ack callback @@ -156,13 +163,21 @@ class Socket extends EventEmitter { acks['$ids'] = ack; packet['id'] = '${ids++}'; } - - if (connected == true) { + final isTransportWritable = io.engine != null && + io.engine!.transport != null && + io.engine!.transport!.writable == true; + + final discardPacket = + flags['volatile'] != null && (!isTransportWritable || !connected); + if (discardPacket) { + _logger + .fine('discard packet as the transport is not currently writable'); + } else if (connected) { this.packet(packet); } else { sendBuffer.add(packet); } - flags = null; + flags = {}; } } @@ -193,12 +208,25 @@ class Socket extends EventEmitter { // } if (auth != null) { - packet({'type': CONNECT, 'data': auth}); + if (auth is Function) { + auth((data) { + packet({'type': CONNECT, 'data': data}); + }); + } else { + packet({'type': CONNECT, 'data': auth}); + } } else { packet({'type': CONNECT}); } } + /// Called upon engine or manager `error` + void onerror(err) { + if (!connected) { + emit('connect_error', err); + } + } + /// /// Called upon engine `close`. /// @@ -223,7 +251,13 @@ class Socket extends EventEmitter { switch (packet['type']) { case CONNECT: - onconnect(); + if (packet['data'] != null && packet['data']['sid'] != null) { + final id = packet['data']['sid']; + onconnect(id); + } else { + emit('connect_error', + 'It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, but they are not compatible (more information here: https://socket.io/docs/v3/migrating-from-2-x-to-3-0/)'); + } break; case EVENT: @@ -324,7 +358,8 @@ class Socket extends EventEmitter { /// Called upon server connect. /// /// @api private - void onconnect() { + void onconnect(id) { + this.id = id; connected = true; disconnected = false; emit('connect'); @@ -371,10 +406,12 @@ class Socket extends EventEmitter { /// @api private. void destroy() { - if (subs?.isNotEmpty == true) { + final _subs = subs; + if (_subs != null && _subs.isNotEmpty) { // clean subscriptions to avoid reconnections - for (var i = 0; i < subs!.length; i++) { - subs![i].destroy(); + + for (var i = 0; i < _subs.length; i++) { + _subs[i].destroy(); } subs = null; } @@ -422,8 +459,7 @@ class Socket extends EventEmitter { /// @return {Socket} self /// @api public Socket compress(compress) { - flags = flags ??= {}; - flags!['compress'] = compress; + flags['compress'] = compress; return this; } } diff --git a/pubspec.yaml b/pubspec.yaml index 1754a89..c83f09a 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: socket_io_client description: Dartlang port of socket.io-client for web, flutter, dartvm to use -version: 2.0.0-beta.3-nullsafety.0 +version: 2.0.0-beta.4-nullsafety.0 homepage: https://www.zkoss.org repository: https://github.com/rikulo/socket.io-client-dart issue_tracker: https://github.com/rikulo/socket.io-client-dart/issues