Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,12 @@ libc = "0.2.147"
similar-asserts = "1.4.2"
proptest = "1.2"
quickcheck = "1.0.3"
lookup = { package = "vector-lookup", path = "lib/vector-lookup", features = ["test"] }
reqwest = { version = "0.11", features = ["json"] }
tempfile = "3.6.0"
test-generator = "0.3.1"
tokio-test = "0.4.2"
tokio = { version = "1.30.0", features = ["test-util"] }
tokio-test = "0.4.2"
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
wiremock = "0.5.19"
Expand Down
3 changes: 3 additions & 0 deletions lib/vector-lookup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ serde = { version = "1.0.183", default-features = false, features = ["derive", "
vector-config = { path = "../vector-config" }
vector-config-macros = { path = "../vector-config-macros" }
vrl.workspace = true

[features]
test = []
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ impl<'a> TargetPath<'a> for &'a ConfigTargetPath {
&self.0.path
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigTargetPath {
fn from(path: &str) -> Self {
ConfigTargetPath::try_from(path.to_string()).unwrap()
}
}
44 changes: 25 additions & 19 deletions src/transforms/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{future::ready, num::NonZeroUsize, pin::Pin};

use bytes::Bytes;
use futures::{Stream, StreamExt};
use lookup::lookup_v2::ConfigTargetPath;
use lru::LruCache;
use vector_config::configurable_component;
use vector_core::config::{clone_input_definitions, LogNamespace};
Expand Down Expand Up @@ -43,7 +44,7 @@ pub enum FieldMatchConfig {
docs::examples = "field1",
docs::examples = "parent.child_field"
))]
Vec<String>,
Vec<ConfigTargetPath>,
),

/// Matches events using all fields except for the ignored ones.
Expand All @@ -55,7 +56,7 @@ pub enum FieldMatchConfig {
docs::examples = "host",
docs::examples = "hostname"
))]
Vec<String>,
Vec<ConfigTargetPath>,
),
}

Expand Down Expand Up @@ -102,16 +103,16 @@ fn default_cache_config() -> CacheConfig {
// These aren't great defaults in that case, but hard-coding isn't much better since the
// structure can vary significantly. This should probably either become a required field
// in the future, or maybe the "semantic meaning" can be utilized here.
fn default_match_fields() -> Vec<String> {
fn default_match_fields() -> Vec<ConfigTargetPath> {
let mut fields = Vec::new();
if let Some(message_key) = log_schema().message_key() {
fields.push(message_key.to_string());
if let Some(message_key) = log_schema().message_key_target_path() {
fields.push(ConfigTargetPath(message_key.clone()));
}
if let Some(host_key) = log_schema().host_key() {
fields.push(host_key.to_string());
if let Some(host_key) = log_schema().host_key_target_path() {
fields.push(ConfigTargetPath(host_key.clone()));
}
if let Some(timestamp_key) = log_schema().timestamp_key() {
fields.push(timestamp_key.to_string());
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
fields.push(ConfigTargetPath(timestamp_key.clone()));
}
fields
}
Expand Down Expand Up @@ -197,7 +198,7 @@ type TypeId = u8;
#[derive(PartialEq, Eq, Hash)]
enum CacheEntry {
Match(Vec<Option<(TypeId, Bytes)>>),
Ignore(Vec<(String, TypeId, Bytes)>),
Ignore(Vec<(ConfigTargetPath, TypeId, Bytes)>),
Comment thread
pront marked this conversation as resolved.
Outdated
}

/// Assigns a unique number to each of the types supported by Event::Value.
Expand Down Expand Up @@ -244,7 +245,7 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
FieldMatchConfig::MatchFields(fields) => {
let mut entry = Vec::new();
for field_name in fields.iter() {
if let Some(value) = event.as_log().get(field_name.as_str()) {
if let Some(value) = event.as_log().get(field_name) {
entry.push(Some((type_id_for_value(value), value.coerce_to_bytes())));
} else {
entry.push(None);
Expand All @@ -257,12 +258,10 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {

if let Some(all_fields) = event.as_log().all_fields() {
Comment thread
pront marked this conversation as resolved.
Outdated
for (field_name, value) in all_fields {
if !fields.contains(&field_name) {
entry.push((
field_name,
type_id_for_value(value),
value.coerce_to_bytes(),
));
if let Ok(path) = ConfigTargetPath::try_from(field_name) {
Comment thread
pront marked this conversation as resolved.
Outdated
if !fields.contains(&path) {
entry.push((path, type_id_for_value(value), value.coerce_to_bytes()));
}
}
}
}
Expand All @@ -289,6 +288,7 @@ impl TaskTransform<Event> for Dedupe {
mod tests {
use std::{collections::BTreeMap, sync::Arc};

use lookup::lookup_v2::ConfigTargetPath;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use vector_common::config::ComponentKey;
Expand All @@ -309,7 +309,10 @@ mod tests {
crate::test_util::test_generate_config::<DedupeConfig>();
}

fn make_match_transform_config(num_events: usize, fields: Vec<String>) -> DedupeConfig {
fn make_match_transform_config(
num_events: usize,
fields: Vec<ConfigTargetPath>,
) -> DedupeConfig {
DedupeConfig {
cache: CacheConfig {
num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
Expand All @@ -318,7 +321,10 @@ mod tests {
}
}

fn make_ignore_transform_config(num_events: usize, given_fields: Vec<String>) -> DedupeConfig {
fn make_ignore_transform_config(
num_events: usize,
given_fields: Vec<ConfigTargetPath>,
) -> DedupeConfig {
// "message" and "timestamp" are added automatically to all Events
let mut fields = vec!["message".into(), "timestamp".into()];
fields.extend(given_fields);
Expand Down