diff --git a/src/ReactMqttClient.php b/src/ReactMqttClient.php index b2adf6c..f56b191 100644 --- a/src/ReactMqttClient.php +++ b/src/ReactMqttClient.php @@ -180,6 +180,10 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim $connection = new DefaultConnection(); } + if ($connection->isCleanSession()) { + $this->cleanPreviousSession(); + } + if ($connection->getClientID() === '') { $connection = $connection->withClientID($this->identifierGenerator->generateClientID()); } @@ -561,6 +565,8 @@ private function handleClose() $this->loop->cancelTimer($timer); } + $this->timer = []; + $connection = $this->connection; $this->isConnecting = false; @@ -673,4 +679,23 @@ private function finishFlow(ReactFlow $flow) $flow->getDeferred()->reject($result); } } + + /** + * Cleans previous session by rejecting all pending flows. + */ + private function cleanPreviousSession() + { + $error = new \RuntimeException('Connection has been closed.'); + + foreach ($this->receivingFlows as $receivingFlow) { + $receivingFlow->getDeferred()->reject($error); + } + + foreach ($this->sendingFlows as $sendingFlow) { + $sendingFlow->getDeferred()->reject($error); + } + + $this->receivingFlows = []; + $this->sendingFlows = []; + } }