diff --git a/Cargo.lock b/Cargo.lock index 391d78773fb7c..f12099842b7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -982,6 +982,31 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sns" +version = "0.24.0" +source = "git+https://github.com/vectordotdev/aws-sdk-rust?rev=3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670#3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes 1.4.0", + "http", + "regex", + "tokio-stream", + "tower", +] + [[package]] name = "aws-sdk-sqs" version = "0.24.0" @@ -9492,6 +9517,7 @@ dependencies = [ "aws-sdk-firehose", "aws-sdk-kinesis", "aws-sdk-s3", + "aws-sdk-sns", "aws-sdk-sqs", "aws-sigv4", "aws-smithy-async", diff --git a/Cargo.toml b/Cargo.toml index 531cc7ba0afe4..a11fd043d53fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ metrics-tracing-context = { version = "0.14.0", default-features = false } # depending on a fork to circumvent https://github.com/awslabs/aws-sdk-rust/issues/749 aws-sdk-s3 = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } aws-sdk-sqs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } +aws-sdk-sns = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } aws-sdk-cloudwatch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } aws-sdk-cloudwatchlogs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } aws-sdk-elasticsearch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true } @@ -612,6 +613,7 @@ sinks-logs = [ "sinks-aws_kinesis_streams", "sinks-aws_s3", "sinks-aws_sqs", + "sinks-aws_sns", "sinks-axiom", "sinks-azure_blob", "sinks-azure_monitor_logs", @@ -671,6 +673,7 @@ sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"] sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"] sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"] sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] +sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-elasticsearch"] sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"] sinks-azure_monitor_logs = [] @@ -779,6 +782,7 @@ aws-integration-tests = [ "aws-kinesis-streams-integration-tests", "aws-s3-integration-tests", "aws-sqs-integration-tests", + "aws-sns-integration-tests", ] azure-integration-tests = [ @@ -792,7 +796,8 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"] aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "dep:aws-sdk-elasticsearch", "sinks-elasticsearch"] aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"] aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"] -aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"] +aws-sqs-integration-tests = ["sinks-aws_sqs"] +aws-sns-integration-tests = ["sinks-aws_sns"] axiom-integration-tests = ["sinks-axiom"] azure-blob-integration-tests = ["sinks-azure_blob"] chronicle-integration-tests = ["sinks-gcp"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index b6d5a876eae7e..8ad918b9800e7 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -53,6 +53,7 @@ aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS R aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " +aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-sqs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-sso,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-sts,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " diff --git a/scripts/integration/aws/compose.yaml b/scripts/integration/aws/compose.yaml index 07ed1fde4311d..e635c31f12a2f 100644 --- a/scripts/integration/aws/compose.yaml +++ b/scripts/integration/aws/compose.yaml @@ -6,7 +6,7 @@ services: mock-localstack: image: docker.io/localstack/localstack-full:0.11.6 environment: - - SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs + - SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs,sns mock-watchlogs: image: docker.io/luciofranco/mockwatchlogs:latest mock-ecs: diff --git a/scripts/integration/aws/test.yaml b/scripts/integration/aws/test.yaml index 6e554fca22c1a..466eb9cfd172e 100644 --- a/scripts/integration/aws/test.yaml +++ b/scripts/integration/aws/test.yaml @@ -13,6 +13,7 @@ env: KINESIS_ADDRESS: http://mock-localstack:4566 S3_ADDRESS: http://mock-localstack:4566 SQS_ADDRESS: http://mock-localstack:4566 + SNS_ADDRESS: http://mock-localstack:4566 WATCHLOGS_ADDRESS: http://mock-watchlogs:6000 matrix: diff --git a/src/sinks/aws_s_s/client.rs b/src/sinks/aws_s_s/client.rs new file mode 100644 index 0000000000000..88c86f5f9631b --- /dev/null +++ b/src/sinks/aws_s_s/client.rs @@ -0,0 +1,14 @@ +use super::{request_builder::SendMessageEntry, service::SendMessageResponse}; +use aws_sdk_sqs::types::SdkError; + +#[async_trait::async_trait] +pub(super) trait Client +where + R: std::fmt::Debug + std::fmt::Display + std::error::Error, +{ + async fn send_message( + &self, + entry: SendMessageEntry, + byte_size: usize, + ) -> Result>; +} diff --git a/src/sinks/aws_s_s/config.rs b/src/sinks/aws_s_s/config.rs new file mode 100644 index 0000000000000..aaf0a112d2f6b --- /dev/null +++ b/src/sinks/aws_s_s/config.rs @@ -0,0 +1,99 @@ +use std::convert::TryFrom; + +use snafu::{ResultExt, Snafu}; + +use vector_config::configurable_component; + +use crate::{ + aws::AwsAuthentication, + codecs::EncodingConfig, + config::AcknowledgementsConfig, + sinks::util::TowerRequestConfig, + template::{Template, TemplateParseError}, + tls::TlsConfig, +}; + +#[derive(Debug, Snafu)] +pub(super) enum BuildError { + #[snafu(display("`message_group_id` should be defined for FIFO queue."))] + MessageGroupIdMissing, + #[snafu(display("`message_group_id` is not allowed with non-FIFO queue."))] + MessageGroupIdNotAllowed, + #[snafu(display("invalid topic template: {}", source))] + TopicTemplate { source: TemplateParseError }, + #[snafu(display("invalid message_deduplication_id template: {}", source))] + MessageDeduplicationIdTemplate { source: TemplateParseError }, +} + +/// Base Configuration `aws_s_s` for sns and sqs sink. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub(super) struct BaseSSSinkConfig { + #[configurable(derived)] + pub(super) encoding: EncodingConfig, + + /// The tag that specifies that a message belongs to a specific message group. + /// + /// Can be applied only to FIFO queues. + #[configurable(metadata(docs::examples = "vector"))] + #[configurable(metadata(docs::examples = "vector-%Y-%m-%d"))] + pub(super) message_group_id: Option, + + /// The message deduplication ID value to allow AWS to identify duplicate messages. + /// + /// This value is a template which should result in a unique string for each event. See the [AWS + /// documentation][deduplication_id_docs] for more about how AWS does message deduplication. + /// + /// [deduplication_id_docs]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html + #[configurable(metadata(docs::examples = "{{ transaction_id }}"))] + pub(super) message_deduplication_id: Option, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + /// The ARN of an [IAM role][iam_role] to assume at startup. + /// + /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html + #[configurable(deprecated)] + #[configurable(metadata(docs::hidden))] + pub(super) assume_role: Option, + + #[configurable(derived)] + #[serde(default)] + pub(super) auth: AwsAuthentication, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) acknowledgements: AcknowledgementsConfig, +} + +pub(super) fn message_group_id( + message_group_id: Option, + fifo: bool, +) -> crate::Result> { + match (message_group_id.as_ref(), fifo) { + (Some(value), true) => Ok(Some( + Template::try_from(value.clone()).context(TopicTemplateSnafu)?, + )), + (Some(_), false) => Err(Box::new(BuildError::MessageGroupIdNotAllowed)), + (None, true) => Err(Box::new(BuildError::MessageGroupIdMissing)), + (None, false) => Ok(None), + } +} +pub(super) fn message_deduplication_id( + message_deduplication_id: Option, +) -> crate::Result> { + Ok(message_deduplication_id + .clone() + .map(Template::try_from) + .transpose()?) +} diff --git a/src/sinks/aws_s_s/mod.rs b/src/sinks/aws_s_s/mod.rs new file mode 100644 index 0000000000000..98f14b44f9bde --- /dev/null +++ b/src/sinks/aws_s_s/mod.rs @@ -0,0 +1,12 @@ +mod client; +mod config; +mod request_builder; +mod retry; +mod service; +mod sink; + +#[cfg(feature = "sinks-aws_sqs")] +mod sqs; + +#[cfg(feature = "sinks-aws_sns")] +mod sns; diff --git a/src/sinks/aws_sqs/request_builder.rs b/src/sinks/aws_s_s/request_builder.rs similarity index 72% rename from src/sinks/aws_sqs/request_builder.rs rename to src/sinks/aws_s_s/request_builder.rs index 34f54b5c1c0a2..fb977244e932c 100644 --- a/src/sinks/aws_sqs/request_builder.rs +++ b/src/sinks/aws_s_s/request_builder.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_core::ByteSizeOf; -use super::config::SqsSinkConfig; +use crate::codecs::EncodingConfig; use crate::{ codecs::{Encoder, Transformer}, event::{Event, EventFinalizers, Finalizable}, @@ -15,37 +15,39 @@ use crate::{ }; #[derive(Clone)] -pub struct SqsMetadata { - pub finalizers: EventFinalizers, - pub message_group_id: Option, - pub message_deduplication_id: Option, +pub(super) struct SSMetadata { + pub(super) finalizers: EventFinalizers, + pub(super) message_group_id: Option, + pub(super) message_deduplication_id: Option, } #[derive(Clone)] -pub(crate) struct SqsRequestBuilder { +pub(super) struct SSRequestBuilder { encoder: (Transformer, Encoder<()>), message_group_id: Option