Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support message key for Kafka external stream #434

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ class Kafka final : public StorageExternalStreamImpl
String data_format;
const std::unique_ptr<klog::KafkaWALAuth> auth_info;
ExternalStreamCounterPtr external_stream_counter;
Poco::Logger * log;

NamesAndTypesList virtual_column_names_and_types;

std::mutex shards_mutex;
int32_t shards = 0;

ASTPtr message_key_ast;

Poco::Logger * log;
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
};
}
109 changes: 64 additions & 45 deletions src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ ExpressionActionsPtr buildExpression(const Block & header, const ASTPtr & expr_a
{
assert(expr_ast);

ASTPtr query = expr_ast;
auto syntax_result = TreeRewriter(context).analyze(query, header.getNamesAndTypesList());
return ExpressionAnalyzer(query, syntax_result, context).getActions(false);
auto syntax_result = TreeRewriter(context).analyze(const_cast<ASTPtr &>(expr_ast), header.getNamesAndTypesList());
return ExpressionAnalyzer(expr_ast, syntax_result, context).getActions(false);
}
}

Expand Down Expand Up @@ -216,12 +215,7 @@ KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_pa
message_key_expr = buildExpression(header, message_key_ast, context);
const auto & sample_block = message_key_expr->getSampleBlock();
/// The last column is the key column, the others are required columns (to be used to calculate the key value).
message_key_column_pos = message_key_expr->getResultPositions().back();
auto message_key_column_name = sample_block.getColumnsWithTypeAndName().back().name;
/// If the key column already exists in the header, which means the message key is
/// one of the columns defined in the stream, in this case, it should not delete the column.
/// Otherwise, the message key is a computed column, it should be deleted from the block.
delete_message_key_column = !header.has(message_key_column_name);
message_key_column_name = sample_block.getColumnsWithTypeAndName().back().name;
}

if (one_message_per_row)
Expand Down Expand Up @@ -277,20 +271,20 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len)
{
StringRef key = message_key_expr ? keys_for_current_batch[current_batch_row++] : "";

/// Data at pos (which is in the WriteBuffer) will be overwritten, thus it must be copied to the message.
/// And these copied data will be freed in the delivery callback
/// (when it's confirm the message is either sent succesfully, or failed).
char * payload = new char[len];
memcpy(payload, pos, len);
/// Data at pos (which is in the WriteBuffer) will be overwritten, thus it must be kept somewhere else (in `batch_payload`).
nlog::ByteVector payload {len};
payload.resize(len); /// set the size to the right value
memcpy(payload.data(), pos, len);

current_batch.push_back(rd_kafka_message_t{
.partition = next_partition,
.payload = const_cast<void *>(static_cast<const void *>(payload)),
.payload = payload.data(),
.len = len,
.key = const_cast<void *>(static_cast<const void *>(key.data)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need all of this cast ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for payload

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference here is that key.data has type of const char *, if we remove the const_cast, it will report Cannot initialize a member subobject of type 'void *' with an lvalue of type 'const char *'. But the static_cast` can be removed.

.key_len = key.size,
});

batch_payload.push_back(std::move(payload));
++state.outstandings;
}

Expand All @@ -302,28 +296,39 @@ void KafkaSink::consume(Chunk chunk)
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = partitioner->shard(std::move(block), partition_cnt);

current_batch.clear();

/// If the expression added a new column holds the key values, we need to erase the column from the block,
/// and it would lead to heap-use-after-free error when rd_kafka_produce_batch tries to read the keys.
/// Thus
std::vector<ColumnPtr> key_columns;
if (message_key_expr)
{
key_columns.reserve(blocks.size());

std::vector<StringRef> keys;
keys_for_current_batch.swap(keys);
if (!keys_for_current_batch.empty())
{
std::vector<StringRef> keys;
keys_for_current_batch.swap(keys);
}
keys_for_current_batch.reserve(chunk.rows());
current_batch_row = 0;
}
if (!current_batch.empty())
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
{
std::vector<rd_kafka_message_t> batch;
current_batch.swap(batch);
}
if (!batch_payload.empty())
{
std::vector<nlog::ByteVector> payload;
batch_payload.swap(payload);
}

/// When one_message_per_row is set to true, one Kafka message will be generated for each row.
/// Otherwise, all rows in the same block will be in the same kafka message.
if (one_message_per_row)
{
current_batch.reserve(chunk.rows());
batch_payload.reserve(chunk.rows());
}
else
{
current_batch.reserve(blocks.size());
batch_payload.reserve(blocks.size());
}

for (auto & block_with_shard : blocks)
{
Expand All @@ -335,35 +340,54 @@ void KafkaSink::consume(Chunk chunk)
continue;
}

/// Compute and collect message keys and removed the key column from the block after executing the expression
/// Compute and collect message keys.
message_key_expr->execute(block_with_shard.block);
auto message_key_column {block_with_shard.block.getByPosition(message_key_column_pos).column};
key_columns.push_back(message_key_column);

// Collect all the message keys for creating the messages for the block.
auto message_key_column {block_with_shard.block.getByName(message_key_column_name).column};
size_t rows {message_key_column->size()};
for (size_t i = 0; i < rows; ++i)
keys_for_current_batch.push_back(message_key_column->getDataAt(i));

if (delete_message_key_column)
block_with_shard.block.erase(message_key_column_pos);
/// After `message_key_expr->execute`, the columns in `block_with_shard.block` could be out-of-order.
/// We have to make sure the the column order in `block_with_shard.block` exactly matches the order in header,
/// otherwise, the output format writer will panic.
Block blk;
blk.reserve(getHeader().columns());
for (const auto & col : getHeader())
blk.insert(std::move(block_with_shard.block.getByName(col.name)));

writer->write(block_with_shard.block);
writer->write(blk);
}

/// With `wb->setAutoFlush()`, it makes sure that all messages are generated for the chunk at this point.
auto n = rd_kafka_produce_batch(
rd_kafka_produce_batch(
topic.get(),
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_BLOCK,
RD_KAFKA_MSG_F_FREE | RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_BLOCK,
current_batch.data(),
current_batch.size());

if (static_cast<size_t>(n) != current_batch.size())
/// Find the first error and throw it.
for (const auto & msg : current_batch)
if (msg.err)
throw Exception(klog::mapErrorCode(msg.err), rd_kafka_err2str(msg.err));
rd_kafka_resp_err_t err {RD_KAFKA_RESP_ERR_NO_ERROR};
for (size_t i = 0; i < current_batch.size(); ++i)
if (current_batch[i].err)
err = current_batch[i].err;
else
batch_payload[i].release(); /// payload of messages which are succesfully handled by rd_kafka_produce_batch will be free'ed by librdkafka

/// Clean up all the bookkeepings for the batch.
std::vector<rd_kafka_message_t> batch;
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
current_batch.swap(batch);

std::vector<nlog::ByteVector> payload;
batch_payload.swap(payload);

if (!keys_for_current_batch.empty())
{
std::vector<StringRef> keys;
keys_for_current_batch.swap(keys);
}

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
throw Exception(klog::mapErrorCode(err), rd_kafka_err2str(err));
}

void KafkaSink::onFinish()
Expand Down Expand Up @@ -398,11 +422,6 @@ void KafkaSink::onMessageDelivery(rd_kafka_t * /* producer */, const rd_kafka_me

void KafkaSink::onMessageDelivery(const rd_kafka_message_t * msg)
{
/// Can't rely on librdkafka freeing up the payloads with RD_KAFKA_MSG_F_FREE, it will throw:
/// AddressSanitizer: alloc-dealloc-mismatch (operator new [] vs free)
char * payload = static_cast<char *>(msg->payload);
delete[] payload;

if (msg->err)
{
state.last_error_code.store(msg->err);
Expand Down
14 changes: 7 additions & 7 deletions src/Storages/ExternalStream/Kafka/KafkaSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class KafkaSink final : public SinkToStorage
/// allows to reset the state after each checkpoint
void resetState() { state.reset(); }

static const int POLL_TIMEOUT_MS = 500;
static const int POLL_TIMEOUT_MS {500};

Int32 partition_cnt {0};
bool one_message_per_row {false};
Expand All @@ -90,21 +90,21 @@ class KafkaSink final : public SinkToStorage
std::unique_ptr<KafkaStream::ChunkSharder> partitioner;

ExpressionActionsPtr message_key_expr;
bool delete_message_key_column {false};
size_t message_key_column_pos {0};
String message_key_column_name;

/// For constructing the message batch
std::vector<rd_kafka_message_t> current_batch;
std::vector<nlog::ByteVector> batch_payload;
std::vector<StringRef> keys_for_current_batch;
size_t current_batch_row {0};
Int32 next_partition {0};

struct State
{
std::atomic_size_t outstandings = 0;
std::atomic_size_t acked = 0;
std::atomic_size_t error_count = 0;
std::atomic_int last_error_code = 0;
std::atomic_size_t outstandings {0};
std::atomic_size_t acked {0};
std::atomic_size_t error_count {0};
std::atomic_int last_error_code {0};

void reset();
};
Expand Down
Loading