From 53388eabd5b8d092117782d8b2cfcc41fbb41485 Mon Sep 17 00:00:00 2001 From: Samuel Roberts Date: Thu, 19 Oct 2023 16:44:58 +1100 Subject: [PATCH 1/3] Add emit_cursor option to journald source --- src/sources/journald.rs | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 2d7c7fd5b49d7..6d4e87312f262 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -209,6 +209,10 @@ pub struct JournaldConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + /// Whether to emit the __CURSOR field + #[serde(default = "crate::serde::default_false")] + emit_cursor: bool, } const fn default_batch_size() -> usize { @@ -308,6 +312,7 @@ impl Default for JournaldConfig { acknowledgements: Default::default(), remap_priority: false, log_namespace: None, + emit_cursor: false, } } } @@ -377,6 +382,7 @@ impl SourceConfig for JournaldConfig { acknowledgements, starter, log_namespace, + emit_cursor: self.emit_cursor, } .run_shutdown(cx.shutdown), )) @@ -404,6 +410,7 @@ struct JournaldSource { acknowledgements: bool, starter: StartJournalctl, log_namespace: LogNamespace, + emit_cursor: bool, } impl JournaldSource { @@ -554,7 +561,11 @@ impl<'a> Batch<'a> { Some(Ok(bytes)) => { match decode_record(&bytes, self.source.remap_priority) { Ok(mut record) => { - if let Some(tmp) = record.remove(CURSOR) { + if self.source.emit_cursor { + if let Some(tmp) = record.get(CURSOR) { + self.cursor = Some(tmp.clone()); + } + } else if let Some(tmp) = record.remove(CURSOR) { self.cursor = Some(tmp); } @@ -1089,13 +1100,14 @@ mod tests { async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec { let include_matches = create_unit_matches(iunits.to_vec()); let exclude_matches = create_unit_matches(xunits.to_vec()); - run_journal(include_matches, exclude_matches, cursor).await + run_journal(include_matches, exclude_matches, cursor, false).await } async fn run_journal( include_matches: Matches, exclude_matches: Matches, checkpoint: Option<&str>, + emit_cursor: bool, ) -> Vec { assert_source_compliance(&["protocol"], async move { let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); @@ -1128,6 +1140,7 @@ mod tests { data_dir: Some(tempdir), remap_priority: true, acknowledgements: false.into(), + emit_cursor, ..Default::default() }; let source = config.build(cx).await.unwrap(); @@ -1207,10 +1220,18 @@ mod tests { ); } + #[tokio::test] + async fn emits_cursor() { + let received = run_journal(Matches::new(), Matches::new(), None, true).await; + assert_eq!(cursor(&received[0]), Value::Bytes("1".into())); + assert_eq!(cursor(&received[3]), Value::Bytes("4".into())); + assert_eq!(cursor(&received[7]), Value::Bytes("8".into())); + } + #[tokio::test] async fn includes_matches() { let matches = create_matches(vec![("PRIORITY", "ERR")]); - let received = run_journal(matches, HashMap::new(), None).await; + let received = run_journal(matches, HashMap::new(), None, false).await; assert_eq!(received.len(), 2); assert_eq!( message(&received[0]), @@ -1227,7 +1248,7 @@ mod tests { #[tokio::test] async fn includes_kernel() { let matches = create_matches(vec![("_TRANSPORT", "kernel")]); - let received = run_journal(matches, HashMap::new(), None).await; + let received = run_journal(matches, HashMap::new(), None, false).await; assert_eq!(received.len(), 1); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000)); assert_eq!(message(&received[0]), Value::Bytes("audit log".into())); @@ -1236,7 +1257,7 @@ mod tests { #[tokio::test] async fn excludes_matches() { let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]); - let received = run_journal(HashMap::new(), matches, None).await; + let received = run_journal(HashMap::new(), matches, None, false).await; assert_eq!(received.len(), 5); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000)); assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000)); @@ -1515,6 +1536,10 @@ mod tests { event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone() } + fn cursor(event: &Event) -> Value { + event.as_log()[CURSOR].clone() + } + fn value_ts(secs: i64, usecs: u32) -> Value { Value::Timestamp( chrono::Utc From 3662a26edb062ac8e332b290e6a74437f4c1080d Mon Sep 17 00:00:00 2001 From: Samuel Roberts Date: Sat, 21 Oct 2023 02:14:30 +1100 Subject: [PATCH 2/3] Update documentation --- src/sources/journald.rs | 5 ++++- .../cue/reference/components/sources/base/journald.cue | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 6d4e87312f262..9e8bff27cd920 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -210,7 +210,10 @@ pub struct JournaldConfig { #[serde(default)] log_namespace: Option, - /// Whether to emit the __CURSOR field + /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_curor]. + /// + /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields + /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html #[serde(default = "crate::serde::default_false")] emit_cursor: bool, } diff --git a/website/cue/reference/components/sources/base/journald.cue b/website/cue/reference/components/sources/base/journald.cue index 321ab7442c522..80b875fc7d0cb 100644 --- a/website/cue/reference/components/sources/base/journald.cue +++ b/website/cue/reference/components/sources/base/journald.cue @@ -49,6 +49,16 @@ base: components: sources: journald: configuration: { required: false type: string: examples: ["/var/lib/vector"] } + emit_cursor: { + description: """ + Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_curor]. + + [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields + [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html + """ + required: false + type: bool: default: false + } exclude_matches: { description: """ A list of sets of field/value pairs that, if any are present in a journal entry, From e2966db5c296c147e48e9d3cc4f71ca34b44698a Mon Sep 17 00:00:00 2001 From: Samuel Roberts Date: Sat, 21 Oct 2023 02:20:07 +1100 Subject: [PATCH 3/3] Fix typo --- src/sources/journald.rs | 2 +- website/cue/reference/components/sources/base/journald.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 9e8bff27cd920..edd6a2f1c2631 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -210,7 +210,7 @@ pub struct JournaldConfig { #[serde(default)] log_namespace: Option, - /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_curor]. + /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor]. /// /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html diff --git a/website/cue/reference/components/sources/base/journald.cue b/website/cue/reference/components/sources/base/journald.cue index 80b875fc7d0cb..0224f4d213041 100644 --- a/website/cue/reference/components/sources/base/journald.cue +++ b/website/cue/reference/components/sources/base/journald.cue @@ -51,7 +51,7 @@ base: components: sources: journald: configuration: { } emit_cursor: { description: """ - Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_curor]. + Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor]. [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html