Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
da4652f
ci: speed up tests by only running sqs tests
wochinge Aug 9, 2023
cd20a13
chore: prepare split in sqs and sns
wochinge Aug 9, 2023
2b7ef02
chore: wrap up split for foundation
wochinge Aug 9, 2023
6941cd0
refactor: separate config
wochinge Aug 9, 2023
0ac39cb
refactor: implement separate publisher
wochinge Aug 9, 2023
f9e7148
refactor: drop no longer needed message builder
wochinge Aug 9, 2023
891e119
refactor: extract error type
wochinge Aug 10, 2023
8311045
style: name variable correctly
wochinge Aug 10, 2023
8c4addd
feat: add sns 🎉
wochinge Aug 10, 2023
5b42a29
refactor: abstract retry logic
wochinge Aug 10, 2023
b8bdc9c
chore: remove sqs from shared modules
wochinge Aug 10, 2023
829db54
chore: cleanup
wochinge Aug 10, 2023
36b0a59
chore: drop temporary changes
wochinge Aug 10, 2023
e76af5f
chore: update mod to include sns module
wochinge Aug 10, 2023
6be88d4
refactor: move healthcheck out of config into separate function
wochinge Aug 11, 2023
07e1938
refactor: simplify request builder setup
wochinge Aug 11, 2023
14c2b67
refactor: message id
ArzelaAscoIi Aug 14, 2023
0ed0b62
nit: make functions just visibile in sub module
ArzelaAscoIi Aug 15, 2023
33c7c6f
fix: add pub(super)
ArzelaAscoIi Aug 15, 2023
d3b5a6a
fix: sub methods attributes pub(super)
ArzelaAscoIi Aug 15, 2023
89c00c3
Update Cargo.toml
ArzelaAscoIi Aug 16, 2023
4a641a3
Update src/sinks/aws_s_s/mod.rs
ArzelaAscoIi Aug 16, 2023
129313c
Update src/sinks/aws_s_s/retry.rs
ArzelaAscoIi Aug 16, 2023
efdf844
Update src/sinks/mod.rs
ArzelaAscoIi Aug 16, 2023
e7fe94c
Update src/sinks/aws_s_s/sns/config.rs
ArzelaAscoIi Aug 16, 2023
a7c7cf7
chore: first bunch of comments
ArzelaAscoIi Aug 16, 2023
8f3be55
Merge branch 'feat/sns' of github.com:wochinge/vector into feat/sns
ArzelaAscoIi Aug 16, 2023
69890db
chore: second bunch of comments
ArzelaAscoIi Aug 16, 2023
6a51b9e
chore: enable all integration tests
ArzelaAscoIi Aug 17, 2023
345d147
fix: move test
ArzelaAscoIi Aug 17, 2023
5308c0e
fix: dead_code warning
ArzelaAscoIi Aug 17, 2023
4df0786
fix: dead code warning
ArzelaAscoIi Aug 17, 2023
a2223eb
docs: autogenerated aws_sns docs
ArzelaAscoIi Aug 17, 2023
8dbb7be
fix: move region serde to downstream impl
ArzelaAscoIi Aug 17, 2023
185fe20
update licenses
neuronull Aug 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ metrics-tracing-context = { version = "0.14.0", default-features = false }
# depending on a fork to circumvent https://github.com/awslabs/aws-sdk-rust/issues/749
aws-sdk-s3 = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sqs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-sns = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-cloudwatchlogs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
aws-sdk-elasticsearch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
Expand Down Expand Up @@ -612,6 +613,7 @@ sinks-logs = [
"sinks-aws_kinesis_streams",
"sinks-aws_s3",
"sinks-aws_sqs",
"sinks-aws_sns",
"sinks-axiom",
"sinks-azure_blob",
"sinks-azure_monitor_logs",
Expand Down Expand Up @@ -671,6 +673,7 @@ sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-elasticsearch"]
sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"]
sinks-azure_monitor_logs = []
Expand Down Expand Up @@ -779,6 +782,7 @@ aws-integration-tests = [
"aws-kinesis-streams-integration-tests",
"aws-s3-integration-tests",
"aws-sqs-integration-tests",
"aws-sns-integration-tests",
]

azure-integration-tests = [
Expand All @@ -792,7 +796,8 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"]
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "dep:aws-sdk-elasticsearch", "sinks-elasticsearch"]
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"]
aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"]
aws-sqs-integration-tests = ["sinks-aws_sqs"]
aws-sns-integration-tests = ["sinks-aws_sns"]
axiom-integration-tests = ["sinks-axiom"]
azure-blob-integration-tests = ["sinks-azure_blob"]
chronicle-integration-tests = ["sinks-gcp"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS R
aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-sqs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-sso,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
aws-sdk-sts,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <aws-sdk-rust@amazon.com>, Russell Cohen <rcoh@amazon.com>"
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
mock-localstack:
image: docker.io/localstack/localstack-full:0.11.6
environment:
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs,sns
mock-watchlogs:
image: docker.io/luciofranco/mockwatchlogs:latest
mock-ecs:
Expand Down
1 change: 1 addition & 0 deletions scripts/integration/aws/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env:
KINESIS_ADDRESS: http://mock-localstack:4566
S3_ADDRESS: http://mock-localstack:4566
SQS_ADDRESS: http://mock-localstack:4566
SNS_ADDRESS: http://mock-localstack:4566
WATCHLOGS_ADDRESS: http://mock-watchlogs:6000

matrix:
Expand Down
14 changes: 14 additions & 0 deletions src/sinks/aws_s_s/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
use aws_sdk_sqs::types::SdkError;

#[async_trait::async_trait]
pub(super) trait Client<R>
where
R: std::fmt::Debug + std::fmt::Display + std::error::Error,
{
async fn send_message(
&self,
entry: SendMessageEntry,
byte_size: usize,
) -> Result<SendMessageResponse, SdkError<R>>;
}
99 changes: 99 additions & 0 deletions src/sinks/aws_s_s/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::convert::TryFrom;

use snafu::{ResultExt, Snafu};

use vector_config::configurable_component;

use crate::{
aws::AwsAuthentication,
codecs::EncodingConfig,
config::AcknowledgementsConfig,
sinks::util::TowerRequestConfig,
template::{Template, TemplateParseError},
tls::TlsConfig,
};

#[derive(Debug, Snafu)]
pub(super) enum BuildError {
#[snafu(display("`message_group_id` should be defined for FIFO queue."))]
MessageGroupIdMissing,
#[snafu(display("`message_group_id` is not allowed with non-FIFO queue."))]
MessageGroupIdNotAllowed,
#[snafu(display("invalid topic template: {}", source))]
TopicTemplate { source: TemplateParseError },
#[snafu(display("invalid message_deduplication_id template: {}", source))]
MessageDeduplicationIdTemplate { source: TemplateParseError },
}

/// Base Configuration `aws_s_s` for sns and sqs sink.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub(super) struct BaseSSSinkConfig {
#[configurable(derived)]
pub(super) encoding: EncodingConfig,

/// The tag that specifies that a message belongs to a specific message group.
///
/// Can be applied only to FIFO queues.
#[configurable(metadata(docs::examples = "vector"))]
#[configurable(metadata(docs::examples = "vector-%Y-%m-%d"))]
pub(super) message_group_id: Option<String>,

/// The message deduplication ID value to allow AWS to identify duplicate messages.
///
/// This value is a template which should result in a unique string for each event. See the [AWS
/// documentation][deduplication_id_docs] for more about how AWS does message deduplication.
///
/// [deduplication_id_docs]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
#[configurable(metadata(docs::examples = "{{ transaction_id }}"))]
pub(super) message_deduplication_id: Option<String>,

#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig,

#[configurable(derived)]
pub(super) tls: Option<TlsConfig>,

/// The ARN of an [IAM role][iam_role] to assume at startup.
///
/// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
#[configurable(deprecated)]
#[configurable(metadata(docs::hidden))]
pub(super) assume_role: Option<String>,

#[configurable(derived)]
#[serde(default)]
pub(super) auth: AwsAuthentication,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub(super) acknowledgements: AcknowledgementsConfig,
}

pub(super) fn message_group_id(
message_group_id: Option<String>,
fifo: bool,
) -> crate::Result<Option<Template>> {
match (message_group_id.as_ref(), fifo) {
(Some(value), true) => Ok(Some(
Template::try_from(value.clone()).context(TopicTemplateSnafu)?,
)),
(Some(_), false) => Err(Box::new(BuildError::MessageGroupIdNotAllowed)),
(None, true) => Err(Box::new(BuildError::MessageGroupIdMissing)),
(None, false) => Ok(None),
}
}
pub(super) fn message_deduplication_id(
message_deduplication_id: Option<String>,
) -> crate::Result<Option<Template>> {
Ok(message_deduplication_id
.clone()
.map(Template::try_from)
.transpose()?)
}
12 changes: 12 additions & 0 deletions src/sinks/aws_s_s/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod client;
mod config;
mod request_builder;
mod retry;
mod service;
mod sink;

#[cfg(feature = "sinks-aws_sqs")]
mod sqs;

#[cfg(feature = "sinks-aws_sns")]
mod sns;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_core::ByteSizeOf;

use super::config::SqsSinkConfig;
use crate::codecs::EncodingConfig;
use crate::{
codecs::{Encoder, Transformer},
event::{Event, EventFinalizers, Finalizable},
Expand All @@ -15,37 +15,39 @@ use crate::{
};

#[derive(Clone)]
pub struct SqsMetadata {
pub finalizers: EventFinalizers,
pub message_group_id: Option<String>,
pub message_deduplication_id: Option<String>,
pub(super) struct SSMetadata {
pub(super) finalizers: EventFinalizers,
pub(super) message_group_id: Option<String>,
pub(super) message_deduplication_id: Option<String>,
}

#[derive(Clone)]
pub(crate) struct SqsRequestBuilder {
pub(super) struct SSRequestBuilder {
encoder: (Transformer, Encoder<()>),
message_group_id: Option<Template>,
message_deduplication_id: Option<Template>,
queue_url: String,
}

impl SqsRequestBuilder {
pub fn new(config: SqsSinkConfig) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
impl SSRequestBuilder {
pub(super) fn new(
message_group_id: Option<Template>,
message_deduplication_id: Option<Template>,
encoding_config: EncodingConfig,
) -> crate::Result<Self> {
let transformer = encoding_config.transformer();
let serializer = encoding_config.build()?;
let encoder = Encoder::<()>::new(serializer);

Ok(Self {
encoder: (transformer, encoder),
message_group_id: config.message_group_id()?,
message_deduplication_id: config.message_deduplication_id()?,
queue_url: config.queue_url,
message_group_id,
message_deduplication_id,
})
}
}

impl RequestBuilder<Event> for SqsRequestBuilder {
type Metadata = SqsMetadata;
impl RequestBuilder<Event> for SSRequestBuilder {
type Metadata = SSMetadata;
type Events = Event;
type Encoder = (Transformer, Encoder<()>);
type Payload = Bytes;
Expand Down Expand Up @@ -95,17 +97,17 @@ impl RequestBuilder<Event> for SqsRequestBuilder {

let builder = RequestMetadataBuilder::from_event(&event);

let sqs_metadata = SqsMetadata {
let metadata = SSMetadata {
finalizers: event.take_finalizers(),
message_group_id,
message_deduplication_id,
};
(sqs_metadata, builder, event)
(metadata, builder, event)
}

fn build_request(
&self,
sqs_metadata: Self::Metadata,
client_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
Expand All @@ -114,23 +116,21 @@ impl RequestBuilder<Event> for SqsRequestBuilder {

SendMessageEntry {
message_body,
message_group_id: sqs_metadata.message_group_id,
message_deduplication_id: sqs_metadata.message_deduplication_id,
queue_url: self.queue_url.clone(),
finalizers: sqs_metadata.finalizers,
message_group_id: client_metadata.message_group_id,
message_deduplication_id: client_metadata.message_deduplication_id,
finalizers: client_metadata.finalizers,
metadata,
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct SendMessageEntry {
pub message_body: String,
pub message_group_id: Option<String>,
pub message_deduplication_id: Option<String>,
pub queue_url: String,
finalizers: EventFinalizers,
pub metadata: RequestMetadata,
pub(super) struct SendMessageEntry {
pub(super) message_body: String,
pub(super) message_group_id: Option<String>,
pub(super) message_deduplication_id: Option<String>,
pub(super) finalizers: EventFinalizers,
pub(super) metadata: RequestMetadata,
}

impl ByteSizeOf for SendMessageEntry {
Expand Down
44 changes: 44 additions & 0 deletions src/sinks/aws_s_s/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use aws_sdk_sqs::types::SdkError;
use std::marker::PhantomData;

use super::service::SendMessageResponse;
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};

#[derive(Debug)]
pub(super) struct SSRetryLogic<E> {
_phantom: PhantomData<fn() -> E>,
}

impl<E> SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
pub(super) fn new() -> SSRetryLogic<E> {
Self {
_phantom: PhantomData,
}
}
}

impl<E> RetryLogic for SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
type Error = SdkError<E>;
type Response = SendMessageResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
is_retriable_error(error)
}
}

impl<E> Clone for SSRetryLogic<E>
where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
fn clone(&self) -> SSRetryLogic<E> {
SSRetryLogic {
_phantom: PhantomData,
}
}
}
Loading