diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 2d7c7fd5b49d7..edd6a2f1c2631 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -209,6 +209,13 @@ pub struct JournaldConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + /// Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor]. + /// + /// [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields + /// [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html + #[serde(default = "crate::serde::default_false")] + emit_cursor: bool, } const fn default_batch_size() -> usize { @@ -308,6 +315,7 @@ impl Default for JournaldConfig { acknowledgements: Default::default(), remap_priority: false, log_namespace: None, + emit_cursor: false, } } } @@ -377,6 +385,7 @@ impl SourceConfig for JournaldConfig { acknowledgements, starter, log_namespace, + emit_cursor: self.emit_cursor, } .run_shutdown(cx.shutdown), )) @@ -404,6 +413,7 @@ struct JournaldSource { acknowledgements: bool, starter: StartJournalctl, log_namespace: LogNamespace, + emit_cursor: bool, } impl JournaldSource { @@ -554,7 +564,11 @@ impl<'a> Batch<'a> { Some(Ok(bytes)) => { match decode_record(&bytes, self.source.remap_priority) { Ok(mut record) => { - if let Some(tmp) = record.remove(CURSOR) { + if self.source.emit_cursor { + if let Some(tmp) = record.get(CURSOR) { + self.cursor = Some(tmp.clone()); + } + } else if let Some(tmp) = record.remove(CURSOR) { self.cursor = Some(tmp); } @@ -1089,13 +1103,14 @@ mod tests { async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec { let include_matches = create_unit_matches(iunits.to_vec()); let exclude_matches = create_unit_matches(xunits.to_vec()); - run_journal(include_matches, exclude_matches, cursor).await + run_journal(include_matches, exclude_matches, cursor, false).await } async fn run_journal( include_matches: Matches, exclude_matches: Matches, checkpoint: Option<&str>, + emit_cursor: bool, ) -> Vec { assert_source_compliance(&["protocol"], async move { let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered); @@ -1128,6 +1143,7 @@ mod tests { data_dir: Some(tempdir), remap_priority: true, acknowledgements: false.into(), + emit_cursor, ..Default::default() }; let source = config.build(cx).await.unwrap(); @@ -1207,10 +1223,18 @@ mod tests { ); } + #[tokio::test] + async fn emits_cursor() { + let received = run_journal(Matches::new(), Matches::new(), None, true).await; + assert_eq!(cursor(&received[0]), Value::Bytes("1".into())); + assert_eq!(cursor(&received[3]), Value::Bytes("4".into())); + assert_eq!(cursor(&received[7]), Value::Bytes("8".into())); + } + #[tokio::test] async fn includes_matches() { let matches = create_matches(vec![("PRIORITY", "ERR")]); - let received = run_journal(matches, HashMap::new(), None).await; + let received = run_journal(matches, HashMap::new(), None, false).await; assert_eq!(received.len(), 2); assert_eq!( message(&received[0]), @@ -1227,7 +1251,7 @@ mod tests { #[tokio::test] async fn includes_kernel() { let matches = create_matches(vec![("_TRANSPORT", "kernel")]); - let received = run_journal(matches, HashMap::new(), None).await; + let received = run_journal(matches, HashMap::new(), None, false).await; assert_eq!(received.len(), 1); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000)); assert_eq!(message(&received[0]), Value::Bytes("audit log".into())); @@ -1236,7 +1260,7 @@ mod tests { #[tokio::test] async fn excludes_matches() { let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]); - let received = run_journal(HashMap::new(), matches, None).await; + let received = run_journal(HashMap::new(), matches, None, false).await; assert_eq!(received.len(), 5); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000)); assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000)); @@ -1515,6 +1539,10 @@ mod tests { event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone() } + fn cursor(event: &Event) -> Value { + event.as_log()[CURSOR].clone() + } + fn value_ts(secs: i64, usecs: u32) -> Value { Value::Timestamp( chrono::Utc diff --git a/website/cue/reference/components/sources/base/journald.cue b/website/cue/reference/components/sources/base/journald.cue index 321ab7442c522..0224f4d213041 100644 --- a/website/cue/reference/components/sources/base/journald.cue +++ b/website/cue/reference/components/sources/base/journald.cue @@ -49,6 +49,16 @@ base: components: sources: journald: configuration: { required: false type: string: examples: ["/var/lib/vector"] } + emit_cursor: { + description: """ + Whether to emit the [__CURSOR field][cursor]. See also [sd_journal_get_cursor][get_cursor]. + + [cursor]: https://www.freedesktop.org/software/systemd/man/latest/systemd.journal-fields.html#Address%20Fields + [get_cursor]: https://www.freedesktop.org/software/systemd/man/latest/sd_journal_get_cursor.html + """ + required: false + type: bool: default: false + } exclude_matches: { description: """ A list of sets of field/value pairs that, if any are present in a journal entry,