Skip to content

Commit

Permalink
[Kafka External Stream] bugfix: repect data format for count optimiza…
Browse files Browse the repository at this point in the history
…tion (#762)
  • Loading branch information
zliang-min authored Jun 7, 2024
1 parent 4e51fe2 commit c5c67a9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ Kafka::Kafka(

calculateDataFormat(storage);

/// For raw format, one_message_per_row default to true. Because if multiple rows are combined together,
/// it could be hard to separate them again.
if (data_format == "RawBLOB" && !settings->isChanged("one_message_per_row"))
settings->set("one_message_per_row", true);

/// Only optimize trivial count on some specific formats, and settings.
support_count_optimization = data_format == "RawBLOB" ||
data_format == "ProtobufSingle" ||
(data_format == "Avro" && (!settings->format_schema.value.empty() || !settings->kafka_schema_registry_url.value.empty())) ||
settings->one_message_per_row.value;

cacheVirtualColumnNamesAndTypes();

rd_kafka_conf_set_log_cb(conf.get(), &Kafka::onLog);
Expand Down Expand Up @@ -447,6 +458,9 @@ std::vector<Int32> getShardsToQuery(const String & shards_exp, Int32 parition_co

std::optional<UInt64> Kafka::totalRows(const Settings & settings_ref)
{
if (!support_count_optimization)
return {};

auto consumer = getConsumer();
auto topic = RdKafka::Topic(*consumer->getHandle(), topicName());
auto shards_to_query = getShardsToQuery(settings_ref.shards.value, topic.getPartitionCount());
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class Kafka final : public StorageExternalStreamImpl
std::vector<Int32> shards_from_settings;
fs::path broker_ca_file;

bool support_count_optimization = false;

ConfPtr conf;
UInt64 poll_timeout_ms = 0;

Expand Down

0 comments on commit c5c67a9

Please sign in to comment.