Skip to content

Commit e1e235a

Browse files
authored
change default error callback (#12)
1 parent 671a7f9 commit e1e235a

File tree

2 files changed

+4
-37
lines changed

2 files changed

+4
-37
lines changed

src/Callback/KafkaErrorCallback.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ final class KafkaErrorCallback
2222
*/
2323
public function __invoke($kafka, int $errorCode, string $reason)
2424
{
25-
if (RD_KAFKA_RESP_ERR__TRANSPORT === $errorCode) {
25+
// non fatal errors are retried by librdkafka
26+
if (RD_KAFKA_RESP_ERR__FATAL !== $errorCode) {
2627
return;
2728
}
2829

tests/Unit/Callback/KafkaErrorCallbackTest.php

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,51 +11,17 @@
1111
*/
1212
class KafkaErrorCallbackTest extends TestCase
1313
{
14-
15-
public function getConsumerMock()
16-
{
17-
return $this->getMockBuilder(RdKafkaConsumer::class)
18-
->disableOriginalConstructor()
19-
->onlyMethods(['unsubscribe', 'getSubscription'])
20-
->getMock();
21-
}
22-
2314
public function testInvokeWithBrokerException()
2415
{
2516
self::expectException('Jobcloud\Kafka\Exception\KafkaBrokerException');
26-
27-
$consumerMock = $this->getConsumerMock();
28-
29-
$consumerMock
30-
->expects(self::any())
31-
->method('unsubscribe')
32-
->willReturn(null);
33-
34-
$consumerMock
35-
->expects(self::any())
36-
->method('getSubscription')
37-
->willReturn([]);
38-
3917
$callback = new KafkaErrorCallback();
40-
call_user_func($callback, $consumerMock, 1, 'error');
18+
call_user_func($callback, null, RD_KAFKA_RESP_ERR__FATAL, 'error');
4119
}
4220

4321
public function testInvokeWithAcceptableError()
4422
{
45-
$consumerMock = $this->getConsumerMock();
46-
47-
$consumerMock
48-
->expects(self::any())
49-
->method('unsubscribe')
50-
->willReturn(null);
51-
52-
$consumerMock
53-
->expects(self::any())
54-
->method('getSubscription')
55-
->willReturn([]);
56-
5723
$callback = new KafkaErrorCallback();
58-
$result = call_user_func($callback, $consumerMock, RD_KAFKA_RESP_ERR__TRANSPORT, 'error');
24+
$result = call_user_func($callback, null, RD_KAFKA_RESP_ERR__TRANSPORT, 'error');
5925

6026
self::assertNull($result);
6127
}

0 commit comments

Comments
 (0)