Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/newline_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct NewlineDelimitedDecoderOptions {
/// consider setting the maximum length to a reasonably large value as a safety net. This
/// ensures that processing is not actually unbounded.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

impl NewlineDelimitedDecoderOptions {
Expand Down
22 changes: 14 additions & 8 deletions src/codecs/decoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use crate::{
/// messages.
#[derive(Clone)]
pub struct Decoder {
framer: Framer,
deserializer: Deserializer,
log_namespace: LogNamespace,
/// The framer being used.
pub framer: Framer,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes generally look good, but I'm missing what is special about the aws_s3 source that required these to be public where they weren't before. What necessitated this change (same with newline_delimited.rs above)?

Copy link
Member Author

@fuchsnj fuchsnj Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Decoder, the way S3 utilizes the decoder is a bit unique since it also allows a "multiline" config. The framing and decoding are done separately with the "multiline" code happening in the middle. So the code needed access to the individual parts.

For newline_delimited.rs, this is the first source that uses this as the default. This default was kept for backwards compatibility since it was previously hard-coded.

I could have added methods to access / modify these values instead of making them public, but since these types are simple data containers, it doesn't matter too much.

/// The deserializer being used.
pub deserializer: Deserializer,
/// The `log_namespace` being used.
pub log_namespace: LogNamespace,
}

impl Default for Decoder {
Expand Down Expand Up @@ -61,16 +64,19 @@ impl Decoder {
Error::FramingError(error)
})?;

let frame = match frame {
Some(frame) => frame,
_ => return Ok(None),
};
frame
.map(|frame| self.deserializer_parse(frame))
.transpose()
}

/// Parses a frame using the included deserializer, and handles any errors by logging.
pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
let byte_size = frame.len();

// Parse structured events from the byte frame.
self.deserializer
.parse(frame, self.log_namespace)
.map(|events| Some((events, byte_size)))
.map(|events| (events, byte_size))
.map_err(|error| {
emit!(DecoderDeserializeError { error: &error });
Error::ParsingError(error)
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::sink::S3RequestOptions;
use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
sinks::{
s3_common::{
self,
Expand Down Expand Up @@ -177,7 +174,7 @@ impl SinkConfig for S3SinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down
34 changes: 30 additions & 4 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ use std::{convert::TryInto, io::ErrorKind};

use async_compression::tokio::bufread;
use aws_sdk_s3::types::ByteStream;
use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions};
use codecs::BytesDeserializerConfig;
use futures::{stream, stream::StreamExt, TryStreamExt};
use lookup::owned_value_path;
use snafu::Snafu;
use tokio_util::io::StreamReader;
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
use vector_core::config::{DataType, LegacyKey, LogNamespace};
use vector_core::config::{LegacyKey, LogNamespace};

use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
use crate::config::DataType;
use crate::{
aws::{auth::AwsAuthentication, create_client, RegionOrEndpoint},
common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
config::{Output, ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext},
line_agg,
serde::bool_or_struct,
serde::{bool_or_struct, default_decoding},
tls::TlsConfig,
};

Expand Down Expand Up @@ -69,7 +72,8 @@ enum Strategy {
//
// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(default, deny_unknown_fields)]
pub struct AwsS3Config {
#[serde(flatten)]
Expand Down Expand Up @@ -113,6 +117,23 @@ pub struct AwsS3Config {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default = "default_framing")]
#[derivative(Default(value = "default_framing()"))]
pub framing: FramingConfig,

#[configurable(derived)]
#[serde(default = "default_decoding")]
#[derivative(Default(value = "default_decoding()"))]
pub decoding: DeserializerConfig,
}

const fn default_framing() -> FramingConfig {
// This is used for backwards compatibility. It used to be the only (hardcoded) option.
FramingConfig::NewlineDelimited {
newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
}
}

impl_generate_config_from_default!(AwsS3Config);
Expand All @@ -131,7 +152,7 @@ impl SourceConfig for AwsS3Config {

match self.strategy {
Strategy::Sqs => Ok(Box::pin(
self.create_sqs_ingestor(multiline_config, &cx.proxy)
self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
.await?
.run(cx, self.acknowledgements, log_namespace),
)),
Expand Down Expand Up @@ -198,6 +219,7 @@ impl AwsS3Config {
&self,
multiline: Option<line_agg::Config>,
proxy: &ProxyConfig,
log_namespace: LogNamespace,
) -> crate::Result<sqs::Ingestor> {
let region = self
.region
Expand All @@ -219,6 +241,9 @@ impl AwsS3Config {
)
.await?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();

match self.sqs {
Some(ref sqs) => {
let sqs_client = create_client::<SqsClientBuilder>(
Expand All @@ -238,6 +263,7 @@ impl AwsS3Config {
sqs.clone(),
self.compression,
multiline,
decoder,
)
.await?;

Expand Down
Loading