Skip to content

Commit d776938

Browse files
authored
Optimize socket close and reconnect (#58)
* Fix connectTimeout in SwooleSocket * Optimize socket close
1 parent a92be69 commit d776938

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

src/Broker.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,12 @@ public function getClientByBrokerId(int $brokerId): ClientInterface
168168
}
169169

170170
$config = $this->config;
171-
if (!isset($this->clients[$brokerId])) {
171+
if (isset($this->clients[$brokerId])) {
172+
$client = $this->clients[$brokerId];
173+
if (!$client->getSocket()->isConnected()) {
174+
$client->connect();
175+
}
176+
} else {
172177
$clientClass = KafkaUtil::getClientClass($config->getClient());
173178

174179
/** @var ClientInterface $client */
@@ -177,7 +182,7 @@ public function getClientByBrokerId(int $brokerId): ClientInterface
177182
$this->clients[$brokerId] = $client;
178183
}
179184

180-
return $this->clients[$brokerId];
185+
return $client;
181186
}
182187

183188
/**

src/Socket/StreamSocket.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,13 @@ public function send(string $data, ?float $timeout = null): int
121121
$writable = $this->select([$this->socket], $timeout, false);
122122

123123
if (false === $writable) {
124+
$this->close();
124125
throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream');
125126
}
126127

127128
if (0 === $writable) {
128129
$res = $this->getMetaData();
130+
$this->close();
129131
if (!empty($res['timed_out'])) {
130132
throw new SocketException('Timed out writing ' . $bytesToWrite . ' bytes to stream after writing ' . $bytesWritten . ' bytes');
131133
}
@@ -142,6 +144,7 @@ public function send(string $data, ?float $timeout = null): int
142144
}
143145

144146
if (-1 === $wrote || false === $wrote) {
147+
$this->close();
145148
throw new SocketException('Could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
146149
}
147150

@@ -150,6 +153,7 @@ public function send(string $data, ?float $timeout = null): int
150153
++$failedAttempts;
151154

152155
if ($failedAttempts > $this->config->getMaxWriteAttempts()) {
156+
$this->close();
153157
throw new SocketException('After ' . $failedAttempts . ' attempts could not write ' . \strlen($data) . ' bytes to stream, completed writing only ' . $bytesWritten . ' bytes');
154158
}
155159
} else {
@@ -181,6 +185,7 @@ public function recv(int $length, ?float $timeout = null): string
181185

182186
if (0 === $readable) { // select timeout
183187
$res = $this->getMetaData();
188+
$this->close();
184189

185190
if (!empty($res['timed_out'])) {
186191
throw new SocketException(sprintf('Timed out reading %d bytes from stream', $length));
@@ -204,6 +209,7 @@ public function recv(int $length, ?float $timeout = null): string
204209
// Otherwise wait for bytes
205210
$readable = $this->select([$this->socket], $timeout);
206211
if (1 !== $readable) {
212+
$this->close();
207213
throw new SocketException(sprintf('Timed out while reading %d bytes from stream, %d bytes are still needed', $length, $remainingBytes));
208214
}
209215

src/Socket/SwooleSocket.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,15 @@ public function getConfig(): CommonConfig
7777

7878
public function isConnected(): bool
7979
{
80-
return null !== $this->socket;
80+
return $this->socket && $this->socket->isConnected();
8181
}
8282

8383
public function connect(): void
8484
{
8585
$config = $this->config;
8686
$client = new Client($this->getClientType());
8787
$client->set($this->getClientConfig());
88-
if ($client->connect($this->host, $this->port)) {
88+
if ($client->connect($this->host, $this->port, $config->getConnectTimeout())) {
8989
$this->socket = $client;
9090
} else {
9191
throw new ConnectionException(sprintf('Could not connect to tcp://%s:%s (%s [%d])', $this->host, $this->port, $client->errMsg, $client->errCode));
@@ -96,7 +96,6 @@ public function close(): bool
9696
{
9797
if ($this->socket) {
9898
$this->socket->close();
99-
$this->socket = null;
10099
$this->receivedBuffer = '';
101100

102101
return true;
@@ -109,6 +108,7 @@ public function send(string $data, ?float $timeout = null): int
109108
{
110109
$result = $this->socket->send($data);
111110
if (false === $result) {
111+
$this->close();
112112
throw new SocketException(sprintf('Could not write data to stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode));
113113
}
114114

@@ -125,6 +125,7 @@ public function recv(int $length, ?float $timeout = null): string
125125
while ($this->socket && !isset($this->receivedBuffer[$length - 1]) && (-1 == $timeout || $leftTime > 0)) {
126126
$buffer = $this->socket->recv($timeout);
127127
if ('' === $buffer || false === $buffer) {
128+
$this->close();
128129
throw new SocketException(sprintf('Could not recv data from stream, %s [%d]', $this->socket->errMsg, $this->socket->errCode));
129130
}
130131
$this->receivedBuffer .= $buffer;
@@ -140,7 +141,8 @@ public function recv(int $length, ?float $timeout = null): string
140141
return $result;
141142
}
142143

143-
if ($this->socket) {
144+
if ($this->socket->isConnected()) {
145+
$this->close();
144146
throw new SocketException('Could not recv data from stream');
145147
}
146148

0 commit comments

Comments
 (0)