Skip to content

Commit

Permalink
fix: Kafka external stream parseFormat crash (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Jun 3, 2024
1 parent e795216 commit e6cecc4
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 14 deletions.
14 changes: 2 additions & 12 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void *
void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
{
assert(format_executor);
assert(convert_non_virtual_to_physical_action);

ReadBufferFromMemory buffer(static_cast<const char *>(kmessage->payload), kmessage->len);
auto new_rows = format_executor->execute(buffer);
Expand All @@ -196,9 +195,7 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
if (!new_rows)
return;

auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns());
convert_non_virtual_to_physical_action->execute(result_block);

auto result_block = physical_header.cloneWithColumns(format_executor->getResultColumns());
MutableColumns new_data(result_block.mutateColumns());

if (!request_virtual_columns)
Expand Down Expand Up @@ -271,20 +268,13 @@ void KafkaSource::initFormatExecutor()
kafka.getFormatSettings(query_context));

format_executor = std::make_unique<StreamingFormatExecutor>(
non_virtual_header,
physical_header,
std::move(input_format),
[this](const MutableColumns &, Exception & ex) -> size_t
{
format_error = ex.what();
return 0;
});

auto converting_dag = ActionsDAG::makeConvertingActions(
non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(),
physical_header.cloneEmpty().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);

convert_non_virtual_to_physical_action = std::make_shared<ExpressionActions>(std::move(converting_dag));
}

void KafkaSource::calculateColumnPositions()
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class KafkaSource final : public Streaming::ISource
Block physical_header;
Chunk header_chunk;

std::shared_ptr<ExpressionActions> convert_non_virtual_to_physical_action = nullptr;

std::unique_ptr<StreamingFormatExecutor> format_executor;
ReadBufferFromMemory read_buffer;

Expand Down

0 comments on commit e6cecc4

Please sign in to comment.