Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,80 @@ async fn s3_healthchecks_invalid_bucket() {
.is_err());
}

#[tokio::test]
async fn s3_flush_on_exhaustion() {
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();
create_bucket(&bucket, false).await;

// batch size of ten events, timeout of ten seconds
let config = {
let mut batch = BatchConfig::default();
batch.max_events = Some(10);
batch.timeout_secs = Some(10.0);

S3SinkConfig {
bucket: bucket.to_string(),
key_prefix: random_string(10) + "/date=%F",
filename_time_format: default_filename_time_format(),
filename_append_uuid: true,
filename_extension: None,
options: S3Options::default(),
region: RegionOrEndpoint::with_both("minio", s3_address()),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
batch,
request: TowerRequestConfig::default(),
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
}
};
let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();

let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit, an actual assertion of this invariant ((less than batch size)) could help guard against it being accidentally broken.


let events = lines.clone().into_iter().enumerate().map(|(i, line)| {
let mut e = LogEvent::from(line);
let i = if i < 10 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't i always be less than 10 since we only generate two events?

1
} else if i < 20 {
2
} else {
3
};
e.insert("i", i.to_string());
Event::from(e)
});

// Here, we validate that the s3 sink flushes when its source stream is exhausted
// by giving it a number of inputs less than the batch size, verifying that the
// outputs for the in-flight batch are flushed. By timing out in 3 seconds with a
// flush period of ten seconds, we verify that the flush is triggered *at stream
// completion* and not because of periodic flushing.
assert!(tokio::time::timeout(
Duration::from_secs(3),
run_and_assert_sink_compliance(sink, stream::iter(events), &AWS_SINK_TAGS)
)
.await
.is_ok());

let keys = get_keys(&bucket, prefix).await;
assert_eq!(keys.len(), 1);

let mut response_lines: Vec<String> = Vec::new();
let mut key_stream = stream::iter(keys);
while let Some(key) = key_stream.next().await {
let obj = get_object(&bucket, key).await;
response_lines.append(&mut get_lines(obj).await);
}

assert_eq!(lines, response_lines); // if all events are received, and lines.len() < batch size, then a flush was performed.
}

async fn client() -> S3Client {
let auth = AwsAuthentication::test_auth();
let region = RegionOrEndpoint::with_both("minio", s3_address());
Expand Down