Skip to content

Commit

Permalink
fix external stream consumer reuse issue (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
qijun-niu-timeplus authored Nov 24, 2023
1 parent 5e60e5a commit 81775ca
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/KafkaLog/KafkaWALPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ KafkaWALSimpleConsumerPtr KafkaWALPool::getOrCreateStreamingExternal(const Strin
for (const auto & consumer : consumers.second)
{
const auto & consumer_settings = consumer->getSettings();
if (consumer.use_count() == 1 && consumer_settings.fetch_wait_max_ms == fetch_wait_max_ms)
if (consumer.use_count() == 1 && consumer_settings.fetch_wait_max_ms == fetch_wait_max_ms && consumer_settings.auth == auth)
{
LOG_INFO(log, "Reusing external Kafka consume with settings={}", consumer_settings.string());
return consumer;
Expand Down
8 changes: 8 additions & 0 deletions src/KafkaLog/KafkaWALSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ struct KafkaWALAuth
std::string username;
std::string password;
std::string ssl_ca_cert_file;

bool operator==(const KafkaWALAuth & o) const
{
return security_protocol == o.security_protocol
&& username == o.username
&& password == o.password
&& ssl_ca_cert_file == o.ssl_ca_cert_file;
}
};

struct KafkaWALSettings
Expand Down

0 comments on commit 81775ca

Please sign in to comment.