Skip to content

Commit 4919a82

Browse files
[C++/Python] Fix bugs that were not exposed by broken C++ CI before (#11557)
Fixes #11551 ### Motivation Currently there're some bugs of C++ client and some tests cannot pass: 1. Introduced from #10601 because it changed the behavior of the admin API to get partition metadata while the C++ implementation relies on the original behavior to create topics automatically. So any test that uses HTTP lookup will fail. - AuthPluginTest.testTlsDetectHttps - AuthPluginToken.testTokenWithHttpUrl - BasicEndToEndTest.testHandlerReconnectionLogic - BasicEndToEndTest.testV2TopicHttp - ClientDeduplicationTest.testProducerDeduplication 2. Introduced from #11029 and #11486 , the implementation will iterate more than once even there's only one valid resolved IP address. - ClientTest.testConnectTimeout In addition, there's an existed flaky test from very early time: ClientTest.testLookupThrottling. Python tests are also broken. Because it must run after all C++ tests passed, they're also not exposed. 1. Some tests in `pulsar_test.py` might encounter `Timeout` error when creating producers or consumers. 2. Some tests in `schema_test.py` failed because some comparisons between two `ComplexRecord`s failed. Since the CI test of C++ client would never fail after #10309 (will be fixed by #11575), all PRs about C++ or Python client are not verified even if CI passed. Before #11575 is merged, we need to fix all existed bugs of C++ client. ### Modifications Corresponding to the above tests group, this PR adds following modifications: 1. Add the `?checkAllowAutoCreation=true` URL suffix to allow HTTP lookup to create topics automatically. 2. When iterating through a resolved IP list, increase the iterator first, then run the connection timer and try to connect the next IP. Regarding to the flaky `testLookupThrottling`, this PR adds a `client.close()` at the end of test and fix the `ClientImpl::close` implementation. Before this PR, if there're no producers or consumers in a client, the `close()` method wouldn't call `shutdown()` to close connection poll and executors. Only after the `Client` instance was destructed would the `shutdown()` method be called. In this case, this PR calls `handleClose` instead of invoking callback directly. In addition, change the log level of this test to debug. This PR also fixes the failed timeout Python tests, some are caused by incorrect import of classes, some are caused by `client` was not closed. Regarding to Python schema tests, in Python2, `self.__ne__(other)` is not equivalent to `not self.__eq__(other)` when the default `__eq__` implementation is overwritten. If a `Record` object has a field whose type is also `Record`, the `Record.__ne__` method will be called, see https://github.com/apache/pulsar/blob/ddb5fb0e062c2fe0967efce2a443a31f9cd12c07/pulsar-client-cpp/python/pulsar/schema/definition.py#L138-L139 but it just uses the default implementation to check whether they're not equal. The custom `__eq__` method won't be called. Therefore, this PR implement `Record.__ne__` explicitly to call `Record.__eq__` so that the comparison will work for Python2. ### Verifying this change We can only check the workflow output to verify this change.
1 parent c4a2572 commit 4919a82

File tree

8 files changed

+60
-28
lines changed

8 files changed

+60
-28
lines changed

pulsar-client-cpp/lib/ClientConnection.cc

+27-12
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
187187
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
188188
numOfPendingLookupRequest_(0),
189189
isTlsAllowInsecureConnection_(false) {
190+
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
190191
if (clientConfiguration.isUseTls()) {
191192
#if BOOST_VERSION >= 105400
192193
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
@@ -433,21 +434,28 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
433434
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
434435
}
435436
} else if (endpointIterator != tcp::resolver::iterator()) {
437+
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
436438
// The connection failed. Try the next endpoint in the list.
437-
boost::system::error_code err;
438-
socket_->close(err); // ignore the error of close
439-
if (err) {
439+
boost::system::error_code closeError;
440+
socket_->close(closeError); // ignore the error of close
441+
if (closeError) {
440442
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
441443
}
442444
connectTimeoutTask_->stop();
443-
connectTimeoutTask_->start();
444-
tcp::endpoint endpoint = *endpointIterator;
445-
socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
446-
std::placeholders::_1, ++endpointIterator));
445+
++endpointIterator;
446+
if (endpointIterator != tcp::resolver::iterator()) {
447+
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
448+
connectTimeoutTask_->start();
449+
tcp::endpoint endpoint = *endpointIterator;
450+
socket_->async_connect(endpoint,
451+
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
452+
std::placeholders::_1, ++endpointIterator));
453+
} else {
454+
close();
455+
}
447456
} else {
448457
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
449458
close();
450-
return;
451459
}
452460
}
453461

@@ -512,7 +520,7 @@ void ClientConnection::tcpConnectAsync() {
512520
return;
513521
}
514522

515-
LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port());
523+
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
516524
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
517525
resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(),
518526
std::placeholders::_1, std::placeholders::_2));
@@ -531,12 +539,16 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
531539
if (state_ != TcpConnected) {
532540
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
533541
<< " ms, close the socket");
534-
PeriodicTask::ErrorCode ignoredError;
535-
socket_->close(ignoredError);
542+
PeriodicTask::ErrorCode err;
543+
socket_->close(err);
544+
if (err) {
545+
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
546+
}
536547
}
537548
connectTimeoutTask_->stop();
538549
});
539550

551+
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
540552
connectTimeoutTask_->start();
541553
if (endpointIterator != tcp::resolver::iterator()) {
542554
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
@@ -1455,7 +1467,10 @@ void ClientConnection::close() {
14551467
}
14561468

14571469
if (tlsSocket_) {
1458-
tlsSocket_->lowest_layer().close();
1470+
tlsSocket_->lowest_layer().close(err);
1471+
if (err) {
1472+
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
1473+
}
14591474
}
14601475

14611476
if (executor_) {

pulsar-client-cpp/lib/ClientImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
514514
}
515515

516516
if (*numberOfOpenHandlers == 0 && callback) {
517-
callback(ResultOk);
517+
handleClose(ResultOk, numberOfOpenHandlers, callback);
518518
}
519519
}
520520

pulsar-client-cpp/lib/ConnectionPool.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ void ConnectionPool::close() {
4646
if (poolConnections_) {
4747
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
4848
ClientConnectionPtr cnx = cnxIt->second.lock();
49-
if (cnx && !cnx->isClosed()) {
49+
if (cnx) {
5050
cnx->close();
5151
}
5252
}

pulsar-client-cpp/lib/HTTPLookupService.cc

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync
106106
<< '/' << PARTITION_METHOD_NAME;
107107
}
108108

109+
completeUrlStream << "?checkAllowAutoCreation=true";
109110
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
110111
shared_from_this(), promise, completeUrlStream.str(),
111112
PartitionMetaData));

pulsar-client-cpp/python/pulsar/schema/definition.py

+3
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ def __eq__(self, other):
139139
return False
140140
return True
141141

142+
def __ne__(self, other):
143+
return not self.__eq__(other)
144+
142145
def __str__(self):
143146
return str(self.__dict__)
144147

pulsar-client-cpp/python/pulsar_test.py

+22-12
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
from unittest import TestCase, main
2424
import time
2525
import os
26+
import pulsar
2627
import uuid
2728
from datetime import timedelta
2829
from pulsar import Client, MessageId, \
2930
CompressionType, ConsumerType, PartitionsRoutingMode, \
3031
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
3132
CryptoKeyReader
3233

33-
from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
34+
from _pulsar import ProducerConfiguration, ConsumerConfiguration
3435

3536
from schema_test import *
3637

@@ -155,6 +156,7 @@ def test_producer_send(self):
155156
consumer.acknowledge(msg)
156157
print('receive from {}'.format(msg.message_id()))
157158
self.assertEqual(msg_id, msg.message_id())
159+
client.close()
158160

159161
def test_producer_consumer(self):
160162
client = Client(self.serviceUrl)
@@ -292,7 +294,7 @@ def test_message_properties(self):
292294
subscription_name='my-subscription',
293295
schema=pulsar.schema.StringSchema())
294296
producer = client.create_producer(topic=topic,
295-
schema=StringSchema())
297+
schema=pulsar.schema.StringSchema())
296298
producer.send('hello',
297299
properties={
298300
'a': '1',
@@ -319,10 +321,11 @@ def test_tls_auth(self):
319321
tls_allow_insecure_connection=False,
320322
authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))
321323

322-
consumer = client.subscribe('my-python-topic-tls-auth',
324+
topic = 'my-python-topic-tls-auth-' + str(time.time())
325+
consumer = client.subscribe(topic,
323326
'my-sub',
324327
consumer_type=ConsumerType.Shared)
325-
producer = client.create_producer('my-python-topic-tls-auth')
328+
producer = client.create_producer(topic)
326329
producer.send(b'hello')
327330

328331
msg = consumer.receive(TM)
@@ -346,10 +349,11 @@ def test_tls_auth2(self):
346349
tls_allow_insecure_connection=False,
347350
authentication=Authentication(authPlugin, authParams))
348351

349-
consumer = client.subscribe('my-python-topic-tls-auth-2',
352+
topic = 'my-python-topic-tls-auth-2-' + str(time.time())
353+
consumer = client.subscribe(topic,
350354
'my-sub',
351355
consumer_type=ConsumerType.Shared)
352-
producer = client.create_producer('my-python-topic-tls-auth-2')
356+
producer = client.create_producer(topic)
353357
producer.send(b'hello')
354358

355359
msg = consumer.receive(TM)
@@ -392,10 +396,11 @@ def test_tls_auth3(self):
392396
tls_allow_insecure_connection=False,
393397
authentication=Authentication(authPlugin, authParams))
394398

395-
consumer = client.subscribe('my-python-topic-tls-auth-3',
399+
topic = 'my-python-topic-tls-auth-3-' + str(time.time())
400+
consumer = client.subscribe(topic,
396401
'my-sub',
397402
consumer_type=ConsumerType.Shared)
398-
producer = client.create_producer('my-python-topic-tls-auth-3')
403+
producer = client.create_producer(topic)
399404
producer.send(b'hello')
400405

401406
msg = consumer.receive(TM)
@@ -583,6 +588,8 @@ def test_producer_sequence_after_reconnection(self):
583588
producer.send(b'hello-%d' % i)
584589
self.assertEqual(producer.last_sequence_id(), i)
585590

591+
client.close()
592+
586593
doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
587594
'false')
588595

@@ -630,6 +637,8 @@ def test_producer_deduplication(self):
630637
with self.assertRaises(pulsar.Timeout):
631638
consumer.receive(100)
632639

640+
client.close()
641+
633642
doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
634643
'false')
635644

@@ -820,10 +829,11 @@ def test_reader_has_message_available(self):
820829

821830
def test_seek(self):
822831
client = Client(self.serviceUrl)
823-
consumer = client.subscribe('my-python-topic-seek',
832+
topic = 'my-python-topic-seek-' + str(time.time())
833+
consumer = client.subscribe(topic,
824834
'my-sub',
825835
consumer_type=ConsumerType.Shared)
826-
producer = client.create_producer('my-python-topic-seek')
836+
producer = client.create_producer(topic)
827837

828838
for i in range(100):
829839
if i > 0:
@@ -858,7 +868,7 @@ def test_seek(self):
858868
self.assertEqual(msg.data(), b'hello-42')
859869

860870
# repeat with reader
861-
reader = client.create_reader('my-python-topic-seek', MessageId.latest)
871+
reader = client.create_reader(topic, MessageId.latest)
862872
with self.assertRaises(pulsar.Timeout):
863873
reader.read_next(100)
864874

@@ -1157,7 +1167,7 @@ def test_connect_timeout(self):
11571167
try:
11581168
producer = client.create_producer('test_connect_timeout')
11591169
self.fail('create_producer should not succeed')
1160-
except ConnectError as expected:
1170+
except pulsar.ConnectError as expected:
11611171
print('expected error: {} when create producer'.format(expected))
11621172
t2 = time.time()
11631173
self.assertGreater(t2 - t1, 1.0)

pulsar-client-cpp/tests/BasicEndToEndTest.cc

+3
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
298298
std::string topicName = "testLookupThrottling";
299299
ClientConfiguration config;
300300
config.setConcurrentLookupRequest(0);
301+
config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG));
301302
Client client(lookupUrl, config);
302303

303304
Producer producer;
@@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
307308
Consumer consumer1;
308309
result = client.subscribe(topicName, "my-sub-name", consumer1);
309310
ASSERT_EQ(ResultTooManyLookupRequestException, result);
311+
312+
client.close();
310313
}
311314

312315
TEST(BasicEndToEndTest, testNonExistingTopic) {

pulsar-client-cpp/tests/CustomLoggerTest.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
5656
// reset to previous log factory
5757
Client client("pulsar://localhost:6650", clientConfig);
5858
client.close();
59-
ASSERT_EQ(logLines.size(), 2);
59+
ASSERT_EQ(logLines.size(), 3);
6060
LogUtils::resetLoggerFactory();
6161
});
6262
testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
6565
Client client("pulsar://localhost:6650", clientConfig);
6666
client.close();
6767
// custom logger didn't get any new lines
68-
ASSERT_EQ(logLines.size(), 2);
68+
ASSERT_EQ(logLines.size(), 3);
6969
}
7070

7171
TEST(CustomLoggerTest, testConsoleLoggerFactory) {

0 commit comments

Comments
 (0)