From fe5873f3c44c8cd9ef1eed9ac475cdd1b1d29641 Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 30 Mar 2023 12:43:10 +0800 Subject: [PATCH 01/29] add TzOffset as File Sink configuration --- src/sinks/file/tz_offset.rs | 55 +++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/sinks/file/tz_offset.rs diff --git a/src/sinks/file/tz_offset.rs b/src/sinks/file/tz_offset.rs new file mode 100644 index 0000000000000..de6dc5e17df90 --- /dev/null +++ b/src/sinks/file/tz_offset.rs @@ -0,0 +1,55 @@ +use std::convert::{ + TryFrom, Into +}; + +use std::default::Default; + +use vector_config::configurable_component; + +use chrono::{ + DateTime, + FixedOffset, + ParseError +}; + +/// handle tz offset configuration +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(try_from = "String", into = "String")] +pub struct TzOffset(FixedOffset); + +impl TzOffset { + pub fn offset(&self) -> FixedOffset { + self.0 + } +} + +impl TryFrom for TzOffset { + type Error = ParseError; + + fn try_from(value: String) -> Result { + let dt = DateTime::parse_from_str( + format!("2000-01-01 00:00:00 {}", value).as_str(), + "%Y-%m-%d %H:%M:%S %z" + )?; + Ok(TzOffset(*dt.offset())) + } +} + +impl Into for TzOffset { + fn into(self) -> String { + self.0.to_string() + } +} + +impl Default for TzOffset { + fn default() -> TzOffset { + TzOffset(FixedOffset::east_opt(0).unwrap()) + } +} + +impl std::fmt::Display for TzOffset { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} From 797d9faeb5400744ef8e710c7d47a11056355c58 Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 30 Mar 2023 12:44:12 +0800 Subject: [PATCH 02/29] integrate TzOffset into File Sink --- src/sinks/file/mod.rs | 18 +++++++++++++- src/template.rs | 56 ++++++++++++++++++++++++++++++------------- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 0620ac5971851..c0077ddcb8f5c 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -13,6 +13,7 @@ use futures::{ stream::{BoxStream, StreamExt}, FutureExt, }; + use serde_with::serde_as; use tokio::{ fs::{self, File}, @@ -35,8 +36,10 @@ use crate::{ template::Template, }; mod bytes_path; +mod tz_offset; use bytes_path::BytesPath; +use tz_offset::TzOffset; /// Configuration for the `file` sink. #[serde_as] @@ -80,6 +83,13 @@ pub struct FileSinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub tz_offset: TzOffset, } impl GenerateConfig for FileSinkConfig { @@ -90,6 +100,7 @@ impl GenerateConfig for FileSinkConfig { encoding: (None::, TextSerializerConfig::default()).into(), compression: Default::default(), acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }) .unwrap() } @@ -207,7 +218,7 @@ impl FileSink { let encoder = Encoder::::new(framer, serializer); Ok(Self { - path: config.path.clone(), + path: config.path.clone().with_tz_offset(config.tz_offset.offset()), transformer, encoder, idle_timeout: config.idle_timeout, @@ -452,6 +463,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }; let (input, _events) = random_lines_with_stream(100, 64, None); @@ -474,6 +486,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Gzip, acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -496,6 +509,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Zstd, acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -523,6 +537,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }; let (mut input, _events) = random_events_with_stream(32, 8, None); @@ -600,6 +615,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), + tz_offset: TzOffset::default(), }; let (mut input, _events) = random_lines_with_stream(10, 64, None); diff --git a/src/template.rs b/src/template.rs index 7a5e1f116548b..1458980db09b1 100644 --- a/src/template.rs +++ b/src/template.rs @@ -4,7 +4,7 @@ use std::{borrow::Cow, convert::TryFrom, fmt, hash::Hash, path::PathBuf}; use bytes::Bytes; use chrono::{ format::{strftime::StrftimeItems, Item}, - Utc, + Utc, FixedOffset, }; use lookup::lookup_v2::parse_target_path; use lookup::PathPrefix; @@ -64,6 +64,9 @@ pub struct Template { #[serde(skip)] reserve_size: usize, + + #[serde(skip)] + tz_offset: Option, } impl TryFrom<&str> for Template { @@ -112,11 +115,14 @@ impl TryFrom> for Template { }) .sum(); + let tz_offset = FixedOffset::east_opt(0); + Template { parts, src: src.into_owned(), is_static, reserve_size, + tz_offset, } }) } @@ -138,6 +144,11 @@ impl fmt::Display for Template { impl ConfigurableString for Template {} impl Template { + /// set tz offset + pub fn with_tz_offset(mut self, tz_offset: FixedOffset) -> Self { + self.tz_offset = Some(tz_offset); + self + } /// Renders the given template with data from the event. pub fn render<'a>( &self, @@ -164,7 +175,7 @@ impl Template { for part in &self.parts { match part { Part::Literal(lit) => out.push_str(lit), - Part::Strftime(items) => out.push_str(&render_timestamp(items, event)), + Part::Strftime(items) => out.push_str(&render_timestamp(items, event, self.tz_offset.unwrap())), Part::Reference(key) => { out.push_str( &match event { @@ -341,22 +352,33 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>) -> String { +fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: FixedOffset) -> String { match event { - EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { - log.get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }), - EventRef::Metric(metric) => metric.timestamp(), - EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| { - trace - .get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }), - } - .unwrap_or_else(Utc::now) + EventRef::Log(log) => { + log_schema().timestamp_key().and_then(|timestamp_key| { + log.get((PathPrefix::Event, timestamp_key)) + .and_then(Value::as_timestamp) + .copied() + }) + .unwrap_or_else(Utc::now) + .with_timezone(&tz_offset) + }, + EventRef::Metric(metric) => { + metric.timestamp() + .unwrap_or_else(Utc::now) + .with_timezone(&tz_offset) + }, + EventRef::Trace(trace) => { + log_schema().timestamp_key().and_then(|timestamp_key| { + trace + .get((PathPrefix::Event, timestamp_key)) + .and_then(Value::as_timestamp) + .copied() + }) + .unwrap_or_else(Utc::now) + .with_timezone(&tz_offset) + }, + } .format_with_items(items.as_items()) .to_string() } From b24aa2d901451b201777a3c61de223704a2e113c Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 30 Mar 2023 14:27:54 +0800 Subject: [PATCH 03/29] apply tz offset to all log event in render_timestamp. --- src/template.rs | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/src/template.rs b/src/template.rs index 1458980db09b1..5bfa0174ca863 100644 --- a/src/template.rs +++ b/src/template.rs @@ -354,31 +354,21 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: FixedOffset) -> String { match event { - EventRef::Log(log) => { - log_schema().timestamp_key().and_then(|timestamp_key| { - log.get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }) - .unwrap_or_else(Utc::now) - .with_timezone(&tz_offset) - }, - EventRef::Metric(metric) => { - metric.timestamp() - .unwrap_or_else(Utc::now) - .with_timezone(&tz_offset) - }, - EventRef::Trace(trace) => { - log_schema().timestamp_key().and_then(|timestamp_key| { - trace - .get((PathPrefix::Event, timestamp_key)) - .and_then(Value::as_timestamp) - .copied() - }) - .unwrap_or_else(Utc::now) - .with_timezone(&tz_offset) - }, - } + EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { + log.get((PathPrefix::Event, timestamp_key)) + .and_then(Value::as_timestamp) + .copied() + }), + EventRef::Metric(metric) => metric.timestamp(), + EventRef::Trace(trace) => log_schema().timestamp_key().and_then(|timestamp_key| { + trace + .get((PathPrefix::Event, timestamp_key)) + .and_then(Value::as_timestamp) + .copied() + }), + } + .unwrap_or_else(Utc::now) + .with_timezone(&tz_offset) .format_with_items(items.as_items()) .to_string() } From 1540a70549778122308a889a4d9c84d469736c3a Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 31 Mar 2023 17:16:57 +0800 Subject: [PATCH 04/29] added TzOffset tests --- src/sinks/file/tz_offset.rs | 61 +++++++++++++++++++++++++++++++------ src/template.rs | 27 +++++++++++++++- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/src/sinks/file/tz_offset.rs b/src/sinks/file/tz_offset.rs index de6dc5e17df90..b569257702e27 100644 --- a/src/sinks/file/tz_offset.rs +++ b/src/sinks/file/tz_offset.rs @@ -1,16 +1,11 @@ -use std::convert::{ - TryFrom, Into +use chrono::{DateTime, FixedOffset, ParseError}; +use std::{ + convert::{TryFrom, Into}, + default::Default, }; -use std::default::Default; - use vector_config::configurable_component; -use chrono::{ - DateTime, - FixedOffset, - ParseError -}; /// handle tz offset configuration #[configurable_component] @@ -36,6 +31,18 @@ impl TryFrom for TzOffset { } } +impl TryFrom<&str> for TzOffset { + type Error = ParseError; + + fn try_from(value: &str) -> Result { + let dt = DateTime::parse_from_str( + format!("2000-01-01 00:00:00 {}", value).as_str(), + "%Y-%m-%d %H:%M:%S %z" + )?; + Ok(TzOffset(*dt.offset())) + } +} + impl Into for TzOffset { fn into(self) -> String { self.0.to_string() @@ -53,3 +60,39 @@ impl std::fmt::Display for TzOffset { write!(f, "{}", self.0) } } + +#[cfg(test)] +mod tests { + use chrono::FixedOffset; + use super::*; + + #[test] + fn parse_str() { + let tz_offset = TzOffset::try_from("+08:00").unwrap(); + + assert_eq!( + tz_offset.to_string(), + "+08:00".to_string() + ); + } + + #[test] + fn parse_string() { + let tz_offset = TzOffset::try_from("+08:00".to_string()).unwrap(); + + assert_eq!( + tz_offset.to_string(), + "+08:00".to_string() + ); + } + + #[test] + fn check_offset() { + let fixed_offset = FixedOffset::east_opt(28800).unwrap(); + let tz_offset = TzOffset::try_from("+08:00".to_string()).unwrap(); + assert_eq!( + tz_offset.offset(), + fixed_offset + ); + } +} diff --git a/src/template.rs b/src/template.rs index 5bfa0174ca863..00a4d87c502f7 100644 --- a/src/template.rs +++ b/src/template.rs @@ -375,7 +375,7 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Fixe #[cfg(test)] mod tests { - use chrono::TimeZone; + use chrono::{TimeZone, FixedOffset}; use lookup::metadata_path; use vector_core::metric_tags; @@ -674,6 +674,31 @@ mod tests { ); } + #[test] + fn render_log_with_tz_offset() { + let ts = Utc + .ymd(2001, 2, 3) + .and_hms_opt(4, 5, 6) + .expect("invalid timestamp"); + + let template = Template::try_from("vector-%Y-%m-%d-%H.log").unwrap(); + let mut event = Event::Log(LogEvent::from("hello world")); + event.as_mut_log().insert( + ( + lookup::PathPrefix::Event, + log_schema().timestamp_key().unwrap(), + ), + ts, + ); + + // +08:00 tz offset + let offset = FixedOffset::east_opt(28800).unwrap(); + assert_eq!( + Ok(Bytes::from("vector-2001-02-03-12.log")), + template.with_tz_offset(offset).render(&event) + ); + } + fn sample_metric() -> Metric { Metric::new( "a-counter", From edb2682e3ac7d5052d4a595b2c307132fd63a28f Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 3 Apr 2023 19:13:31 +0800 Subject: [PATCH 05/29] adding chrono-tz for parsing timezones --- Cargo.lock | 1 + Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 01e2588679b1d..55f3a637700aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9346,6 +9346,7 @@ dependencies = [ "bytes 1.4.0", "bytesize", "chrono", + "chrono-tz", "cidr-utils", "clap 4.1.14", "codecs", diff --git a/Cargo.toml b/Cargo.toml index 5f584f015eb29..3d460a8d5ec21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -241,6 +241,7 @@ bollard = { version = "0.14.0", default-features = false, features = ["ssl", "ch bytes = { version = "1.4.0", default-features = false, features = ["serde"] } bytesize = { version = "1.2.0", default-features = false } chrono = { version = "0.4.22", default-features = false, features = ["serde"] } +chrono-tz = { version = "0.8.1", default-features = false } cidr-utils = { version = "0.5.10", default-features = false } clap = { version = "4.1.14", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.0", default-features = false } From 5454523458fed31bd98596fadc64c49c98068378 Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 3 Apr 2023 19:15:14 +0800 Subject: [PATCH 06/29] rename tz_offset to path_tz. timezones are safer than offsets --- src/sinks/file/path_tz.rs | 96 ++++++++++++++++++++++++++++++++++++ src/sinks/file/tz_offset.rs | 98 ------------------------------------- 2 files changed, 96 insertions(+), 98 deletions(-) create mode 100644 src/sinks/file/path_tz.rs delete mode 100644 src/sinks/file/tz_offset.rs diff --git a/src/sinks/file/path_tz.rs b/src/sinks/file/path_tz.rs new file mode 100644 index 0000000000000..dc23998c9b365 --- /dev/null +++ b/src/sinks/file/path_tz.rs @@ -0,0 +1,96 @@ +//use chrono::{DateTime, FixedOffset, ParseError}; +use chrono_tz::{UTC, Tz, ParseError}; +use std::{ + convert::{TryFrom, Into}, + default::Default, +}; + +use vector_config::configurable_component; + + +/// handle tz offset configuration +#[configurable_component] +#[derive(Clone, Debug, Eq, PartialEq)] +#[serde(try_from = "String", into = "String")] +pub struct PathTz(Tz); + +impl PathTz { + pub fn timezone(&self) -> Tz { + self.0 + } +} + +impl TryFrom for PathTz { + type Error = ParseError; + + fn try_from(value: String) -> Result { + let tz: Tz = value.parse()?; + Ok(PathTz(tz)) + } +} + +impl TryFrom<&str> for PathTz { + type Error = ParseError; + + fn try_from(value: &str) -> Result { + let tz: Tz = value.parse()?; + Ok(PathTz(tz)) + } +} + +impl Into for PathTz { + fn into(self) -> String { + self.0.to_string() + } +} + +impl Default for PathTz { + fn default() -> PathTz { + PathTz(UTC) + } +} + +// impl std::fmt::Display for TzOffset { +// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +// write!(f, "{}", self.0) +// } +// } + +#[cfg(test)] +mod tests { + use chrono_tz::{UTC, Tz, Asia::Singapore}; + use super::*; + + #[test] + fn parse_str() { + let path_tz = PathTz::try_from("Asia/Singapore").unwrap(); + let tz: Tz = path_tz.timezone(); + + assert_eq!( + tz, + Singapore + ); + } + + #[test] + fn parse_string() { + let path_tz = PathTz::try_from("Asia/Singapore".to_string()).unwrap(); + let tz: Tz = path_tz.timezone(); + + assert_eq!( + tz, + Singapore + ); + } + + #[test] + fn utc_timezone() { + let path_tz = PathTz::try_from("UTC").unwrap(); + let tz: Tz = path_tz.timezone(); + + assert_eq!( + tz, + UTC + ); + } +} diff --git a/src/sinks/file/tz_offset.rs b/src/sinks/file/tz_offset.rs deleted file mode 100644 index b569257702e27..0000000000000 --- a/src/sinks/file/tz_offset.rs +++ /dev/null @@ -1,98 +0,0 @@ -use chrono::{DateTime, FixedOffset, ParseError}; -use std::{ - convert::{TryFrom, Into}, - default::Default, -}; - -use vector_config::configurable_component; - - -/// handle tz offset configuration -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -#[serde(try_from = "String", into = "String")] -pub struct TzOffset(FixedOffset); - -impl TzOffset { - pub fn offset(&self) -> FixedOffset { - self.0 - } -} - -impl TryFrom for TzOffset { - type Error = ParseError; - - fn try_from(value: String) -> Result { - let dt = DateTime::parse_from_str( - format!("2000-01-01 00:00:00 {}", value).as_str(), - "%Y-%m-%d %H:%M:%S %z" - )?; - Ok(TzOffset(*dt.offset())) - } -} - -impl TryFrom<&str> for TzOffset { - type Error = ParseError; - - fn try_from(value: &str) -> Result { - let dt = DateTime::parse_from_str( - format!("2000-01-01 00:00:00 {}", value).as_str(), - "%Y-%m-%d %H:%M:%S %z" - )?; - Ok(TzOffset(*dt.offset())) - } -} - -impl Into for TzOffset { - fn into(self) -> String { - self.0.to_string() - } -} - -impl Default for TzOffset { - fn default() -> TzOffset { - TzOffset(FixedOffset::east_opt(0).unwrap()) - } -} - -impl std::fmt::Display for TzOffset { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -#[cfg(test)] -mod tests { - use chrono::FixedOffset; - use super::*; - - #[test] - fn parse_str() { - let tz_offset = TzOffset::try_from("+08:00").unwrap(); - - assert_eq!( - tz_offset.to_string(), - "+08:00".to_string() - ); - } - - #[test] - fn parse_string() { - let tz_offset = TzOffset::try_from("+08:00".to_string()).unwrap(); - - assert_eq!( - tz_offset.to_string(), - "+08:00".to_string() - ); - } - - #[test] - fn check_offset() { - let fixed_offset = FixedOffset::east_opt(28800).unwrap(); - let tz_offset = TzOffset::try_from("+08:00".to_string()).unwrap(); - assert_eq!( - tz_offset.offset(), - fixed_offset - ); - } -} From 535f5772995cc475b79ef3ff51360b42f8ff83d9 Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 3 Apr 2023 19:16:31 +0800 Subject: [PATCH 07/29] update tz_offset references to path_tz --- src/sinks/file/mod.rs | 20 ++++++++++---------- src/template.rs | 29 ++++++++++++++--------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index c0077ddcb8f5c..45c8e50f8b577 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -36,10 +36,10 @@ use crate::{ template::Template, }; mod bytes_path; -mod tz_offset; +mod path_tz; use bytes_path::BytesPath; -use tz_offset::TzOffset; +use path_tz::PathTz; /// Configuration for the `file` sink. #[serde_as] @@ -89,7 +89,7 @@ pub struct FileSinkConfig { default, skip_serializing_if = "crate::serde::skip_serializing_if_default" )] - pub tz_offset: TzOffset, + pub path_tz: PathTz, } impl GenerateConfig for FileSinkConfig { @@ -100,7 +100,7 @@ impl GenerateConfig for FileSinkConfig { encoding: (None::, TextSerializerConfig::default()).into(), compression: Default::default(), acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }) .unwrap() } @@ -218,7 +218,7 @@ impl FileSink { let encoder = Encoder::::new(framer, serializer); Ok(Self { - path: config.path.clone().with_tz_offset(config.tz_offset.offset()), + path: config.path.clone().with_path_tz(config.path_tz.timezone()), transformer, encoder, idle_timeout: config.idle_timeout, @@ -463,7 +463,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }; let (input, _events) = random_lines_with_stream(100, 64, None); @@ -486,7 +486,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Gzip, acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -509,7 +509,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Zstd, acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -537,7 +537,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }; let (mut input, _events) = random_events_with_stream(32, 8, None); @@ -615,7 +615,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - tz_offset: TzOffset::default(), + path_tz: PathTz::default(), }; let (mut input, _events) = random_lines_with_stream(10, 64, None); diff --git a/src/template.rs b/src/template.rs index 00a4d87c502f7..f904b7b06aca1 100644 --- a/src/template.rs +++ b/src/template.rs @@ -4,8 +4,10 @@ use std::{borrow::Cow, convert::TryFrom, fmt, hash::Hash, path::PathBuf}; use bytes::Bytes; use chrono::{ format::{strftime::StrftimeItems, Item}, - Utc, FixedOffset, + Utc, }; + +use chrono_tz::{UTC, Tz}; use lookup::lookup_v2::parse_target_path; use lookup::PathPrefix; use once_cell::sync::Lazy; @@ -66,7 +68,7 @@ pub struct Template { reserve_size: usize, #[serde(skip)] - tz_offset: Option, + path_tz: Option, } impl TryFrom<&str> for Template { @@ -115,14 +117,12 @@ impl TryFrom> for Template { }) .sum(); - let tz_offset = FixedOffset::east_opt(0); - Template { parts, src: src.into_owned(), is_static, reserve_size, - tz_offset, + path_tz: Some(UTC), } }) } @@ -145,8 +145,8 @@ impl ConfigurableString for Template {} impl Template { /// set tz offset - pub fn with_tz_offset(mut self, tz_offset: FixedOffset) -> Self { - self.tz_offset = Some(tz_offset); + pub fn with_path_tz(mut self, path_tz: Tz) -> Self { + self.path_tz = Some(path_tz); self } /// Renders the given template with data from the event. @@ -175,7 +175,7 @@ impl Template { for part in &self.parts { match part { Part::Literal(lit) => out.push_str(lit), - Part::Strftime(items) => out.push_str(&render_timestamp(items, event, self.tz_offset.unwrap())), + Part::Strftime(items) => out.push_str(&render_timestamp(items, event, self.path_tz.unwrap())), Part::Reference(key) => { out.push_str( &match event { @@ -352,7 +352,7 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: FixedOffset) -> String { +fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, path_tz: Tz) -> String { match event { EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { log.get((PathPrefix::Event, timestamp_key)) @@ -368,14 +368,14 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Fixe }), } .unwrap_or_else(Utc::now) - .with_timezone(&tz_offset) + .with_timezone(&path_tz) .format_with_items(items.as_items()) .to_string() } #[cfg(test)] mod tests { - use chrono::{TimeZone, FixedOffset}; + use chrono::{Utc, TimeZone}; use lookup::metadata_path; use vector_core::metric_tags; @@ -675,7 +675,7 @@ mod tests { } #[test] - fn render_log_with_tz_offset() { + fn render_log_with_path_tz() { let ts = Utc .ymd(2001, 2, 3) .and_hms_opt(4, 5, 6) @@ -691,11 +691,10 @@ mod tests { ts, ); - // +08:00 tz offset - let offset = FixedOffset::east_opt(28800).unwrap(); + let tz = "Asia/Singapore".parse().unwrap(); assert_eq!( Ok(Bytes::from("vector-2001-02-03-12.log")), - template.with_tz_offset(offset).render(&event) + template.with_path_tz(tz).render(&event) ); } From 3495afa37216ec03165791a92477affa818fad29 Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 5 Apr 2023 11:07:07 +0800 Subject: [PATCH 08/29] cargo fmt --- src/sinks/file/path_tz.rs | 22 ++++++---------------- src/template.rs | 8 +++++--- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/src/sinks/file/path_tz.rs b/src/sinks/file/path_tz.rs index dc23998c9b365..48a6dd7ef87f0 100644 --- a/src/sinks/file/path_tz.rs +++ b/src/sinks/file/path_tz.rs @@ -1,13 +1,12 @@ //use chrono::{DateTime, FixedOffset, ParseError}; -use chrono_tz::{UTC, Tz, ParseError}; +use chrono_tz::{ParseError, Tz, UTC}; use std::{ - convert::{TryFrom, Into}, + convert::{Into, TryFrom}, default::Default, }; use vector_config::configurable_component; - /// handle tz offset configuration #[configurable_component] #[derive(Clone, Debug, Eq, PartialEq)] @@ -58,18 +57,15 @@ impl Default for PathTz { #[cfg(test)] mod tests { - use chrono_tz::{UTC, Tz, Asia::Singapore}; use super::*; + use chrono_tz::{Asia::Singapore, Tz, UTC}; #[test] fn parse_str() { let path_tz = PathTz::try_from("Asia/Singapore").unwrap(); let tz: Tz = path_tz.timezone(); - assert_eq!( - tz, - Singapore - ); + assert_eq!(tz, Singapore); } #[test] @@ -77,10 +73,7 @@ mod tests { let path_tz = PathTz::try_from("Asia/Singapore".to_string()).unwrap(); let tz: Tz = path_tz.timezone(); - assert_eq!( - tz, - Singapore - ); + assert_eq!(tz, Singapore); } #[test] @@ -88,9 +81,6 @@ mod tests { let path_tz = PathTz::try_from("UTC").unwrap(); let tz: Tz = path_tz.timezone(); - assert_eq!( - tz, - UTC - ); + assert_eq!(tz, UTC); } } diff --git a/src/template.rs b/src/template.rs index f904b7b06aca1..9377313433516 100644 --- a/src/template.rs +++ b/src/template.rs @@ -7,7 +7,7 @@ use chrono::{ Utc, }; -use chrono_tz::{UTC, Tz}; +use chrono_tz::{Tz, UTC}; use lookup::lookup_v2::parse_target_path; use lookup::PathPrefix; use once_cell::sync::Lazy; @@ -175,7 +175,9 @@ impl Template { for part in &self.parts { match part { Part::Literal(lit) => out.push_str(lit), - Part::Strftime(items) => out.push_str(&render_timestamp(items, event, self.path_tz.unwrap())), + Part::Strftime(items) => { + out.push_str(&render_timestamp(items, event, self.path_tz.unwrap())) + } Part::Reference(key) => { out.push_str( &match event { @@ -375,7 +377,7 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, path_tz: Tz) -> #[cfg(test)] mod tests { - use chrono::{Utc, TimeZone}; + use chrono::{TimeZone, Utc}; use lookup::metadata_path; use vector_core::metric_tags; From 7ff862416d5761688800c3eab507eb16bb588121 Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 10 Apr 2023 14:12:20 +0800 Subject: [PATCH 09/29] remove unnecessary commented out code. fmt and generate-component-docs --- src/sinks/file/mod.rs | 1 + src/sinks/file/path_tz.rs | 9 +-------- src/template.rs | 5 +---- website/cue/reference/components/sinks/base/file.cue | 11 +++++++++++ 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index a0076f3d6485f..f65527b01b2fc 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -85,6 +85,7 @@ pub struct FileSinkConfig { pub acknowledgements: AcknowledgementsConfig, #[configurable(derived)] + #[configurable(metadata(docs::examples = "Asia/Singapore"))] #[serde( default, skip_serializing_if = "crate::serde::skip_serializing_if_default" diff --git a/src/sinks/file/path_tz.rs b/src/sinks/file/path_tz.rs index 48a6dd7ef87f0..197b967cfdab8 100644 --- a/src/sinks/file/path_tz.rs +++ b/src/sinks/file/path_tz.rs @@ -1,13 +1,12 @@ -//use chrono::{DateTime, FixedOffset, ParseError}; use chrono_tz::{ParseError, Tz, UTC}; use std::{ convert::{Into, TryFrom}, default::Default, }; - use vector_config::configurable_component; /// handle tz offset configuration +/// Defaults to UTC #[configurable_component] #[derive(Clone, Debug, Eq, PartialEq)] #[serde(try_from = "String", into = "String")] @@ -49,12 +48,6 @@ impl Default for PathTz { } } -// impl std::fmt::Display for TzOffset { -// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { -// write!(f, "{}", self.0) -// } -// } - #[cfg(test)] mod tests { use super::*; diff --git a/src/template.rs b/src/template.rs index 416811fe1b287..842766cfa189a 100644 --- a/src/template.rs +++ b/src/template.rs @@ -678,10 +678,7 @@ mod tests { #[test] fn render_log_with_path_tz() { - let ts = Utc - .ymd(2001, 2, 3) - .and_hms_opt(4, 5, 6) - .expect("invalid timestamp"); + let ts = Utc.with_ymd_and_hms(2001, 2, 3, 4, 5, 6).unwrap(); let template = Template::try_from("vector-%Y-%m-%d-%H.log").unwrap(); let mut event = Event::Log(LogEvent::from("hello world")); diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index 383b9d2086f37..537b8ca1ea9c6 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -243,4 +243,15 @@ base: components: sinks: file: configuration: { syntax: "template" } } + path_tz: { + description: """ + handle tz offset configuration + Defaults to UTC + """ + required: false + type: string: { + default: "UTC" + examples: ["Asia/Singapore"] + } + } } From e37fbfa135543348a86c137ff2cd692afd417a39 Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 12 Apr 2023 15:19:08 +0800 Subject: [PATCH 10/29] clippy suggestions and remove TryFrom<&str> - serde handles converting to String --- src/sinks/file/path_tz.rs | 29 ++++++----------------------- src/template.rs | 2 +- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/src/sinks/file/path_tz.rs b/src/sinks/file/path_tz.rs index 197b967cfdab8..7c6767171ee70 100644 --- a/src/sinks/file/path_tz.rs +++ b/src/sinks/file/path_tz.rs @@ -1,6 +1,6 @@ use chrono_tz::{ParseError, Tz, UTC}; use std::{ - convert::{Into, TryFrom}, + convert::{From, TryFrom}, default::Default, }; use vector_config::configurable_component; @@ -13,7 +13,7 @@ use vector_config::configurable_component; pub struct PathTz(Tz); impl PathTz { - pub fn timezone(&self) -> Tz { + pub const fn timezone(&self) -> Tz { self.0 } } @@ -27,18 +27,9 @@ impl TryFrom for PathTz { } } -impl TryFrom<&str> for PathTz { - type Error = ParseError; - - fn try_from(value: &str) -> Result { - let tz: Tz = value.parse()?; - Ok(PathTz(tz)) - } -} - -impl Into for PathTz { - fn into(self) -> String { - self.0.to_string() +impl From for String { + fn from(path_tz: PathTz) -> String { + path_tz.0.to_string() } } @@ -53,14 +44,6 @@ mod tests { use super::*; use chrono_tz::{Asia::Singapore, Tz, UTC}; - #[test] - fn parse_str() { - let path_tz = PathTz::try_from("Asia/Singapore").unwrap(); - let tz: Tz = path_tz.timezone(); - - assert_eq!(tz, Singapore); - } - #[test] fn parse_string() { let path_tz = PathTz::try_from("Asia/Singapore".to_string()).unwrap(); @@ -71,7 +54,7 @@ mod tests { #[test] fn utc_timezone() { - let path_tz = PathTz::try_from("UTC").unwrap(); + let path_tz = PathTz::try_from("UTC".to_string()).unwrap(); let tz: Tz = path_tz.timezone(); assert_eq!(tz, UTC); diff --git a/src/template.rs b/src/template.rs index 842766cfa189a..1d06b25a61c23 100644 --- a/src/template.rs +++ b/src/template.rs @@ -145,7 +145,7 @@ impl ConfigurableString for Template {} impl Template { /// set tz offset - pub fn with_path_tz(mut self, path_tz: Tz) -> Self { + pub const fn with_path_tz(mut self, path_tz: Tz) -> Self { self.path_tz = Some(path_tz); self } From 86a8a829bf4257a018a2a6bc10b694c6e9ce3336 Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 26 Apr 2023 16:22:05 +0800 Subject: [PATCH 11/29] rename Template config option `path_tz` to `timezone` --- src/template.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/template.rs b/src/template.rs index 1d06b25a61c23..cacd85b8115ad 100644 --- a/src/template.rs +++ b/src/template.rs @@ -68,7 +68,7 @@ pub struct Template { reserve_size: usize, #[serde(skip)] - path_tz: Option, + timezone: Option, } impl TryFrom<&str> for Template { @@ -122,7 +122,7 @@ impl TryFrom> for Template { src: src.into_owned(), is_static, reserve_size, - path_tz: Some(UTC), + timezone: Some(UTC), } }) } @@ -145,8 +145,8 @@ impl ConfigurableString for Template {} impl Template { /// set tz offset - pub const fn with_path_tz(mut self, path_tz: Tz) -> Self { - self.path_tz = Some(path_tz); + pub const fn with_timezone(mut self, timezone: Tz) -> Self { + self.timezone = Some(timezone); self } /// Renders the given template with data from the event. @@ -176,7 +176,7 @@ impl Template { match part { Part::Literal(lit) => out.push_str(lit), Part::Strftime(items) => { - out.push_str(&render_timestamp(items, event, self.path_tz.unwrap())) + out.push_str(&render_timestamp(items, event, self.timezone.unwrap())) } Part::Reference(key) => { out.push_str( @@ -354,7 +354,7 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, path_tz: Tz) -> String { +fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, timezone: Tz) -> String { match event { EventRef::Log(log) => log_schema().timestamp_key().and_then(|timestamp_key| { log.get((PathPrefix::Event, timestamp_key)) @@ -370,7 +370,7 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, path_tz: Tz) -> }), } .unwrap_or_else(Utc::now) - .with_timezone(&path_tz) + .with_timezone(&timezone) .format_with_items(items.as_items()) .to_string() } @@ -677,7 +677,7 @@ mod tests { } #[test] - fn render_log_with_path_tz() { + fn render_log_with_timezone() { let ts = Utc.with_ymd_and_hms(2001, 2, 3, 4, 5, 6).unwrap(); let template = Template::try_from("vector-%Y-%m-%d-%H.log").unwrap(); @@ -693,7 +693,7 @@ mod tests { let tz = "Asia/Singapore".parse().unwrap(); assert_eq!( Ok(Bytes::from("vector-2001-02-03-12.log")), - template.with_path_tz(tz).render(&event) + template.with_timezone(tz).render(&event) ); } From ad76c173f2145864a1a21bec813fb0613ad28e26 Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 26 Apr 2023 16:23:28 +0800 Subject: [PATCH 12/29] move `path_tz.rs` to `src/config` preparing for applying the same to `aws_s3` sink for filename timezone --- src/config/mod.rs | 2 ++ src/{sinks/file => config}/path_tz.rs | 0 src/sinks/file/mod.rs | 4 +--- 3 files changed, 3 insertions(+), 3 deletions(-) rename src/{sinks/file => config}/path_tz.rs (100%) diff --git a/src/config/mod.rs b/src/config/mod.rs index bd9f6c283eaf1..79d76ba88e312 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -29,6 +29,7 @@ pub mod format; mod graph; mod id; mod loading; +pub mod path_tz; pub mod provider; pub mod schema; mod secret; @@ -50,6 +51,7 @@ pub use loading::{ load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets, load_from_str, load_source_from_paths, merge_path_lists, process_paths, CONFIG_PATHS, }; +pub use path_tz::PathTz; pub use provider::ProviderConfig; pub use secret::SecretBackend; pub use sink::{SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter}; diff --git a/src/sinks/file/path_tz.rs b/src/config/path_tz.rs similarity index 100% rename from src/sinks/file/path_tz.rs rename to src/config/path_tz.rs diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index f65527b01b2fc..d2d6a429fd636 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -28,7 +28,7 @@ use vector_core::{ use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, + config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, PathTz, SinkConfig, SinkContext}, event::{Event, EventStatus, Finalizable}, expiring_hash_map::ExpiringHashMap, internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, @@ -36,10 +36,8 @@ use crate::{ template::Template, }; mod bytes_path; -mod path_tz; use bytes_path::BytesPath; -use path_tz::PathTz; /// Configuration for the `file` sink. #[serde_as] From 5e8a2a626fe8698972eea8a34b8973b986efcc50 Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 28 Apr 2023 12:06:05 +0800 Subject: [PATCH 13/29] update doc configuration description for path_tz --- src/config/path_tz.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/config/path_tz.rs b/src/config/path_tz.rs index 7c6767171ee70..84ab330cee2cc 100644 --- a/src/config/path_tz.rs +++ b/src/config/path_tz.rs @@ -5,8 +5,7 @@ use std::{ }; use vector_config::configurable_component; -/// handle tz offset configuration -/// Defaults to UTC +/// Configure timezone #[configurable_component] #[derive(Clone, Debug, Eq, PartialEq)] #[serde(try_from = "String", into = "String")] From 21106bc3bef4b5192211740d8df69c965aca5ab2 Mon Sep 17 00:00:00 2001 From: kates Date: Tue, 2 May 2023 14:51:42 +0800 Subject: [PATCH 14/29] fix wrong method name --- src/sinks/file/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index d2d6a429fd636..b3b48139b854c 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -217,7 +217,7 @@ impl FileSink { let encoder = Encoder::::new(framer, serializer); Ok(Self { - path: config.path.clone().with_path_tz(config.path_tz.timezone()), + path: config.path.clone().with_timezone(config.path_tz.timezone()), transformer, encoder, idle_timeout: config.idle_timeout, From 5e8ae7e229fc609513a2cd5fe3ea10797f80275a Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 7 Sep 2023 14:23:03 +0800 Subject: [PATCH 15/29] AWS and GCS filename timezone support * remove custom tz config * use VRL's timezone config * pass around SinkContext --- Cargo.toml | 2 +- src/config/mod.rs | 2 -- src/config/path_tz.rs | 61 ---------------------------------- src/sinks/aws_s3/config.rs | 17 ++++++++-- src/sinks/aws_s3/sink.rs | 18 ++++++++-- src/sinks/file/mod.rs | 58 +++++++++++++++++++++++--------- src/sinks/gcp/cloud_storage.rs | 37 ++++++++++++++++----- 7 files changed, 103 insertions(+), 92 deletions(-) delete mode 100644 src/config/path_tz.rs diff --git a/Cargo.toml b/Cargo.toml index 6a08aa56796d4..8eb2ffae12b1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -253,7 +253,7 @@ bollard = { version = "0.14.0", default-features = false, features = ["ssl", "ch bytes = { version = "1.4.0", default-features = false, features = ["serde"] } bytesize = { version = "1.3.0", default-features = false } chrono = { version = "0.4.27", default-features = false, features = ["serde"] } -chrono-tz = { version = "0.8.1", default-features = false } +chrono-tz = { version = "0.8.3", default-features = false } cidr-utils = { version = "0.5.10", default-features = false } clap = { version = "4.4.2", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.4", default-features = false } diff --git a/src/config/mod.rs b/src/config/mod.rs index 82d989f9728d6..86b7f7dde7238 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -30,7 +30,6 @@ pub mod format; mod graph; mod id; mod loading; -pub mod path_tz; pub mod provider; pub mod schema; mod secret; @@ -53,7 +52,6 @@ pub use loading::{ load_from_str, load_source_from_paths, merge_path_lists, process_paths, COLLECTOR, CONFIG_PATHS, }; -pub use path_tz::PathTz; pub use provider::ProviderConfig; pub use secret::SecretBackend; pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter}; diff --git a/src/config/path_tz.rs b/src/config/path_tz.rs deleted file mode 100644 index 84ab330cee2cc..0000000000000 --- a/src/config/path_tz.rs +++ /dev/null @@ -1,61 +0,0 @@ -use chrono_tz::{ParseError, Tz, UTC}; -use std::{ - convert::{From, TryFrom}, - default::Default, -}; -use vector_config::configurable_component; - -/// Configure timezone -#[configurable_component] -#[derive(Clone, Debug, Eq, PartialEq)] -#[serde(try_from = "String", into = "String")] -pub struct PathTz(Tz); - -impl PathTz { - pub const fn timezone(&self) -> Tz { - self.0 - } -} - -impl TryFrom for PathTz { - type Error = ParseError; - - fn try_from(value: String) -> Result { - let tz: Tz = value.parse()?; - Ok(PathTz(tz)) - } -} - -impl From for String { - fn from(path_tz: PathTz) -> String { - path_tz.0.to_string() - } -} - -impl Default for PathTz { - fn default() -> PathTz { - PathTz(UTC) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use chrono_tz::{Asia::Singapore, Tz, UTC}; - - #[test] - fn parse_string() { - let path_tz = PathTz::try_from("Asia/Singapore".to_string()).unwrap(); - let tz: Tz = path_tz.timezone(); - - assert_eq!(tz, Singapore); - } - - #[test] - fn utc_timezone() { - let path_tz = PathTz::try_from("UTC".to_string()).unwrap(); - let tz: Tz = path_tz.timezone(); - - assert_eq!(tz, UTC); - } -} diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 6123ee737d22f..eeb0d3be5202a 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -32,6 +32,8 @@ use crate::{ tls::TlsConfig, }; +use vrl::compiler::TimeZone; + /// Configuration for the `aws_s3` sink. #[configurable_component(sink( "aws_s3", @@ -136,6 +138,10 @@ pub struct S3SinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub timezone: Option, } pub(super) fn default_key_prefix() -> String { @@ -163,6 +169,7 @@ impl GenerateConfig for S3SinkConfig { tls: Some(TlsConfig::default()), auth: AwsAuthentication::default(), acknowledgements: Default::default(), + timezone: Default::default(), }) .unwrap() } @@ -174,7 +181,7 @@ impl SinkConfig for S3SinkConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let service = self.create_service(&cx.proxy).await?; let healthcheck = self.build_healthcheck(service.client())?; - let sink = self.build_processor(service)?; + let sink = self.build_processor(service, cx)?; Ok((sink, healthcheck)) } @@ -188,7 +195,7 @@ impl SinkConfig for S3SinkConfig { } impl S3SinkConfig { - pub fn build_processor(&self, service: S3Service) -> crate::Result { + pub fn build_processor(&self, service: S3Service, cx: SinkContext) -> crate::Result { // Build our S3 client/service, which is what we'll ultimately feed // requests into in order to ship files to S3. We build this here in // order to configure the client/service with retries, concurrency @@ -214,6 +221,11 @@ impl S3SinkConfig { let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); + let timezone = match self.timezone { + Some(tz) => tz, + None => cx.globals.timezone(), + }; + let request_options = S3RequestOptions { bucket: self.bucket.clone(), api_options: self.options.clone(), @@ -222,6 +234,7 @@ impl S3SinkConfig { filename_append_uuid: self.filename_append_uuid, encoder: (transformer, encoder), compression: self.compression, + filename_timezone: timezone, }; let sink = S3Sink::new(service, request_options, partitioner, batch_settings); diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index c9849cf834e13..3145e618af09b 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -2,10 +2,12 @@ use std::io; use bytes::Bytes; use chrono::Utc; +use chrono_tz::UTC; use codecs::encoding::Framer; use uuid::Uuid; use vector_common::request_metadata::RequestMetadata; use vector_core::event::Finalizable; +use vrl::compiler::TimeZone; use crate::{ codecs::{Encoder, Transformer}, @@ -21,6 +23,7 @@ use crate::{ RequestBuilder, }, }, + template::Template, }; #[derive(Clone)] @@ -32,6 +35,7 @@ pub struct S3RequestOptions { pub api_options: S3Options, pub encoder: (Transformer, Encoder), pub compression: Compression, + pub filename_timezone: TimeZone, } impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { @@ -58,7 +62,12 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { let builder = RequestMetadataBuilder::from_events(&events); let finalizers = events.take_finalizers(); - let s3_key_prefix = partition_key.key_prefix.clone(); + let tz = match self.filename_timezone { + TimeZone::Local => UTC, + TimeZone::Named(tz) => tz + }; + + let s3_key_prefix = Template::try_from(partition_key.key_prefix.clone()).unwrap().with_timezone(tz).to_string(); let metadata = S3Metadata { partition_key, @@ -76,7 +85,12 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { payload: EncodeResult, ) -> Self::Request { let filename = { - let formatted_ts = Utc::now().format(self.filename_time_format.as_str()); + let tz = match self.filename_timezone { + TimeZone::Local => UTC, + TimeZone::Named(tz) => tz + }; + + let formatted_ts = Utc::now().with_timezone(&tz).format(self.filename_time_format.as_str()); self.filename_append_uuid .then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().hyphenated())) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index c7b81a73d52ac..7a543109ed075 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -4,6 +4,7 @@ use std::time::{Duration, Instant}; use async_compression::tokio::write::{GzipEncoder, ZstdEncoder}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; +use chrono_tz::UTC; use codecs::{ encoding::{Framer, FramingConfig}, TextSerializerConfig, @@ -28,16 +29,18 @@ use vector_core::{ use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, PathTz, SinkConfig, SinkContext}, + config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, event::{Event, EventStatus, Finalizable}, expiring_hash_map::ExpiringHashMap, internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, sinks::util::StreamSink, template::Template, }; + mod bytes_path; use bytes_path::BytesPath; +use vrl::compiler::TimeZone; /// Configuration for the `file` sink. #[serde_as] @@ -84,12 +87,8 @@ pub struct FileSinkConfig { pub acknowledgements: AcknowledgementsConfig, #[configurable(derived)] - #[configurable(metadata(docs::examples = "Asia/Singapore"))] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub path_tz: PathTz, + #[serde(default)] + pub timezone: Option, } impl GenerateConfig for FileSinkConfig { @@ -100,7 +99,7 @@ impl GenerateConfig for FileSinkConfig { encoding: (None::, TextSerializerConfig::default()).into(), compression: Default::default(), acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }) .unwrap() } @@ -184,9 +183,32 @@ impl OutFile { impl SinkConfig for FileSinkConfig { async fn build( &self, - _cx: SinkContext, + cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = FileSink::new(self)?; + let timezone = match self.timezone { + Some(tz) => { + Some(tz) + }, + None => { + match cx.globals.timezone { + Some(tz) => { + Some(tz) + }, + None => { + Some(TimeZone::default()) + } + } + } + }; + let c = Self { + path: self.path.clone(), + idle_timeout: self.idle_timeout, + encoding: self.encoding.clone(), + compression: self.compression, + acknowledgements: self.acknowledgements, + timezone: timezone, + }; + let sink = FileSink::new(&c)?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -217,9 +239,13 @@ impl FileSink { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); + let timezone = match config.timezone.unwrap() { + TimeZone::Local => UTC, + TimeZone::Named(tz) => tz + }; Ok(Self { - path: config.path.clone().with_timezone(config.path_tz.timezone()), + path: config.path.clone().with_timezone(timezone), transformer, encoder, idle_timeout: config.idle_timeout, @@ -464,7 +490,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }; let (input, _events) = random_lines_with_stream(100, 64, None); @@ -487,7 +513,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Gzip, acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -510,7 +536,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::Zstd, acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }; let (input, _) = random_lines_with_stream(100, 64, None); @@ -538,7 +564,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }; let (mut input, _events) = random_events_with_stream(32, 8, None); @@ -617,7 +643,7 @@ mod tests { encoding: (None::, TextSerializerConfig::default()).into(), compression: Compression::None, acknowledgements: Default::default(), - path_tz: PathTz::default(), + timezone: Default::default(), }; let (mut input, _events) = random_lines_with_stream(10, 64, None); diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 45b051a87ca3d..a05a5b14dfb81 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; use chrono::Utc; +use chrono_tz::UTC; use codecs::encoding::Framer; use http::header::{HeaderName, HeaderValue}; use http::Uri; @@ -13,6 +14,7 @@ use uuid::Uuid; use vector_common::request_metadata::RequestMetadata; use vector_config::configurable_component; use vector_core::event::{EventFinalizers, Finalizable}; +use vrl::compiler::TimeZone; use crate::sinks::util::metadata::RequestMetadataBuilder; use crate::{ @@ -164,6 +166,11 @@ pub struct GcsSinkConfig { skip_serializing_if = "crate::serde::skip_serializing_if_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub timezone: Option, + } fn default_time_format() -> String { @@ -188,6 +195,7 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { auth: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } } @@ -218,7 +226,7 @@ impl SinkConfig for GcsSinkConfig { auth.clone(), )?; auth.spawn_regenerate_token(); - let sink = self.build_sink(client, base_url, auth)?; + let sink = self.build_sink(client, base_url, auth, cx)?; Ok((sink, healthcheck)) } @@ -238,6 +246,7 @@ impl GcsSinkConfig { client: HttpClient, base_url: String, auth: GcpAuthenticator, + cx: SinkContext, ) -> crate::Result { let request = self.request.unwrap_with(&TowerRequestConfig { rate_limit_num: Some(1000), @@ -254,7 +263,7 @@ impl GcsSinkConfig { .settings(request, GcsRetryLogic) .service(GcsService::new(client, base_url, auth)); - let request_settings = RequestSettings::new(self)?; + let request_settings = RequestSettings::new(self, cx)?; let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); @@ -284,6 +293,7 @@ struct RequestSettings { append_uuid: bool, encoder: (Transformer, Encoder), compression: Compression, + timezone: TimeZone } impl RequestBuilder<(String, Vec)> for RequestSettings { @@ -322,7 +332,12 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { let (key, finalizers) = gcp_metadata; // TODO: pull the seconds from the last event let filename = { - let seconds = Utc::now().format(&self.time_format); + let tz = match self.timezone { + TimeZone::Local => UTC, + TimeZone::Named(tz) => tz + }; + + let seconds = Utc::now().with_timezone(&tz).format(&self.time_format); if self.append_uuid { let uuid = Uuid::new_v4(); @@ -352,7 +367,7 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } impl RequestSettings { - fn new(config: &GcsSinkConfig) -> crate::Result { + fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); @@ -382,6 +397,10 @@ impl RequestSettings { .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; + let timezone = match config.timezone { + Some(tz) => tz, + None => cx.globals.timezone() + }; Ok(Self { acl, content_type, @@ -393,6 +412,7 @@ impl RequestSettings { append_uuid, compression: config.compression, encoder: (transformer, encoder), + timezone: timezone, }) } } @@ -440,7 +460,7 @@ mod tests { let config = default_config((None::, JsonSerializerConfig::default()).into()); let sink = config - .build_sink(client, mock_endpoint.to_string(), GcpAuthenticator::None) + .build_sink(client, mock_endpoint.to_string(), GcpAuthenticator::None, context) .expect("failed to build sink"); let event = Event::Log(LogEvent::from("simple message")); @@ -468,11 +488,12 @@ mod tests { assert_eq!(key, "key: value"); } - fn request_settings(sink_config: &GcsSinkConfig) -> RequestSettings { - RequestSettings::new(sink_config).expect("Could not create request settings") + fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { + RequestSettings::new(sink_config, context).expect("Could not create request settings") } fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest { + let context = SinkContext::default(); let sink_config = GcsSinkConfig { key_prefix: Some("key/".into()), filename_time_format: "date".into(), @@ -497,7 +518,7 @@ mod tests { let mut byte_size = GroupedCountByteSize::new_untagged(); byte_size.add_event(&log, log.estimated_json_encoded_size_of()); - let request_settings = request_settings(&sink_config); + let request_settings = request_settings(&sink_config, context); let (metadata, metadata_request_builder, _events) = request_settings.split_input((key, vec![log])); let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); From 9e4530f2ca3a61ef75493ecf3d9ef634a53c6b9a Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 11 Sep 2023 17:46:27 +0800 Subject: [PATCH 16/29] use TzOffset to pass down to request builders. VRL's TimeZone can't be hash derived --- src/sinks/aws_s3/config.rs | 13 +++++-- src/sinks/aws_s3/sink.rs | 31 ++++++++-------- src/sinks/file/mod.rs | 35 ++++++++----------- src/sinks/gcp/cloud_storage.rs | 29 +++++++++------ src/template.rs | 35 ++++++++++++------- .../components/sinks/base/aws_s3.cue | 11 ++++++ .../reference/components/sinks/base/file.cue | 14 ++++---- .../sinks/base/gcp_cloud_storage.cue | 11 ++++++ 8 files changed, 111 insertions(+), 68 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index eeb0d3be5202a..94b761bf1b613 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,6 +1,7 @@ use std::convert::TryInto; use aws_sdk_s3::Client as S3Client; +use chrono::{Utc, Offset}; use codecs::{ encoding::{Framer, FramingConfig}, TextSerializerConfig, @@ -222,8 +223,14 @@ impl S3SinkConfig { let encoder = Encoder::::new(framer, serializer); let timezone = match self.timezone { - Some(tz) => tz, - None => cx.globals.timezone(), + Some(tz) => Some(tz), + None => cx.globals.timezone, + }; + + let offset = match timezone { + Some(TimeZone::Local) => Some(*Utc::now().with_timezone(&chrono::Local).offset()), + Some(TimeZone::Named(tz)) => Some(Utc::now().with_timezone(&tz).offset().fix()), + None => None }; let request_options = S3RequestOptions { @@ -234,7 +241,7 @@ impl S3SinkConfig { filename_append_uuid: self.filename_append_uuid, encoder: (transformer, encoder), compression: self.compression, - filename_timezone: timezone, + filename_tz_offset: offset, }; let sink = S3Sink::new(service, request_options, partitioner, batch_settings); diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 3145e618af09b..840e931e7e4ad 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,13 +1,11 @@ use std::io; use bytes::Bytes; -use chrono::Utc; -use chrono_tz::UTC; +use chrono::{Utc, FixedOffset}; use codecs::encoding::Framer; use uuid::Uuid; use vector_common::request_metadata::RequestMetadata; use vector_core::event::Finalizable; -use vrl::compiler::TimeZone; use crate::{ codecs::{Encoder, Transformer}, @@ -35,7 +33,7 @@ pub struct S3RequestOptions { pub api_options: S3Options, pub encoder: (Transformer, Encoder), pub compression: Compression, - pub filename_timezone: TimeZone, + pub filename_tz_offset: Option, } impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { @@ -62,12 +60,11 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { let builder = RequestMetadataBuilder::from_events(&events); let finalizers = events.take_finalizers(); - let tz = match self.filename_timezone { - TimeZone::Local => UTC, - TimeZone::Named(tz) => tz - }; - let s3_key_prefix = Template::try_from(partition_key.key_prefix.clone()).unwrap().with_timezone(tz).to_string(); + let s3_key_prefix = Template::try_from(partition_key.key_prefix.clone()) + .unwrap() + .with_tz_offset(self.filename_tz_offset) + .to_string(); let metadata = S3Metadata { partition_key, @@ -85,13 +82,19 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { payload: EncodeResult, ) -> Self::Request { let filename = { - let tz = match self.filename_timezone { - TimeZone::Local => UTC, - TimeZone::Named(tz) => tz + let formatted_ts = match self.filename_tz_offset { + Some(offset) => { + Utc::now() + .with_timezone(&offset) + .format(self.filename_time_format.as_str()) + }, + None => { + Utc::now() + .with_timezone(&chrono::Utc) + .format(self.filename_time_format.as_str()) + } }; - let formatted_ts = Utc::now().with_timezone(&tz).format(self.filename_time_format.as_str()); - self.filename_append_uuid .then(|| format!("{}-{}", formatted_ts, Uuid::new_v4().hyphenated())) .unwrap_or_else(|| formatted_ts.to_string()) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 7a543109ed075..a6271ec7dbede 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -4,7 +4,7 @@ use std::time::{Duration, Instant}; use async_compression::tokio::write::{GzipEncoder, ZstdEncoder}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use chrono_tz::UTC; +use chrono::{Utc, Offset}; use codecs::{ encoding::{Framer, FramingConfig}, TextSerializerConfig, @@ -186,21 +186,10 @@ impl SinkConfig for FileSinkConfig { cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { let timezone = match self.timezone { - Some(tz) => { - Some(tz) - }, - None => { - match cx.globals.timezone { - Some(tz) => { - Some(tz) - }, - None => { - Some(TimeZone::default()) - } - } - } + Some(tz) => Some(tz), + None => cx.globals.timezone, }; - let c = Self { + let config = Self { path: self.path.clone(), idle_timeout: self.idle_timeout, encoding: self.encoding.clone(), @@ -208,7 +197,7 @@ impl SinkConfig for FileSinkConfig { acknowledgements: self.acknowledgements, timezone: timezone, }; - let sink = FileSink::new(&c)?; + let sink = FileSink::new(&config)?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -239,13 +228,19 @@ impl FileSink { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); - let timezone = match config.timezone.unwrap() { - TimeZone::Local => UTC, - TimeZone::Named(tz) => tz + + let offset = match config.timezone { + Some(TimeZone::Local) => { + Some(*Utc::now().with_timezone(&chrono::Local).offset()) + }, + Some(TimeZone::Named(tz)) => { + Some(Utc::now().with_timezone(&tz).offset().fix()) + }, + None => None }; Ok(Self { - path: config.path.clone().with_timezone(timezone), + path: config.path.clone().with_tz_offset(offset), transformer, encoder, idle_timeout: config.idle_timeout, diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index a05a5b14dfb81..a72a363fb09fc 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -1,8 +1,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; -use chrono::Utc; -use chrono_tz::UTC; +use chrono::{Utc, Offset, FixedOffset}; use codecs::encoding::Framer; use http::header::{HeaderName, HeaderValue}; use http::Uri; @@ -293,7 +292,7 @@ struct RequestSettings { append_uuid: bool, encoder: (Transformer, Encoder), compression: Compression, - timezone: TimeZone + tz_offset: Option } impl RequestBuilder<(String, Vec)> for RequestSettings { @@ -332,13 +331,11 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { let (key, finalizers) = gcp_metadata; // TODO: pull the seconds from the last event let filename = { - let tz = match self.timezone { - TimeZone::Local => UTC, - TimeZone::Named(tz) => tz + let seconds = match self.tz_offset { + Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format), + None => Utc::now().with_timezone(&chrono::Utc).format(&self.time_format) }; - let seconds = Utc::now().with_timezone(&tz).format(&self.time_format); - if self.append_uuid { let uuid = Uuid::new_v4(); format!("{}-{}", seconds, uuid.hyphenated()) @@ -398,8 +395,18 @@ impl RequestSettings { let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; let timezone = match config.timezone { - Some(tz) => tz, - None => cx.globals.timezone() + Some(tz) => Some(tz), + None => cx.globals.timezone + }; + + let offset = match timezone { + Some(TimeZone::Local) => { + Some(*Utc::now().with_timezone(&chrono::Local).offset()) + }, + Some(TimeZone::Named(tz)) => { + Some(Utc::now().with_timezone(&tz).offset().fix()) + }, + None => None }; Ok(Self { acl, @@ -412,7 +419,7 @@ impl RequestSettings { append_uuid, compression: config.compression, encoder: (transformer, encoder), - timezone: timezone, + tz_offset: offset, }) } } diff --git a/src/template.rs b/src/template.rs index eb8180e09f73a..5d2ffd79e6576 100644 --- a/src/template.rs +++ b/src/template.rs @@ -4,10 +4,9 @@ use std::{borrow::Cow, convert::TryFrom, fmt, hash::Hash, path::PathBuf}; use bytes::Bytes; use chrono::{ format::{strftime::StrftimeItems, Item}, - Utc, + Utc, FixedOffset, }; -use chrono_tz::{Tz, UTC}; use lookup::lookup_v2::parse_target_path; use once_cell::sync::Lazy; use regex::Regex; @@ -67,7 +66,7 @@ pub struct Template { reserve_size: usize, #[serde(skip)] - timezone: Option, + tz_offset: Option, } impl TryFrom<&str> for Template { @@ -121,7 +120,7 @@ impl TryFrom> for Template { src: src.into_owned(), is_static, reserve_size, - timezone: Some(UTC), + tz_offset: None, } }) } @@ -144,8 +143,8 @@ impl ConfigurableString for Template {} impl Template { /// set tz offset - pub const fn with_timezone(mut self, timezone: Tz) -> Self { - self.timezone = Some(timezone); + pub const fn with_tz_offset(mut self, tz_offset: Option) -> Self { + self.tz_offset = tz_offset; self } /// Renders the given template with data from the event. @@ -175,7 +174,7 @@ impl Template { match part { Part::Literal(lit) => out.push_str(lit), Part::Strftime(items) => { - out.push_str(&render_timestamp(items, event, self.timezone.unwrap())) + out.push_str(&render_timestamp(items, event, self.tz_offset)) } Part::Reference(key) => { out.push_str( @@ -357,8 +356,8 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, timezone: Tz) -> String { - match event { +fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Option) -> String { + let timestamp = match event { EventRef::Log(log) => log_schema() .timestamp_key_target_path() .and_then(|timestamp_key| { @@ -378,10 +377,20 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, timezone: Tz) - }) } } - .unwrap_or_else(Utc::now) - .with_timezone(&timezone) - .format_with_items(items.as_items()) - .to_string() + .unwrap_or_else(Utc::now); + + match tz_offset { + Some(offset) => { + timestamp.with_timezone(&offset) + .format_with_items(items.as_items()) + .to_string() + }, + None => { + timestamp.with_timezone(&chrono::Utc) + .format_with_items(items.as_items()) + .to_string() + } + } } #[cfg(test)] diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 802bf4111a340..de95bcf047045 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -885,6 +885,17 @@ base: components: sinks: aws_s3: configuration: { } } } + timezone: { + description: """ + Timezone reference. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + """ + required: false + type: string: examples: ["local", "America/New_York", "EST5EDT"] + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index 71a700779949e..ebddab0c5b5f0 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -307,15 +307,15 @@ base: components: sinks: file: configuration: { syntax: "template" } } - path_tz: { + timezone: { description: """ - handle tz offset configuration - Defaults to UTC + Timezone reference. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones """ required: false - type: string: { - default: "UTC" - examples: ["Asia/Singapore"] - } + type: string: examples: ["local", "America/New_York", "EST5EDT"] } } diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index dc3e43c5aeb8e..a5e4285597004 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -661,6 +661,17 @@ base: components: sinks: gcp_cloud_storage: configuration: { """ } } + timezone: { + description: """ + Timezone reference. + + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + + [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + """ + required: false + type: string: examples: ["local", "America/New_York", "EST5EDT"] + } tls: { description: "TLS configuration." required: false From 03dda4c50276edc216f7e18fc350f143c48c6424 Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 14 Sep 2023 21:24:48 +0800 Subject: [PATCH 17/29] make key_prefix timezone aware and use Option `or` syntax --- src/sinks/aws_s3/config.rs | 26 ++++++++++++++------------ src/sinks/aws_s3/sink.rs | 7 +------ src/sinks/file/mod.rs | 7 ++----- src/sinks/gcp/cloud_storage.rs | 5 +---- src/template.rs | 3 ++- 5 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 94b761bf1b613..af24f16703b57 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -33,7 +33,7 @@ use crate::{ tls::TlsConfig, }; -use vrl::compiler::TimeZone; +use vector_common::TimeZone; /// Configuration for the `aws_s3` sink. #[configurable_component(sink( @@ -206,9 +206,20 @@ impl S3SinkConfig { .settings(request_limits, S3RetryLogic) .service(service); + let timezone = self.timezone.or(cx.globals.timezone); + + let offset = match timezone { + Some(TimeZone::Local) => Some(*Utc::now().with_timezone(&chrono::Local).offset()), + Some(TimeZone::Named(tz)) => Some(Utc::now().with_timezone(&tz).offset().fix()), + None => None + }; + // Configure our partitioning/batching. let batch_settings = self.batch.into_batcher_settings()?; - let key_prefix = self.key_prefix.clone().try_into()?; + + let key_prefix = Template::try_from(self.key_prefix.clone())? + .with_tz_offset(offset).try_into()?; + let ssekms_key_id = self .options .ssekms_key_id @@ -216,22 +227,13 @@ impl S3SinkConfig { .cloned() .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str())) .transpose()?; + let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id); let transformer = self.encoding.transformer(); let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); - let timezone = match self.timezone { - Some(tz) => Some(tz), - None => cx.globals.timezone, - }; - - let offset = match timezone { - Some(TimeZone::Local) => Some(*Utc::now().with_timezone(&chrono::Local).offset()), - Some(TimeZone::Named(tz)) => Some(Utc::now().with_timezone(&tz).offset().fix()), - None => None - }; let request_options = S3RequestOptions { bucket: self.bucket.clone(), diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 840e931e7e4ad..e415cea052e72 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -21,7 +21,6 @@ use crate::{ RequestBuilder, }, }, - template::Template, }; #[derive(Clone)] @@ -60,11 +59,7 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { let builder = RequestMetadataBuilder::from_events(&events); let finalizers = events.take_finalizers(); - - let s3_key_prefix = Template::try_from(partition_key.key_prefix.clone()) - .unwrap() - .with_tz_offset(self.filename_tz_offset) - .to_string(); + let s3_key_prefix = partition_key.key_prefix.clone(); let metadata = S3Metadata { partition_key, diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index a6271ec7dbede..b617627a091d3 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -185,17 +185,14 @@ impl SinkConfig for FileSinkConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let timezone = match self.timezone { - Some(tz) => Some(tz), - None => cx.globals.timezone, - }; + let timezone = self.timezone.or(cx.globals.timezone); let config = Self { path: self.path.clone(), idle_timeout: self.idle_timeout, encoding: self.encoding.clone(), compression: self.compression, acknowledgements: self.acknowledgements, - timezone: timezone, + timezone, }; let sink = FileSink::new(&config)?; Ok(( diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index a72a363fb09fc..159bf48d246aa 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -394,10 +394,7 @@ impl RequestSettings { .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; - let timezone = match config.timezone { - Some(tz) => Some(tz), - None => cx.globals.timezone - }; + let timezone = config.timezone.or(cx.globals.timezone); let offset = match timezone { Some(TimeZone::Local) => { diff --git a/src/template.rs b/src/template.rs index 5d2ffd79e6576..1ae9f9ca682d0 100644 --- a/src/template.rs +++ b/src/template.rs @@ -701,9 +701,10 @@ mod tests { ); let tz = "Asia/Singapore".parse().unwrap(); + let offset = Some(Utc::now().with_timezone(&tz).offset().fix()); assert_eq!( Ok(Bytes::from("vector-2001-02-03-12.log")), - template.with_timezone(tz).render(&event) + template.with_tz_offset(offset).render(&event) ); } From 2ab18ea09c154955d10dd129905b9e429bd2e4ac Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 6 Oct 2023 14:26:43 +0800 Subject: [PATCH 18/29] move tz to offset conversion codes to sink util --- src/sinks/aws_s3/config.rs | 11 ++--------- src/sinks/file/mod.rs | 15 +++------------ src/sinks/gcp/cloud_storage.rs | 15 +++------------ src/sinks/util/mod.rs | 9 +++++++++ 4 files changed, 17 insertions(+), 33 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index af24f16703b57..9af94a0fdcefd 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,7 +1,6 @@ use std::convert::TryInto; use aws_sdk_s3::Client as S3Client; -use chrono::{Utc, Offset}; use codecs::{ encoding::{Framer, FramingConfig}, TextSerializerConfig, @@ -25,7 +24,7 @@ use crate::{ }, util::{ BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, + TowerRequestConfig, timezone_to_offset, }, Healthcheck, }, @@ -206,13 +205,7 @@ impl S3SinkConfig { .settings(request_limits, S3RetryLogic) .service(service); - let timezone = self.timezone.or(cx.globals.timezone); - - let offset = match timezone { - Some(TimeZone::Local) => Some(*Utc::now().with_timezone(&chrono::Local).offset()), - Some(TimeZone::Named(tz)) => Some(Utc::now().with_timezone(&tz).offset().fix()), - None => None - }; + let offset = self.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); // Configure our partitioning/batching. let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index b617627a091d3..60d2773d9a227 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -4,7 +4,6 @@ use std::time::{Duration, Instant}; use async_compression::tokio::write::{GzipEncoder, ZstdEncoder}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use chrono::{Utc, Offset}; use codecs::{ encoding::{Framer, FramingConfig}, TextSerializerConfig, @@ -33,14 +32,14 @@ use crate::{ event::{Event, EventStatus, Finalizable}, expiring_hash_map::ExpiringHashMap, internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, - sinks::util::StreamSink, + sinks::util::{StreamSink, timezone_to_offset}, template::Template, }; mod bytes_path; use bytes_path::BytesPath; -use vrl::compiler::TimeZone; +use vector_common::TimeZone; /// Configuration for the `file` sink. #[serde_as] @@ -226,15 +225,7 @@ impl FileSink { let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); - let offset = match config.timezone { - Some(TimeZone::Local) => { - Some(*Utc::now().with_timezone(&chrono::Local).offset()) - }, - Some(TimeZone::Named(tz)) => { - Some(Utc::now().with_timezone(&tz).offset().fix()) - }, - None => None - }; + let offset = config.timezone.and_then(timezone_to_offset); Ok(Self { path: config.path.clone().with_tz_offset(offset), diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 159bf48d246aa..5e7c061d9251f 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; -use chrono::{Utc, Offset, FixedOffset}; +use chrono::{Utc, FixedOffset}; use codecs::encoding::Framer; use http::header::{HeaderName, HeaderValue}; use http::Uri; @@ -34,7 +34,7 @@ use crate::{ util::{ batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt, - TowerRequestConfig, + TowerRequestConfig, timezone_to_offset }, Healthcheck, VectorSink, }, @@ -394,17 +394,8 @@ impl RequestSettings { .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; - let timezone = config.timezone.or(cx.globals.timezone); + let offset = config.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); - let offset = match timezone { - Some(TimeZone::Local) => { - Some(*Utc::now().with_timezone(&chrono::Local).offset()) - }, - Some(TimeZone::Named(tz)) => { - Some(Utc::now().with_timezone(&tz).offset().fix()) - }, - None => None - }; Ok(Self { acl, content_type, diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 1fcae4842cd7f..b5647738556fe 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -54,6 +54,8 @@ pub use uri::UriSerde; use vector_common::json_size::JsonSize; use crate::event::EventFinalizers; +use vector_common::TimeZone; +use chrono::{Utc, Offset, FixedOffset}; #[derive(Debug, Snafu)] enum SinkBuildError { @@ -133,3 +135,10 @@ impl ElementCount for Vec { self.len() } } + +pub fn timezone_to_offset(tz: TimeZone) -> Option { + match tz { + TimeZone::Local => Some(*Utc::now().with_timezone(&chrono::Local).offset()), + TimeZone::Named(tz) => Some(Utc::now().with_timezone(&tz).offset().fix()), + } +} From 672354621912e069b3006d86d0365b36f87f7072 Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 6 Oct 2023 14:53:24 +0800 Subject: [PATCH 19/29] remove empty line --- src/sinks/file/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 60d2773d9a227..bcbbde404de25 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -13,7 +13,6 @@ use futures::{ stream::{BoxStream, StreamExt}, FutureExt, }; - use serde_with::serde_as; use tokio::{ fs::{self, File}, From 0af758a965e1deac3dd5a1c13944db9ebfa1c2cb Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 18 Oct 2023 16:39:21 +0800 Subject: [PATCH 20/29] update timezone docs in vector-config --- lib/vector-config/src/external/datetime.rs | 4 ++-- website/cue/reference/components/sinks/base/aws_s3.cue | 4 ++-- website/cue/reference/components/sinks/base/file.cue | 4 ++-- .../cue/reference/components/sinks/base/gcp_cloud_storage.cue | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/vector-config/src/external/datetime.rs b/lib/vector-config/src/external/datetime.rs index 45575f2846564..6735cfa2e239c 100644 --- a/lib/vector-config/src/external/datetime.rs +++ b/lib/vector-config/src/external/datetime.rs @@ -20,8 +20,8 @@ impl Configurable for TimeZone { fn metadata() -> Metadata { let mut metadata = Metadata::default(); - metadata.set_title("Timezone reference."); - metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + metadata.set_title("Timezone to use for any date specifiers in template strings."); + metadata.set_description(r#"This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones"#); metadata.add_custom_attribute(CustomAttribute::kv( diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index d313a52e2cd58..a59b55c3b8057 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -918,9 +918,9 @@ base: components: sinks: aws_s3: configuration: { } timezone: { description: """ - Timezone reference. + Timezone to use for any date specifiers in template strings. - This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones """ diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index 7dea8b428d304..fb085a7f607c0 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -335,9 +335,9 @@ base: components: sinks: file: configuration: { } timezone: { description: """ - Timezone reference. + Timezone to use for any date specifiers in template strings. - This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones """ diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index 2eb688eb1d9f9..cdf93ff35a3e2 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -694,9 +694,9 @@ base: components: sinks: gcp_cloud_storage: configuration: { } timezone: { description: """ - Timezone reference. + Timezone to use for any date specifiers in template strings. - This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. + This can refer to any valid timezone as defined in the [TZ database][tzdb], or "local" which refers to the system local timezone. It will default to the [globally configured timezone](https://vector.dev/docs/reference/configuration/global-options/#timezone). [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones """ From 6667f6cca8d5675bcd85b9f13c9852f398f2f487 Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 23 Oct 2023 10:09:32 +0800 Subject: [PATCH 21/29] get timezone and convert to offset in one go in FileSink --- src/sinks/file/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index bcbbde404de25..05c6bb07e740f 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -183,16 +183,15 @@ impl SinkConfig for FileSinkConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let timezone = self.timezone.or(cx.globals.timezone); let config = Self { path: self.path.clone(), idle_timeout: self.idle_timeout, encoding: self.encoding.clone(), compression: self.compression, acknowledgements: self.acknowledgements, - timezone, + timezone: self.timezone, }; - let sink = FileSink::new(&config)?; + let sink = FileSink::new(&config, cx)?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -219,12 +218,12 @@ pub struct FileSink { } impl FileSink { - pub fn new(config: &FileSinkConfig) -> crate::Result { + pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); - let offset = config.timezone.and_then(timezone_to_offset); + let offset = config.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); Ok(Self { path: config.path.clone().with_tz_offset(offset), From a6f95d554543315d03f5f37ab87bf969137c01ba Mon Sep 17 00:00:00 2001 From: kates Date: Wed, 25 Oct 2023 12:36:00 +0800 Subject: [PATCH 22/29] just pass the sinkconfig directly --- src/sinks/file/mod.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 05c6bb07e740f..cbe2cc03bdfc4 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -183,15 +183,7 @@ impl SinkConfig for FileSinkConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let config = Self { - path: self.path.clone(), - idle_timeout: self.idle_timeout, - encoding: self.encoding.clone(), - compression: self.compression, - acknowledgements: self.acknowledgements, - timezone: self.timezone, - }; - let sink = FileSink::new(&config, cx)?; + let sink = FileSink::new(self, cx)?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), From f25f6b71b1948e9c29c31fbceea044af7dbab50f Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 26 Oct 2023 12:32:48 +0800 Subject: [PATCH 23/29] vector_common to vector_lib --- src/sinks/aws_s3/config.rs | 6 +++--- src/sinks/aws_s3/mod.rs | 2 +- src/sinks/file/mod.rs | 3 +-- src/sinks/gcp/cloud_storage.rs | 8 +++++--- src/sinks/util/mod.rs | 6 ++++-- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 08b1702f56b16..5e1aa8bd1fd7d 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -7,7 +7,9 @@ use codecs::{ }; use tower::ServiceBuilder; use vector_config::configurable_component; -use vector_lib::sink::VectorSink; +use vector_lib::{ + sink::VectorSink, TimeZone +}; use super::sink::S3RequestOptions; use crate::{ @@ -32,8 +34,6 @@ use crate::{ tls::TlsConfig, }; -use vector_common::TimeZone; - /// Configuration for the `aws_s3` sink. #[configurable_component(sink( "aws_s3", diff --git a/src/sinks/aws_s3/mod.rs b/src/sinks/aws_s3/mod.rs index fd3a3418b4716..3027de1bb6287 100644 --- a/src/sinks/aws_s3/mod.rs +++ b/src/sinks/aws_s3/mod.rs @@ -3,4 +3,4 @@ mod sink; mod integration_tests; -pub use self::config::S3SinkConfig; +pub use config::S3SinkConfig; diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 4ac73dfb477ed..beebbb216d336 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -22,7 +22,7 @@ use tokio_util::codec::Encoder as _; use vector_config::configurable_component; use vector_lib::{ internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered}, - EstimatedJsonEncodedSizeOf, + EstimatedJsonEncodedSizeOf, TimeZone, }; use crate::{ @@ -38,7 +38,6 @@ use crate::{ mod bytes_path; use bytes_path::BytesPath; -use vector_common::TimeZone; /// Configuration for the `file` sink. #[serde_as] diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 587ae0d90a414..627260ccd9f59 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -11,9 +11,11 @@ use snafu::Snafu; use tower::ServiceBuilder; use uuid::Uuid; use vector_config::configurable_component; -use vector_core::event::{EventFinalizers, Finalizable}; -use vector_lib::request_metadata::RequestMetadata; -use vrl::compiler::TimeZone; +use vector_lib::event::{EventFinalizers, Finalizable}; +use vector_lib::{ + request_metadata::RequestMetadata, + TimeZone, +}; use crate::sinks::util::metadata::RequestMetadataBuilder; use crate::{ diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 6aac18f0b235d..13fe97461715b 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -52,10 +52,12 @@ pub use service::{ pub use sink::{BatchSink, PartitionBatchSink, StreamSink}; use snafu::Snafu; pub use uri::UriSerde; -use vector_lib::json_size::JsonSize; +use vector_lib::{ + json_size::JsonSize, + TimeZone, +}; use crate::event::EventFinalizers; -use vector_common::TimeZone; use chrono::{Utc, Offset, FixedOffset}; #[derive(Debug, Snafu)] From d6d579d508544cace86c1b1dc5e693c61fed8f30 Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 27 Oct 2023 12:19:22 +0800 Subject: [PATCH 24/29] configurable_component is in vector_lib now --- src/sinks/aws_s3/config.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 5e1aa8bd1fd7d..204c911ba2913 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -6,9 +6,11 @@ use codecs::{ TextSerializerConfig, }; use tower::ServiceBuilder; -use vector_config::configurable_component; +//use vector_config::configurable_component; use vector_lib::{ - sink::VectorSink, TimeZone + configurable::configurable_component, + sink::VectorSink, + TimeZone, }; use super::sink::S3RequestOptions; From 0707b0cc5b5bc11b4ae5385010e76bd098936b9c Mon Sep 17 00:00:00 2001 From: kates Date: Mon, 6 Nov 2023 11:47:40 +0800 Subject: [PATCH 25/29] lookup to vector_lib --- src/template.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/template.rs b/src/template.rs index e2c65781075f9..c92f3f4aff8fc 100644 --- a/src/template.rs +++ b/src/template.rs @@ -394,13 +394,8 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Opti #[cfg(test)] mod tests { -<<<<<<< HEAD use chrono::{TimeZone, Utc}; - use lookup::metadata_path; -======= - use chrono::TimeZone; use vector_lib::lookup::{metadata_path, PathPrefix}; ->>>>>>> master use vector_lib::metric_tags; use super::*; From 0b59e29659d6b623aa5ba652dc8c15a736a4e0ae Mon Sep 17 00:00:00 2001 From: kates Date: Thu, 9 Nov 2023 16:55:31 +0800 Subject: [PATCH 26/29] fix aws s3 integration test. pass the context to build_processor in tests --- src/sinks/aws_s3/integration_tests.rs | 20 +++++++++++--------- src/template.rs | 7 ++++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index 27c53c1214429..01c78dae08523 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -61,7 +61,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() { config.key_prefix = "test-prefix".to_string(); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -95,7 +95,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() { config.key_prefix = "test-prefix/".to_string(); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -132,7 +132,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() { config.options.ssekms_key_id = Some("alias/aws/s3".to_string()); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -170,7 +170,7 @@ async fn s3_rotate_files_after_the_buffer_size_is_reached() { }; let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, _events) = random_lines_with_stream(100, 30, None); @@ -229,7 +229,7 @@ async fn s3_gzip() { let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -274,7 +274,7 @@ async fn s3_zstd() { let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -336,7 +336,7 @@ async fn s3_insert_message_into_object_lock() { let config = config(&bucket, 1000000); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, events, receiver) = make_events_batch(100, 10); run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; @@ -368,7 +368,7 @@ async fn acknowledges_failures() { config.bucket = format!("BREAK{}IT", config.bucket); let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (_lines, events, receiver) = make_events_batch(1, 1); run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; @@ -434,11 +434,12 @@ async fn s3_flush_on_exhaustion() { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } }; let prefix = config.key_prefix.clone(); let service = config.create_service(&cx.globals.proxy).await.unwrap(); - let sink = config.build_processor(service).unwrap(); + let sink = config.build_processor(service, cx).unwrap(); let (lines, _events) = random_lines_with_stream(100, 2, None); // only generate two events (less than batch size) @@ -517,6 +518,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { tls: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + timezone: Default::default(), } } diff --git a/src/template.rs b/src/template.rs index c92f3f4aff8fc..c1c662c04183c 100644 --- a/src/template.rs +++ b/src/template.rs @@ -394,7 +394,8 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Opti #[cfg(test)] mod tests { - use chrono::{TimeZone, Utc}; + use chrono::{TimeZone, Utc, Offset}; + use chrono_tz::Tz; use vector_lib::lookup::{metadata_path, PathPrefix}; use vector_lib::metric_tags; @@ -684,13 +685,13 @@ mod tests { let mut event = Event::Log(LogEvent::from("hello world")); event.as_mut_log().insert( ( - lookup::PathPrefix::Event, + PathPrefix::Event, log_schema().timestamp_key().unwrap(), ), ts, ); - let tz = "Asia/Singapore".parse().unwrap(); + let tz: Tz = "Asia/Singapore".parse().unwrap(); let offset = Some(Utc::now().with_timezone(&tz).offset().fix()); assert_eq!( Ok(Bytes::from("vector-2001-02-03-12.log")), From 25ca1788a9f5e137ef0ac82aa4b108e984fffd71 Mon Sep 17 00:00:00 2001 From: kates Date: Fri, 10 Nov 2023 12:41:27 +0800 Subject: [PATCH 27/29] formatting --- src/sinks/aws_s3/config.rs | 19 +++++++++++++------ src/sinks/aws_s3/sink.rs | 18 +++++++----------- src/sinks/file/mod.rs | 7 +++++-- src/sinks/gcp/cloud_storage.rs | 30 ++++++++++++++++++------------ src/sinks/util/mod.rs | 7 ++----- src/template.rs | 33 ++++++++++++++++----------------- 6 files changed, 61 insertions(+), 53 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 02d189d7abd3b..cb6cc4bdf54ec 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -24,8 +24,8 @@ use crate::{ sink::S3Sink, }, util::{ - BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt, - TowerRequestConfig, timezone_to_offset, + timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, + ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, }, @@ -194,7 +194,11 @@ impl SinkConfig for S3SinkConfig { } impl S3SinkConfig { - pub fn build_processor(&self, service: S3Service, cx: SinkContext) -> crate::Result { + pub fn build_processor( + &self, + service: S3Service, + cx: SinkContext, + ) -> crate::Result { // Build our S3 client/service, which is what we'll ultimately feed // requests into in order to ship files to S3. We build this here in // order to configure the client/service with retries, concurrency @@ -204,13 +208,17 @@ impl S3SinkConfig { .settings(request_limits, S3RetryLogic) .service(service); - let offset = self.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); + let offset = self + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); // Configure our partitioning/batching. let batch_settings = self.batch.into_batcher_settings()?; let key_prefix = Template::try_from(self.key_prefix.clone())? - .with_tz_offset(offset).try_into()?; + .with_tz_offset(offset) + .try_into()?; let ssekms_key_id = self .options @@ -226,7 +234,6 @@ impl S3SinkConfig { let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; let encoder = Encoder::::new(framer, serializer); - let request_options = S3RequestOptions { bucket: self.bucket.clone(), api_options: self.options.clone(), diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 31a6548ccf464..a4fc542437106 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,7 +1,7 @@ use std::io; use bytes::Bytes; -use chrono::{Utc, FixedOffset}; +use chrono::{FixedOffset, Utc}; use uuid::Uuid; use vector_lib::codecs::encoding::Framer; use vector_lib::event::Finalizable; @@ -78,16 +78,12 @@ impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { ) -> Self::Request { let filename = { let formatted_ts = match self.filename_tz_offset { - Some(offset) => { - Utc::now() - .with_timezone(&offset) - .format(self.filename_time_format.as_str()) - }, - None => { - Utc::now() - .with_timezone(&chrono::Utc) - .format(self.filename_time_format.as_str()) - } + Some(offset) => Utc::now() + .with_timezone(&offset) + .format(self.filename_time_format.as_str()), + None => Utc::now() + .with_timezone(&chrono::Utc) + .format(self.filename_time_format.as_str()), }; self.filename_append_uuid diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 61c813caced70..8898d861784a8 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -33,7 +33,7 @@ use crate::{ internal_events::{ FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError, }, - sinks::util::{StreamSink, timezone_to_offset}, + sinks::util::{timezone_to_offset, StreamSink}, template::Template, }; @@ -222,7 +222,10 @@ impl FileSink { let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); - let offset = config.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); + let offset = config + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); Ok(Self { path: config.path.clone().with_tz_offset(offset), diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index b186c03a26c0f..bb06e6a18ceba 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, convert::TryFrom, io}; use bytes::Bytes; -use chrono::{Utc, FixedOffset}; +use chrono::{FixedOffset, Utc}; use http::header::{HeaderName, HeaderValue}; use http::Uri; use indoc::indoc; @@ -12,10 +12,7 @@ use uuid::Uuid; use vector_lib::codecs::encoding::Framer; use vector_lib::configurable::configurable_component; use vector_lib::event::{EventFinalizers, Finalizable}; -use vector_lib::{ - request_metadata::RequestMetadata, - TimeZone, -}; +use vector_lib::{request_metadata::RequestMetadata, TimeZone}; use crate::sinks::util::metadata::RequestMetadataBuilder; use crate::{ @@ -35,8 +32,8 @@ use crate::{ }, util::{ batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult, - BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt, - TowerRequestConfig, timezone_to_offset + timezone_to_offset, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, + ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -171,7 +168,6 @@ pub struct GcsSinkConfig { #[configurable(derived)] #[serde(default)] pub timezone: Option, - } fn default_time_format() -> String { @@ -294,7 +290,7 @@ struct RequestSettings { append_uuid: bool, encoder: (Transformer, Encoder), compression: Compression, - tz_offset: Option + tz_offset: Option, } impl RequestBuilder<(String, Vec)> for RequestSettings { @@ -335,7 +331,9 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { let filename = { let seconds = match self.tz_offset { Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format), - None => Utc::now().with_timezone(&chrono::Utc).format(&self.time_format) + None => Utc::now() + .with_timezone(&chrono::Utc) + .format(&self.time_format), }; if self.append_uuid { @@ -396,7 +394,10 @@ impl RequestSettings { .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; - let offset = config.timezone.or(cx.globals.timezone).and_then(timezone_to_offset); + let offset = config + .timezone + .or(cx.globals.timezone) + .and_then(timezone_to_offset); Ok(Self { acl, @@ -459,7 +460,12 @@ mod tests { let config = default_config((None::, JsonSerializerConfig::default()).into()); let sink = config - .build_sink(client, mock_endpoint.to_string(), GcpAuthenticator::None, context) + .build_sink( + client, + mock_endpoint.to_string(), + GcpAuthenticator::None, + context, + ) .expect("failed to build sink"); let event = Event::Log(LogEvent::from("simple message")); diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 13fe97461715b..23474327cdfc5 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -52,13 +52,10 @@ pub use service::{ pub use sink::{BatchSink, PartitionBatchSink, StreamSink}; use snafu::Snafu; pub use uri::UriSerde; -use vector_lib::{ - json_size::JsonSize, - TimeZone, -}; +use vector_lib::{json_size::JsonSize, TimeZone}; use crate::event::EventFinalizers; -use chrono::{Utc, Offset, FixedOffset}; +use chrono::{FixedOffset, Offset, Utc}; #[derive(Debug, Snafu)] enum SinkBuildError { diff --git a/src/template.rs b/src/template.rs index c1c662c04183c..136a2bd3d2547 100644 --- a/src/template.rs +++ b/src/template.rs @@ -4,7 +4,7 @@ use std::{borrow::Cow, convert::TryFrom, fmt, hash::Hash, path::PathBuf}; use bytes::Bytes; use chrono::{ format::{strftime::StrftimeItems, Item}, - Utc, FixedOffset, + FixedOffset, Utc, }; use once_cell::sync::Lazy; use regex::Regex; @@ -355,7 +355,11 @@ fn render_metric_field<'a>(key: &str, metric: &'a Metric) -> Option<&'a str> { } } -fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Option) -> String { +fn render_timestamp( + items: &ParsedStrftime, + event: EventRef<'_>, + tz_offset: Option, +) -> String { let timestamp = match event { EventRef::Log(log) => log_schema() .timestamp_key_target_path() @@ -379,22 +383,20 @@ fn render_timestamp(items: &ParsedStrftime, event: EventRef<'_>, tz_offset: Opti .unwrap_or_else(Utc::now); match tz_offset { - Some(offset) => { - timestamp.with_timezone(&offset) - .format_with_items(items.as_items()) - .to_string() - }, - None => { - timestamp.with_timezone(&chrono::Utc) - .format_with_items(items.as_items()) - .to_string() - } + Some(offset) => timestamp + .with_timezone(&offset) + .format_with_items(items.as_items()) + .to_string(), + None => timestamp + .with_timezone(&chrono::Utc) + .format_with_items(items.as_items()) + .to_string(), } } #[cfg(test)] mod tests { - use chrono::{TimeZone, Utc, Offset}; + use chrono::{Offset, TimeZone, Utc}; use chrono_tz::Tz; use vector_lib::lookup::{metadata_path, PathPrefix}; use vector_lib::metric_tags; @@ -684,10 +686,7 @@ mod tests { let template = Template::try_from("vector-%Y-%m-%d-%H.log").unwrap(); let mut event = Event::Log(LogEvent::from("hello world")); event.as_mut_log().insert( - ( - PathPrefix::Event, - log_schema().timestamp_key().unwrap(), - ), + (PathPrefix::Event, log_schema().timestamp_key().unwrap()), ts, ); From 0baabb8787421ccb73dcda7b38cdaf37aec2b60c Mon Sep 17 00:00:00 2001 From: kates Date: Sun, 12 Nov 2023 19:41:37 +0800 Subject: [PATCH 28/29] add sinkcontext to FileSink in file tests --- src/sinks/file/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 8898d861784a8..5392ec57647ec 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -642,7 +642,7 @@ mod tests { let sink_handle = tokio::spawn(async move { assert_sink_compliance(&FILE_SINK_TAGS, async move { - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, SinkContext::default()).unwrap(); VectorSink::from_event_streamsink(sink) .run(Box::pin(rx.map(Into::into))) .await @@ -686,7 +686,7 @@ mod tests { async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator + Send) { assert_sink_compliance(&FILE_SINK_TAGS, async move { - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, SinkContext::default()).unwrap(); VectorSink::from_event_streamsink(sink) .run(Box::pin(stream::iter(events.map(Into::into)))) .await From a36cf741c8705afa7bd7b6351061202f6b5fa979 Mon Sep 17 00:00:00 2001 From: kates Date: Tue, 14 Nov 2023 15:37:01 +0800 Subject: [PATCH 29/29] key_prefix is expected to be a template. no need for into --- src/sinks/aws_s3/config.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index cb6cc4bdf54ec..4aad4417734a8 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -1,5 +1,3 @@ -use std::convert::TryInto; - use aws_sdk_s3::Client as S3Client; use tower::ServiceBuilder; use vector_lib::codecs::{ @@ -216,9 +214,7 @@ impl S3SinkConfig { // Configure our partitioning/batching. let batch_settings = self.batch.into_batcher_settings()?; - let key_prefix = Template::try_from(self.key_prefix.clone())? - .with_tz_offset(offset) - .try_into()?; + let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset); let ssekms_key_id = self .options