Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/vector-common/src/internal_event/cached_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::BTreeMap,
collections::HashMap,
hash::Hash,
sync::{Arc, RwLock},
};

Expand All @@ -22,7 +23,7 @@ pub struct RegisteredEventCache<T, Event: RegisterTaggedInternalEvent> {
fixed_tags: T,
cache: Arc<
RwLock<
BTreeMap<
HashMap<
Comment on lines -25 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a motivation to prefer one over the other? I don't disagree with this change, just wondering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I was experimenting with removing the Ord and then just didn't change this back when I realised I needed the Ord anyway. I can't think of any clear reason why one would be better than the other.

<Event as RegisterTaggedInternalEvent>::Tags,
<Event as RegisterInternalEvent>::Handle,
>,
Expand All @@ -48,7 +49,7 @@ impl<Event, EventHandle, Data, Tags, FixedTags> RegisteredEventCache<FixedTags,
where
Data: Sized,
EventHandle: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
Tags: Clone + Eq + Hash,
FixedTags: Clone,
Event: RegisterInternalEvent<Handle = EventHandle>
+ RegisterTaggedInternalEvent<Tags = Tags, Fixed = FixedTags>,
Expand Down
22 changes: 14 additions & 8 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use metrics::{register_counter, Counter};
use tracing::trace;

use crate::{config::ComponentKey, request_metadata::EventCountTags};
use crate::config::ComponentKey;

use super::{CountByteSize, OptionalTag, Output, SharedString};

Expand Down Expand Up @@ -91,19 +91,25 @@ crate::registered_event!(
self.event_bytes.increment(byte_size.get() as u64);
}

fn register(_fixed: (), tags: EventCountTags) {
super::register(TaggedEventsSent::new(
tags,
))
fn register(_fixed: (), tags: TaggedEventsSent) {
super::register(tags)
}
);

impl TaggedEventsSent {
#[must_use]
pub fn new(tags: EventCountTags) -> Self {
pub fn new_empty() -> Self {
Self {
source: tags.source,
service: tags.service,
source: OptionalTag::Specified(None),
service: OptionalTag::Specified(None),
}
}

#[must_use]
pub fn new_unspecified() -> Self {
Self {
source: OptionalTag::Ignored,
service: OptionalTag::Ignored,
}
}
}
4 changes: 2 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,15 @@ impl From<Protocol> for SharedString {
macro_rules! registered_event {
// A registered event struct with no fields (zero-sized type).
($event:ident => $($tail:tt)*) => {
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct $event;

$crate::registered_event!(=> $event $($tail)*);
};

// A normal registered event struct.
($event:ident { $( $field:ident: $type:ty, )* } => $($tail:tt)*) => {
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct $event {
$( pub $field: $type, )*
}
Expand Down
62 changes: 20 additions & 42 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,18 @@
use std::collections::HashMap;
use std::ops::Add;
use std::{collections::HashMap, sync::Arc};

use crate::{
config::ComponentKey,
internal_event::{
CountByteSize, InternalEventHandle, OptionalTag, RegisterTaggedInternalEvent,
RegisteredEventCache,
CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache,
TaggedEventsSent,
},
json_size::JsonSize,
};

/// Tags that are used to group the events within a batch for emitting telemetry.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EventCountTags {
pub source: OptionalTag<Arc<ComponentKey>>,
pub service: OptionalTag<String>,
}
Comment on lines -14 to -18
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider making this a type alias instead? i.e. pub type EventCountTags = TaggedEventsSent;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did and did have that type alias until just before pushing the PR. I'm not sure I like type aliases that just rename the type, since it's just an extra level of indirection to work out what is going on.

Happy to put it back in if you feel strongly about it.


impl EventCountTags {
#[must_use]
pub fn new_empty() -> Self {
Self {
source: OptionalTag::Specified(None),
service: OptionalTag::Specified(None),
}
}

#[must_use]
pub fn new_unspecified() -> Self {
Self {
source: OptionalTag::Ignored,
service: OptionalTag::Ignored,
}
}
}

/// Must be implemented by events to get the tags that will be attached to
/// the `component_sent_event_*` emitted metrics.
pub trait GetEventCountTags {
fn get_tags(&self) -> EventCountTags;
fn get_tags(&self) -> TaggedEventsSent;
}

/// Keeps track of the estimated json size of a given batch of events by
Expand All @@ -48,7 +22,7 @@ pub enum GroupedCountByteSize {
/// When we need to keep track of the events by certain tags we use this
/// variant.
Tagged {
sizes: HashMap<EventCountTags, CountByteSize>,
sizes: HashMap<TaggedEventsSent, CountByteSize>,
},
/// If we don't need to track the events by certain tags we can use
/// this variant to avoid allocating a `HashMap`,
Expand Down Expand Up @@ -86,7 +60,7 @@ impl GroupedCountByteSize {
/// Returns `None` if we are not tracking by tags.
#[must_use]
#[cfg(test)]
pub fn sizes(&self) -> Option<&HashMap<EventCountTags, CountByteSize>> {
pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
match self {
Self::Tagged { sizes } => Some(sizes),
Self::Untagged { .. } => None,
Expand Down Expand Up @@ -131,7 +105,7 @@ impl GroupedCountByteSize {
/// Emits our counts to a `RegisteredEvent` cached event.
pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
where
T: RegisterTaggedInternalEvent<Tags = EventCountTags, Fixed = (), Handle = H>,
T: RegisterTaggedInternalEvent<Tags = TaggedEventsSent, Fixed = (), Handle = H>,
H: InternalEventHandle<Data = CountByteSize>,
{
match self {
Expand All @@ -141,7 +115,7 @@ impl GroupedCountByteSize {
}
}
GroupedCountByteSize::Untagged { size } => {
event_cache.emit(&EventCountTags::new_unspecified(), *size);
event_cache.emit(&TaggedEventsSent::new_unspecified(), *size);
}
}
}
Expand Down Expand Up @@ -177,21 +151,21 @@ impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {

// The following two scenarios shouldn't really occur in practice, but are provided for completeness.
(Self::Tagged { mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&EventCountTags::new_empty()) {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(EventCountTags::new_empty(), *size);
sizes.insert(TaggedEventsSent::new_empty(), *size);
}
}

Self::Tagged { sizes }
}
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = sizes.clone();
match sizes.get_mut(&EventCountTags::new_empty()) {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(EventCountTags::new_empty(), size);
sizes.insert(TaggedEventsSent::new_empty(), size);
}
}

Expand Down Expand Up @@ -307,6 +281,10 @@ pub trait MetaDescriptive {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::{config::ComponentKey, internal_event::OptionalTag};

use super::*;

struct DummyEvent {
Expand All @@ -315,8 +293,8 @@ mod tests {
}

impl GetEventCountTags for DummyEvent {
fn get_tags(&self) -> EventCountTags {
EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
TaggedEventsSent {
source: self.source.clone(),
service: self.service.clone(),
}
Expand Down Expand Up @@ -380,14 +358,14 @@ mod tests {
assert_eq!(
vec![
(
EventCountTags {
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("cabbage".to_string()).into()
},
CountByteSize(2, JsonSize::new(78))
),
(
EventCountTags {
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("tomato".to_string()).into()
},
Expand Down
8 changes: 4 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use lookup::lookup_v2::TargetPath;
use lookup::PathPrefix;
use serde::{Deserialize, Serialize, Serializer};
use vector_common::{
internal_event::OptionalTag,
internal_event::{OptionalTag, TaggedEventsSent},
json_size::{JsonSize, NonZeroJsonSize},
request_metadata::{EventCountTags, GetEventCountTags},
request_metadata::GetEventCountTags,
EventDataEq,
};

Expand Down Expand Up @@ -215,7 +215,7 @@ impl EstimatedJsonEncodedSizeOf for LogEvent {
}

impl GetEventCountTags for LogEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
let source = if telemetry().tags().emit_source {
self.metadata().source_id().cloned().into()
} else {
Expand All @@ -230,7 +230,7 @@ impl GetEventCountTags for LogEvent {
OptionalTag::Ignored
};

EventCountTags { source, service }
TaggedEventsSent { source, service }
}
}

Expand Down
8 changes: 4 additions & 4 deletions lib/vector-core/src/event/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::{

use chrono::{DateTime, Utc};
use vector_common::{
internal_event::OptionalTag,
internal_event::{OptionalTag, TaggedEventsSent},
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
request_metadata::GetEventCountTags,
EventDataEq,
};
use vector_config::configurable_component;
Expand Down Expand Up @@ -483,7 +483,7 @@ impl Finalizable for Metric {
}

impl GetEventCountTags for Metric {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
let source = if telemetry().tags().emit_source {
self.metadata().source_id().cloned().into()
} else {
Expand All @@ -500,7 +500,7 @@ impl GetEventCountTags for Metric {
OptionalTag::Ignored
};

EventCountTags { source, service }
TaggedEventsSent { source, service }
}
}

Expand Down
9 changes: 3 additions & 6 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ use serde::{Deserialize, Serialize};
pub use trace::TraceEvent;
use vector_buffers::EventCount;
use vector_common::{
config::ComponentKey,
finalization,
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
EventDataEq,
config::ComponentKey, finalization, internal_event::TaggedEventsSent, json_size::JsonSize,
request_metadata::GetEventCountTags, EventDataEq,
};
pub use vrl::value::Value;
#[cfg(feature = "vrl")]
Expand Down Expand Up @@ -97,7 +94,7 @@ impl Finalizable for Event {
}

impl GetEventCountTags for Event {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
match self {
Event::Log(log) => log.get_tags(),
Event::Metric(metric) => metric.get_tags(),
Expand Down
5 changes: 2 additions & 3 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use lookup::lookup_v2::TargetPath;
use serde::{Deserialize, Serialize};
use vector_buffers::EventCount;
use vector_common::{
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};

Expand Down Expand Up @@ -149,7 +148,7 @@ impl AsMut<LogEvent> for TraceEvent {
}

impl GetEventCountTags for TraceEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.0.get_tags()
}
}
5 changes: 3 additions & 2 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::{io, io::Write};
use serde::Serialize;
use vector_buffers::EventCount;
use vector_common::{
internal_event::TaggedEventsSent,
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags, GroupedCountByteSize},
request_metadata::{GetEventCountTags, GroupedCountByteSize},
};
use vector_core::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};

Expand Down Expand Up @@ -51,7 +52,7 @@ impl EventCount for ProcessedEvent {
}

impl GetEventCountTags for ProcessedEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.log.get_tags()
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/loki/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct LokiRecord {
pub event: LokiEvent,
pub json_byte_size: JsonSize,
pub finalizers: EventFinalizers,
pub event_count_tags: EventCountTags,
pub event_count_tags: TaggedEventsSent,
}

impl ByteSizeOf for LokiRecord {
Expand Down Expand Up @@ -191,7 +191,7 @@ impl Finalizable for LokiRecord {
}

impl GetEventCountTags for LokiRecord {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.event_count_tags.clone()
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/sinks/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ pub use tower::{Service, ServiceBuilder};
pub use vector_buffers::EventCount;
pub use vector_common::{
finalization::{EventFinalizers, EventStatus, Finalizable},
internal_event::CountByteSize,
internal_event::{CountByteSize, TaggedEventsSent},
json_size::JsonSize,
request_metadata::{
EventCountTags, GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata,
},
request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata},
};
pub use vector_config::configurable_component;
pub use vector_core::{
Expand Down
Loading