Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support message key for Kafka external stream #434

Merged
merged 4 commits into from
Jan 9, 2024

Conversation

zliang-min
Copy link
Collaborator

@zliang-min zliang-min commented Dec 21, 2023

PR checklist:

  • Did you run ClangFormat ?
  • Did you separate headers to a different section in existing community code base ?
  • Did you surround proton: starts/ends for new code in existing community code base ?

Please write user-readable short description of the changes:

closes #433

Added a new external stream setting message_key which is an expression that returns a string value, the values return by the expression will be used as the message key for each row. Examples:

-- use a column
CREATE EXTERNAL STREAM example_one (
  one string,
  two int32
) SETTINGS type='kafka',...,message_key='one'

-- use a complex expression
CREATE EXTERNAL STREAM example_two (
  one string,
  two int32
) SETTINGS type='kafka',...,message_key='split_by_string(\',\', one)[1]'

message_key can be used with sharding_expr together, and sharding_expr will take priority (exactly what a Kafka client does).

Together with this change, a couple more changes are worth mentioning:

  • the random sharding logic has been simplified, it now relies on librdkafka for partitioning.
  • it now uses rd_kafka_produce_batch to send messages to Kafka, which is expected to be more efficient ( at least, it removes the overhead of calling rd_kafka_produce for each message ). This should benefit one_message_per_row even more.

@@ -304,8 +287,8 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len)
.partition = next_partition,
.payload = const_cast<void *>(static_cast<const void *>(payload)),
.len = len,
.key = const_cast<void *>(static_cast<const void *>(key.data())),
.key_len = key.size(),
.key = const_cast<void *>(static_cast<const void *>(key.data)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need all of this cast ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for payload

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference here is that key.data has type of const char *, if we remove the const_cast, it will report Cannot initialize a member subobject of type 'void *' with an lvalue of type 'const char *'. But the static_cast` can be removed.

@chenziliang chenziliang merged commit 81ee54d into develop Jan 9, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support message keys for Kafka external stream
2 participants