Skip to content

Commit

Permalink
Cleanup timers on close and reject pending flows on connect
Browse files Browse the repository at this point in the history
  • Loading branch information
valga authored and binsoul committed Aug 20, 2017
1 parent 091bee2 commit 6a80fea
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/ReactMqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim
$connection = new DefaultConnection();
}

if ($connection->isCleanSession()) {
$this->cleanPreviousSession();
}

if ($connection->getClientID() === '') {
$connection = $connection->withClientID($this->identifierGenerator->generateClientID());
}
Expand Down Expand Up @@ -561,6 +565,8 @@ private function handleClose()
$this->loop->cancelTimer($timer);
}

$this->timer = [];

$connection = $this->connection;

$this->isConnecting = false;
Expand Down Expand Up @@ -673,4 +679,23 @@ private function finishFlow(ReactFlow $flow)
$flow->getDeferred()->reject($result);
}
}

/**
* Cleans previous session by rejecting all pending flows.
*/
private function cleanPreviousSession()
{
$error = new \RuntimeException('Connection has been closed.');

foreach ($this->receivingFlows as $receivingFlow) {
$receivingFlow->getDeferred()->reject($error);
}

foreach ($this->sendingFlows as $sendingFlow) {
$sendingFlow->getDeferred()->reject($error);
}

$this->receivingFlows = [];
$this->sendingFlows = [];
}
}

0 comments on commit 6a80fea

Please sign in to comment.