diff --git a/lib/codecs/src/decoding/framing/newline_delimited.rs b/lib/codecs/src/decoding/framing/newline_delimited.rs index 1ca8e68815c60..cc96d08d7ab40 100644 --- a/lib/codecs/src/decoding/framing/newline_delimited.rs +++ b/lib/codecs/src/decoding/framing/newline_delimited.rs @@ -34,7 +34,7 @@ pub struct NewlineDelimitedDecoderOptions { /// consider setting the maximum length to a reasonably large value as a safety net. This /// ensures that processing is not actually unbounded. #[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")] - max_length: Option, + pub max_length: Option, } impl NewlineDelimitedDecoderOptions { diff --git a/src/codecs/decoding/decoder.rs b/src/codecs/decoding/decoder.rs index bca8c9267e075..76b06f5a59f70 100644 --- a/src/codecs/decoding/decoder.rs +++ b/src/codecs/decoding/decoder.rs @@ -15,9 +15,12 @@ use crate::{ /// messages. #[derive(Clone)] pub struct Decoder { - framer: Framer, - deserializer: Deserializer, - log_namespace: LogNamespace, + /// The framer being used. + pub framer: Framer, + /// The deserializer being used. + pub deserializer: Deserializer, + /// The `log_namespace` being used. + pub log_namespace: LogNamespace, } impl Default for Decoder { @@ -61,16 +64,19 @@ impl Decoder { Error::FramingError(error) })?; - let frame = match frame { - Some(frame) => frame, - _ => return Ok(None), - }; + frame + .map(|frame| self.deserializer_parse(frame)) + .transpose() + } + /// Parses a frame using the included deserializer, and handles any errors by logging. + pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> { let byte_size = frame.len(); + // Parse structured events from the byte frame. self.deserializer .parse(frame, self.log_namespace) - .map(|events| Some((events, byte_size))) + .map(|events| (events, byte_size)) .map_err(|error| { emit!(DecoderDeserializeError { error: &error }); Error::ParsingError(error) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 175ee91a0b53d..38f22220d293a 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -13,10 +13,7 @@ use super::sink::S3RequestOptions; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, codecs::{Encoder, EncodingConfigWithFraming, SinkType}, - config::{ - AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, - SinkContext, - }, + config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, sinks::{ s3_common::{ self, @@ -177,7 +174,7 @@ impl SinkConfig for S3SinkConfig { } fn input(&self) -> Input { - Input::new(self.encoding.config().1.input_type() & DataType::Log) + Input::new(self.encoding.config().1.input_type()) } fn acknowledgements(&self) -> &AcknowledgementsConfig { diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index ecbc83f582787..21c104607a1b1 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -2,6 +2,7 @@ use std::{convert::TryInto, io::ErrorKind}; use async_compression::tokio::bufread; use aws_sdk_s3::types::ByteStream; +use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}; use codecs::BytesDeserializerConfig; use futures::{stream, stream::StreamExt, TryStreamExt}; use lookup::owned_value_path; @@ -9,15 +10,17 @@ use snafu::Snafu; use tokio_util::io::StreamReader; use value::{kind::Collection, Kind}; use vector_config::configurable_component; -use vector_core::config::{DataType, LegacyKey, LogNamespace}; +use vector_core::config::{LegacyKey, LogNamespace}; use super::util::MultilineConfig; +use crate::codecs::DecodingConfig; +use crate::config::DataType; use crate::{ aws::{auth::AwsAuthentication, create_client, RegionOrEndpoint}, common::{s3::S3ClientBuilder, sqs::SqsClientBuilder}, config::{Output, ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext}, line_agg, - serde::bool_or_struct, + serde::{bool_or_struct, default_decoding}, tls::TlsConfig, }; @@ -69,7 +72,8 @@ enum Strategy { // // Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies: #[configurable_component(source("aws_s3", "Collect logs from AWS S3."))] -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] #[serde(default, deny_unknown_fields)] pub struct AwsS3Config { #[serde(flatten)] @@ -113,6 +117,23 @@ pub struct AwsS3Config { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + #[configurable(derived)] + #[serde(default = "default_framing")] + #[derivative(Default(value = "default_framing()"))] + pub framing: FramingConfig, + + #[configurable(derived)] + #[serde(default = "default_decoding")] + #[derivative(Default(value = "default_decoding()"))] + pub decoding: DeserializerConfig, +} + +const fn default_framing() -> FramingConfig { + // This is used for backwards compatibility. It used to be the only (hardcoded) option. + FramingConfig::NewlineDelimited { + newline_delimited: NewlineDelimitedDecoderOptions { max_length: None }, + } } impl_generate_config_from_default!(AwsS3Config); @@ -131,7 +152,7 @@ impl SourceConfig for AwsS3Config { match self.strategy { Strategy::Sqs => Ok(Box::pin( - self.create_sqs_ingestor(multiline_config, &cx.proxy) + self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace) .await? .run(cx, self.acknowledgements, log_namespace), )), @@ -198,6 +219,7 @@ impl AwsS3Config { &self, multiline: Option, proxy: &ProxyConfig, + log_namespace: LogNamespace, ) -> crate::Result { let region = self .region @@ -219,6 +241,9 @@ impl AwsS3Config { ) .await?; + let decoder = + DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build(); + match self.sqs { Some(ref sqs) => { let sqs_client = create_client::( @@ -238,6 +263,7 @@ impl AwsS3Config { sqs.clone(), self.compression, multiline, + decoder, ) .await?; diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 490f5b9b8d0c0..4adae407ce9d2 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::{future::ready, num::NonZeroUsize, panic, sync::Arc}; use aws_sdk_s3::{error::GetObjectError, Client as S3Client}; @@ -10,12 +11,13 @@ use aws_sdk_sqs::{ use aws_smithy_client::SdkError; use aws_types::region::Region; use bytes::Bytes; -use chrono::{TimeZone, Utc}; -use codecs::{decoding::FramingError, BytesDeserializer, CharacterDelimitedDecoder}; +use chrono::{DateTime, TimeZone, Utc}; +use codecs::decoding::FramingError; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; use once_cell::sync::Lazy; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; +use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; use tokio::{pin, select}; use tokio_util::codec::FramedRead; @@ -25,6 +27,8 @@ use vector_common::internal_event::{ }; use vector_config::configurable_component; +use crate::codecs::Decoder; +use crate::event::{Event, LogEvent}; use crate::{ config::{SourceAcknowledgementsConfig, SourceContext}, event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf}, @@ -42,6 +46,7 @@ use crate::{ }; use lookup::{metadata_path, path, PathPrefix}; use vector_core::config::{log_schema, LegacyKey, LogNamespace}; +use vector_core::event::MaybeAsLogMut; static SUPPORTED_S3_EVENT_VERSION: Lazy = Lazy::new(|| semver::VersionReq::parse("~2").unwrap()); @@ -195,6 +200,7 @@ pub struct State { client_concurrency: usize, visibility_timeout_secs: i32, delete_message: bool, + decoder: Decoder, } pub(super) struct Ingestor { @@ -209,6 +215,7 @@ impl Ingestor { config: Config, compression: super::Compression, multiline: Option, + decoder: Decoder, ) -> Result { let state = Arc::new(State { region, @@ -227,6 +234,7 @@ impl Ingestor { .unwrap_or_else(crate::num_threads), visibility_timeout_secs: config.visibility_timeout_secs as i32, delete_message: config.delete_message, + decoder, }); Ok(Ingestor { state }) @@ -340,7 +348,6 @@ impl IngestorProcess { .message_id .clone() .unwrap_or_else(|| "".to_owned()); - match self.handle_sqs_message(message).await { Ok(()) => { emit!(SqsMessageProcessingSucceeded { @@ -398,7 +405,7 @@ impl IngestorProcess { } async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> { - let s3_event: Event = serde_json::from_str(message.body.unwrap_or_default().as_ref()) + let s3_event: SqsEvent = serde_json::from_str(message.body.unwrap_or_default().as_ref()) .context(InvalidSqsMessageSnafu { message_id: message .message_id @@ -407,11 +414,11 @@ impl IngestorProcess { })?; match s3_event { - Event::TestEvent(_s3_test_event) => { + SqsEvent::TestEvent(_s3_test_event) => { debug!(?message.message_id, message = "Found S3 Test Event."); Ok(()) } - Event::Event(s3_event) => self.handle_s3_event(s3_event).await, + SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await, } } @@ -504,7 +511,7 @@ impl IngestorProcess { let bytes_received = self.bytes_received.clone(); let events_received = self.events_received.clone(); let lines: Box + Send + Unpin> = Box::new( - FramedRead::new(object_reader, CharacterDelimitedDecoder::new(b'\n')) + FramedRead::new(object_reader, self.state.decoder.framer.clone()) .map(|res| { res.map(|bytes| { bytes_received.emit(ByteSize(bytes.len())); @@ -519,7 +526,7 @@ impl IngestorProcess { .map(|r| r.expect("validated by take_while")), ); - let lines = match &self.state.multiline { + let lines: Box + Send + Unpin> = match &self.state.multiline { Some(config) => Box::new( LineAgg::new( lines.map(|line| ((), line, ())), @@ -530,77 +537,34 @@ impl IngestorProcess { None => lines, }; - let mut stream = lines.map(|line| { - let deserializer = BytesDeserializer::new(); - let mut log = deserializer - .parse_single(line, log_namespace) - .with_batch_notifier_option(&batch); - - log_namespace.insert_source_metadata( - AwsS3Config::NAME, - &mut log, - Some(LegacyKey::Overwrite(path!("bucket"))), - path!("bucket"), - Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()), - ); - log_namespace.insert_source_metadata( - AwsS3Config::NAME, - &mut log, - Some(LegacyKey::Overwrite(path!("object"))), - path!("object"), - Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()), - ); - log_namespace.insert_source_metadata( - AwsS3Config::NAME, - &mut log, - Some(LegacyKey::Overwrite(path!("region"))), - path!("region"), - Bytes::from(s3_event.aws_region.as_bytes().to_vec()), - ); - - if let Some(metadata) = &metadata { - for (key, value) in metadata { - log_namespace.insert_source_metadata( - AwsS3Config::NAME, - &mut log, - Some(LegacyKey::Overwrite(key.as_str())), - path!("metadata", key.as_str()), - value.clone(), - ); + let mut stream = lines.flat_map(|line| { + let events = match self.state.decoder.deserializer_parse(line) { + Ok((events, _events_size)) => events, + Err(_error) => { + // Error is handled by `codecs::Decoder`, no further handling + // is needed here. + SmallVec::new() } - } - - log_namespace.insert_vector_metadata( - &mut log, - Some(log_schema().source_type_key()), - path!("source_type"), - Bytes::from_static(AwsS3Config::NAME.as_bytes()), - ); - - // This handles the transition from the original timestamp logic. Originally the - // `timestamp_key` was populated by the `last_modified` time on the object, falling - // back to calling `now()`. - match log_namespace { - LogNamespace::Vector => { - if let Some(timestamp) = timestamp { - log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp); - } + }; - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); - } - LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - log.try_insert( - (PathPrefix::Event, timestamp_key), - timestamp.unwrap_or_else(Utc::now), + let events = events + .into_iter() + .map(|mut event: Event| { + event = event.with_batch_notifier_option(&batch); + if let Some(log_event) = event.maybe_as_log_mut() { + handle_single_log( + log_event, + log_namespace, + &s3_event, + &metadata, + timestamp, ); } - } - }; - - events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of())); - - log + events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of())); + event + }) + .collect::>(); + futures::stream::iter(events) }); let send_error = match self.out.send_event_stream(&mut stream).await { @@ -635,15 +599,18 @@ impl IngestorProcess { } else { match receiver { None => Ok(()), - Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(()), - BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement), - BatchStatus::Rejected => { - // Sinks are responsible for emitting ComponentEventsDropped. - // Failed events cannot be retried, so continue to delete the SQS source message. - Ok(()) + Some(receiver) => { + let result = receiver.await; + match result { + BatchStatus::Delivered => Ok(()), + BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement), + BatchStatus::Rejected => { + // Sinks are responsible for emitting ComponentEventsDropped. + // Failed events cannot be retried, so continue to delete the SQS source message. + Ok(()) + } } - }, + } } } } @@ -675,10 +642,81 @@ impl IngestorProcess { } } +fn handle_single_log( + log: &mut LogEvent, + log_namespace: LogNamespace, + s3_event: &S3EventRecord, + metadata: &Option>, + timestamp: Option>, +) { + log_namespace.insert_source_metadata( + AwsS3Config::NAME, + log, + Some(LegacyKey::Overwrite(path!("bucket"))), + path!("bucket"), + Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()), + ); + + log_namespace.insert_source_metadata( + AwsS3Config::NAME, + log, + Some(LegacyKey::Overwrite(path!("object"))), + path!("object"), + Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()), + ); + log_namespace.insert_source_metadata( + AwsS3Config::NAME, + log, + Some(LegacyKey::Overwrite(path!("region"))), + path!("region"), + Bytes::from(s3_event.aws_region.as_bytes().to_vec()), + ); + + if let Some(metadata) = metadata { + for (key, value) in metadata { + log_namespace.insert_source_metadata( + AwsS3Config::NAME, + log, + Some(LegacyKey::Overwrite(key.as_str())), + path!("metadata", key.as_str()), + value.clone(), + ); + } + } + + log_namespace.insert_vector_metadata( + log, + Some(log_schema().source_type_key()), + path!("source_type"), + Bytes::from_static(AwsS3Config::NAME.as_bytes()), + ); + + // This handles the transition from the original timestamp logic. Originally the + // `timestamp_key` was populated by the `last_modified` time on the object, falling + // back to calling `now()`. + match log_namespace { + LogNamespace::Vector => { + if let Some(timestamp) = timestamp { + log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp); + } + + log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + } + LogNamespace::Legacy => { + if let Some(timestamp_key) = log_schema().timestamp_key() { + log.try_insert( + (PathPrefix::Event, timestamp_key), + timestamp.unwrap_or_else(Utc::now), + ); + } + } + }; +} + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] -enum Event { +enum SqsEvent { Event(S3Event), TestEvent(S3TestEvent), } diff --git a/website/cue/reference/components/sources/base/aws_s3.cue b/website/cue/reference/components/sources/base/aws_s3.cue index 1b70165eb3ccd..8311de4c71911 100644 --- a/website/cue/reference/components/sources/base/aws_s3.cue +++ b/website/cue/reference/components/sources/base/aws_s3.cue @@ -135,11 +135,151 @@ base: components: sources: aws_s3: configuration: { } } } + decoding: { + description: "Configures how events are decoded from raw bytes." + required: false + type: object: options: codec: { + description: "The codec to use for decoding events." + required: false + type: string: { + default: "bytes" + enum: { + bytes: "Uses the raw bytes as-is." + gelf: """ + Decodes the raw bytes as a [GELF][gelf] message. + + [gelf]: https://docs.graylog.org/docs/gelf + """ + json: """ + Decodes the raw bytes as [JSON][json]. + + [json]: https://www.json.org/ + """ + native: """ + Decodes the raw bytes as Vector’s [native Protocol Buffers format][vector_native_protobuf]. + + This codec is **[experimental][experimental]**. + + [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + native_json: """ + Decodes the raw bytes as Vector’s [native JSON format][vector_native_json]. + + This codec is **[experimental][experimental]**. + + [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + syslog: """ + Decodes the raw bytes as a Syslog message. + + Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the + [RFC 5424][rfc5424]-style format ("new" style, includes structured data). + + [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt + [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt + """ + } + } + } + } endpoint: { description: "Custom endpoint for use with AWS-compatible services." required: false type: string: examples: ["http://127.0.0.0:5000/path/to/service"] } + framing: { + description: """ + Framing configuration. + + Framing handles how events are separated when encoded in a raw byte form, where each event is + a frame that must be prefixed, or delimited, in a way that marks where an event begins and + ends within the byte stream. + """ + required: false + type: object: options: { + character_delimited: { + description: "Options for the character delimited decoder." + relevant_when: "method = \"character_delimited\"" + required: true + type: object: options: { + delimiter: { + description: "The character that delimits byte sequences." + required: true + type: uint: {} + } + max_length: { + description: """ + The maximum length of the byte buffer. + + This length does *not* include the trailing delimiter. + + By default, there is no maximum length enforced. If events are malformed, this can lead to + additional resource usage as events continue to be buffered in memory, and can potentially + lead to memory exhaustion in extreme cases. + + If there is a risk of processing malformed data, such as logs with user-controlled input, + consider setting the maximum length to a reasonably large value as a safety net. This + ensures that processing is not actually unbounded. + """ + required: false + type: uint: {} + } + } + } + method: { + description: "The framing method." + required: false + type: string: { + default: "newline_delimited" + enum: { + bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." + character_delimited: "Byte frames which are delimited by a chosen character." + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." + octet_counting: """ + Byte frames according to the [octet counting][octet_counting] format. + + [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1 + """ + } + } + } + newline_delimited: { + description: "Options for the newline delimited decoder." + relevant_when: "method = \"newline_delimited\"" + required: false + type: object: options: max_length: { + description: """ + The maximum length of the byte buffer. + + This length does *not* include the trailing delimiter. + + By default, there is no maximum length enforced. If events are malformed, this can lead to + additional resource usage as events continue to be buffered in memory, and can potentially + lead to memory exhaustion in extreme cases. + + If there is a risk of processing malformed data, such as logs with user-controlled input, + consider setting the maximum length to a reasonably large value as a safety net. This + ensures that processing is not actually unbounded. + """ + required: false + type: uint: {} + } + } + octet_counting: { + description: "Options for the octet counting decoder." + relevant_when: "method = \"octet_counting\"" + required: false + type: object: options: max_length: { + description: "The maximum length of the byte buffer." + required: false + type: uint: {} + } + } + } + } multiline: { description: """ Multiline aggregation configuration.