diff --git a/src/client.rs b/src/client.rs index 0e23f4b3b..c8c31ea3b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -221,9 +221,18 @@ impl Client { native_config: NativeClientConfig, rd_kafka_type: RDKafkaType, context: C, + ) -> KafkaResult> { + Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context)) + } + + /// Creates a new `Client` given a configuration, a client type and a context. + pub(crate) fn new_context_arc( + config: &ClientConfig, + native_config: NativeClientConfig, + rd_kafka_type: RDKafkaType, + context: Arc, ) -> KafkaResult> { let mut err_buf = ErrBuf::new(); - let context = Arc::new(context); unsafe { rdsys::rd_kafka_conf_set_opaque( native_config.ptr(), diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index d966d4e33..dddaf0cdb 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -41,10 +41,13 @@ //! acknowledge messages quickly enough. If this error is returned, the caller //! should wait and try again. -use std::ffi::CString; +use std::ffi::{CStr, CString}; +use std::marker::PhantomData; use std::mem; use std::os::raw::c_void; use std::ptr; +use std::slice; +use std::str; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::{self, JoinHandle}; @@ -60,15 +63,19 @@ use crate::consumer::ConsumerGroupMetadata; use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError}; use crate::log::{trace, warn}; use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes}; -use crate::producer::{DefaultProducerContext, Producer, ProducerContext, PurgeConfig}; +use crate::producer::{ + DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig, +}; use crate::topic_partition_list::TopicPartitionList; use crate::util::{IntoOpaque, Timeout}; pub use crate::message::DeliveryResult; +use super::NoCustomPartitioner; + /// Callback that gets called from librdkafka every time a message succeeds or fails to be /// delivered. -unsafe extern "C" fn delivery_cb( +unsafe extern "C" fn delivery_cb>( _client: *mut RDKafka, msg: *const RDKafkaMessage, opaque: *mut c_void, @@ -206,6 +213,33 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> { } } +unsafe extern "C" fn partitioner_cb>( + topic: *const RDKafkaTopic, + keydata: *const c_void, + keylen: usize, + partition_cnt: i32, + rkt_opaque: *mut c_void, + _msg_opaque: *mut c_void, +) -> i32 { + let topic_name = CStr::from_ptr(rdsys::rd_kafka_topic_name(topic)); + let topic_name = str::from_utf8_unchecked(topic_name.to_bytes()); + + let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1; + + let key = if keydata.is_null() { + None + } else { + Some(slice::from_raw_parts(keydata as *const u8, keylen)) + }; + + let producer_context = &mut *(rkt_opaque as *mut C); + + producer_context + .get_custom_partitioner() + .expect("custom partitioner is not set") + .partition(topic_name, key, partition_cnt, is_partition_available) +} + impl FromClientConfig for BaseProducer { /// Creates a new `BaseProducer` starting from a configuration. fn from_config(config: &ClientConfig) -> KafkaResult> { @@ -213,16 +247,44 @@ impl FromClientConfig for BaseProducer { } } -impl FromClientConfigAndContext for BaseProducer +impl FromClientConfigAndContext for BaseProducer where - C: ProducerContext, + Part: Partitioner, + C: ProducerContext, { /// Creates a new `BaseProducer` starting from a configuration and a /// context. - fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult> { + /// + /// SAFETY: Raw pointer to custom partitioner is used as opaque. + /// It's comes from reference to field in producer context so it's valid as the context is valid. + fn from_config_and_context( + config: &ClientConfig, + context: C, + ) -> KafkaResult> { let native_config = config.create_native_config()?; - unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::)) }; - let client = Client::new( + let context = Arc::new(context); + + if context.get_custom_partitioner().is_some() { + let default_topic_config = + unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) }; + unsafe { + rdsys::rd_kafka_topic_conf_set_opaque( + default_topic_config, + Arc::as_ptr(&context) as *mut c_void, + ) + }; + unsafe { + rdsys::rd_kafka_topic_conf_set_partitioner_cb( + default_topic_config, + Some(partitioner_cb::), + ) + } + } + + unsafe { + rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::)) + }; + let client = Client::new_context_arc( config, native_config, RDKafkaType::RD_KAFKA_PRODUCER, @@ -270,20 +332,27 @@ where /// ``` /// /// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/ -pub struct BaseProducer +/// +pub struct BaseProducer where - C: ProducerContext, + Part: Partitioner, + C: ProducerContext, { client: Client, + _partitioner: PhantomData, } -impl BaseProducer +impl BaseProducer where - C: ProducerContext, + Part: Partitioner, + C: ProducerContext, { /// Creates a base producer starting from a Client. - fn from_client(client: Client) -> BaseProducer { - BaseProducer { client } + fn from_client(client: Client) -> BaseProducer { + BaseProducer { + client, + _partitioner: PhantomData, + } } /// Polls the producer, returning the number of events served. @@ -374,9 +443,10 @@ where } } -impl Producer for BaseProducer +impl Producer for BaseProducer where - C: ProducerContext, + Part: Partitioner, + C: ProducerContext, { fn client(&self) -> &Client { &self.client @@ -480,9 +550,9 @@ where } } -impl Drop for BaseProducer +impl Drop for BaseProducer where - C: ProducerContext, + C: ProducerContext, { fn drop(&mut self) { self.purge(PurgeConfig::default().queue().inflight()); @@ -502,29 +572,30 @@ where /// queued events, such as delivery notifications. The thread will be /// automatically stopped when the producer is dropped. #[must_use = "The threaded producer will stop immediately if unused"] -pub struct ThreadedProducer +pub struct ThreadedProducer where - C: ProducerContext + 'static, + C: ProducerContext + 'static, { - producer: Arc>, + producer: Arc>, should_stop: Arc, handle: Option>, } -impl FromClientConfig for ThreadedProducer { +impl FromClientConfig for ThreadedProducer { fn from_config(config: &ClientConfig) -> KafkaResult> { ThreadedProducer::from_config_and_context(config, DefaultProducerContext) } } -impl FromClientConfigAndContext for ThreadedProducer +impl FromClientConfigAndContext for ThreadedProducer where - C: ProducerContext + 'static, + Part: Partitioner + Send + Sync + 'static, + C: ProducerContext + 'static, { fn from_config_and_context( config: &ClientConfig, context: C, - ) -> KafkaResult> { + ) -> KafkaResult> { let producer = Arc::new(BaseProducer::from_config_and_context(config, context)?); let should_stop = Arc::new(AtomicBool::new(false)); let thread = { @@ -558,9 +629,10 @@ where } } -impl ThreadedProducer +impl ThreadedProducer where - C: ProducerContext + 'static, + Part: Partitioner, + C: ProducerContext + 'static, { /// Sends a message to Kafka. /// @@ -587,9 +659,10 @@ where } } -impl Producer for ThreadedProducer +impl Producer for ThreadedProducer where - C: ProducerContext + 'static, + Part: Partitioner, + C: ProducerContext + 'static, { fn client(&self) -> &Client { self.producer.client() @@ -634,9 +707,10 @@ where } } -impl Drop for ThreadedProducer +impl Drop for ThreadedProducer where - C: ProducerContext + 'static, + Part: Partitioner, + C: ProducerContext + 'static, { fn drop(&mut self) { trace!("Destroy ThreadedProducer"); diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index d7f71739b..0769a16a8 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -20,12 +20,15 @@ use crate::consumer::ConsumerGroupMetadata; use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes}; use crate::producer::{ - BaseRecord, DeliveryResult, Producer, ProducerContext, PurgeConfig, ThreadedProducer, + BaseRecord, DeliveryResult, NoCustomPartitioner, Producer, ProducerContext, PurgeConfig, + ThreadedProducer, }; use crate::statistics::Statistics; use crate::topic_partition_list::TopicPartitionList; use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout}; +use super::Partitioner; + // // ********** FUTURE PRODUCER ********** // @@ -167,7 +170,11 @@ impl ClientContext for FutureProducerContext { } } -impl ProducerContext for FutureProducerContext { +impl ProducerContext for FutureProducerContext +where + C: ClientContext + 'static, + Part: Partitioner, +{ type DeliveryOpaque = Box>; fn delivery( @@ -195,11 +202,12 @@ impl ProducerContext for FutureProducerContext { /// underlying producer. The internal polling thread will be terminated when the /// `FutureProducer` goes out of scope. #[must_use = "Producer polling thread will stop immediately if unused"] -pub struct FutureProducer +pub struct FutureProducer where + Part: Partitioner, C: ClientContext + 'static, { - producer: Arc>>, + producer: Arc, Part>>, _runtime: PhantomData, } @@ -363,10 +371,11 @@ where } } -impl Producer> for FutureProducer +impl Producer, Part> for FutureProducer where C: ClientContext + 'static, R: AsyncRuntime, + Part: Partitioner, { fn client(&self) -> &Client> { self.producer.client() @@ -421,7 +430,7 @@ mod tests { struct TestContext; impl ClientContext for TestContext {} - impl ProducerContext for TestContext { + impl ProducerContext for TestContext { type DeliveryOpaque = Box; fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) { diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 5bcf6b86b..88c82fd55 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -183,13 +183,14 @@ pub use self::future_producer::{DeliveryFuture, FutureProducer, FutureRecord}; /// /// This user-defined object can be used to provide custom callbacks for /// producer events. Refer to the list of methods to check which callbacks can -/// be specified. +/// be specified. It can also specify custom partitioner to register and to be +/// used for deciding to which partition write message into. /// /// In particular, it can be used to specify the `delivery` callback that will /// be called when the acknowledgement for a delivered message is received. /// /// See also the [`ClientContext`] trait. -pub trait ProducerContext: ClientContext { +pub trait ProducerContext: ClientContext { /// A `DeliveryOpaque` is a user-defined structure that will be passed to /// the producer when producing a message, and returned to the `delivery` /// method once the message has been delivered, or failed to. @@ -199,6 +200,58 @@ pub trait ProducerContext: ClientContext { /// failed to). The `DeliveryOpaque` will be the one provided by the user /// when calling send. fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque); + + /// This method is called when creating producer in order to optionally register custom partitioner. + /// If custom partitioner is not used then `partitioner` configuration property is used (or its default). + /// + /// sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key. + /// See https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196 + fn get_custom_partitioner(&self) -> Option<&Part> { + None + } +} + +/// Unassigned partition. +/// See RD_KAFKA_PARTITION_UA from librdkafka. +pub const PARTITION_UA: i32 = -1; + +/// Trait allowing to customize the partitioning of messages. +pub trait Partitioner { + /// Return partition to use for `topic_name`. + /// `topic_name` is the name of a topic to which a message is being produced. + /// `partition_cnt` is the number of partitions for this topic. + /// `key` is an optional key of the message. + /// `is_partition_available` is a function that can be called to check if a partition has an active leader broker. + /// + /// It may be called in any thread at any time, + /// It may be called multiple times for the same message/key. + /// MUST NOT block or execute for prolonged periods of time. + /// MUST return a value between 0 and partition_cnt-1, or the + /// special RD_KAFKA_PARTITION_UA value if partitioning could not be performed. + /// See documentation for rd_kafka_topic_conf_set_partitioner_cb from librdkafka for more info. + fn partition( + &self, + topic_name: &str, + key: Option<&[u8]>, + partition_cnt: i32, + is_partition_available: impl Fn(i32) -> bool, + ) -> i32; +} + +/// Placeholder used when no custom partitioner is needed. +#[derive(Clone)] +pub struct NoCustomPartitioner {} + +impl Partitioner for NoCustomPartitioner { + fn partition( + &self, + _topic_name: &str, + _key: Option<&[u8]>, + _partition_cnt: i32, + _is_paritition_available: impl Fn(i32) -> bool, + ) -> i32 { + panic!("NoCustomPartitioner should not be called"); + } } /// An inert producer context that can be used when customizations are not @@ -207,16 +260,17 @@ pub trait ProducerContext: ClientContext { pub struct DefaultProducerContext; impl ClientContext for DefaultProducerContext {} -impl ProducerContext for DefaultProducerContext { +impl ProducerContext for DefaultProducerContext { type DeliveryOpaque = (); fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {} } /// Common trait for all producers. -pub trait Producer +pub trait Producer where - C: ProducerContext, + Part: Partitioner, + C: ProducerContext, { /// Returns the [`Client`] underlying this producer. fn client(&self) -> &Client; diff --git a/tests/test_low_producers.rs b/tests/test_low_producers.rs index 142e6e11f..bf5c9ef2a 100644 --- a/tests/test_low_producers.rs +++ b/tests/test_low_producers.rs @@ -13,7 +13,8 @@ use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::message::{Header, Headers, Message, OwnedHeaders, OwnedMessage}; use rdkafka::producer::{ - BaseProducer, BaseRecord, DeliveryResult, Producer, ProducerContext, ThreadedProducer, + BaseProducer, BaseRecord, DeliveryResult, NoCustomPartitioner, Partitioner, Producer, + ProducerContext, ThreadedProducer, }; use rdkafka::types::RDKafkaRespErr; use rdkafka::util::current_time_millis; @@ -46,9 +47,10 @@ impl ProducerContext for PrintingContext { type TestProducerDeliveryResult = (OwnedMessage, Option, usize); #[derive(Clone)] -struct CollectingContext { +struct CollectingContext { stats: Arc>>, results: Arc>>, + partitioner: Option, } impl CollectingContext { @@ -56,11 +58,22 @@ impl CollectingContext { CollectingContext { stats: Arc::new(Mutex::new(Vec::new())), results: Arc::new(Mutex::new(Vec::new())), + partitioner: None, } } } -impl ClientContext for CollectingContext { +impl CollectingContext { + fn new_with_custom_partitioner(partitioner: Part) -> CollectingContext { + CollectingContext { + stats: Arc::new(Mutex::new(Vec::new())), + results: Arc::new(Mutex::new(Vec::new())), + partitioner: Some(partitioner), + } + } +} + +impl ClientContext for CollectingContext { // Access and use all stats. fn stats(&self, stats: Statistics) { let mut stats_vec = self.stats.lock().unwrap(); @@ -68,7 +81,7 @@ impl ClientContext for CollectingContext { } } -impl ProducerContext for CollectingContext { +impl ProducerContext for CollectingContext { type DeliveryOpaque = usize; fn delivery(&self, delivery_result: &DeliveryResult, delivery_opaque: Self::DeliveryOpaque) { @@ -80,6 +93,52 @@ impl ProducerContext for CollectingContext { } } } + + fn get_custom_partitioner(&self) -> Option<&Part> { + match &self.partitioner { + None => None, + Some(p) => Some(&p), + } + } +} + +// Partitioner sending all messages to single, defined partition. +#[derive(Clone)] +pub struct FixedPartitioner { + partition: i32, +} + +impl FixedPartitioner { + fn new(partition: i32) -> Self { + Self { partition } + } +} + +impl Partitioner for FixedPartitioner { + fn partition( + &self, + _topic_name: &str, + _key: Option<&[u8]>, + _partition_cnt: i32, + _is_paritition_available: impl Fn(i32) -> bool, + ) -> i32 { + self.partition + } +} + +#[derive(Clone)] +pub struct PanicPartitioner {} + +impl Partitioner for PanicPartitioner { + fn partition( + &self, + _topic_name: &str, + _key: Option<&[u8]>, + _partition_cnt: i32, + _is_paritition_available: impl Fn(i32) -> bool, + ) -> i32 { + panic!("partition() panic"); + } } fn default_config(config_overrides: HashMap<&str, &str>) -> ClientConfig { @@ -98,26 +157,32 @@ fn base_producer(config_overrides: HashMap<&str, &str>) -> BaseProducer( +fn base_producer_with_context>( context: C, config_overrides: HashMap<&str, &str>, -) -> BaseProducer { +) -> BaseProducer { default_config(config_overrides) - .create_with_context::>(context) + .create_with_context::>(context) .unwrap() } #[allow(dead_code)] -fn threaded_producer(config_overrides: HashMap<&str, &str>) -> ThreadedProducer { +fn threaded_producer( + config_overrides: HashMap<&str, &str>, +) -> ThreadedProducer { threaded_producer_with_context(PrintingContext { _n: 123 }, config_overrides) } -fn threaded_producer_with_context( +fn threaded_producer_with_context( context: C, config_overrides: HashMap<&str, &str>, -) -> ThreadedProducer { +) -> ThreadedProducer +where + Part: Partitioner + Send + Sync + 'static, + C: ProducerContext, +{ default_config(config_overrides) - .create_with_context::>(context) + .create_with_context::>(context) .unwrap() } @@ -406,3 +471,82 @@ fn test_fatal_errors() { )) ) } + +#[test] +fn test_register_custom_partitioner_linger_non_zero_key_null() { + // Custom partitioner is not used when sticky.partitioning.linger.ms > 0 and key is null. + // https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196 + let context = CollectingContext::new_with_custom_partitioner(PanicPartitioner {}); + let mut config_overrides = HashMap::new(); + config_overrides.insert("sticky.partitioning.linger.ms", "10"); + let producer = base_producer_with_context(context.clone(), config_overrides); + + producer + .send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload("")) + .unwrap(); + producer.flush(Duration::from_secs(10)).unwrap(); + + let delivery_results = context.results.lock().unwrap(); + + assert_eq!(delivery_results.len(), 1); + + for &(_, ref error, _) in &(*delivery_results) { + assert_eq!(*error, None); + } +} + +#[test] +fn test_custom_partitioner_base_producer() { + let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2)); + let producer = base_producer_with_context(context.clone(), HashMap::new()); + let topic_name = rand_test_topic(); + + let results_count = (0..10) + .map(|id| { + producer.send( + BaseRecord::with_opaque_to(&topic_name, id) + .payload("") + .key(""), + ) + }) + .filter(|r| r.is_ok()) + .count(); + + assert_eq!(results_count, 10); + producer.flush(Duration::from_secs(10)).unwrap(); + + let delivery_results = context.results.lock().unwrap(); + + for &(ref message, ref error, _) in &(*delivery_results) { + assert_eq!(error, &None); + assert_eq!(message.partition(), 2); + } +} + +#[test] +fn test_custom_partitioner_threaded_producer() { + let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2)); + let producer = threaded_producer_with_context(context.clone(), HashMap::new()); + let topic_name = rand_test_topic(); + + let results_count = (0..10) + .map(|id| { + producer.send( + BaseRecord::with_opaque_to(&topic_name, id) + .payload("") + .key(""), + ) + }) + .filter(|r| r.is_ok()) + .count(); + + assert_eq!(results_count, 10); + producer.flush(Duration::from_secs(10)).unwrap(); + + let delivery_results = context.results.lock().unwrap(); + + for &(ref message, ref error, _) in &(*delivery_results) { + assert_eq!(error, &None); + assert_eq!(message.partition(), 2); + } +}