Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce partitioner interface #592

Merged
merged 13 commits into from
Jun 30, 2023
11 changes: 10 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,18 @@ impl<C: ClientContext> Client<C> {
native_config: NativeClientConfig,
rd_kafka_type: RDKafkaType,
context: C,
) -> KafkaResult<Client<C>> {
Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
}

/// Creates a new `Client` given a configuration, a client type and a context.
pub(crate) fn new_context_arc(
config: &ClientConfig,
native_config: NativeClientConfig,
rd_kafka_type: RDKafkaType,
context: Arc<C>,
) -> KafkaResult<Client<C>> {
let mut err_buf = ErrBuf::new();
let context = Arc::new(context);
unsafe {
rdsys::rd_kafka_conf_set_opaque(
native_config.ptr(),
Expand Down
136 changes: 105 additions & 31 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
//! acknowledge messages quickly enough. If this error is returned, the caller
//! should wait and try again.

use std::ffi::CString;
use std::ffi::{CStr, CString};
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::slice;
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
Expand All @@ -60,15 +63,19 @@ use crate::consumer::ConsumerGroupMetadata;
use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
use crate::log::{trace, warn};
use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
use crate::producer::{DefaultProducerContext, Producer, ProducerContext, PurgeConfig};
use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{IntoOpaque, Timeout};

pub use crate::message::DeliveryResult;

use super::NoCustomPartitioner;

/// Callback that gets called from librdkafka every time a message succeeds or fails to be
/// delivered.
unsafe extern "C" fn delivery_cb<C: ProducerContext>(
unsafe extern "C" fn delivery_cb<Part: Partitioner, C: ProducerContext<Part>>(
_client: *mut RDKafka,
msg: *const RDKafkaMessage,
opaque: *mut c_void,
Expand Down Expand Up @@ -206,23 +213,78 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
}
}

unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>(
topic: *const RDKafkaTopic,
keydata: *const c_void,
keylen: usize,
partition_cnt: i32,
rkt_opaque: *mut c_void,
_msg_opaque: *mut c_void,
) -> i32 {
let topic_name = CStr::from_ptr(rdsys::rd_kafka_topic_name(topic));
let topic_name = str::from_utf8_unchecked(topic_name.to_bytes());

let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;

let key = if keydata.is_null() {
None
} else {
Some(slice::from_raw_parts(keydata as *const u8, keylen))
};

let producer_context = &mut *(rkt_opaque as *mut C);

producer_context
.get_custom_partitioner()
.expect("custom partitioner is not set")
.partition(topic_name, key, partition_cnt, is_partition_available)
}

impl FromClientConfig for BaseProducer<DefaultProducerContext> {
/// Creates a new `BaseProducer` starting from a configuration.
fn from_config(config: &ClientConfig) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
BaseProducer::from_config_and_context(config, DefaultProducerContext)
}
}

impl<C> FromClientConfigAndContext<C> for BaseProducer<C>
impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where
C: ProducerContext,
Part: Partitioner,
C: ProducerContext<Part>,
{
/// Creates a new `BaseProducer` starting from a configuration and a
/// context.
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseProducer<C>> {
///
/// SAFETY: Raw pointer to custom partitioner is used as opaque.
/// It's comes from reference to field in producer context so it's valid as the context is valid.
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<BaseProducer<C, Part>> {
let native_config = config.create_native_config()?;
unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<C>)) };
let client = Client::new(
let context = Arc::new(context);

if context.get_custom_partitioner().is_some() {
let default_topic_config =
unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) };
unsafe {
rdsys::rd_kafka_topic_conf_set_opaque(
default_topic_config,
Arc::as_ptr(&context) as *mut c_void,
)
};
unsafe {
rdsys::rd_kafka_topic_conf_set_partitioner_cb(
default_topic_config,
Some(partitioner_cb::<Part, C>),
)
}
}

unsafe {
rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<Part, C>))
};
let client = Client::new_context_arc(
config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
Expand Down Expand Up @@ -270,20 +332,27 @@ where
/// ```
///
/// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
pub struct BaseProducer<C = DefaultProducerContext>
///
pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
where
C: ProducerContext,
Part: Partitioner,
C: ProducerContext<Part>,
{
client: Client<C>,
_partitioner: PhantomData<Part>,
}

impl<C> BaseProducer<C>
impl<C, Part> BaseProducer<C, Part>
where
C: ProducerContext,
Part: Partitioner,
C: ProducerContext<Part>,
{
/// Creates a base producer starting from a Client.
fn from_client(client: Client<C>) -> BaseProducer<C> {
BaseProducer { client }
fn from_client(client: Client<C>) -> BaseProducer<C, Part> {
BaseProducer {
client,
_partitioner: PhantomData,
}
}

/// Polls the producer, returning the number of events served.
Expand Down Expand Up @@ -374,9 +443,10 @@ where
}
}

impl<C> Producer<C> for BaseProducer<C>
impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
where
C: ProducerContext,
Part: Partitioner,
C: ProducerContext<Part>,
{
fn client(&self) -> &Client<C> {
&self.client
Expand Down Expand Up @@ -480,9 +550,9 @@ where
}
}

impl<C> Drop for BaseProducer<C>
impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where
C: ProducerContext,
C: ProducerContext<Part>,
{
fn drop(&mut self) {
self.purge(PurgeConfig::default().queue().inflight());
Expand All @@ -502,29 +572,30 @@ where
/// queued events, such as delivery notifications. The thread will be
/// automatically stopped when the producer is dropped.
#[must_use = "The threaded producer will stop immediately if unused"]
pub struct ThreadedProducer<C>
pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>
where
C: ProducerContext + 'static,
C: ProducerContext<Part> + 'static,
{
producer: Arc<BaseProducer<C>>,
producer: Arc<BaseProducer<C, Part>>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}

impl FromClientConfig for ThreadedProducer<DefaultProducerContext> {
impl FromClientConfig for ThreadedProducer<DefaultProducerContext, NoCustomPartitioner> {
fn from_config(config: &ClientConfig) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
ThreadedProducer::from_config_and_context(config, DefaultProducerContext)
}
}

impl<C> FromClientConfigAndContext<C> for ThreadedProducer<C>
impl<C, Part> FromClientConfigAndContext<C> for ThreadedProducer<C, Part>
where
C: ProducerContext + 'static,
Part: Partitioner + Send + Sync + 'static,
C: ProducerContext<Part> + 'static,
{
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<ThreadedProducer<C>> {
) -> KafkaResult<ThreadedProducer<C, Part>> {
let producer = Arc::new(BaseProducer::from_config_and_context(config, context)?);
let should_stop = Arc::new(AtomicBool::new(false));
let thread = {
Expand Down Expand Up @@ -558,9 +629,10 @@ where
}
}

impl<C> ThreadedProducer<C>
impl<C, Part> ThreadedProducer<C, Part>
where
C: ProducerContext + 'static,
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
/// Sends a message to Kafka.
///
Expand All @@ -587,9 +659,10 @@ where
}
}

impl<C> Producer<C> for ThreadedProducer<C>
impl<C, Part> Producer<C, Part> for ThreadedProducer<C, Part>
where
C: ProducerContext + 'static,
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn client(&self) -> &Client<C> {
self.producer.client()
Expand Down Expand Up @@ -634,9 +707,10 @@ where
}
}

impl<C> Drop for ThreadedProducer<C>
impl<C, Part> Drop for ThreadedProducer<C, Part>
where
C: ProducerContext + 'static,
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn drop(&mut self) {
trace!("Destroy ThreadedProducer");
Expand Down
21 changes: 15 additions & 6 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use crate::consumer::ConsumerGroupMetadata;
use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
use crate::message::{Message, OwnedHeaders, OwnedMessage, Timestamp, ToBytes};
use crate::producer::{
BaseRecord, DeliveryResult, Producer, ProducerContext, PurgeConfig, ThreadedProducer,
BaseRecord, DeliveryResult, NoCustomPartitioner, Producer, ProducerContext, PurgeConfig,
ThreadedProducer,
};
use crate::statistics::Statistics;
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{AsyncRuntime, DefaultRuntime, IntoOpaque, Timeout};

use super::Partitioner;

//
// ********** FUTURE PRODUCER **********
//
Expand Down Expand Up @@ -167,7 +170,11 @@ impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
}
}

impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
impl<C, Part> ProducerContext<Part> for FutureProducerContext<C>
where
C: ClientContext + 'static,
Part: Partitioner,
{
type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;

fn delivery(
Expand Down Expand Up @@ -195,11 +202,12 @@ impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
/// underlying producer. The internal polling thread will be terminated when the
/// `FutureProducer` goes out of scope.
#[must_use = "Producer polling thread will stop immediately if unused"]
pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime>
pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime, Part = NoCustomPartitioner>
where
Part: Partitioner,
C: ClientContext + 'static,
{
producer: Arc<ThreadedProducer<FutureProducerContext<C>>>,
producer: Arc<ThreadedProducer<FutureProducerContext<C>, Part>>,
_runtime: PhantomData<R>,
}

Expand Down Expand Up @@ -363,10 +371,11 @@ where
}
}

impl<C, R> Producer<FutureProducerContext<C>> for FutureProducer<C, R>
impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
where
C: ClientContext + 'static,
R: AsyncRuntime,
Part: Partitioner,
{
fn client(&self) -> &Client<FutureProducerContext<C>> {
self.producer.client()
Expand Down Expand Up @@ -421,7 +430,7 @@ mod tests {
struct TestContext;

impl ClientContext for TestContext {}
impl ProducerContext for TestContext {
impl ProducerContext<NoCustomPartitioner> for TestContext {
type DeliveryOpaque = Box<i32>;

fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
Expand Down
Loading