diff --git a/src/Broker.php b/src/Broker.php index 1750977..6f45c4c 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -168,7 +168,12 @@ public function getClientByBrokerId(int $brokerId): ClientInterface } $config = $this->config; - if (!isset($this->clients[$brokerId])) { + if (isset($this->clients[$brokerId])) { + $client = $this->clients[$brokerId]; + if (!$client->getSocket()->isConnected()) { + $client->connect(); + } + } else { $clientClass = KafkaUtil::getClientClass($config->getClient()); /** @var ClientInterface $client */ @@ -177,7 +182,7 @@ public function getClientByBrokerId(int $brokerId): ClientInterface $this->clients[$brokerId] = $client; } - return $this->clients[$brokerId]; + return $client; } /** diff --git a/src/Socket/StreamSocket.php b/src/Socket/StreamSocket.php index 32a989c..bfca2f4 100644 --- a/src/Socket/StreamSocket.php +++ b/src/Socket/StreamSocket.php @@ -121,11 +121,13 @@ public function send(string $data, ?float $timeout = null): int $writable = $this->select([$this->socket], $timeout, false); if (false === $writable) { + $this->close(); throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream'); } if (0 === $writable) { $res = $this->getMetaData(); + $this->close(); if (!empty($res['timed_out'])) { throw new SocketException('Timed out writing ' . $bytesToWrite . ' bytes to stream after writing ' . $bytesWritten . ' bytes'); } @@ -142,6 +144,7 @@ public function send(string $data, ?float $timeout = null): int } if (-1 === $wrote || false === $wrote) { + $this->close(); throw new SocketException('Could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes'); } @@ -150,6 +153,7 @@ public function send(string $data, ?float $timeout = null): int ++$failedAttempts; if ($failedAttempts > $this->config->getMaxWriteAttempts()) { + $this->close(); throw new SocketException('After ' . $failedAttempts . ' attempts could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes'); } } else { @@ -181,6 +185,7 @@ public function recv(int $length, ?float $timeout = null): string if (0 === $readable) { // select timeout $res = $this->getMetaData(); + $this->close(); if (!empty($res['timed_out'])) { throw new SocketException(sprintf('Timed out reading %d bytes from stream', $length)); @@ -204,6 +209,7 @@ public function recv(int $length, ?float $timeout = null): string // Otherwise wait for bytes $readable = $this->select([$this->socket], $timeout); if (1 !== $readable) { + $this->close(); throw new SocketException(sprintf('Timed out while reading %d bytes from stream, %d bytes are still needed', $length, $remainingBytes)); } diff --git a/src/Socket/SwooleSocket.php b/src/Socket/SwooleSocket.php index 7c8aab7..b7b2f75 100644 --- a/src/Socket/SwooleSocket.php +++ b/src/Socket/SwooleSocket.php @@ -77,7 +77,7 @@ public function getConfig(): CommonConfig public function isConnected(): bool { - return null !== $this->socket; + return $this->socket && $this->socket->isConnected(); } public function connect(): void @@ -85,7 +85,7 @@ public function connect(): void $config = $this->config; $client = new Client($this->getClientType()); $client->set($this->getClientConfig()); - if ($client->connect($this->host, $this->port)) { + if ($client->connect($this->host, $this->port, $config->getConnectTimeout())) { $this->socket = $client; } else { throw new ConnectionException(sprintf('Could not connect to tcp://%s:%s (%s [%d])', $this->host, $this->port, $client->errMsg, $client->errCode)); @@ -96,7 +96,6 @@ public function close(): bool { if ($this->socket) { $this->socket->close(); - $this->socket = null; $this->receivedBuffer = ''; return true; @@ -109,6 +108,7 @@ public function send(string $data, ?float $timeout = null): int { $result = $this->socket->send($data); if (false === $result) { + $this->close(); throw new SocketException(sprintf('Could not write data to stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode)); } @@ -125,6 +125,7 @@ public function recv(int $length, ?float $timeout = null): string while ($this->socket && !isset($this->receivedBuffer[$length - 1]) && (-1 == $timeout || $leftTime > 0)) { $buffer = $this->socket->recv($timeout); if ('' === $buffer || false === $buffer) { + $this->close(); throw new SocketException(sprintf('Could not recv data from stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode)); } $this->receivedBuffer .= $buffer; @@ -140,7 +141,8 @@ public function recv(int $length, ?float $timeout = null): string return $result; } - if ($this->socket) { + if ($this->socket->isConnected()) { + $this->close(); throw new SocketException('Could not recv data from stream'); }