Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ pub struct JournaldConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// Whether to emit the __CURSOR field
Comment thread
sproberts92 marked this conversation as resolved.
Outdated
#[serde(default = "crate::serde::default_false")]
emit_cursor: bool,
}

const fn default_batch_size() -> usize {
Expand Down Expand Up @@ -308,6 +312,7 @@ impl Default for JournaldConfig {
acknowledgements: Default::default(),
remap_priority: false,
log_namespace: None,
emit_cursor: false,
}
}
}
Expand Down Expand Up @@ -377,6 +382,7 @@ impl SourceConfig for JournaldConfig {
acknowledgements,
starter,
log_namespace,
emit_cursor: self.emit_cursor,
}
.run_shutdown(cx.shutdown),
))
Expand Down Expand Up @@ -404,6 +410,7 @@ struct JournaldSource {
acknowledgements: bool,
starter: StartJournalctl,
log_namespace: LogNamespace,
emit_cursor: bool,
}

impl JournaldSource {
Expand Down Expand Up @@ -554,7 +561,11 @@ impl<'a> Batch<'a> {
Some(Ok(bytes)) => {
match decode_record(&bytes, self.source.remap_priority) {
Ok(mut record) => {
if let Some(tmp) = record.remove(CURSOR) {
if self.source.emit_cursor {
if let Some(tmp) = record.get(CURSOR) {
self.cursor = Some(tmp.clone());
}
} else if let Some(tmp) = record.remove(CURSOR) {
self.cursor = Some(tmp);
}

Expand Down Expand Up @@ -1089,13 +1100,14 @@ mod tests {
async fn run_with_units(iunits: &[&str], xunits: &[&str], cursor: Option<&str>) -> Vec<Event> {
let include_matches = create_unit_matches(iunits.to_vec());
let exclude_matches = create_unit_matches(xunits.to_vec());
run_journal(include_matches, exclude_matches, cursor).await
run_journal(include_matches, exclude_matches, cursor, false).await
}

async fn run_journal(
include_matches: Matches,
exclude_matches: Matches,
checkpoint: Option<&str>,
emit_cursor: bool,
) -> Vec<Event> {
assert_source_compliance(&["protocol"], async move {
let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
Expand Down Expand Up @@ -1128,6 +1140,7 @@ mod tests {
data_dir: Some(tempdir),
remap_priority: true,
acknowledgements: false.into(),
emit_cursor,
..Default::default()
};
let source = config.build(cx).await.unwrap();
Expand Down Expand Up @@ -1207,10 +1220,18 @@ mod tests {
);
}

#[tokio::test]
async fn emits_cursor() {
let received = run_journal(Matches::new(), Matches::new(), None, true).await;
assert_eq!(cursor(&received[0]), Value::Bytes("1".into()));
assert_eq!(cursor(&received[3]), Value::Bytes("4".into()));
assert_eq!(cursor(&received[7]), Value::Bytes("8".into()));
}

#[tokio::test]
async fn includes_matches() {
let matches = create_matches(vec![("PRIORITY", "ERR")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 2);
assert_eq!(
message(&received[0]),
Expand All @@ -1227,7 +1248,7 @@ mod tests {
#[tokio::test]
async fn includes_kernel() {
let matches = create_matches(vec![("_TRANSPORT", "kernel")]);
let received = run_journal(matches, HashMap::new(), None).await;
let received = run_journal(matches, HashMap::new(), None, false).await;
assert_eq!(received.len(), 1);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140006000));
assert_eq!(message(&received[0]), Value::Bytes("audit log".into()));
Expand All @@ -1236,7 +1257,7 @@ mod tests {
#[tokio::test]
async fn excludes_matches() {
let matches = create_matches(vec![("PRIORITY", "INFO"), ("PRIORITY", "DEBUG")]);
let received = run_journal(HashMap::new(), matches, None).await;
let received = run_journal(HashMap::new(), matches, None, false).await;
assert_eq!(received.len(), 5);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140003000));
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140004000));
Expand Down Expand Up @@ -1515,6 +1536,10 @@ mod tests {
event.as_log()[log_schema().timestamp_key().unwrap().to_string()].clone()
}

fn cursor(event: &Event) -> Value {
event.as_log()[CURSOR].clone()
}

fn value_ts(secs: i64, usecs: u32) -> Value {
Value::Timestamp(
chrono::Utc
Expand Down