Skip to content

Commit 65cd680

Browse files
committed
Add exceptionCallback in CommonConfig
1 parent 7031e1b commit 65cd680

File tree

7 files changed

+43
-4
lines changed

7 files changed

+43
-4
lines changed

doc/consumer.en.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Class `longlang\phpkafka\Consumer\ConsumerConfig`
3636
| groupHeartbeat | Group heartbeat intervals. (unit: second) | `3` |
3737
| autoCreateTopic | Auto create topic. | `true` |
3838
| partitionAssignmentStrategy | Consumer partition assignment strategy. Optional: Range-`longlang\phpkafka\Consumer\Assignor\RangeAssignor`, RoundRobin-`\longlang\phpkafka\Consumer\Assignor\RoundRobinAssignor`, Sticky-`\longlang\phpkafka\Consumer\Assignor\StickyAssignor`. |
39+
| exceptionCallback | This callback is called when an exception that cannot be thrown by the `recv()` coroutine is encountered. Format: `function(\Exception $e){}` | `null` |
3940

4041
## Asynchronous (callback)
4142

doc/consumer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
| groupHeartbeat | 分组心跳时间间隔,单位:秒 | `3` |
3737
| autoCreateTopic | 自动创建主题 | `true` |
3838
| partitionAssignmentStrategy | 消费者分区分配策略,可选:范围分配-`longlang\phpkafka\Consumer\Assignor\RangeAssignor`、轮询分配-`\longlang\phpkafka\Consumer\Assignor\RoundRobinAssignor`、粘性分配-`\longlang\phpkafka\Consumer\Assignor\StickyAssignor` | `longlang\phpkafka\Consumer\Assignor\RangeAssignor` |
39+
| exceptionCallback | 遇到无法在`recv()`协程抛出的异常时,调用此回调。格式:`function(\Exception $e){}` | `null` |
3940

4041
## 异步消费(回调)
4142

doc/producer.en.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Class `longlang\phpkafka\Producer\ProducerConfig`
2525
| producerEpoch | producer Epoch | `-1` |
2626
| partitionLeaderEpoch | partition Leader Epoch | `-1` |
2727
| autoCreateTopic | auto create topic | `true` |
28+
| exceptionCallback | This callback is called when an exception that cannot be thrown by the `recv()` coroutine is encountered. Format: `function(\Exception $e){}` | `null` |
2829

2930
## Send a single message
3031

doc/producer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
| producerEpoch | 生产者 Epoch | `-1` |
2626
| partitionLeaderEpoch | 分区 Leader Epoch | `-1` |
2727
| autoCreateTopic | 自动创建主题 | `true` |
28+
| exceptionCallback | 遇到无法在`recv()`协程抛出的异常时,调用此回调。格式:`function(\Exception $e){}` | `null` |
2829

2930
## 发送单个消息
3031

src/Client/SwooleClient.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,17 @@ private function startRecvCo()
137137
$length = Int32::unpack($data);
138138
$data = $this->socket->recv($length);
139139
$correlationId = Int32::unpack($data);
140-
if (!isset($this->recvChannels[$correlationId])) {
141-
continue;
140+
if (isset($this->recvChannels[$correlationId])) {
141+
$this->recvChannels[$correlationId]->push($data);
142142
}
143143
} catch (Exception $e) {
144-
$data = $e;
144+
$callback = $this->getConfig()->getExceptionCallback();
145+
if ($callback) {
146+
$callback($e);
147+
} else {
148+
throw $e;
149+
}
145150
}
146-
$this->recvChannels[$correlationId]->push($data);
147151
}
148152
});
149153
}

src/Config/CommonConfig.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ class CommonConfig extends AbstractConfig
4747
*/
4848
protected $updateBrokers = true;
4949

50+
/**
51+
* Exception callback in SwooleClient.
52+
*
53+
* @var callable|null
54+
*/
55+
protected $exceptionCallback = null;
56+
5057
public function getConnectTimeout(): float
5158
{
5259
return $this->connectTimeout;
@@ -152,4 +159,16 @@ public function setUpdateBrokers(bool $updateBrokers): self
152159

153160
return $this;
154161
}
162+
163+
public function getExceptionCallback(): ?callable
164+
{
165+
return $this->exceptionCallback;
166+
}
167+
168+
public function setExceptionCallback(?callable $exceptionCallback): self
169+
{
170+
$this->exceptionCallback = $exceptionCallback;
171+
172+
return $this;
173+
}
155174
}

tests/Client/SwooleClientTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace longlang\phpkafka\Test\Client;
66

7+
use Exception;
78
use longlang\phpkafka\Client\ClientInterface;
89
use longlang\phpkafka\Client\SwooleClient;
910
use longlang\phpkafka\Test\TestUtil;
@@ -63,4 +64,15 @@ public function testClose(ClientInterface $client)
6364

6465
return parent::testClose($client);
6566
}
67+
68+
public function testExceptionCallback()
69+
{
70+
$client = $this->testClient();
71+
$exception = null;
72+
$client->getConfig()->setExceptionCallback(function (Exception $e) use (&$exception) {
73+
$exception = $e;
74+
});
75+
$client->close();
76+
$this->assertInstanceOf(Exception::class, $exception);
77+
}
6678
}

0 commit comments

Comments
 (0)