diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index a86d647fb3c6c..4b5c082b7aa91 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -406,6 +406,80 @@ async fn s3_healthchecks_invalid_bucket() { .is_err()); } +#[tokio::test] +async fn s3_flush_on_exhaustion() { + let cx = SinkContext::new_test(); + + let bucket = uuid::Uuid::new_v4().to_string(); + create_bucket(&bucket, false).await; + + // batch size of ten events, timeout of ten seconds + let config = { + let mut batch = BatchConfig::default(); + batch.max_events = Some(10); + batch.timeout_secs = Some(10.0); + + S3SinkConfig { + bucket: bucket.to_string(), + key_prefix: random_string(10) + "/date=%F", + filename_time_format: default_filename_time_format(), + filename_append_uuid: true, + filename_extension: None, + options: S3Options::default(), + region: RegionOrEndpoint::with_both("minio", s3_address()), + encoding: (None::, TextSerializerConfig::default()).into(), + compression: Compression::None, + batch, + request: TowerRequestConfig::default(), + tls: Default::default(), + auth: Default::default(), + acknowledgements: Default::default(), + } + }; + let prefix = config.key_prefix.clone(); + let service = config.create_service(&cx.globals.proxy).await.unwrap(); + let sink = config.build_processor(service).unwrap(); + + let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size) + + let events = lines.clone().into_iter().enumerate().map(|(i, line)| { + let mut e = LogEvent::from(line); + let i = if i < 10 { + 1 + } else if i < 20 { + 2 + } else { + 3 + }; + e.insert("i", i.to_string()); + Event::from(e) + }); + + // Here, we validate that the s3 sink flushes when its source stream is exhausted + // by giving it a number of inputs less than the batch size, verifying that the + // outputs for the in-flight batch are flushed. By timing out in 3 seconds with a + // flush period of ten seconds, we verify that the flush is triggered *at stream + // completion* and not because of periodic flushing. + assert!(tokio::time::timeout( + Duration::from_secs(3), + run_and_assert_sink_compliance(sink, stream::iter(events), &AWS_SINK_TAGS) + ) + .await + .is_ok()); + + let keys = get_keys(&bucket, prefix).await; + assert_eq!(keys.len(), 1); + + let mut response_lines: Vec = Vec::new(); + let mut key_stream = stream::iter(keys); + while let Some(key) = key_stream.next().await { + let obj = get_object(&bucket, key).await; + response_lines.append(&mut get_lines(obj).await); + } + + assert_eq!(lines, response_lines); // if all events are received, and lines.len() < batch size, then a flush was performed. +} + async fn client() -> S3Client { let auth = AwsAuthentication::test_auth(); let region = RegionOrEndpoint::with_both("minio", s3_address());