diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs deleted file mode 100644 index 8960edd8171fb..0000000000000 --- a/src/sinks/redis.rs +++ /dev/null @@ -1,680 +0,0 @@ -use std::task::{Context, Poll}; - -use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream, FutureExt, SinkExt, StreamExt}; -use redis::{aio::ConnectionManager, RedisError, RedisResult}; -use snafu::{ResultExt, Snafu}; -use tokio_util::codec::Encoder as _; -use tower::{Service, ServiceBuilder}; -use vector_common::internal_event::{ - ByteSize, BytesSent, InternalEventHandle, Protocol, Registered, -}; -use vector_config::configurable_component; -use vector_core::EstimatedJsonEncodedSizeOf; - -use crate::{ - codecs::{Encoder, EncodingConfig, Transformer}, - config::{self, AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::Event, - internal_events::TemplateRenderingError, - sinks::util::{ - batch::BatchConfig, - retries::{RetryAction, RetryLogic}, - sink::Response, - BatchSink, Concurrency, EncodedEvent, EncodedLength, ServiceBuilderExt, SinkBatchSettings, - TowerRequestConfig, VecBuffer, - }, - template::{Template, TemplateParseError}, -}; - -#[derive(Debug, Snafu)] -enum RedisSinkError { - #[snafu(display("Creating Redis producer failed: {}", source))] - RedisCreateFailed { source: RedisError }, - #[snafu(display("Invalid key template: {}", source))] - KeyTemplate { source: TemplateParseError }, -} - -/// Redis data type to store messages in. -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative)] -#[derivative(Default)] -#[serde(rename_all = "lowercase")] -pub enum DataTypeConfig { - /// The Redis `list` type. - /// - /// This resembles a deque, where messages can be popped and pushed from either end. - /// - /// This is the default. - #[derivative(Default)] - List, - - /// The Redis `channel` type. - /// - /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving. - Channel, -} - -/// List-specific options. -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -pub struct ListOption { - /// The method to use for pushing messages into a `list`. - method: Method, -} - -#[derive(Clone, Copy, Debug, Derivative)] -#[derivative(Default)] -pub enum DataType { - /// The Redis `list` type. - /// - /// This resembles a deque, where messages can be popped and pushed from either end. - #[derivative(Default)] - List(Method), - - /// The Redis `channel` type. - /// - /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving. - Channel, -} - -/// Method for pushing messages into a `list`. -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] -#[derivative(Default)] -#[serde(rename_all = "lowercase")] -pub enum Method { - /// Use the `rpush` method. - /// - /// This pushes messages onto the tail of the list. - /// - /// This is the default. - #[derivative(Default)] - RPush, - - /// Use the `lpush` method. - /// - /// This pushes messages onto the head of the list. - LPush, -} - -#[derive(Clone, Copy, Debug, Default)] -pub struct RedisDefaultBatchSettings; - -impl SinkBatchSettings for RedisDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -/// Configuration for the `redis` sink. -#[configurable_component(sink("redis", "Publish observability data to Redis."))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct RedisSinkConfig { - #[configurable(derived)] - encoding: EncodingConfig, - - #[configurable(derived)] - #[serde(default)] - data_type: DataTypeConfig, - - #[configurable(derived)] - #[serde(alias = "list")] - list_option: Option, - - /// The URL of the Redis endpoint to connect to. - /// - /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be - /// `redis` or `rediss` for connections secured via TLS. - #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))] - #[serde(alias = "url")] - endpoint: String, - - /// The Redis key to publish messages to. - #[configurable(validation(length(min = 1)))] - #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))] - key: Template, - - #[configurable(derived)] - #[serde(default)] - batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - request: TowerRequestConfig, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -impl GenerateConfig for RedisSinkConfig { - fn generate_config() -> toml::Value { - toml::from_str( - r#" - url = "redis://127.0.0.1:6379/0" - key = "vector" - data_type = "list" - list.method = "lpush" - encoding.codec = "json" - batch.max_events = 1 - "#, - ) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "redis")] -impl SinkConfig for RedisSinkConfig { - async fn build( - &self, - _cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - if self.key.is_empty() { - return Err("`key` cannot be empty.".into()); - } - let conn = self.build_client().await.context(RedisCreateFailedSnafu)?; - let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed(); - let sink = self.new(conn)?; - Ok((sink, healthcheck)) - } - - fn input(&self) -> Input { - Input::new(self.encoding.config().input_type() & config::DataType::Log) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -impl RedisSinkConfig { - pub fn new(&self, conn: ConnectionManager) -> crate::Result { - let request = self.request.unwrap_with(&TowerRequestConfig { - concurrency: Concurrency::Fixed(1), - ..Default::default() - }); - - let key = self.key.clone(); - - let transformer = self.encoding.transformer(); - let serializer = self.encoding.build()?; - let mut encoder = Encoder::<()>::new(serializer); - - let method = self.list_option.map(|option| option.method); - - let data_type = match self.data_type { - DataTypeConfig::Channel => DataType::Channel, - DataTypeConfig::List => DataType::List(method.unwrap_or_default()), - }; - - let batch = self.batch.into_batch_settings()?; - - let buffer = VecBuffer::new(batch.size); - - let redis = RedisSink { - conn, - data_type, - bytes_sent: register!(BytesSent::from(Protocol::TCP)), - }; - - let svc = ServiceBuilder::new() - .settings(request, RedisRetryLogic) - .service(redis); - - let sink = BatchSink::new(svc, buffer, batch.timeout) - .with_flat_map(move |event| { - // Errors are handled by `Encoder`. - stream::iter(encode_event(event, &key, &transformer, &mut encoder)).map(Ok) - }) - .sink_map_err(|error| error!(message = "Sink failed to flush.", %error)); - - #[allow(deprecated)] - Ok(super::VectorSink::from_event_sink(sink)) - } - - async fn build_client(&self) -> RedisResult { - trace!("Open Redis client."); - let client = redis::Client::open(self.endpoint.as_str())?; - trace!("Open Redis client success."); - trace!("Get Redis connection."); - let conn = client.get_tokio_connection_manager().await; - trace!("Get Redis connection success."); - conn - } - - async fn healthcheck(mut conn: ConnectionManager) -> crate::Result<()> { - redis::cmd("PING") - .query_async(&mut conn) - .await - .map_err(Into::into) - } -} - -#[derive(Debug, Clone)] -struct RedisKvEntry { - key: String, - value: Bytes, -} - -impl EncodedLength for RedisKvEntry { - fn encoded_length(&self) -> usize { - self.value.len() - } -} - -fn encode_event( - mut event: Event, - key: &Template, - transformer: &Transformer, - encoder: &mut Encoder<()>, -) -> Option> { - let key = key - .render_string(&event) - .map_err(|error| { - emit!(TemplateRenderingError { - error, - field: Some("key"), - drop_event: true, - }); - }) - .ok()?; - - let event_byte_size = event.estimated_json_encoded_size_of(); - - transformer.transform(&mut event); - - let mut bytes = BytesMut::new(); - - // Errors are handled by `Encoder`. - encoder.encode(event, &mut bytes).ok()?; - - let byte_size = bytes.len(); - let value = bytes.freeze(); - - let event = EncodedEvent::new(RedisKvEntry { key, value }, byte_size, event_byte_size); - Some(event) -} - -type RedisPipeResult = RedisResult>; - -impl Response for Vec { - fn is_successful(&self) -> bool { - self.iter().all(|x| *x) - } -} - -#[derive(Debug, Clone)] -struct RedisRetryLogic; - -impl RetryLogic for RedisRetryLogic { - type Error = RedisError; - type Response = Vec; - - fn is_retriable_error(&self, _error: &Self::Error) -> bool { - true - } - - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - if response.is_successful() { - return RetryAction::Successful; - } - RetryAction::Retry("Sending data to redis failed.".into()) - } -} - -#[derive(Clone)] -pub struct RedisSink { - conn: ConnectionManager, - data_type: DataType, - bytes_sent: Registered, -} - -impl Service> for RedisSink { - type Response = Vec; - type Error = RedisError; - type Future = BoxFuture<'static, RedisPipeResult>; - - // Emission of an internal event in case of errors is handled upstream by the caller. - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - // Emission of internal events for errors and dropped events is handled upstream by the caller. - fn call(&mut self, kvs: Vec) -> Self::Future { - let count = kvs.len(); - let mut byte_size = 0; - - let mut conn = self.conn.clone(); - let mut pipe = redis::pipe(); - - for kv in kvs { - byte_size += kv.encoded_length(); - match self.data_type { - DataType::List(method) => match method { - Method::LPush => { - if count > 1 { - pipe.atomic().lpush(kv.key, kv.value.as_ref()); - } else { - pipe.lpush(kv.key, kv.value.as_ref()); - } - } - Method::RPush => { - if count > 1 { - pipe.atomic().rpush(kv.key, kv.value.as_ref()); - } else { - pipe.rpush(kv.key, kv.value.as_ref()); - } - } - }, - DataType::Channel => { - if count > 1 { - pipe.atomic().publish(kv.key, kv.value.as_ref()); - } else { - pipe.publish(kv.key, kv.value.as_ref()); - } - } - } - } - - let bytes_sent = self.bytes_sent.clone(); - Box::pin(async move { - let result: RedisPipeResult = pipe.query_async(&mut conn).await; - if let Ok(res) = &result { - if res.is_successful() { - bytes_sent.emit(ByteSize(byte_size)); - } else { - warn!("Batch sending was not all successful and will be retried.") - } - } - result - }) - } -} - -#[cfg(test)] -mod tests { - use std::{collections::HashMap, convert::TryFrom}; - - use codecs::{JsonSerializerConfig, TextSerializerConfig}; - use vector_core::event::LogEvent; - - use super::*; - use crate::config::log_schema; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[test] - fn redis_event_json() { - let msg = "hello_world".to_owned(); - let mut evt = LogEvent::from(msg.clone()); - evt.insert("key", "value"); - let result = encode_event( - evt.into(), - &Template::try_from("key").unwrap(), - &Default::default(), - &mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()), - ) - .unwrap() - .item - .value; - let map: HashMap = serde_json::from_slice(&result[..]).unwrap(); - assert_eq!(msg, map[&log_schema().message_key().unwrap().to_string()]); - } - - #[test] - fn redis_event_text() { - let msg = "hello_world".to_owned(); - let evt = LogEvent::from(msg.clone()); - let event = encode_event( - evt.into(), - &Template::try_from("key").unwrap(), - &Default::default(), - &mut Encoder::<()>::new(TextSerializerConfig::default().build().into()), - ) - .unwrap() - .item - .value; - assert_eq!(event, Vec::from(msg)); - } - - #[test] - fn redis_encode_event() { - let msg = "hello_world"; - let mut evt = LogEvent::from(msg); - evt.insert("key", "value"); - - let result = encode_event( - evt.into(), - &Template::try_from("key").unwrap(), - &Transformer::new(None, Some(vec!["key".into()]), None).unwrap(), - &mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()), - ) - .unwrap() - .item - .value; - - let map: HashMap = serde_json::from_slice(&result[..]).unwrap(); - assert!(!map.contains_key("key")); - } -} - -#[cfg(feature = "redis-integration-tests")] -#[cfg(test)] -mod integration_tests { - use codecs::JsonSerializerConfig; - use futures::stream; - use rand::Rng; - use redis::AsyncCommands; - use vector_core::event::LogEvent; - - use super::*; - use crate::test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, - random_lines_with_stream, random_string, trace_init, - }; - - fn redis_server() -> String { - std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/0".to_owned()) - } - - #[tokio::test] - async fn redis_sink_list_lpush() { - trace_init(); - - let key = Template::try_from(format!("test-{}", random_string(10))) - .expect("should not fail to create key template"); - debug!("Test key name: {}.", key); - let mut rng = rand::thread_rng(); - let num_events = rng.gen_range(10000..20000); - debug!("Test events num: {}.", num_events); - - let cnf = RedisSinkConfig { - endpoint: redis_server(), - key: key.clone(), - encoding: JsonSerializerConfig::default().into(), - data_type: DataTypeConfig::List, - list_option: Some(ListOption { - method: Method::LPush, - }), - batch: BatchConfig::default(), - request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), - ..Default::default() - }, - acknowledgements: Default::default(), - }; - - let mut events: Vec = Vec::new(); - for i in 0..num_events { - let s: String = i.to_string(); - let e = LogEvent::from(s); - events.push(e.into()); - } - let input = stream::iter(events.clone().into_iter().map(Into::into)); - - // Publish events. - let cnf2 = cnf.clone(); - assert_sink_compliance(&SINK_TAGS, async move { - let conn = cnf2.build_client().await.unwrap(); - cnf2.new(conn).unwrap().run(input).await - }) - .await - .expect("Running sink failed"); - - let mut conn = cnf.build_client().await.unwrap(); - - let key_exists: bool = conn.exists(key.clone().to_string()).await.unwrap(); - debug!("Test key: {} exists: {}.", key, key_exists); - assert!(key_exists); - let llen: usize = conn.llen(key.clone().to_string()).await.unwrap(); - debug!("Test key: {} len: {}.", key, llen); - assert_eq!(llen, num_events); - - for i in 0..num_events { - let e = events.get(i).unwrap().as_log(); - let s = serde_json::to_string(e).unwrap_or_default(); - let payload: (String, String) = - conn.brpop(key.clone().to_string(), 2000).await.unwrap(); - let val = payload.1; - assert_eq!(val, s); - } - } - - #[tokio::test] - async fn redis_sink_list_rpush() { - trace_init(); - - let key = Template::try_from(format!("test-{}", random_string(10))) - .expect("should not fail to create key template"); - debug!("Test key name: {}.", key); - let mut rng = rand::thread_rng(); - let num_events = rng.gen_range(10000..20000); - debug!("Test events num: {}.", num_events); - - let cnf = RedisSinkConfig { - endpoint: redis_server(), - key: key.clone(), - encoding: JsonSerializerConfig::default().into(), - data_type: DataTypeConfig::List, - list_option: Some(ListOption { - method: Method::RPush, - }), - batch: BatchConfig::default(), - request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), - ..Default::default() - }, - acknowledgements: Default::default(), - }; - - let mut events: Vec = Vec::new(); - for i in 0..num_events { - let s: String = i.to_string(); - let e = LogEvent::from(s); - events.push(e.into()); - } - let input = stream::iter(events.clone().into_iter().map(Into::into)); - - // Publish events. - let cnf2 = cnf.clone(); - assert_sink_compliance(&SINK_TAGS, async move { - let conn = cnf2.build_client().await.unwrap(); - cnf2.new(conn).unwrap().run(input).await - }) - .await - .expect("Running sink failed"); - - let mut conn = cnf.build_client().await.unwrap(); - - let key_exists: bool = conn.exists(key.to_string()).await.unwrap(); - debug!("Test key: {} exists: {}.", key, key_exists); - assert!(key_exists); - let llen: usize = conn.llen(key.clone().to_string()).await.unwrap(); - debug!("Test key: {} len: {}.", key, llen); - assert_eq!(llen, num_events); - - for i in 0..num_events { - let e = events.get(i).unwrap().as_log(); - let s = serde_json::to_string(e).unwrap_or_default(); - let payload: (String, String) = - conn.blpop(key.clone().to_string(), 2000).await.unwrap(); - let val = payload.1; - assert_eq!(val, s); - } - } - - #[tokio::test] - async fn redis_sink_channel() { - trace_init(); - - let key = Template::try_from(format!("test-{}", random_string(10))) - .expect("should not fail to create key template"); - debug!("Test key name: {}.", key); - let mut rng = rand::thread_rng(); - let num_events = rng.gen_range(10000..20000); - debug!("Test events num: {}.", num_events); - - let client = redis::Client::open(redis_server()).unwrap(); - debug!("Get Redis async connection."); - let conn = client - .get_async_connection() - .await - .expect("Failed to get Redis async connection."); - debug!("Get Redis async connection success."); - let mut pubsub_conn = conn.into_pubsub(); - debug!("Subscribe channel:{}.", key); - pubsub_conn - .subscribe(key.clone().to_string()) - .await - .unwrap_or_else(|_| panic!("Failed to subscribe channel:{}.", key)); - debug!("Subscribed to channel:{}.", key); - let mut pubsub_stream = pubsub_conn.on_message(); - - let cnf = RedisSinkConfig { - endpoint: redis_server(), - key: key.clone(), - encoding: JsonSerializerConfig::default().into(), - data_type: DataTypeConfig::Channel, - list_option: None, - batch: BatchConfig::default(), - request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), - ..Default::default() - }, - acknowledgements: Default::default(), - }; - - // Publish events. - assert_sink_compliance(&SINK_TAGS, async move { - let conn = cnf.build_client().await.unwrap(); - let sink = cnf.new(conn).unwrap(); - let (_input, events) = random_lines_with_stream(100, num_events, None); - sink.run(events).await - }) - .await - .expect("Running sink failed"); - - // Receive events. - let mut received_msg_num = 0; - loop { - let _msg = pubsub_stream.next().await.unwrap(); - received_msg_num += 1; - debug!("Received msg num:{}.", received_msg_num); - if received_msg_num == num_events { - assert_eq!(received_msg_num, num_events); - break; - } - } - } -} diff --git a/src/sinks/redis/config.rs b/src/sinks/redis/config.rs new file mode 100644 index 0000000000000..0504cda3ceb30 --- /dev/null +++ b/src/sinks/redis/config.rs @@ -0,0 +1,162 @@ +use redis::{aio::ConnectionManager, RedisResult}; +use snafu::prelude::*; + +use crate::sinks::prelude::*; + +use super::{sink::RedisSink, RedisCreateFailedSnafu}; + +/// Redis data type to store messages in. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative)] +#[derivative(Default)] +#[serde(rename_all = "lowercase")] +pub enum DataTypeConfig { + /// The Redis `list` type. + /// + /// This resembles a deque, where messages can be popped and pushed from either end. + /// + /// This is the default. + #[derivative(Default)] + List, + + /// The Redis `channel` type. + /// + /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving. + Channel, +} + +/// List-specific options. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub struct ListOption { + /// The method to use for pushing messages into a `list`. + pub(super) method: Method, +} + +/// Method for pushing messages into a `list`. +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] +#[derivative(Default)] +#[serde(rename_all = "lowercase")] +pub enum Method { + /// Use the `rpush` method. + /// + /// This pushes messages onto the tail of the list. + /// + /// This is the default. + #[derivative(Default)] + RPush, + + /// Use the `lpush` method. + /// + /// This pushes messages onto the head of the list. + LPush, +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct RedisDefaultBatchSettings; + +impl SinkBatchSettings for RedisDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1); + const MAX_BYTES: Option = None; + const TIMEOUT_SECS: f64 = 1.0; +} + +/// Configuration for the `redis` sink. +#[configurable_component(sink("redis", "Publish observability data to Redis."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct RedisSinkConfig { + #[configurable(derived)] + pub(super) encoding: EncodingConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) data_type: DataTypeConfig, + + #[configurable(derived)] + #[serde(alias = "list")] + pub(super) list_option: Option, + + /// The URL of the Redis endpoint to connect to. + /// + /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be + /// `redis` or `rediss` for connections secured via TLS. + #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))] + #[serde(alias = "url")] + pub(super) endpoint: String, + + /// The Redis key to publish messages to. + #[configurable(validation(length(min = 1)))] + #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))] + pub(super) key: Template, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for RedisSinkConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#" + url = "redis://127.0.0.1:6379/0" + key = "vector" + data_type = "list" + list.method = "lpush" + encoding.codec = "json" + batch.max_events = 1 + "#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "redis")] +impl SinkConfig for RedisSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + if self.key.is_empty() { + return Err("`key` cannot be empty.".into()); + } + let conn = self.build_client().await.context(RedisCreateFailedSnafu)?; + let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed(); + let sink = RedisSink::new(self, conn)?; + Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().input_type() & DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl RedisSinkConfig { + pub(super) async fn build_client(&self) -> RedisResult { + let client = redis::Client::open(self.endpoint.as_str())?; + client.get_tokio_connection_manager().await + } + + async fn healthcheck(mut conn: ConnectionManager) -> crate::Result<()> { + redis::cmd("PING") + .query_async(&mut conn) + .await + .map_err(Into::into) + } +} diff --git a/src/sinks/redis/integration_tests.rs b/src/sinks/redis/integration_tests.rs new file mode 100644 index 0000000000000..577210555361c --- /dev/null +++ b/src/sinks/redis/integration_tests.rs @@ -0,0 +1,292 @@ +use codecs::JsonSerializerConfig; +use futures::stream; +use rand::Rng; +use redis::AsyncCommands; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::LogEvent, +}; + +use super::config::{DataTypeConfig, ListOption, Method, RedisSinkConfig}; +use crate::{ + sinks::prelude::*, + test_util::{ + components::{ + assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS, + SINK_TAGS, + }, + random_lines_with_stream, random_string, trace_init, + }, +}; + +fn redis_server() -> String { + std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/0".to_owned()) +} + +#[tokio::test] +async fn redis_sink_list_lpush() { + trace_init(); + + let key = Template::try_from(format!("test-{}", random_string(10))) + .expect("should not fail to create key template"); + debug!("Test key name: {}.", key); + let mut rng = rand::thread_rng(); + let num_events = rng.gen_range(10000..20000); + debug!("Test events num: {}.", num_events); + + let cnf = RedisSinkConfig { + endpoint: redis_server(), + key: key.clone(), + encoding: JsonSerializerConfig::default().into(), + data_type: DataTypeConfig::List, + list_option: Some(ListOption { + method: Method::LPush, + }), + batch: BatchConfig::default(), + request: TowerRequestConfig { + rate_limit_num: Some(u64::MAX), + ..Default::default() + }, + acknowledgements: Default::default(), + }; + + let mut events: Vec = Vec::new(); + for i in 0..num_events { + let s: String = i.to_string(); + let e = LogEvent::from(s); + events.push(e.into()); + } + let input = stream::iter(events.clone().into_iter().map(Into::into)); + + // Publish events. + let cnf2 = cnf.clone(); + assert_sink_compliance(&SINK_TAGS, async move { + // let conn = cnf2.build_client().await.unwrap(); + let cx = SinkContext::default(); + let (sink, _healthcheck) = cnf2.build(cx).await.unwrap(); + sink.run(input).await + }) + .await + .expect("Running sink failed"); + + let mut conn = cnf.build_client().await.unwrap(); + + let key_exists: bool = conn.exists(key.clone().to_string()).await.unwrap(); + debug!("Test key: {} exists: {}.", key, key_exists); + assert!(key_exists); + let llen: usize = conn.llen(key.clone().to_string()).await.unwrap(); + debug!("Test key: {} len: {}.", key, llen); + assert_eq!(llen, num_events); + + for i in 0..num_events { + let e = events.get(i).unwrap().as_log(); + let s = serde_json::to_string(e).unwrap_or_default(); + let payload: (String, String) = conn.brpop(key.clone().to_string(), 2000).await.unwrap(); + let val = payload.1; + assert_eq!(val, s); + } +} + +#[tokio::test] +async fn redis_sink_list_rpush() { + trace_init(); + + let key = Template::try_from(format!("test-{}", random_string(10))) + .expect("should not fail to create key template"); + debug!("Test key name: {}.", key); + let mut rng = rand::thread_rng(); + let num_events = rng.gen_range(10000..20000); + debug!("Test events num: {}.", num_events); + + let cnf = RedisSinkConfig { + endpoint: redis_server(), + key: key.clone(), + encoding: JsonSerializerConfig::default().into(), + data_type: DataTypeConfig::List, + list_option: Some(ListOption { + method: Method::RPush, + }), + batch: BatchConfig::default(), + request: TowerRequestConfig { + rate_limit_num: Some(u64::MAX), + ..Default::default() + }, + acknowledgements: Default::default(), + }; + + let mut events: Vec = Vec::new(); + for i in 0..num_events { + let s: String = i.to_string(); + let e = LogEvent::from(s); + events.push(e.into()); + } + let input = stream::iter(events.clone().into_iter().map(Into::into)); + + // Publish events. + let cnf2 = cnf.clone(); + assert_sink_compliance(&SINK_TAGS, async move { + // let conn = cnf2.build_client().await.unwrap(); + let cx = SinkContext::default(); + let (sink, _healthcheck) = cnf2.build(cx).await.unwrap(); + sink.run(input).await + }) + .await + .expect("Running sink failed"); + + let mut conn = cnf.build_client().await.unwrap(); + + let key_exists: bool = conn.exists(key.to_string()).await.unwrap(); + debug!("Test key: {} exists: {}.", key, key_exists); + assert!(key_exists); + let llen: usize = conn.llen(key.clone().to_string()).await.unwrap(); + debug!("Test key: {} len: {}.", key, llen); + assert_eq!(llen, num_events); + + for i in 0..num_events { + let e = events.get(i).unwrap().as_log(); + let s = serde_json::to_string(e).unwrap_or_default(); + let payload: (String, String) = conn.blpop(key.clone().to_string(), 2000).await.unwrap(); + let val = payload.1; + assert_eq!(val, s); + } +} + +#[tokio::test] +async fn redis_sink_channel() { + trace_init(); + + let key = Template::try_from(format!("test-{}", random_string(10))) + .expect("should not fail to create key template"); + debug!("Test key name: {}.", key); + let mut rng = rand::thread_rng(); + let num_events = rng.gen_range(10000..20000); + debug!("Test events num: {}.", num_events); + + let client = redis::Client::open(redis_server()).unwrap(); + debug!("Get Redis async connection."); + let conn = client + .get_async_connection() + .await + .expect("Failed to get Redis async connection."); + debug!("Get Redis async connection success."); + let mut pubsub_conn = conn.into_pubsub(); + debug!("Subscribe channel:{}.", key); + pubsub_conn + .subscribe(key.clone().to_string()) + .await + .unwrap_or_else(|_| panic!("Failed to subscribe channel:{}.", key)); + debug!("Subscribed to channel:{}.", key); + let mut pubsub_stream = pubsub_conn.on_message(); + + let cnf = RedisSinkConfig { + endpoint: redis_server(), + key: key.clone(), + encoding: JsonSerializerConfig::default().into(), + data_type: DataTypeConfig::Channel, + list_option: None, + batch: BatchConfig::default(), + request: TowerRequestConfig { + rate_limit_num: Some(u64::MAX), + ..Default::default() + }, + acknowledgements: Default::default(), + }; + + // Publish events. + assert_sink_compliance(&SINK_TAGS, async move { + let cx = SinkContext::default(); + let (sink, _healthcheck) = cnf.build(cx).await.unwrap(); // Box::new(RedisSink::new(&cnf, conn).unwrap()); + let (_input, events) = random_lines_with_stream(100, num_events, None); + sink.run(events).await + }) + .await + .expect("Running sink failed"); + + // Receive events. + let mut received_msg_num = 0; + loop { + let _msg = pubsub_stream.next().await.unwrap(); + received_msg_num += 1; + debug!("Received msg num:{}.", received_msg_num); + if received_msg_num == num_events { + assert_eq!(received_msg_num, num_events); + break; + } + } +} + +#[tokio::test] +async fn redis_sink_channel_data_volume_tags() { + trace_init(); + + // We need to configure Vector to emit the service and source tags. + // The default is to not emit these. + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + let key = Template::try_from(format!("test-{}", random_string(10))) + .expect("should not fail to create key template"); + debug!("Test key name: {}.", key); + let mut rng = rand::thread_rng(); + let num_events = rng.gen_range(10000..20000); + debug!("Test events num: {}.", num_events); + + let client = redis::Client::open(redis_server()).unwrap(); + debug!("Get Redis async connection."); + let conn = client + .get_async_connection() + .await + .expect("Failed to get Redis async connection."); + debug!("Get Redis async connection success."); + let mut pubsub_conn = conn.into_pubsub(); + debug!("Subscribe channel:{}.", key); + pubsub_conn + .subscribe(key.clone().to_string()) + .await + .unwrap_or_else(|_| panic!("Failed to subscribe channel:{}.", key)); + debug!("Subscribed to channel:{}.", key); + let mut pubsub_stream = pubsub_conn.on_message(); + + let cnf = RedisSinkConfig { + endpoint: redis_server(), + key: key.clone(), + encoding: JsonSerializerConfig::default().into(), + data_type: DataTypeConfig::Channel, + list_option: None, + batch: BatchConfig::default(), + request: TowerRequestConfig { + rate_limit_num: Some(u64::MAX), + ..Default::default() + }, + acknowledgements: Default::default(), + }; + + // Publish events. + assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move { + let cx = SinkContext::default(); + let (sink, _healthcheck) = cnf.build(cx).await.unwrap(); // Box::new(RedisSink::new(&cnf, conn).unwrap()); + let (_input, events) = random_lines_with_stream(100, num_events, None); + sink.run(events).await + }) + .await + .expect("Running sink failed"); + + // Receive events. + let mut received_msg_num = 0; + loop { + let _msg = pubsub_stream.next().await.unwrap(); + received_msg_num += 1; + debug!("Received msg num:{}.", received_msg_num); + if received_msg_num == num_events { + assert_eq!(received_msg_num, num_events); + break; + } + } +} diff --git a/src/sinks/redis/mod.rs b/src/sinks/redis/mod.rs new file mode 100644 index 0000000000000..7ea22c1603f37 --- /dev/null +++ b/src/sinks/redis/mod.rs @@ -0,0 +1,114 @@ +//! `redis` sink. +//! +//! Writes data to [redis](https://redis.io/). +mod config; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; + +#[cfg(feature = "redis-integration-tests")] +#[cfg(test)] +mod integration_tests; + +use bytes::Bytes; +use redis::RedisError; +use snafu::Snafu; + +use crate::sinks::prelude::*; + +use self::config::Method; + +use super::util::EncodedLength; + +#[derive(Debug, Snafu)] +pub(super) enum RedisSinkError { + #[snafu(display("Creating Redis producer failed: {}", source))] + RedisCreateFailed { source: RedisError }, + #[snafu(display("Invalid key template: {}", source))] + KeyTemplate { source: TemplateParseError }, + #[snafu(display("Error sending query: {}", source))] + SendError { source: RedisError }, +} + +#[derive(Clone, Copy, Debug, Derivative)] +#[derivative(Default)] +pub enum DataType { + /// The Redis `list` type. + /// + /// This resembles a deque, where messages can be popped and pushed from either end. + #[derivative(Default)] + List(Method), + + /// The Redis `channel` type. + /// + /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving. + Channel, +} + +/// Wrapper for an `Event` that also stored the rendered key. +pub(super) struct RedisEvent { + event: Event, + key: String, +} + +impl Finalizable for RedisEvent { + fn take_finalizers(&mut self) -> EventFinalizers { + self.event.take_finalizers() + } +} + +impl ByteSizeOf for RedisEvent { + fn allocated_bytes(&self) -> usize { + self.event.allocated_bytes() + } +} + +impl GetEventCountTags for RedisEvent { + fn get_tags(&self) -> TaggedEventsSent { + self.event.get_tags() + } +} + +impl EstimatedJsonEncodedSizeOf for RedisEvent { + fn estimated_json_encoded_size_of(&self) -> JsonSize { + self.event.estimated_json_encoded_size_of() + } +} + +#[derive(Clone)] +pub(super) struct RedisRequest { + request: Vec, + finalizers: EventFinalizers, + metadata: RequestMetadata, +} + +impl Finalizable for RedisRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for RedisRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +#[derive(Debug, Clone)] +pub(super) struct RedisKvEntry { + key: String, + value: Bytes, +} + +impl EncodedLength for RedisKvEntry { + fn encoded_length(&self) -> usize { + self.value.len() + } +} diff --git a/src/sinks/redis/request_builder.rs b/src/sinks/redis/request_builder.rs new file mode 100644 index 0000000000000..f38a1c7de7a81 --- /dev/null +++ b/src/sinks/redis/request_builder.rs @@ -0,0 +1,72 @@ +use bytes::BytesMut; +use tokio_util::codec::Encoder as _; +use vector_core::config::telemetry; + +use crate::sinks::{prelude::*, util::EncodedLength}; + +use super::{RedisEvent, RedisKvEntry, RedisRequest}; + +pub(super) fn encode_event( + mut event: Event, + key: String, + transformer: &Transformer, + encoder: &mut Encoder<()>, + byte_size: &mut GroupedCountByteSize, +) -> Option { + transformer.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + let mut bytes = BytesMut::new(); + + // Errors are handled by `Encoder`. + encoder.encode(event, &mut bytes).ok()?; + + let value = bytes.freeze(); + + let event = RedisKvEntry { key, value }; + Some(event) +} + +fn encode_events( + events: Vec, + transformer: &Transformer, + encoder: &mut Encoder<()>, +) -> EncodeResult> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let request = events + .into_iter() + .filter_map(|event| { + encode_event(event.event, event.key, transformer, encoder, &mut byte_size) + }) + .collect::>(); + + let uncompressed_byte_size = request.iter().map(|event| event.encoded_length()).sum(); + + EncodeResult { + payload: request, + uncompressed_byte_size, + transformed_json_size: byte_size, + compressed_byte_size: None, + } +} + +/// Builds the request to be sent to Redis. +/// The `[RequestBuilder]` trait doesn't work since the encoded event is not just `Byte`s. +/// This function allows us to accept a list of `Event`s and return a list of key -> encoded +/// event objects. +pub(super) fn request_builder( + mut events: Vec, + transformer: &Transformer, + encoder: &mut Encoder<()>, +) -> RedisRequest { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + let encoded = encode_events(events, transformer, encoder); + let metadata = builder.build(&encoded); + + RedisRequest { + request: encoded.into_payload(), + finalizers, + metadata, + } +} diff --git a/src/sinks/redis/service.rs b/src/sinks/redis/service.rs new file mode 100644 index 0000000000000..52d4a215eaa40 --- /dev/null +++ b/src/sinks/redis/service.rs @@ -0,0 +1,103 @@ +use std::task::{Context, Poll}; + +use redis::aio::ConnectionManager; + +use crate::sinks::prelude::*; + +use super::{config::Method, RedisRequest, RedisSinkError}; + +#[derive(Clone)] +pub struct RedisService { + pub(super) conn: ConnectionManager, + pub(super) data_type: super::DataType, +} + +impl Service for RedisService { + type Response = RedisResponse; + type Error = RedisSinkError; + type Future = BoxFuture<'static, Result>; + + // Emission of an internal event in case of errors is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of internal events for errors and dropped events is handled upstream by the caller. + fn call(&mut self, kvs: RedisRequest) -> Self::Future { + let count = kvs.request.len(); + + let mut conn = self.conn.clone(); + let mut pipe = redis::pipe(); + + for kv in kvs.request { + match self.data_type { + super::DataType::List(method) => match method { + Method::LPush => { + if count > 1 { + pipe.atomic().lpush(kv.key, kv.value.as_ref()); + } else { + pipe.lpush(kv.key, kv.value.as_ref()); + } + } + Method::RPush => { + if count > 1 { + pipe.atomic().rpush(kv.key, kv.value.as_ref()); + } else { + pipe.rpush(kv.key, kv.value.as_ref()); + } + } + }, + super::DataType::Channel => { + if count > 1 { + pipe.atomic().publish(kv.key, kv.value.as_ref()); + } else { + pipe.publish(kv.key, kv.value.as_ref()); + } + } + } + } + + let byte_size = kvs.metadata.events_byte_size(); + + Box::pin(async move { + match pipe.query_async(&mut conn).await { + Ok(event_status) => Ok(RedisResponse { + event_status, + events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(), + byte_size, + }), + Err(error) => Err(RedisSinkError::SendError { source: error }), + } + }) + } +} + +pub struct RedisResponse { + pub event_status: Vec, + pub events_byte_size: GroupedCountByteSize, + pub byte_size: usize, +} + +impl RedisResponse { + pub(super) fn is_successful(&self) -> bool { + self.event_status.iter().all(|x| *x) + } +} + +impl DriverResponse for RedisResponse { + fn event_status(&self) -> EventStatus { + if self.is_successful() { + EventStatus::Delivered + } else { + EventStatus::Errored + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.byte_size) + } +} diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs new file mode 100644 index 0000000000000..a9dcad68682ff --- /dev/null +++ b/src/sinks/redis/sink.rs @@ -0,0 +1,127 @@ +use std::future; + +use redis::{aio::ConnectionManager, RedisError}; + +use crate::sinks::{ + prelude::*, + util::{retries::RetryAction, Concurrency}, +}; + +use super::{ + config::{DataTypeConfig, RedisSinkConfig}, + request_builder::request_builder, + service::{RedisResponse, RedisService}, + RedisEvent, +}; + +pub(super) struct RedisSink { + request: TowerRequestConfig, + encoder: crate::codecs::Encoder<()>, + transformer: crate::codecs::Transformer, + conn: ConnectionManager, + data_type: super::DataType, + key: Template, + batcher_settings: BatcherSettings, +} + +impl RedisSink { + pub(super) fn new(config: &RedisSinkConfig, conn: ConnectionManager) -> crate::Result { + let method = config.list_option.map(|option| option.method); + let data_type = match config.data_type { + DataTypeConfig::Channel => super::DataType::Channel, + DataTypeConfig::List => super::DataType::List(method.unwrap_or_default()), + }; + + let batcher_settings = config.batch.validate()?.into_batcher_settings()?; + let transformer = config.encoding.transformer(); + let serializer = config.encoding.build()?; + let encoder = Encoder::<()>::new(serializer); + let key = config.key.clone(); + let request = config.request; + + Ok(RedisSink { + request, + batcher_settings, + transformer, + encoder, + conn, + data_type, + key, + }) + } + + /// Transforms an event into a `Redis` event by rendering the template field used to + /// determine the key. + /// Returns `None` if there is an error whilst rendering. An error event is also emitted. + fn make_redis_event(&self, event: Event) -> Option { + let key = self + .key + .render_string(&event) + .map_err(|error| { + emit!(TemplateRenderingError { + error, + field: Some("key"), + drop_event: true, + }); + }) + .ok()?; + + Some(RedisEvent { event, key }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let request = self.request.unwrap_with(&TowerRequestConfig { + concurrency: Concurrency::Fixed(1), + ..Default::default() + }); + + let service = RedisService { + conn: self.conn.clone(), + data_type: self.data_type, + }; + + let service = ServiceBuilder::new() + .settings(request, RedisRetryLogic) + .service(service); + + let mut encoder = self.encoder.clone(); + let transformer = self.transformer.clone(); + let batcher_settings = self.batcher_settings.into_byte_size_config(); + + input + .filter_map(|event| future::ready(self.make_redis_event(event))) + .batched(batcher_settings) + .map(|events| request_builder(events, &transformer, &mut encoder)) + .into_driver(service) + .protocol("redis") + .run() + .await + } +} + +#[async_trait] +impl StreamSink for RedisSink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} + +#[derive(Debug, Clone)] +pub(super) struct RedisRetryLogic; + +impl RetryLogic for RedisRetryLogic { + type Error = RedisError; + type Response = RedisResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + true + } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if response.is_successful() { + RetryAction::Successful + } else { + RetryAction::Retry("Sending data to redis failed.".into()) + } + } +} diff --git a/src/sinks/redis/tests.rs b/src/sinks/redis/tests.rs new file mode 100644 index 0000000000000..c9d148b4d823d --- /dev/null +++ b/src/sinks/redis/tests.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; + +use codecs::{JsonSerializerConfig, TextSerializerConfig}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::event::LogEvent; + +use super::{config::RedisSinkConfig, request_builder::encode_event}; +use crate::{ + codecs::{Encoder, Transformer}, + config::log_schema, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[test] +fn redis_event_json() { + let msg = "hello_world".to_owned(); + let mut byte_size = GroupedCountByteSize::new_untagged(); + let mut evt = LogEvent::from(msg.clone()); + evt.insert("key", "value"); + let result = encode_event( + evt.into(), + "key".to_string(), + &Default::default(), + &mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()), + &mut byte_size, + ) + .unwrap() + .value; + let map: HashMap = serde_json::from_slice(&result[..]).unwrap(); + assert_eq!(msg, map[&log_schema().message_key().unwrap().to_string()]); +} + +#[test] +fn redis_event_text() { + let msg = "hello_world".to_owned(); + let evt = LogEvent::from(msg.clone()); + let mut byte_size = GroupedCountByteSize::new_untagged(); + let event = encode_event( + evt.into(), + "key".to_string(), + &Default::default(), + &mut Encoder::<()>::new(TextSerializerConfig::default().build().into()), + &mut byte_size, + ) + .unwrap() + .value; + assert_eq!(event, Vec::from(msg)); +} + +#[test] +fn redis_encode_event() { + let msg = "hello_world"; + let mut evt = LogEvent::from(msg); + let mut byte_size = GroupedCountByteSize::new_untagged(); + evt.insert("key", "value"); + + let result = encode_event( + evt.into(), + "key".to_string(), + &Transformer::new(None, Some(vec!["key".into()]), None).unwrap(), + &mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()), + &mut byte_size, + ) + .unwrap() + .value; + + let map: HashMap = serde_json::from_slice(&result[..]).unwrap(); + assert!(!map.contains_key("key")); +}