Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ public function getClientByBrokerId(int $brokerId): ClientInterface
}

$config = $this->config;
if (!isset($this->clients[$brokerId])) {
if (isset($this->clients[$brokerId])) {
$client = $this->clients[$brokerId];
if (!$client->getSocket()->isConnected()) {
$client->connect();
}
} else {
$clientClass = KafkaUtil::getClientClass($config->getClient());

/** @var ClientInterface $client */
Expand All @@ -177,7 +182,7 @@ public function getClientByBrokerId(int $brokerId): ClientInterface
$this->clients[$brokerId] = $client;
}

return $this->clients[$brokerId];
return $client;
}

/**
Expand Down
6 changes: 6 additions & 0 deletions src/Socket/StreamSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ public function send(string $data, ?float $timeout = null): int
$writable = $this->select([$this->socket], $timeout, false);

if (false === $writable) {
$this->close();
throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream');
}

if (0 === $writable) {
$res = $this->getMetaData();
$this->close();
if (!empty($res['timed_out'])) {
throw new SocketException('Timed out writing ' . $bytesToWrite . ' bytes to stream after writing ' . $bytesWritten . ' bytes');
}
Expand All @@ -142,6 +144,7 @@ public function send(string $data, ?float $timeout = null): int
}

if (-1 === $wrote || false === $wrote) {
$this->close();
throw new SocketException('Could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}

Expand All @@ -150,6 +153,7 @@ public function send(string $data, ?float $timeout = null): int
++$failedAttempts;

if ($failedAttempts > $this->config->getMaxWriteAttempts()) {
$this->close();
throw new SocketException('After ' . $failedAttempts . ' attempts could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
}
} else {
Expand Down Expand Up @@ -181,6 +185,7 @@ public function recv(int $length, ?float $timeout = null): string

if (0 === $readable) { // select timeout
$res = $this->getMetaData();
$this->close();

if (!empty($res['timed_out'])) {
throw new SocketException(sprintf('Timed out reading %d bytes from stream', $length));
Expand All @@ -204,6 +209,7 @@ public function recv(int $length, ?float $timeout = null): string
// Otherwise wait for bytes
$readable = $this->select([$this->socket], $timeout);
if (1 !== $readable) {
$this->close();
throw new SocketException(sprintf('Timed out while reading %d bytes from stream, %d bytes are still needed', $length, $remainingBytes));
}

Expand Down
10 changes: 6 additions & 4 deletions src/Socket/SwooleSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ public function getConfig(): CommonConfig

public function isConnected(): bool
{
return null !== $this->socket;
return $this->socket && $this->socket->isConnected();
}

public function connect(): void
{
$config = $this->config;
$client = new Client($this->getClientType());
$client->set($this->getClientConfig());
if ($client->connect($this->host, $this->port)) {
if ($client->connect($this->host, $this->port, $config->getConnectTimeout())) {
$this->socket = $client;
} else {
throw new ConnectionException(sprintf('Could not connect to tcp://%s:%s (%s [%d])', $this->host, $this->port, $client->errMsg, $client->errCode));
Expand All @@ -96,7 +96,6 @@ public function close(): bool
{
if ($this->socket) {
$this->socket->close();
$this->socket = null;
$this->receivedBuffer = '';

return true;
Expand All @@ -109,6 +108,7 @@ public function send(string $data, ?float $timeout = null): int
{
$result = $this->socket->send($data);
if (false === $result) {
$this->close();
throw new SocketException(sprintf('Could not write data to stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode));
}

Expand All @@ -125,6 +125,7 @@ public function recv(int $length, ?float $timeout = null): string
while ($this->socket && !isset($this->receivedBuffer[$length - 1]) && (-1 == $timeout || $leftTime > 0)) {
$buffer = $this->socket->recv($timeout);
if ('' === $buffer || false === $buffer) {
$this->close();
throw new SocketException(sprintf('Could not recv data from stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode));
}
$this->receivedBuffer .= $buffer;
Expand All @@ -140,7 +141,8 @@ public function recv(int $length, ?float $timeout = null): string
return $result;
}

if ($this->socket) {
if ($this->socket->isConnected()) {
$this->close();
throw new SocketException('Could not recv data from stream');
}

Expand Down