Skip to content

Commit

Permalink
#21 Rewrite interface (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneudecker authored Mar 21, 2024
1 parent aa3614a commit 7db20d1
Show file tree
Hide file tree
Showing 27 changed files with 556 additions and 240 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@
"cinttypes": "cpp",
"typeindex": "cpp",
"variant": "cpp"
}
},
"editor.formatOnSave": true,
"editor.defaultFormatter": "REditorSupport.r",
}
9 changes: 6 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
Package: kafka
Type: Package
Title: Kafka Client for R
Version: 0.0.1
Version: 0.1.0
Author: Andreas Neudecker<[email protected]>
Maintainer: Andreas Neudecker<[email protected]>
Description: This package is a wrapper for the librdkafka C/C++ library. It can be used to produce and consume messages from Kafka.
License: MIT + file LICENSE
Imports: Rcpp (>= 1.0.10), methods
Imports:
methods,
R6,
Rcpp (>= 1.0.10),
LinkingTo: Rcpp
SystemRequirements: librdkafka - Apache Kafka C driver library
RoxygenNote: 7.2.3
RoxygenNote: 7.3.1
Encoding: UTF-8
Suggests:
testthat (>= 3.0.0)
Expand Down
10 changes: 6 additions & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
S3method(print,KafkaMessage)
export(Consumer)
export(Producer)
export(content)
export(has_error)
export(is_empty)
export(key)
export(new_kafka_message)
export(result_error_code)
export(result_error_message)
export(result_has_error)
export(result_message)
importFrom(R6,R6Class)
importFrom(Rcpp,evalCpp)
importFrom(methods,new)
useDynLib(kafka, .registration=TRUE)
1 change: 1 addition & 0 deletions R/00-NAMESPACE.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#' @useDynLib kafka, .registration=TRUE
#' @importFrom Rcpp evalCpp
#' @importFrom methods new
#' @importFrom R6 R6Class
NULL
69 changes: 62 additions & 7 deletions R/Consumer.R
Original file line number Diff line number Diff line change
@@ -1,10 +1,65 @@
#' Consumer
#'
#' @param config (list)
#' Consumer reference class
#'
#' @export
Consumer <- function(config) {
consumer_rcpp_module <- Rcpp::Module("consumer_module")
Consumer <- R6Class("Consumer",
public = list(
#' @description
#' Create a new consumer instance
#'
#' @param config (`list()`)\cr
#' list of config values. These are passed without modification to
#' librdkafka
initialize = function(config) {
consumer_rcpp_module <- Rcpp::Module("consumer_module")
private$consumer <- new(consumer_rcpp_module$Consumer, config)
},
#' @description
#' Subscribe to topic
#'
#' @param topic (`character`)\cr
#' kafka topic
subscribe = function(topic) {
private$consumer$subscribe(topic)
},
#' @description
#' Consume single message
#'
#' @param timeout (`integer`)\cr
#' timeout in ms.
consume = function(timeout = 1000) {
result <- private$consumer$consume(timeout)

message <- new_kafka_message(
value = result$payload_data,
key = result$key
)

new(consumer_rcpp_module$Consumer, config)
}
new_kafka_result(
message,
result$error_code,
result$error_message
)
},
#' @description
#' Commit current offset
#'
#' @param async (`logical`)\cr
#' commit async
commit = function(async = FALSE) {
private$consumer$commit(async)
},
#' @description
#' Unsubscribe from current topic
unsubscribe = function() {
private$consumer$unsubscribe()
},
#' @description
#' Close consumer
close = function() {
private$consumer$close()
}
),
private = list(
consumer = NULL
)
)
63 changes: 0 additions & 63 deletions R/KafkaMessage.R

This file was deleted.

76 changes: 76 additions & 0 deletions R/Message.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#' Create KafkaMessage object
#'
#' @param key (`character`) message key (optional)
#' @param value (`character`) message value
#' @export
new_kafka_message <- function(key = NULL, value) {
structure(
list(
key = key,
value = value
),
class = "KafkaMessage"
)
}

#' Extract message from result
#'
#' @param result (`KafkaResult`) result from consumer
#'
#' @export
result_message <- function(result) {
result$message
}

#' Check if result has errors
#'
#' @param result (`KafkaResult`) result from consumer
#'
#' @export
result_has_error <- function(result) {
!is.null(result$error_code)
}

#' Extract error code from result
#'
#' @param result (`KafkaResult`) result from consumer
#'
#' @export
result_error_code <- function(result) {
result$error_code
}

#' Extract error message from result
#'
#' @param result (`KafkaResult`) result from consumer
#'
#' @export
result_error_message <- function(result) {
result$error_message
}

#' Print kafka message
#'
#' @param x (`KafkaMessage`) message
#' @param ... additional args (not used)
#'
#' @export
print.KafkaMessage <- function(x, ...) {
if (!is.null(x$key)) {
cat(sprintf("Key: %s\n", x$key))
}
cat(sprintf("Value: %s\n", x$value))
invisible(x)
}


new_kafka_result <- function(message, error_code, error_message) {
structure(
list(
message = message,
error_code = error_code,
error_message = error_message
),
class = "KafkaResult"
)
}
69 changes: 46 additions & 23 deletions R/Producer.R
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
#' Create Kafka Producer
#'
#' @details This function creates an instance of a Kafka Producer based on a list with configuration options. The following methods are currently implemented: ...
#'
#' @examples
#' \dontrun{
#' config <- list(
#' "bootstrap.servers" = "my-boostrap-server.com"
#' )
#'
#' producer <- Producer(config)
#'
#' producer$produce(topic = "test-topic", message = "Hello World!")
#'
#' producer$flush(1000)
#' }
#' @param config list with configuration options accepted by librdkafka. See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for details.
#' @title Producer reference class
#'
#' @description
#' Wrapper for RdKafka::Producer
#'
#' @return A kafka producer object. See details for currently available methods
#' @export
Producer <- function(config) {
producer_rcpp_module <- Rcpp::Module("producer_module")

new(producer_rcpp_module$Producer, config)
}
Producer <- R6Class("Producer",
public = list(
#' @description
#' Create a new consumer instance
#'
#' @param config (`list()`)\cr
#' list of config values. These are passed without modification to
#' librdkafka
initialize = function(config) {
producer_rcpp_module <- Rcpp::Module("producer_module")
private$producer <- new(producer_rcpp_module$Producer, config)
},
#' @description
#' Produce message
#'
#' @param topic (`character`)\cr
#' kafka topic
#' @param value (`character`)\cr
#' value of message
#' @param key (`character`)\cr
#' key of message (optional)
produce = function(topic, value, key = NULL) {
if (!is.null(key)) {
private$producer$produce_with_key(topic, key, value)
} else {
private$producer$produce(topic, value)
}
},
#' @description
#' Flush producer, i.e. send all messages currently in the
#' queue
#'
#' @param timeout (`integer`)\cr
#' timeout in ms.
flush = function(timeout = 1000) {
private$producer$flush(timeout)
}
),
private = list(
producer = NULL
)
)
Loading

0 comments on commit 7db20d1

Please sign in to comment.