Skip to content

Commit

Permalink
Update to react/socket 0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
binsoul committed May 16, 2017
1 parent 07cf48d commit 027016f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"php": "~5.6|~7.0",
"binsoul/net-mqtt": "~0.2",
"react/promise": "~2.0",
"react/socket-client": "0.5.x"
"react/socket": "~0.8"
},
"require-dev": {
"phpunit/phpunit": "~4.0||~5.0",
Expand Down
26 changes: 13 additions & 13 deletions src/ReactMqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
use React\EventLoop\LoopInterface;
use React\Promise\ExtendedPromiseInterface;
use React\Promise\RejectedPromise;
use React\SocketClient\ConnectorInterface;
use React\Stream\Stream;
use React\Socket\ConnectorInterface;
use React\Stream\DuplexStreamInterface;

/**
* Connects to a MQTT broker and subscribes to topics or publishes messages.
Expand All @@ -52,7 +52,7 @@ class ReactMqttClient extends EventEmitter
private $connector;
/** @var LoopInterface */
private $loop;
/** @var Stream */
/** @var DuplexStreamInterface */
private $stream;
/** @var StreamParser */
private $parser;
Expand Down Expand Up @@ -147,7 +147,7 @@ public function isConnected()
/**
* Returns the underlying stream or null if the client is not connected.
*
* @return Stream|null
* @return DuplexStreamInterface|null
*/
public function getStream()
{
Expand Down Expand Up @@ -187,7 +187,7 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim
$deferred = new Deferred();

$this->establishConnection($this->host, $this->port, $timeout)
->then(function (Stream $stream) use ($connection, $deferred, $timeout) {
->then(function (DuplexStreamInterface $stream) use ($connection, $deferred, $timeout) {
$this->stream = $stream;

$this->emit('open', [$connection, $this]);
Expand Down Expand Up @@ -377,19 +377,15 @@ function () use ($deferred, $timeout) {
}
);

$this->connector->create($host, $port)
$this->connector->connect($host.':'.$port)
->always(function () use ($timer) {
$this->loop->cancelTimer($timer);
})
->then(function (Stream $stream) use ($deferred, $timeout) {
->then(function (DuplexStreamInterface $stream) use ($deferred) {
$stream->on('data', function ($data) {
$this->handleReceive($data);
});

$stream->getBuffer()->on('full-drain', function () {
$this->handleSend();
});

$stream->on('close', function () {
$this->handleClose();
});
Expand Down Expand Up @@ -467,6 +463,8 @@ private function handleReceive($data)
if ($flowCount > count($this->receivingFlows)) {
$this->receivingFlows = array_values($this->receivingFlows);
}

$this->handleSend();
}

/**
Expand Down Expand Up @@ -603,11 +601,12 @@ private function startFlow(Flow $flow, $isSilent = false)
$internalFlow = new ReactFlow($flow, $deferred, $packet, $isSilent);

if ($packet !== null) {
if ($this->stream->getBuffer()->listening) {
if ($this->writtenFlow !== null) {
$this->sendingFlows[] = $internalFlow;
} else {
$this->stream->write($packet);
$this->writtenFlow = $internalFlow;
$this->handleSend();
}
} else {
$this->loop->nextTick(function () use ($internalFlow) {
Expand Down Expand Up @@ -635,11 +634,12 @@ private function continueFlow(ReactFlow $flow, Packet $packet)
}

if ($response !== null) {
if ($this->stream->getBuffer()->listening) {
if ($this->writtenFlow !== null) {
$this->sendingFlows[] = $flow;
} else {
$this->stream->write($response);
$this->writtenFlow = $flow;
$this->handleSend();
}
} elseif ($flow->isFinished()) {
$this->loop->nextTick(function () use ($flow) {
Expand Down
11 changes: 3 additions & 8 deletions tests/Integration/ReactMqttClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
use React\EventLoop\Factory as EventLoopFactory;
use React\Dns\Resolver\Factory as DNSResolverFactory;
use React\EventLoop\LoopInterface;
use React\SocketClient\DnsConnector;
use React\SocketClient\SecureConnector;
use React\SocketClient\TcpConnector;
use React\Socket\Connector;

/**
* Tests the ReactMqttClient class.
Expand All @@ -40,7 +38,7 @@ class ReactMqttClientTest extends \PHPUnit_Framework_TestCase
*
* @var string
*/
const HOSTNAME = 'iot.eclipse.org';
const HOSTNAME = 'tls://iot.eclipse.org';

/**
* Port.
Expand Down Expand Up @@ -170,10 +168,7 @@ private function log($message, $clientName = '')
*/
private function buildClient($name = '', $isPrimary = true)
{
$connector = new DnsConnector(new TcpConnector($this->loop), $this->resolver);
if (self::SECURE) {
$connector = new SecureConnector($connector, $this->loop);
}
$connector = new Connector($this->loop, ['timeout' => false]);

$client = new ReactMqttClient($connector, $this->loop);
if ($isPrimary) {
Expand Down

0 comments on commit 027016f

Please sign in to comment.