Skip to content

Commit

Permalink
Publish message with key
Browse files Browse the repository at this point in the history
  • Loading branch information
aneudecker committed Dec 22, 2023
1 parent d79c36d commit 90d8c2f
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 0 deletions.
54 changes: 54 additions & 0 deletions R/KafkaMessage.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,57 @@ is_empty <- function(msg) {
content <- function(msg) {
msg$payload_data
}

#' Print Message
#'
#' @param x (a KafkaMessage)
#'
#' @export
print.KafkaMessage <- function(x) {
if ("payload_data" %in% names(x)) {
if ("key" %in% names(x)) {
cat ("Key: ", x$key, "\n")
}
cat("Payload: ", x$payload_data, "\n")
} else {
cat("This is an empty message\n")
}
}

#' Check for message errors
#'
#' @param x (a KafkaMessage)
#'
#' @export
has_error <- function(x) {
UseMethod("has_error")
}

has_error.KafkaMessage <- function(x) {
"error" %in% names(x) || !("payload_data" %in% names(x))
}

#' Get message content
#'
#' @param x (a KafkaMessage)
#'
#' @export
content <- function(x) {
UseMethod("content")
}

content.KafkaMessage <- function(x) {
x$payload_data
}

#' Get message key
#'
#' @param x (a KafkaMessage)
#'
#' @export
key <- function(x) {
UseMethod("key")
}
key.KafkaMessage <- function(x) {
x$key
}
15 changes: 15 additions & 0 deletions R/Message.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,19 @@ has_error <- function(x) {

has_error.KafkaMessage <- function(x) {
"error" %in% names(x) || !("payload_data" %in% names(x))
}

content <- function(x) {
UseMethod("content")
}

content.KafkaMessage <- function(x) {
x$payload_data
}

key <- function(x) {
UseMethod("key")
}
key.KafkaMessage <- function(x) {
x$key
}
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ Start cluster:
docker-compose up -d
```

Save configuration in .Renviron

```sh
echo TOPIC="test-topic" > .Renviron
echo BROKERS="localhost:9093" >> .Renviron
```

Create a new topic

```sh
Expand Down
5 changes: 5 additions & 0 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class Consumer
payload_size = msg->len();

result["payload_data"] = std::string(static_cast<const char *>(payload), payload_size);

if (msg->key()) {
result["key"] = *msg->key();
}

break;
case RdKafka::ERR__TIMED_OUT:
break;
Expand Down
29 changes: 29 additions & 0 deletions src/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ class Producer
}
}


void produce_with_key(const std::string topic, const std::string key, const std::string message)
{
if (!producer)
{
Rcpp::stop("Kafka producer is not initialized.");
}

RdKafka::ErrorCode err = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
/* Value */
const_cast<char *>(message.data()), message.size(),
/* Key */
const_cast<char *>(key.data()), key.size(),
/* Timestamp (defaults to now) */
0,
/* Message headers, if any */
NULL,
/* Per-message opaque value passed to
* delivery report */
NULL);

if (err != RdKafka::ERR_NO_ERROR)
{
Rcpp::stop("Failed to produce message: " + RdKafka::err2str(err));
}
}

void flush(const int timeout_ms)
{
RdKafka::ErrorCode err = producer->flush(timeout_ms);
Expand All @@ -69,5 +97,6 @@ RCPP_MODULE(producer_module)
class_<Producer>("Producer")
.constructor<Rcpp::List>()
.method("produce", &Producer::produce)
.method("produce_with_key", &Producer::produce_with_key)
.method("flush", &Producer::flush);
}
28 changes: 28 additions & 0 deletions tests/testthat/test-roundtrip.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,31 @@ test_that("Roundtrip", {
consumer$unsubscribe()
consumer$close()
})


test_that("Roundtrip with key", {
config_producer <- list(
"bootstrap.servers" = brokers
)
config_consumer <- list(
"bootstrap.servers" = brokers,
"auto.offset.reset" = "latest",
"group.id" = paste0("test-consumer-", paste0(sample(letters, 6, TRUE), collapse = ""))
)

# start consuming to determine current group offset
consumer <- Consumer(config_consumer)
consumer$subscribe(topic)
consumer$consume(6000)

producer <- Producer(config_producer)
producer$produce_with_key(topic, "Key", "Simple message")

msg <- consumer$consume(10000)

expect_equal(content(msg), "Simple message")
expect_equal(key(msg), "Key")

consumer$unsubscribe()
consumer$close()
})

0 comments on commit 90d8c2f

Please sign in to comment.