diff --git a/Cargo.lock b/Cargo.lock index 3a02082211586..136fe6d8e6223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9915,6 +9915,7 @@ dependencies = [ "bytes 1.5.0", "bytesize", "chrono", + "chrono-tz", "cidr-utils", "clap 4.4.8", "colored", diff --git a/Cargo.toml b/Cargo.toml index b429feefe5343..d92e02860bf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -242,6 +242,7 @@ bollard = { version = "0.15.0", default-features = false, features = ["ssl", "ch bytes = { version = "1.5.0", default-features = false, features = ["serde"] } bytesize = { version = "1.3.0", default-features = false } chrono = { version = "0.4.31", default-features = false, features = ["serde"] } +chrono-tz = { version = "0.8.3", default-features = false } cidr-utils = { version = "0.5.11", default-features = false } clap = { version = "4.4.8", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.4", default-features = false } diff --git a/lib/vector-config/src/external/datetime.rs b/lib/vector-config/src/external/datetime.rs index 45575f2846564..6735cfa2e239c 100644 --- a/lib/vector-config/src/external/datetime.rs +++ b/lib/vector-config/src/external/datetime.rs @@ -20,8 +20,8 @@ impl Configurable for TimeZone { fn metadata() -> Metadata { let mut metadata = Metadata::default(); - metadata.set_title("Timezone reference."); - metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + metadata.set_title("Timezone to use for any date specifiers in template strings."); + metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones"#); metadata.add_custom_attribute(CustomAttribute::kv( diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 2e8a497952359..4aad4417734a8 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,5 +1,3 @@ -use std::convert::TryInto; - use aws_sdk_s3::Client as S3Client; use tower::ServiceBuilder; use vector_lib::codecs::{ @@ -8,6 +6,7 @@ use vector_lib::codecs::{ }; use vector_lib::configurable::configurable_component; use vector_lib::sink::VectorSink; +use vector_lib::TimeZone; use super::sink::S3RequestOptions; use crate::{ @@ -23,8 +22,8 @@ use crate::{ sink::S3Sink, }, util::{ - BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, + timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, + ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, }, @@ -136,6 +135,10 @@ pub struct S3SinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub timezone: Option, } pub(super) fn default_key_prefix() -> String { @@ -163,6 +166,7 @@ impl GenerateConfig for S3SinkConfig { tls: Some(TlsConfig::default()), auth: AwsAuthentication::default(), acknowledgements: Default::default(), + timezone: Default::default(), }) .unwrap() } @@ -174,7 +178,7 @@ impl SinkConfig for S3SinkConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let service = self.create_service(&cx.proxy).await?; let healthcheck = self.build_healthcheck(service.client())?; - let sink = self.build_processor(service)?; + let sink = self.build_processor(service, cx)?; Ok((sink, healthcheck)) } @@ -188,7 +192,11 @@ impl SinkConfig for S3SinkConfig { } impl S3SinkConfig { - pub fn build_processor(&self, service: S3Service) -> crate::Result { + pub fn build_processor( + &self, + service: S3Service, + cx: SinkContext, + ) -> crate::Result { // Build our S3 client/service, which is what we'll ultimately feed // requests into in order to ship files to S3. We build this here in // order to configure the client/service with retries, concurrency @@ -198,9 +206,16 @@ impl S3SinkConfig { .settings(request_limits, S3RetryLogic) .service(service); + let offset = self + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); + // Configure our partitioning/batching. let batch_settings = self.batch.into_batcher_settings()?; - let key_prefix = self.key_prefix.clone().try_into()?; + + let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset); + let ssekms_key_id = self .options .ssekms_key_id @@ -208,6 +223,7 @@ impl S3SinkConfig { .cloned() .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str())) .transpose()?; + let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id); let transformer = self.encoding.transformer(); @@ -222,6 +238,7 @@ impl S3SinkConfig { filename_append_uuid: self.filename_append_uuid, encoder: (transformer, encoder), compression: self.compression, + filename_tz_offset: offset, }; let sink = S3Sink::new(service, request_options, partitioner, batch_settings); diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index 27c53c1214429..01c78dae08523 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -61,7 +61,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() { config.key_prefix = "test-prefix".to_string(); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -95,7 +95,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() { config.key_prefix = "test-prefix/".to_string(); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -132,7 +132,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() { config.options.ssekms_key_id = Some("alias/aws/s3".to_string()); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -170,7 +170,7 @@ async fn s3_rotate_files_after_the_buffer_size_is_reached() { }; let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, _events) = random_lines_with_stream(100, 30, None); @@ -229,7 +229,7 @@ async fn s3_gzip() { let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -274,7 +274,7 @@ async fn s3_zstd() { let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -336,7 +336,7 @@ async fn s3_insert_message_into_object_lock() { let config = config(&bucket, 1000000); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -368,7 +368,7 @@ async fn acknowledges_failures() { config.bucket = format!("BREAK{}IT", config.bucket); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (_lines, events, receiver) = make_events_batch(1, 1); run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; @@ -434,11 +434,12 @@ async fn s3_flush_on_exhaustion() { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } }; let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size) @@ -517,6 +518,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } } diff --git a/src/sinks/aws_s3/mod.rs b/src/sinks/aws_s3/mod.rs index fd3a3418b4716..3027de1bb6287 100644 --- a/src/sinks/aws_s3/mod.rs +++ b/src/sinks/aws_s3/mod.rs @@ -3,4 +3,4 @@ mod sink; mod integration_tests; -pub use self::config::S3SinkConfig; +pub use config::S3SinkConfig; diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 553908064b680..a4fc542437106 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,7 +1,7 @@ use std::io; use bytes::Bytes; -use chrono::Utc; +use chrono::{FixedOffset, Utc}; use uuid::Uuid; use vector_lib::codecs::encoding::Framer; use vector_lib::event::Finalizable; @@ -32,6 +32,7 @@ pub struct S3RequestOptions { pub api_options: S3Options, pub encoder: (Transformer, Encoder), pub compression: Compression, + pub filename_tz_offset: Option, } impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { @@ -76,7 +77,14 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { payload: EncodeResult, ) -> Self::Request { let filename = { - let formatted_ts = Utc::now().format(self.filename_time_format.as_str()); + let formatted_ts = match self.filename_tz_offset { + Some(offset) => Utc::now() + .with_timezone(&offset) + .format(self.filename_time_format.as_str()), + None => Utc::now() + .with_timezone(&chrono::Utc) + .format(self.filename_time_format.as_str()), + }; self.filename_append_uuid .then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().hyphenated())) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 05609edc8e25a..5392ec57647ec 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -22,7 +22,7 @@ use vector_lib::codecs::{ use vector_lib::configurable::configurable_component; use vector_lib::{ internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered}, - EstimatedJsonEncodedSizeOf, + EstimatedJsonEncodedSizeOf, TimeZone, }; use crate::{ @@ -33,9 +33,10 @@ use crate::{ internal_events::{ FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError, }, - sinks::util::StreamSink, + sinks::util::{timezone_to_offset, StreamSink}, template::Template, }; + mod bytes_path; use bytes_path::BytesPath; @@ -84,6 +85,10 @@ pub struct FileSinkConfig { )] pub acknowledgements: AcknowledgementsConfig, + #[configurable(derived)] + #[serde(default)] + pub timezone: Option, + #[configurable(derived)] #[serde(default)] internal_metrics: FileInternalMetricsConfig, @@ -97,6 +102,7 @@ impl GenerateConfig for FileSinkConfig { encoding: (None::, TextSerializerConfig::default()).into(), compression: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }) .unwrap() @@ -181,9 +187,9 @@ impl OutFile { impl SinkConfig for FileSinkConfig { async fn build( &self, - _cx: SinkContext, + cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = FileSink::new(self)?; + let sink = FileSink::new(self, cx)?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -211,13 +217,18 @@ pub struct FileSink { } impl FileSink { - pub fn new(config: &FileSinkConfig) -> crate::Result { + pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); + let offset = config + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); + Ok(Self { - path: config.path.clone(), + path: config.path.clone().with_tz_offset(offset), transformer, encoder, idle_timeout: config.idle_timeout, @@ -464,6 +475,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }; @@ -487,6 +499,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Gzip, acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }; @@ -510,6 +523,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Zstd, acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }; @@ -538,6 +552,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }; @@ -617,6 +632,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + timezone: Default::default(), internal_metrics: Default::default(), }; @@ -626,7 +642,7 @@ mod tests { let sink_handle = tokio::spawn(async move { assert_sink_compliance(&FILE_SINK_TAGS, async move { - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, SinkContext::default()).unwrap(); VectorSink::from_event_streamsink(sink) .run(Box::pin(rx.map(Into::into))) .await @@ -670,7 +686,7 @@ mod tests { async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator + Send) { assert_sink_compliance(&FILE_SINK_TAGS, async move { - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, SinkContext::default()).unwrap(); VectorSink::from_event_streamsink(sink) .run(Box::pin(stream::iter(events.map(Into::into)))) .await diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index af1f0cd2ea41c..bb06e6a18ceba 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; -use chrono::Utc; +use chrono::{FixedOffset, Utc}; use http::header::{HeaderName, HeaderValue}; use http::Uri; use indoc::indoc; @@ -12,7 +12,7 @@ use uuid::Uuid; use vector_lib::codecs::encoding::Framer; use vector_lib::configurable::configurable_component; use vector_lib::event::{EventFinalizers, Finalizable}; -use vector_lib::request_metadata::RequestMetadata; +use vector_lib::{request_metadata::RequestMetadata, TimeZone}; use crate::sinks::util::metadata::RequestMetadataBuilder; use crate::{ @@ -32,8 +32,8 @@ use crate::{ }, util::{ batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult, - BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt, - TowerRequestConfig, + timezone_to_offset, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, + ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -164,6 +164,10 @@ pub struct GcsSinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub timezone: Option, } fn default_time_format() -> String { @@ -188,6 +192,7 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { auth: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } } @@ -218,7 +223,7 @@ impl SinkConfig for GcsSinkConfig { auth.clone(), )?; auth.spawn_regenerate_token(); - let sink = self.build_sink(client, base_url, auth)?; + let sink = self.build_sink(client, base_url, auth, cx)?; Ok((sink, healthcheck)) } @@ -238,6 +243,7 @@ impl GcsSinkConfig { client: HttpClient, base_url: String, auth: GcpAuthenticator, + cx: SinkContext, ) -> crate::Result { let request = self.request.unwrap_with(&TowerRequestConfig { rate_limit_num: Some(1000), @@ -254,7 +260,7 @@ impl GcsSinkConfig { .settings(request, GcsRetryLogic) .service(GcsService::new(client, base_url, auth)); - let request_settings = RequestSettings::new(self)?; + let request_settings = RequestSettings::new(self, cx)?; let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); @@ -284,6 +290,7 @@ struct RequestSettings { append_uuid: bool, encoder: (Transformer, Encoder), compression: Compression, + tz_offset: Option, } impl RequestBuilder<(String, Vec)> for RequestSettings { @@ -322,7 +329,12 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { let (key, finalizers) = gcp_metadata; // TODO: pull the seconds from the last event let filename = { - let seconds = Utc::now().format(&self.time_format); + let seconds = match self.tz_offset { + Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format), + None => Utc::now() + .with_timezone(&chrono::Utc) + .format(&self.time_format), + }; if self.append_uuid { let uuid = Uuid::new_v4(); @@ -352,7 +364,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } impl RequestSettings { - fn new(config: &GcsSinkConfig) -> crate::Result { + fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); @@ -382,6 +394,11 @@ impl RequestSettings { .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; + let offset = config + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); + Ok(Self { acl, content_type, @@ -393,6 +410,7 @@ impl RequestSettings { append_uuid, compression: config.compression, encoder: (transformer, encoder), + tz_offset: offset, }) } } @@ -442,7 +460,12 @@ mod tests { let config = default_config((None::, JsonSerializerConfig::default()).into()); let sink = config - .build_sink(client, mock_endpoint.to_string(), GcpAuthenticator::None) + .build_sink( + client, + mock_endpoint.to_string(), + GcpAuthenticator::None, + context, + ) .expect("failed to build sink"); let event = Event::Log(LogEvent::from("simple message")); @@ -470,11 +493,12 @@ mod tests { assert_eq!(key, "key: value"); } - fn request_settings(sink_config: &GcsSinkConfig) -> RequestSettings { - RequestSettings::new(sink_config).expect("Could not create request settings") + fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { + RequestSettings::new(sink_config, context).expect("Could not create request settings") } fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest { + let context = SinkContext::default(); let sink_config = GcsSinkConfig { key_prefix: Some("key/".into()), filename_time_format: "date".into(), @@ -499,7 +523,7 @@ mod tests { let mut byte_size = GroupedCountByteSize::new_untagged(); byte_size.add_event(&log, log.estimated_json_encoded_size_of()); - let request_settings = request_settings(&sink_config); + let request_settings = request_settings(&sink_config, context); let (metadata, metadata_request_builder, _events) = request_settings.split_input((key, vec![log])); let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index f0a233b7037e9..23474327cdfc5 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -52,9 +52,10 @@ pub use service::{ pub use sink::{BatchSink, PartitionBatchSink, StreamSink}; use snafu::Snafu; pub use uri::UriSerde; -use vector_lib::json_size::JsonSize; +use vector_lib::{json_size::JsonSize, TimeZone}; use crate::event::EventFinalizers; +use chrono::{FixedOffset, Offset, Utc}; #[derive(Debug, Snafu)] enum SinkBuildError { @@ -134,3 +135,10 @@ impl ElementCount for Vec { self.len() } } + +pub fn timezone_to_offset(tz: TimeZone) -> Option { + match tz { + TimeZone::Local => Some(*Utc::now().with_timezone(&chrono::Local).offset()), + TimeZone::Named(tz) => Some(Utc::now().with_timezone(&tz).offset().fix()), + } +} diff --git a/src/template.rs b/src/template.rs index 3eccf14712684..136a2bd3d2547 100644 --- a/src/template.rs +++ b/src/template.rs @@ -4,7 +4,7 @@ use std::{borrow::Cow, convert::TryFrom, fmt, hash::Hash, path::PathBuf}; use bytes::Bytes; use chrono::{ format::{strftime::StrftimeItems, Item}, - Utc, + FixedOffset, Utc, }; use once_cell::sync::Lazy; use regex::Regex; @@ -63,6 +63,9 @@ pub struct Template { #[serde(skip)] reserve_size: usize, + + #[serde(skip)] + tz_offset: Option, } impl TryFrom<&str> for Template { @@ -116,6 +119,7 @@ impl TryFrom> for Template { src: src.into_owned(), is_static, reserve_size, + tz_offset: None, } }) } @@ -137,6 +141,11 @@ impl fmt::Display for Template { impl ConfigurableString for Template {} impl Template { + /// set tz offset + pub const fn with_tz_offset(mut self, tz_offset: Option) -> Self { + self.tz_offset = tz_offset; + self + } /// Renders the given template with data from the event. pub fn render<'a>( &self, @@ -163,7 +172,9 @@ impl Template { for part in &self.parts { match part { Part::Literal(lit) => out.push_str(lit), - Part::Strftime(items) => out.push_str(&render_timestamp(items, event)), + Part::Strftime(items) => { + out.push_str(&render_timestamp(items, event, self.tz_offset)) + } Part::Reference(key) => { out.push_str( &match event { @@ -344,8 +355,12 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>) -> String { - match event { +fn render_timestamp( + items: &ParsedStrftime, + event: EventRef<'_>, + tz_offset: Option, +) -> String { + let timestamp = match event { EventRef::Log(log) => log_schema() .timestamp_key_target_path() .and_then(|timestamp_key| { @@ -365,14 +380,24 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>) -> String { }) } } - .unwrap_or_else(Utc::now) - .format_with_items(items.as_items()) - .to_string() + .unwrap_or_else(Utc::now); + + match tz_offset { + Some(offset) => timestamp + .with_timezone(&offset) + .format_with_items(items.as_items()) + .to_string(), + None => timestamp + .with_timezone(&chrono::Utc) + .format_with_items(items.as_items()) + .to_string(), + } } #[cfg(test)] mod tests { - use chrono::TimeZone; + use chrono::{Offset, TimeZone, Utc}; + use chrono_tz::Tz; use vector_lib::lookup::{metadata_path, PathPrefix}; use vector_lib::metric_tags; @@ -654,6 +679,25 @@ mod tests { ); } + #[test] + fn render_log_with_timezone() { + let ts = Utc.with_ymd_and_hms(2001, 2, 3, 4, 5, 6).unwrap(); + + let template = Template::try_from("vector-%Y-%m-%d-%H.log").unwrap(); + let mut event = Event::Log(LogEvent::from("hello world")); + event.as_mut_log().insert( + (PathPrefix::Event, log_schema().timestamp_key().unwrap()), + ts, + ); + + let tz: Tz = "Asia/Singapore".parse().unwrap(); + let offset = Some(Utc::now().with_timezone(&tz).offset().fix()); + assert_eq!( + Ok(Bytes::from("vector-2001-02-03-12.log")), + template.with_tz_offset(offset).render(&event) + ); + } + fn sample_metric() -> Metric { Metric::new( "a-counter", diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 55ad8f643d934..3c92f678061e6 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -925,6 +925,17 @@ base: components: sinks: aws_s3: configuration: { } } } + timezone: { + description: """ + Timezone to use for any date specifiers in template strings. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + """ + required: false + type: string: examples: ["local", "America/New_York", "EST5EDT"] + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index 00837da33d4d1..9d2085c704e2a 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -351,4 +351,15 @@ base: components: sinks: file: configuration: { syntax: "template" } } + timezone: { + description: """ + Timezone to use for any date specifiers in template strings. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + """ + required: false + type: string: examples: ["local", "America/New_York", "EST5EDT"] + } } diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index 8bcf9922c23a0..c6b5791eba0ea 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -701,6 +701,17 @@ base: components: sinks: gcp_cloud_storage: configuration: { """ } } + timezone: { + description: """ + Timezone to use for any date specifiers in template strings. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + """ + required: false + type: string: examples: ["local", "America/New_York", "EST5EDT"] + } tls: { description: "TLS configuration." required: false