Skip to content

Commit

Permalink
Fix #172 socket id's not synced
Browse files Browse the repository at this point in the history
  • Loading branch information
jumperchen committed Apr 28, 2021
1 parent ad5b665 commit b582852
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 113 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 2.0.0-beta.4-nullsafety.0

**New Feature:**

* [#177](https://github.com/rikulo/socket.io-client-dart/pull/177) Send credentials with the auth option

**Bug fix:**

* [#172] (https://github.com/rikulo/socket.io-client-dart/issues/172) socket id's not synced

## 2.0.0-beta.3-nullsafety.0

**New Feature:**
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,4 @@ If you are new to Git or GitHub, please read [this guide](https://help.github.co
- Thanks [@bruce3x](https://github.com/bruce3x) for https://github.com/rikulo/socket.io-client-dart/issues/25
- Thanks [@Kavantix](https://github.com/Kavantix) for https://github.com/rikulo/socket.io-client-dart/issues/26
- Thanks [@luandnguyen](https://github.com/luandnguyen) for https://github.com/rikulo/socket.io-client-dart/issues/59
- Thanks [@jorgefspereira](https://github.com/jorgefspereira) for https://github.com/rikulo/socket.io-client-dart/pull/177
124 changes: 29 additions & 95 deletions lib/src/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,9 @@ class Manager extends EventEmitter {
_Backoff? backoff;
String readyState = 'closed';
late String uri;
List connecting = [];
num? lastPing;
bool encoding = false;
List packetBuffer = [];
bool reconnecting = false;

late engine_socket.Socket engine;
engine_socket.Socket? engine;
Encoder encoder = Encoder();
Decoder decoder = Decoder();
late bool autoConnect;
Expand All @@ -98,41 +94,6 @@ class Manager extends EventEmitter {
if (autoConnect) open();
}

///
/// Propagate given event to sockets and emit on `this`
///
/// @api private
///
void emitAll(String event, [data]) {
emit(event, data);
for (var nsp in nsps.keys) {
nsps[nsp]!.emit(event, data);
}
}

///
/// Update `socket.id` of all sockets
///
/// @api private
///
void updateSocketIds() {
for (var nsp in nsps.keys) {
nsps[nsp]!.id = generateId(nsp);
}
}

///
/// generate `socket.id` for the given `nsp`
///
/// @param {String} nsp
/// @return {String}
/// @api private
///
String generateId(String nsp) {
if (nsp.startsWith('/')) nsp = nsp.substring(1);
return (nsp.isEmpty ? '' : (nsp + '#')) + (engine.id ?? '');
}

num? get randomizationFactor => _randomizationFactor;
set randomizationFactor(num? v) {
_randomizationFactor = v;
Expand Down Expand Up @@ -182,12 +143,12 @@ class Manager extends EventEmitter {

_logger.fine('opening $uri');
engine = engine_socket.Socket(uri, options);
var socket = engine;
var socket = engine!;
readyState = 'opening';
skipReconnect = false;

// emit `open`
var openSub = util.on(socket, 'open', (_) {
var openSubDestroy = util.on(socket, 'open', (_) {
onopen();
if (callback != null) callback();
});
Expand All @@ -197,7 +158,7 @@ class Manager extends EventEmitter {
_logger.fine('connect_error');
cleanup();
readyState = 'closed';
emitAll('connect_error', data);
super.emit('error', data);
if (callback != null) {
callback({'error': 'Connection error', 'data': data});
} else {
Expand All @@ -210,19 +171,22 @@ class Manager extends EventEmitter {
if (timeout != null) {
_logger.fine('connect attempt will timeout after $timeout');

if (timeout == 0) {
openSubDestroy
.destroy(); // prevents a race condition with the 'open' event
}
// set timer
var timer = Timer(Duration(milliseconds: timeout!.toInt()), () {
_logger.fine('connect attempt timed out after $timeout');
openSub.destroy();
openSubDestroy.destroy();
socket.close();
socket.emit('error', 'timeout');
emitAll('connect_timeout', timeout);
});

subs.add(Destroyable(() => timer.cancel()));
}

subs.add(openSub);
subs.add(openSubDestroy);
subs.add(errorSub);

return this;
Expand All @@ -244,7 +208,7 @@ class Manager extends EventEmitter {
emit('open');

// add subs
var socket = engine;
var socket = engine!;
subs.add(util.on(socket, 'data', ondata));
subs.add(util.on(socket, 'ping', onping));
// subs.add(util.on(socket, 'pong', onpong));
Expand All @@ -259,8 +223,7 @@ class Manager extends EventEmitter {
/// @api private
///
void onping([_]) {
lastPing = DateTime.now().millisecondsSinceEpoch;
emitAll('ping');
emit('ping');
}

///
Expand Down Expand Up @@ -297,7 +260,7 @@ class Manager extends EventEmitter {
///
void onerror(err) {
_logger.fine('error $err');
emitAll('error', err);
emit('error', err);
}

///
Expand All @@ -309,24 +272,9 @@ class Manager extends EventEmitter {
Socket socket(String nsp, Map opts) {
var socket = nsps[nsp];

var onConnecting = ([_]) {
if (!connecting.contains(socket)) {
connecting.add(socket);
}
};

if (socket == null) {
socket = Socket(this, nsp, opts);
nsps[nsp] = socket;
socket.on('connecting', onConnecting);
socket.on('connect', (_) {
socket!.id = generateId(nsp);
});

if (autoConnect) {
// manually call here since connecting event is fired before listening
onConnecting();
}
}

return socket;
Expand All @@ -338,8 +286,16 @@ class Manager extends EventEmitter {
/// @param {Socket} socket
///
void destroy(socket) {
connecting.remove(socket);
if (connecting.isNotEmpty) return;
final nsps = this.nsps.keys;

for (var nsp in nsps) {
final socket = this.nsps[nsp];

if (socket!.active) {
_logger.fine('socket $nsp is still active, skipping close');
return;
}
}

close();
}
Expand All @@ -352,37 +308,21 @@ class Manager extends EventEmitter {
///
void packet(Map packet) {
_logger.fine('writing packet $packet');
if (packet.containsKey('query') && packet['type'] == 0) {
packet['nsp'] += '''?${packet['query']}''';
}

// if (encoding != true) {
// encode, then write to engine with result
// encoding = true;
var encodedPackets = encoder.encode(packet);

for (var i = 0; i < encodedPackets.length; i++) {
engine.write(encodedPackets[i], packet['options']);
engine!.write(encodedPackets[i], packet['options']);
}
// } else {
// add packet to the queue
// packetBuffer.add(packet);
// }
}

///
/// If packet buffer is non-empty, begins encoding the
/// next packet in line.
///
/// @api private
///
void processPacketQueue() {
if (packetBuffer.isNotEmpty && encoding != true) {
var pack = packetBuffer.removeAt(0);
packet(pack);
}
}

///
/// Clean up transport subscriptions and packet buffer.
///
Expand All @@ -397,10 +337,6 @@ class Manager extends EventEmitter {
sub.destroy();
}

packetBuffer = [];
encoding = false;
lastPing = null;

decoder.destroy();
}

Expand All @@ -422,7 +358,7 @@ class Manager extends EventEmitter {
}
backoff!.reset();
readyState = 'closed';
engine.close();
engine?.close();
}

///
Expand Down Expand Up @@ -454,7 +390,7 @@ class Manager extends EventEmitter {
if (backoff!.attempts >= reconnectionAttempts!) {
_logger.fine('reconnect failed');
backoff!.reset();
emitAll('reconnect_failed');
emit('reconnect_failed');
reconnecting = false;
} else {
var delay = backoff!.duration;
Expand All @@ -465,8 +401,7 @@ class Manager extends EventEmitter {
if (skipReconnect!) return;

_logger.fine('attempting reconnect');
emitAll('reconnect_attempt', backoff!.attempts);
emitAll('reconnecting', backoff!.attempts);
emit('reconnect_attempt', backoff!.attempts);

// check again for the case socket closed in above events
if (skipReconnect!) return;
Expand All @@ -476,7 +411,7 @@ class Manager extends EventEmitter {
_logger.fine('reconnect attempt error');
reconnecting = false;
reconnect();
emitAll('reconnect_error', err['data']);
emit('reconnect_error', err['data']);
} else {
_logger.fine('reconnect success');
onreconnect();
Expand All @@ -498,8 +433,7 @@ class Manager extends EventEmitter {
var attempt = backoff!.attempts;
reconnecting = false;
backoff!.reset();
updateSocketIds();
emitAll('reconnect', attempt);
emit('reconnect', attempt);
}
}

Expand Down
Loading

0 comments on commit b582852

Please sign in to comment.