Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::sink::VectorSink;

use super::sink::S3RequestOptions;
use super::sink::{NewS3Sink, S3RequestOptions};
use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
Expand All @@ -20,7 +20,6 @@ use crate::{
config::{S3Options, S3RetryLogic},
partitioner::S3KeyPartitioner,
service::S3Service,
sink::S3Sink,
},
util::{
BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
Expand Down Expand Up @@ -199,7 +198,7 @@ impl S3SinkConfig {
.service(service);

// Configure our partitioning/batching.
let batch_settings = self.batch.into_batcher_settings()?;
let batcher_settings = self.batch.into_batcher_settings()?;
let key_prefix = self.key_prefix.clone().try_into()?;
let ssekms_key_id = self
.options
Expand All @@ -212,19 +211,27 @@ impl S3SinkConfig {

let transformer = self.encoding.transformer();
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);
let encoder = Encoder::<Framer>::new(framer.clone(), serializer.clone());

let request_options = S3RequestOptions {
bucket: self.bucket.clone(),
api_options: self.options.clone(),
filename_extension: self.filename_extension.clone(),
filename_time_format: self.filename_time_format.clone(),
filename_append_uuid: self.filename_append_uuid,
encoder: (transformer, encoder),
encoder: (transformer.clone(), encoder),
compression: self.compression,
};

let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
let sink = NewS3Sink {
service,
partitioner,
transformer,
framer,
serializer,
batcher_settings,
options: request_options,
};

Ok(VectorSink::from_event_streamsink(sink))
}
Expand Down
176 changes: 170 additions & 6 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,192 @@
use std::io;
use std::{fmt, io, num::NonZeroUsize, sync::Arc};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use codecs::encoding::Framer;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use tokio_util::codec::Encoder as _;
use tower::Service;
use uuid::Uuid;
use vector_common::request_metadata::RequestMetadata;
use vector_core::event::Finalizable;
use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata};
use vector_core::{
event::Finalizable,
partition::Partitioner,
stream::{BatcherSettings, DriverResponse},
ByteSizeOf,
};

use crate::{
codecs::{Encoder, Transformer},
event::Event,
sinks::{
s3_common::{
config::S3Options,
partitioner::S3PartitionKey,
partitioner::{S3KeyPartitioner, S3PartitionKey},
service::{S3Metadata, S3Request},
},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, SinkBuilderExt,
},
},
};

pub struct NewS3Sink<Svc> {
pub service: Svc,
pub partitioner: S3KeyPartitioner,
pub transformer: Transformer,
pub framer: Framer,
pub serializer: codecs::encoding::Serializer,
pub batcher_settings: BatcherSettings,
pub options: S3RequestOptions,
}

struct EncodedEvent {
inner: Event,
encoded: BytesMut,
}

// hack to reuse this trait for encoded size
impl ByteSizeOf for EncodedEvent {
fn size_of(&self) -> usize {
self.allocated_bytes()
}

fn allocated_bytes(&self) -> usize {
self.encoded.len()
}
}

struct WrappedPartitioner(S3KeyPartitioner);

impl Partitioner for WrappedPartitioner {
type Item = EncodedEvent;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
self.0.partition(&item.inner)
}
}

impl<Svc> NewS3Sink<Svc>
where
Svc: Service<S3Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let transformer = self.transformer;
let mut serializer = self.serializer;
let partitioner = WrappedPartitioner(self.partitioner);
let service = self.service;
let framer = Arc::new(self.framer);
let batcher_settings = self.batcher_settings;
let options = Arc::new(self.options);

let combined_encoder = Arc::new(Encoder::<Framer>::new(
framer.as_ref().clone(),
serializer.clone(),
));

let builder_limit = NonZeroUsize::new(64);

input
.map(|event| {
let mut to_encode = event.clone();
transformer.transform(&mut to_encode);

let mut encoded = BytesMut::new();
serializer.encode(to_encode, &mut encoded).unwrap();

EncodedEvent {
inner: event,
encoded,
}
})
.batched_partitioned(partitioner, batcher_settings)
.filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
.concurrent_map(builder_limit, move |(partition_key, encoded_events)| {
let framer = Arc::clone(&framer);
let combined_encoder = Arc::clone(&combined_encoder);
let options = Arc::clone(&options);

Box::pin(async move {
// This is silly because we really just need the prefix, delimiter, and suffix. Oh well.
let mut framer = framer.as_ref().clone();

let mut grouped_sizes = GroupedCountByteSize::new_tagged();
let mut events = Vec::with_capacity(encoded_events.len());
let mut encoded = Vec::with_capacity(encoded_events.len());
for e in encoded_events {
grouped_sizes.add_event(&e.inner, e.encoded.len().into());
events.push(e.inner);
encoded.push(e.encoded);
}

// TODO: this doesn't include framing, is that right?
let events_encoded_size = encoded.iter().map(BytesMut::len).sum::<usize>();

let finalizers = events.take_finalizers();
let s3_key_prefix = partition_key.key_prefix.clone();

let metadata = S3Metadata {
partition_key,
s3_key: s3_key_prefix,
finalizers,
};

// TODO: not doing compression yet
let mut payload = BytesMut::new();
payload.extend_from_slice(combined_encoder.batch_prefix());
let mut remaining = encoded.len();
for buf in encoded {
payload.extend_from_slice(buf.as_ref());
remaining -= 1;
if remaining > 0 {
// write the frame delimiter
framer.encode((), &mut payload).expect("framing to bytes");
}
}
payload.extend_from_slice(combined_encoder.batch_suffix());

let request_metadata = RequestMetadata::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a fringe benefit of this would be that we don't need the RequestMetadataBuilder anymore ? I added that last year and it was kind of annoying, like if we have all the information we need in one place there are definite perks to that.

events.len(),
events_encoded_size,
payload.len(),
// TODO: same since no compression yet
payload.len(),
// TODO: just using encoded size here, not sure if we still need to estimate?
grouped_sizes,
);

options.build_request(
metadata,
request_metadata,
EncodeResult::uncompressed(payload.freeze()),
)
})
})
.into_driver(service)
.run()
.await
}
}

#[async_trait::async_trait]
impl<Svc> vector_core::sink::StreamSink<Event> for NewS3Sink<Svc>
where
Svc: Service<S3Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}

#[derive(Clone)]
pub struct S3RequestOptions {
pub bucket: String,
Expand Down
16 changes: 10 additions & 6 deletions src/sinks/s3_common/partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ impl S3KeyPartitioner {
) -> Self {
Self(key_prefix_template, ssekms_key_id_template)
}
}

impl Partitioner for S3KeyPartitioner {
type Item = Event;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
pub fn partition(&self, item: &Event) -> Option<S3PartitionKey> {
let key_prefix = self
.0
.render_string(item)
Expand Down Expand Up @@ -56,3 +51,12 @@ impl Partitioner for S3KeyPartitioner {
})
}
}

impl Partitioner for S3KeyPartitioner {
type Item = Event;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
self.partition(item)
}
}