Skip to content

Commit

Permalink
Merge branch 'felangel-dart2-upgrade'
Browse files Browse the repository at this point in the history
fix #7
  • Loading branch information
jumperchen committed Nov 20, 2018
2 parents ee427db + 5f29b3b commit 8c7e7a6
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 454 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Files and directories created by pub
.dart_tool
.packages
.pub/
build/
Expand Down
8 changes: 4 additions & 4 deletions lib/socket_io_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ export 'package:socket_io_client/src/socket.dart';
// Protocol version
final protocol = Parser.protocol;


final Map<String, dynamic> cache = {};

final Logger _logger = new Logger('socket_io_client');


/**
* Looks up an existing `Manager` for multiplexing.
* If the user summons:
Expand All @@ -50,8 +48,10 @@ _lookup(uri, opts) {
var id = '${parsed.scheme}://${parsed.host}:${parsed.port}';
var path = parsed.path;
var sameNamespace = cache.containsKey(id) && cache[id].nsps.containsKey(path);
var newConnection = opts['forceNew'] == true || opts['force new connection'] == true
|| false == opts['multiplex'] || sameNamespace;
var newConnection = opts['forceNew'] == true ||
opts['force new connection'] == true ||
false == opts['multiplex'] ||
sameNamespace;

var io;

Expand Down
5 changes: 2 additions & 3 deletions lib/src/engine/parseqs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
*
* Copyright (C) 2017 Potix Corporation. All Rights Reserved.
*/
import 'dart:html';
encode(Map obj) {
var str = '';

Expand All @@ -28,12 +27,12 @@ encode(Map obj) {
* @param {String} qs
* @api private
*/
decode(qs){
decode(qs) {
var qry = {};
var pairs = qs.split('&');
for (var i = 0, l = pairs.length; i < l; i++) {
var pair = pairs[i].split('=');
qry[Uri.decodeComponent(pair[0])] = Uri.decodeComponent(pair[1]);
}
return qry;
}
}
108 changes: 60 additions & 48 deletions lib/src/engine/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class Socket extends EventEmitter {
bool supportsBinary;
bool upgrading;


Socket(String uri, Map opts) {
opts = opts ?? {};

Expand All @@ -76,8 +75,7 @@ class Socket extends EventEmitter {
opts['hostname'] = this.uri.host;
opts['secure'] = this.uri.scheme == 'https' || this.uri.scheme == 'wss';
opts['port'] = this.uri.port;
if (this.uri.hasQuery)
opts['query'] = this.uri.query;
if (this.uri.hasQuery) opts['query'] = this.uri.query;
} else if (opts.containsKey('host')) {
opts['hostname'] = Uri.parse(opts['host']).host;
}
Expand All @@ -90,8 +88,12 @@ class Socket extends EventEmitter {
}

this.agent = opts['agent'] ?? false;
this.hostname = opts['hostname'] ?? (window.location.hostname ?? 'localhost');
this.port = opts['port'] ?? (window.location.port.isNotEmpty ? int.parse(window.location.port) : (this.secure ? 443 : 80));
this.hostname =
opts['hostname'] ?? (window.location.hostname ?? 'localhost');
this.port = opts['port'] ??
(window.location.port.isNotEmpty
? int.parse(window.location.port)
: (this.secure ? 443 : 80));
var query = opts['query'] ?? {};
if (query is String)
this.query = decode(query);
Expand All @@ -100,7 +102,10 @@ class Socket extends EventEmitter {
}

this.upgrade = opts['upgrade'] != false;
this.path = (opts['path'] ?? '/engine.io').toString().replaceFirst(new RegExp(r'\/$'), '') + '/';
this.path = (opts['path'] ?? '/engine.io')
.toString()
.replaceFirst(new RegExp(r'\/$'), '') +
'/';
this.forceJSONP = opts['forceJSONP'] == true;
this.jsonp = opts['jsonp'] != false;
this.forceBase64 = opts['forceBase64'] == true;
Expand All @@ -117,8 +122,10 @@ class Socket extends EventEmitter {
this.binaryType = null;
this.onlyBinaryUpgrades = opts['onlyBinaryUpgrades'];

if (!opts.containsKey('perMessageDeflate') || opts['perMessageDeflate'] == true) {
this.perMessageDeflate = opts['perMessageDeflate'] is Map ? opts['perMessageDeflate'] : {};
if (!opts.containsKey('perMessageDeflate') ||
opts['perMessageDeflate'] == true) {
this.perMessageDeflate =
opts['perMessageDeflate'] is Map ? opts['perMessageDeflate'] : {};
if (!this.perMessageDeflate.containsKey('threshold'))
this.perMessageDeflate['threshold'] = 1024;
}
Expand Down Expand Up @@ -160,7 +167,6 @@ class Socket extends EventEmitter {

static bool priorWebsocketSuccess = false;


/**
* Protocol version.
*
Expand Down Expand Up @@ -213,8 +219,8 @@ class Socket extends EventEmitter {
'jsonp': options['jsonp'] ?? this.jsonp,
'forceBase64': options['forceBase64'] ?? this.forceBase64,
'enablesXDR': options['enablesXDR'] ?? this.enablesXDR,
'timestampRequests': options['timestampRequests'] ??
this.timestampRequests,
'timestampRequests':
options['timestampRequests'] ?? this.timestampRequests,
'timestampParam': options['timestampParam'] ?? this.timestampParam,
'policyPort': options['policyPort'] ?? this.policyPort,
// 'pfx: options.pfx || this.pfx,
Expand All @@ -224,8 +230,8 @@ class Socket extends EventEmitter {
// 'ca: options.ca || this.ca,
// 'ciphers: options.ciphers || this.ciphers,
// 'rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
'perMessageDeflate': options['perMessageDeflate'] ??
this.perMessageDeflate,
'perMessageDeflate':
options['perMessageDeflate'] ?? this.perMessageDeflate,
// 'extraHeaders: options['extraHeaders ?? this.extraHeaders,
// 'forceNode: options.forceNode || this.forceNode,
// 'localAddress: options.localAddress || this.localAddress,
Expand All @@ -243,12 +249,12 @@ class Socket extends EventEmitter {
*/
open() {
var transport;
if (this.rememberUpgrade != null && priorWebsocketSuccess &&
if (this.rememberUpgrade != null &&
priorWebsocketSuccess &&
this.transports.contains('websocket')) {
transport = 'websocket';
} else if (0 == this.transports.length) {
// Emit error on next tick so it can be listened to
var self = this;
Timer.run(() => emit('error', 'No transports available'));
return;
} else {
Expand Down Expand Up @@ -276,7 +282,6 @@ class Socket extends EventEmitter {
*/
setTransport(transport) {
_logger.fine('setting transport ${transport?.name}');
var self = this;

if (this.transport != null) {
_logger.fine('clearing existing transport ${this.transport?.name}');
Expand All @@ -287,10 +292,11 @@ class Socket extends EventEmitter {
this.transport = transport;

// set up transport listeners
transport..on('drain', (_) => onDrain())..on(
'packet', (packet) => onPacket(packet))..on(
'error', (e) => onError(e))..on(
'close', (_) => onClose('transport close'));
transport
..on('drain', (_) => onDrain())
..on('packet', (packet) => onPacket(packet))
..on('error', (e) => onError(e))
..on('close', (_) => onClose('transport close'));
}

/**
Expand All @@ -301,22 +307,23 @@ class Socket extends EventEmitter {
*/
probe(name) {
_logger.fine('probing transport "$name"');
var transport = this.createTransport(name, { 'probe': true});
var transport = this.createTransport(name, {'probe': true});
var failed = false;
var self = this;
var cleanup;
priorWebsocketSuccess = false;

var onTransportOpen = (_) {
if (onlyBinaryUpgrades == true) {
var upgradeLosesBinary = this.supportsBinary == false &&
transport.supportsBinary;
var upgradeLosesBinary =
this.supportsBinary == false && transport.supportsBinary;
failed = failed || upgradeLosesBinary;
}
if (failed) return;

_logger.fine('probe transport "$name" opened');
transport.send([{ 'type': 'ping', 'data': 'probe'}]);
transport.send([
{'type': 'ping', 'data': 'probe'}
]);
transport.once('packet', (msg) {
if (failed) return;
if ('pong' == msg['type'] && 'probe' == msg['data']) {
Expand All @@ -336,7 +343,9 @@ class Socket extends EventEmitter {
cleanup();

setTransport(transport);
transport.send([{ 'type': 'upgrade'}]);
transport.send([
{'type': 'upgrade'}
]);
emit('upgrade', transport);
transport = null;
upgrading = false;
Expand Down Expand Up @@ -368,10 +377,10 @@ class Socket extends EventEmitter {
final oldTransport = transport;
freezeTransport();

_logger.fine(
'probe transport "$name" failed because of error: $err');
_logger.fine('probe transport "$name" failed because of error: $err');

emit('upgradeError', {'error': 'probe error: $err', 'transport': oldTransport.name});
emit('upgradeError',
{'error': 'probe error: $err', 'transport': oldTransport.name});
};

var onTransportClose = (_) => onerror('transport closed');
Expand Down Expand Up @@ -420,7 +429,8 @@ class Socket extends EventEmitter {

// we check for `readyState` in case an `open`
// listener already closed the socket
if ('open' == this.readyState && this.upgrade == true &&
if ('open' == this.readyState &&
this.upgrade == true &&
this.transport is PollingTransport) {
_logger.fine('starting upgrade probes');
for (var i = 0, l = this.upgrades.length; i < l; i++) {
Expand All @@ -435,12 +445,12 @@ class Socket extends EventEmitter {
* @api private
*/
onPacket(Map packet) {
if ('opening' == this.readyState || 'open' == this.readyState ||
if ('opening' == this.readyState ||
'open' == this.readyState ||
'closing' == this.readyState) {
var type = packet['type'];
var data = packet['data'];
_logger.fine(
'socket receive: type "$type", data "$data"');
_logger.fine('socket receive: type "$type", data "$data"');

this.emit('packet', packet);

Expand All @@ -449,7 +459,7 @@ class Socket extends EventEmitter {

switch (type) {
case 'open':
this.onHandshake(JSON.decode(data ?? 'null'));
this.onHandshake(json.decode(data ?? 'null'));
break;

case 'pong':
Expand All @@ -467,8 +477,8 @@ class Socket extends EventEmitter {
break;
}
} else {
_logger.fine(
'packet received with socket readyState "${this.readyState}"');
_logger
.fine('packet received with socket readyState "${this.readyState}"');
}
}

Expand Down Expand Up @@ -502,8 +512,9 @@ class Socket extends EventEmitter {
*/
onHeartbeat(timeout) {
this.pingTimeoutTimer?.cancel();
this.pingTimeoutTimer = new Timer(new Duration(
milliseconds: timeout ?? (pingInterval + pingTimeout)), () {
this.pingTimeoutTimer = new Timer(
new Duration(milliseconds: timeout ?? (pingInterval + pingTimeout)),
() {
if ('closed' == readyState) return;
onClose('ping timeout');
});
Expand All @@ -518,7 +529,8 @@ class Socket extends EventEmitter {
setPing() {
pingIntervalTimer?.cancel();
pingIntervalTimer = new Timer(new Duration(milliseconds: pingInterval), () {
_logger.fine('writing ping packet - expecting pong within ${pingTimeout}ms');
_logger
.fine('writing ping packet - expecting pong within ${pingTimeout}ms');
ping();
onHeartbeat(pingTimeout);
});
Expand Down Expand Up @@ -559,8 +571,10 @@ class Socket extends EventEmitter {
* @api private
*/
flush() {
if ('closed' != this.readyState && this.transport.writable == true &&
this.upgrading != true && this.writeBuffer.isNotEmpty) {
if ('closed' != this.readyState &&
this.transport.writable == true &&
this.upgrading != true &&
this.writeBuffer.isNotEmpty) {
_logger.fine('flushing ${this.writeBuffer.length} packets in socket');
this.transport.send(this.writeBuffer);
// keep track of current length of writeBuffer
Expand Down Expand Up @@ -640,8 +654,6 @@ class Socket extends EventEmitter {
if ('opening' == this.readyState || 'open' == this.readyState) {
this.readyState = 'closing';

var self = this;

if (this.writeBuffer.isNotEmpty) {
this.once('drain', (_) {
if (this.upgrading == true) {
Expand Down Expand Up @@ -678,12 +690,11 @@ class Socket extends EventEmitter {
* @api private
*/
onClose(reason, [desc]) {
if ('opening' == this.readyState || 'open' == this.readyState ||
if ('opening' == this.readyState ||
'open' == this.readyState ||
'closing' == this.readyState) {
_logger.fine('socket close with reason: "$reason"');

var self = this;

// clear timers
this.pingIntervalTimer?.cancel();
this.pingTimeoutTimer?.cancel();
Expand Down Expand Up @@ -720,5 +731,6 @@ class Socket extends EventEmitter {
* @api private
*
*/
filterUpgrades(List upgrades) => transports.where((_) => upgrades.contains(_)).toList();
}
filterUpgrades(List upgrades) =>
transports.where((_) => upgrades.contains(_)).toList();
}
Loading

0 comments on commit 8c7e7a6

Please sign in to comment.