Skip to content

Commit

Permalink
feat: support message key for Kafka external stream (#434)
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Jan 9, 2024
1 parent b74a0e5 commit 81ee54d
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 276 deletions.
47 changes: 25 additions & 22 deletions src/KafkaLog/KafkaWALCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,41 +194,44 @@ nlog::RecordPtr kafkaMsgToRecord(rd_kafka_message_t * msg, const nlog::SchemaCon
return record;
}

DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log)
DescribeResult describeTopic(rd_kafka_topic_t * rkt, struct rd_kafka_s * rk, Poco::Logger * log)
{
std::shared_ptr<rd_kafka_topic_t> topic_handle{rd_kafka_topic_new(rk, name.c_str(), nullptr), rd_kafka_topic_destroy};

if (!topic_handle)
{
LOG_ERROR(log, "Failed to describe topic, can't create topic handle");
return {.err = DB::ErrorCodes::UNKNOWN_EXCEPTION};
}

const struct rd_kafka_metadata * metadata = nullptr;

auto err = rd_kafka_metadata(rk, 0, topic_handle.get(), &metadata, 5000);
auto err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(log, "Failed to describe topic, error={}", rd_kafka_err2str(err));
return {.err = mapErrorCode(err)};
}

for (int32_t i = 0; i < metadata->topic_cnt; ++i)
if (metadata->topic_cnt < 1)
{
if (name == metadata->topics[i].topic)
{
auto partition_cnt = metadata->topics[i].partition_cnt;
rd_kafka_metadata_destroy(metadata);

if (partition_cnt > 0)
return {.err = DB::ErrorCodes::OK, .partitions = partition_cnt};
else
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
}
rd_kafka_metadata_destroy(metadata);
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
}

assert(metadata->topic_cnt == 1);

auto partition_cnt = metadata->topics[0].partition_cnt;
rd_kafka_metadata_destroy(metadata);
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
if (partition_cnt > 0)
return {.err = DB::ErrorCodes::OK, .partitions = partition_cnt};
else
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
}

DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log)
{
std::shared_ptr<rd_kafka_topic_t> topic_handle{rd_kafka_topic_new(rk, name.c_str(), nullptr), rd_kafka_topic_destroy};

if (!topic_handle)
{
LOG_ERROR(log, "Failed to describe topic, can't create topic handle");
return {.err = DB::ErrorCodes::UNKNOWN_EXCEPTION};
}

return describeTopic(topic_handle.get(), rk, log);
}

std::vector<int64_t> getOffsetsForTimestamps(
Expand Down
1 change: 1 addition & 0 deletions src/KafkaLog/KafkaWALCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ initRdKafkaTopicHandle(const std::string & topic, KConfParams & params, rd_kafka
nlog::RecordPtr kafkaMsgToRecord(rd_kafka_message_t * msg, const nlog::SchemaContext & schema_ctx, bool copy_topic = false);

DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log);
DescribeResult describeTopic(rd_kafka_topic_t * rkt, struct rd_kafka_s * rk, Poco::Logger * log);

std::vector<int64_t> getOffsetsForTimestamps(
struct rd_kafka_s * rd_handle,
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ASTStorage;
M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \
M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \
M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \
M(String, message_key, "", "An expression which will be evaluated on each row of data returned by the query to compute a string which will be used as the message key.", 0) \
M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) \
/* those are log related settings */ \
M(String, log_files, "", "A comma-separated list of log files", 0) \
Expand Down
71 changes: 59 additions & 12 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
#include "Kafka.h"
#include "KafkaSink.h"
#include "KafkaSource.h"

#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <KafkaLog/KafkaWALPool.h>
#include <Parsers/ExpressionListParsers.h>
#include <Storages/ExternalStream/ExternalStreamTypes.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Kafka/KafkaSink.h>
#include <Storages/ExternalStream/Kafka/KafkaSource.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Common/ProtonCommon.h>
#include <Common/logger_useful.h>
#include <Parsers/ASTFunction.h>

#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/split.hpp>

#include <ranges>
Expand All @@ -26,19 +30,20 @@ extern const int INVALID_SETTING_VALUE;
extern const int RESOURCE_NOT_FOUND;
}

Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_)
Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context)
: StorageExternalStreamImpl(std::move(settings_))
, storage_id(storage->getStorageID())
, data_format(StorageExternalStreamImpl::dataFormat())
, log(&Poco::Logger::get("External-" + settings->topic.value))
, engine_args(engine_args_)
, kafka_properties(klog::parseProperties(settings->properties.value))
, data_format(StorageExternalStreamImpl::dataFormat())
, auth_info(std::make_unique<klog::KafkaWALAuth>(
settings->security_protocol.value,
settings->username.value,
settings->password.value,
settings->sasl_mechanism.value,
settings->ssl_ca_cert_file.value))
, external_stream_counter(external_stream_counter_)
, logger(&Poco::Logger::get("External-" + settings->topic.value))
{
assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA);
assert(external_stream_counter);
Expand All @@ -49,7 +54,15 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting
if (settings->topic.value.empty())
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Empty `topic` setting for {} external stream", settings->type.value);

kafka_properties = klog::parseProperties(settings->properties.value);
if (!settings->message_key.value.empty())
{
validateMessageKey(settings->message_key.value, storage, context);

/// When message_key is set, each row should be sent as one message, it doesn't make any sense otherwise.
if (settings->isChanged("one_message_per_row") && !settings->one_message_per_row)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "`one_message_per_row` cannot be set to `false` when `message_key` is set");
settings->set("one_message_per_row", true);
}

calculateDataFormat(storage);

Expand All @@ -60,6 +73,16 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting
validate();
}

bool Kafka::hasCustomShardingExpr() const {
if (engine_args.empty())
return false;

if (auto * shard_func = shardingExprAst()->as<ASTFunction>())
return !boost::iequals(shard_func->name, "rand");

return true;
}

Pipe Kafka::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
Expand All @@ -76,7 +99,7 @@ Pipe Kafka::read(
{
shards_to_query = parseShards(context->getSettingsRef().shards.value);
validate(shards_to_query);
LOG_INFO(log, "reading from [{}] partitions for topic={}", fmt::join(shards_to_query, ","), settings->topic.value);
LOG_INFO(logger, "reading from [{}] partitions for topic={}", fmt::join(shards_to_query, ","), settings->topic.value);
}
else
{
Expand Down Expand Up @@ -120,11 +143,11 @@ Pipe Kafka::read(
assert(offsets.size() == shards_to_query.size());
for (auto [shard, offset] : std::ranges::views::zip(shards_to_query, offsets))
pipes.emplace_back(
std::make_shared<KafkaSource>(this, header, storage_snapshot, context, shard, offset, max_block_size, log, external_stream_counter));
std::make_shared<KafkaSource>(this, header, storage_snapshot, context, shard, offset, max_block_size, logger, external_stream_counter));
}

LOG_INFO(
log,
logger,
"Starting reading {} streams by seeking to {} in dedicated resource group",
pipes.size(),
query_info.seek_to_info->getSeekTo());
Expand Down Expand Up @@ -228,6 +251,30 @@ std::vector<int32_t> Kafka::parseShards(const std::string & shards_setting)
return specified_shards;
}

void Kafka::validateMessageKey(const String & message_key_, IStorage * storage, const ContextPtr & context)
{
const auto & key = message_key_.c_str();
Tokens tokens(key, key + message_key_.size(), 0);
IParser::Pos pos(tokens, 0);
Expected expected;
ParserExpression p_id;
if (!p_id.parse(pos, message_key_ast, expected))
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key was not a valid expression, parse failed at {}, expected {}", expected.max_parsed_pos, fmt::join(expected.variants, ", "));

if (!pos->isEnd())
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must be a single expression, got extra characters: {}", expected.max_parsed_pos);

auto syntax_result = TreeRewriter(context).analyze(message_key_ast, storage->getInMemoryMetadata().getColumns().getAllPhysical());
auto analyzer = ExpressionAnalyzer(message_key_ast, syntax_result, context).getActions(true);
const auto & block = analyzer->getSampleBlock();
if (block.columns() != 1)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key expression must return exactly one column");

auto type_id = block.getByPosition(0).type->getTypeId();
if (type_id != TypeIndex::String && type_id != TypeIndex::FixedString)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must have type of string");
}

/// Validate the topic still exists, specified partitions are still valid etc
void Kafka::validate(const std::vector<int32_t> & shards_to_query)
{
Expand Down Expand Up @@ -267,6 +314,6 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr
{
/// always validate before actual use
validate();
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), context, shards, log);
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger);
}
}
20 changes: 10 additions & 10 deletions src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class IStorage;
class Kafka final : public StorageExternalStreamImpl
{
public:
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_);
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context);
~Kafka() override = default;

void startup() override { }
Expand All @@ -41,32 +41,32 @@ class Kafka final : public StorageExternalStreamImpl
const String & topic() const { return settings->topic.value; }
const klog::KConfParams & properties() const { return kafka_properties; }
const klog::KafkaWALAuth & auth() const noexcept { return *auth_info; }
bool hasCustomShardingExpr() const { return !engine_args.empty(); }
const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; }
bool hasCustomShardingExpr() const;
klog::KafkaWALSimpleConsumerPtr getConsumer(int32_t fetch_wait_max_ms = 200) const;

private:
void calculateDataFormat(const IStorage * storage);
void cacheVirtualColumnNamesAndTypes();
std::vector<Int64> getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector<int32_t> & shards_to_query) const;
void validateMessageKey(const String & message_key, IStorage * storage, const ContextPtr & context);
void validate(const std::vector<int32_t> & shards_to_query = {});

static std::vector<int32_t> parseShards(const std::string & shards_setting);

private:
StorageID storage_id;
ASTs engine_args;
klog::KConfParams kafka_properties;
String data_format;

Poco::Logger * log;
const std::unique_ptr<klog::KafkaWALAuth> auth_info;
ExternalStreamCounterPtr external_stream_counter;

NamesAndTypesList virtual_column_names_and_types;
klog::KConfParams kafka_properties;
const ASTs engine_args;
const std::unique_ptr<klog::KafkaWALAuth> auth_info;

std::mutex shards_mutex;
int32_t shards = 0;

ExternalStreamCounterPtr external_stream_counter;
ASTPtr message_key_ast;

Poco::Logger * logger;
};
}
Loading

0 comments on commit 81ee54d

Please sign in to comment.