Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
796def9
Add initial version of Cached
StephenWakely May 25, 2023
b205354
Remove the deadlock from writing to the cache after reading
StephenWakely May 26, 2023
d0c36ed
Insert source and service tag into events
StephenWakely May 26, 2023
ec666b0
Add bytesize count grouped by source and service
StephenWakely May 30, 2023
3e06791
Add EventCountTags trait
StephenWakely May 30, 2023
3a5fe02
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely May 31, 2023
15ea497
Fix merge
StephenWakely May 31, 2023
d66654c
Fix compile errors
StephenWakely May 31, 2023
d1c7df6
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 1, 2023
c895653
Set source and service tags for most other sinks
StephenWakely Jun 1, 2023
d28a809
Register the events with a trait rather than a Fn
StephenWakely Jun 2, 2023
4ade9be
These tests are round trip
StephenWakely Jun 2, 2023
3632fa2
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 5, 2023
4d9c5df
Clippy
StephenWakely Jun 5, 2023
c9640d9
Add event count tags for loki sink
StephenWakely Jun 5, 2023
a758704
RegisterEvent inherits form RegisterInternalEvent
StephenWakely Jun 6, 2023
5cbfee4
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 6, 2023
9eab386
Added telemetry options
StephenWakely Jun 7, 2023
ee50bf5
Only collect configured tags
StephenWakely Jun 8, 2023
9fd7854
Added tests and clippy.
StephenWakely Jun 8, 2023
01d8ad1
Little tidy
StephenWakely Jun 9, 2023
87ca474
TaggedEventsSent doesn't need Output
StephenWakely Jun 9, 2023
ebd10c8
Spelling
StephenWakely Jun 9, 2023
b30a4bb
Remove default impl of take_metadata
StephenWakely Jun 9, 2023
8b74470
Outer event should get tags from inner event type
StephenWakely Jun 9, 2023
b11c7a6
Driver should not consume the metadata
StephenWakely Jun 9, 2023
3327904
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 9, 2023
7a43045
Tags should be an associated type of RegisterEvent
StephenWakely Jun 9, 2023
f43af1b
Feedback from Bruce
StephenWakely Jun 15, 2023
b141f30
Merge remote-tracking branch 'origin' into stephen/cached_events
StephenWakely Jun 19, 2023
8ac7020
Set source tag to be Arc<ComponentKey>
StephenWakely Jun 19, 2023
243ec8b
Replace take_metadata with metadata_mut
StephenWakely Jun 21, 2023
574cbae
Add test for telemetry tags to Kafka sink
StephenWakely Jun 21, 2023
73131e6
Fix datadog integration test
StephenWakely Jun 21, 2023
4dee27c
Use Derivative to replace clone with no bounds
StephenWakely Jun 23, 2023
f1af663
Spelling
StephenWakely Jun 23, 2023
8f0cc1b
Feedback from Bruce
StephenWakely Jun 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ impl DriverResponse for BasicResponse {
EventStatus::Delivered
}

fn events_sent(&self) -> CountByteSize {
fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size)
CountByteSize(1, self.byte_size).into()
}
}
```
Expand Down
62 changes: 62 additions & 0 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,65 @@ impl From<Output> for EventsSent {
Self { output: output.0 }
}
}

crate::registered_event!(
TaggedEventsSent {
output: Option<SharedString>,
source: Option<String>,
service: Option<String>,
} => {
events: Counter = if let Some(output) = &self.output {
register_counter!("component_sent_events_total", "output" => output.clone(),
"source" => self.source.clone().unwrap_or("-".to_string()),
"service" => self.service.clone().unwrap_or("-".to_string()))
} else {
register_counter!("component_sent_events_total",
"source" => self.source.clone().unwrap_or("-".to_string()),
"service" => self.service.clone().unwrap_or("-".to_string()))
},
events_out: Counter = if let Some(output) = &self.output {
register_counter!("events_out_total", "output" => output.clone())
} else {
register_counter!("events_out_total")
},
Comment on lines +78 to +83

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad the macro doesn't let you make one call to make_tags and reuse it for both of these counters. Good thing it's just a setup function.

event_bytes: Counter = if let Some(output) = &self.output {
register_counter!("component_sent_event_bytes_total",
"output" => output.clone(),
"source" => self.source.clone().unwrap_or("-".to_string()),
"service" => self.service.clone().unwrap_or("-".to_string()))
} else {
register_counter!("component_sent_event_bytes_total",
"source" => self.source.clone().unwrap_or("-".to_string()),
"service" => self.service.clone().unwrap_or("-".to_string()))
},
output: Option<SharedString> = self.output,
}

fn emit(&self, data: CountByteSize) {
let CountByteSize(count, byte_size) = data;

match &self.output {
Some(output) => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size, output = %output);
}
None => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size);
}
}

self.events.increment(count as u64);
self.events_out.increment(count as u64);
self.event_bytes.increment(byte_size as u64);
}
);

impl TaggedEventsSent {
#[must_use]
pub fn new(source: Option<String>, service: Option<String>, output: Output) -> Self {
Self {
output: output.0,
source: source.map(|source| source.to_string()),
service: service.map(|service| service.to_string()),
}
}
}
60 changes: 58 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ mod events_sent;
mod prelude;
pub mod service;

use std::{
collections::BTreeMap,
ops::{Add, AddAssign},
sync::{Arc, RwLock},
};

pub use metrics::SharedString;

pub use bytes_received::BytesReceived;
pub use bytes_sent::BytesSent;
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
pub use events_received::EventsReceived;
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use events_sent::{EventsSent, TaggedEventsSent, DEFAULT_OUTPUT};
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

Expand Down Expand Up @@ -107,9 +113,24 @@ pub struct ByteSize(pub usize);
pub struct Count(pub usize);

/// Holds the tuple `(count_of_events, size_of_events_in_bytes)`.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
pub struct CountByteSize(pub usize, pub usize);

impl AddAssign for CountByteSize {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here because the struct name didn't actually change in this PR, but: it's kind of unfortunate we never changed this to CountJsonByteSize or CountJsonSize.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like an easy enough find and replace fix?

fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
self.1 += rhs.1;
}
}

impl Add<CountByteSize> for CountByteSize {
type Output = CountByteSize;

fn add(self, rhs: CountByteSize) -> Self::Output {
CountByteSize(self.0 + rhs.0, self.1 + rhs.1)
}
}

// Wrapper types used to hold parameters for registering events

pub struct Output(pub Option<SharedString>);
Expand Down Expand Up @@ -224,3 +245,38 @@ macro_rules! registered_event {
}
};
}

#[derive(Clone)]
pub struct Cached<Tags, Event, Register> {
cache: Arc<RwLock<BTreeMap<Tags, Event>>>,
register: Register,
}

impl<Tags, Event, Register, Data> Cached<Tags, Event, Register>
where
Data: Sized,
Register: Fn(&Tags) -> Event,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This register function should likely be co-located with the event and not the cache. Could it be made a trait on the event type, potentially generated by a macro (maybe the same registered_event thing)?

Event: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
{
pub fn new(register: Register) -> Self {
Self {
cache: Arc::new(RwLock::new(BTreeMap::new())),
register,
}
}

pub fn emit(&self, tags: &Tags, value: Data) {
let read = self.cache.read().unwrap();
if let Some(event) = read.get(tags) {
event.emit(value);
} else {
let event = (self.register)(tags);
event.emit(value);

// Ensure the read lock is dropped so we can write.
drop(read);
self.cache.write().unwrap().insert(tags.clone(), event);
}
}
}
81 changes: 73 additions & 8 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,79 @@
use std::collections::HashMap;
use std::ops::Add;

use crate::internal_event::CountByteSize;

pub type EventCountTags = (Option<String>, Option<String>);

pub trait GetEventCountTags {
fn get_tags(&self) -> EventCountTags;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to see some doc text on what these are about. Also, could the EventCountTags just take a reference to the strings bound to the lifetime of &self to avoid unneeded cloning?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes absolutely. I'll update the PR description to indicate what I still thinks need doing.

}

/// Struct that keeps track of the estimated json size of a given
/// batch of events by source and service.
#[derive(Clone, Debug, Default)]
pub struct RequestCountByteSize {
sizes: HashMap<EventCountTags, CountByteSize>,
}

impl RequestCountByteSize {
pub fn sizes(&self) -> &HashMap<EventCountTags, CountByteSize> {
&self.sizes
}

pub fn add_event<E>(&mut self, event: &E, json_size: usize)
where
E: GetEventCountTags,
{
let size = CountByteSize(1, json_size);
let tags = event.get_tags();

match self.sizes.get_mut(&tags) {
Some(current) => {
*current += size;
}
None => {
self.sizes.insert(tags, size);
}
}
}
}

impl From<CountByteSize> for RequestCountByteSize {
fn from(value: CountByteSize) -> Self {
let mut sizes = HashMap::new();
sizes.insert((None, None), value);

Self { sizes }
}
}

impl<'a> Add<&'a RequestCountByteSize> for RequestCountByteSize {
type Output = RequestCountByteSize;

fn add(mut self, other: &'a Self::Output) -> Self::Output {
for (key, value) in &other.sizes {
match self.sizes.get_mut(&key) {
Some(size) => *size += *value,
None => {
self.sizes.insert(key.clone(), *value);
}
}
}

Self { sizes: self.sizes }
}
}

/// Metadata for batch requests.
#[derive(Clone, Copy, Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct RequestMetadata {
/// Number of events represented by this batch request.
event_count: usize,
/// Size, in bytes, of the in-memory representation of all events in this batch request.
events_byte_size: usize,
/// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: RequestCountByteSize,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little weird that we have this field which might be grouped, might not, while the others are never grouped.

Not really sure, as I type this comment, what a better solution would be but it does feel like it adds on more complexity to what was already (admittedly, given that I designed it) an already somewhat opaque type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is unfortunate primarily because this struct can no longer be Copy,

  • which means we now have to have a take_metadata function on the MetaDescriptive trait so that we don't have to clone the object,
  • which then means that we have to be very careful we don't access the metadata twice on that object because the second time the real values will no longer be there.

A very elusive bug could creep in.

I'm all ears for a better solution.

/// Uncompressed size, in bytes, of the encoded events in this batch request.
request_encoded_size: usize,
/// On-the-wire size, in bytes, of the batch request itself after compression, etc.
Expand All @@ -25,7 +90,7 @@ impl RequestMetadata {
events_byte_size: usize,
request_encoded_size: usize,
request_wire_size: usize,
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: RequestCountByteSize,
) -> Self {
Self {
event_count,
Expand All @@ -47,8 +112,8 @@ impl RequestMetadata {
}

#[must_use]
pub const fn events_estimated_json_encoded_byte_size(&self) -> usize {
self.events_estimated_json_encoded_byte_size
pub fn events_estimated_json_encoded_byte_size(&self) -> &RequestCountByteSize {
&self.events_estimated_json_encoded_byte_size
}

#[must_use]
Expand All @@ -64,7 +129,7 @@ impl RequestMetadata {
/// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
#[must_use]
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0);
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, Default::default());

for metadata in metadata_iter {
metadata_sum = metadata_sum + &metadata;
Expand All @@ -82,7 +147,7 @@ impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
event_count: self.event_count + other.event_count,
events_byte_size: self.events_byte_size + other.events_byte_size,
events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size
+ other.events_estimated_json_encoded_byte_size,
+ &other.events_estimated_json_encoded_byte_size,
request_encoded_size: self.request_encoded_size + other.request_encoded_size,
request_wire_size: self.request_wire_size + other.request_wire_size,
}
Expand All @@ -92,5 +157,5 @@ impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
/// Objects implementing this trait have metadata that describes the request.
pub trait MetaDescriptive {
/// Returns the `RequestMetadata` associated with this object.
fn get_metadata(&self) -> RequestMetadata;
fn get_metadata(&self) -> &RequestMetadata;
}
14 changes: 13 additions & 1 deletion lib/vector-core/src/event/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use std::{
};

use chrono::{DateTime, Utc};
use vector_common::EventDataEq;
use vector_common::{
request_metadata::{EventCountTags, GetEventCountTags},
EventDataEq,
};
use vector_config::configurable_component;

use crate::{
Expand Down Expand Up @@ -476,6 +479,15 @@ impl Finalizable for Metric {
}
}

impl GetEventCountTags for Metric {
fn get_tags(&self) -> EventCountTags {
let source = self.metadata().source_id().map(|output| output.to_string());
// Metrics do not contain a service field.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an intentional choice? I don't see a real-life reason why metrics couldn't be associated with a service.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only Log events can have schemas. Once we expand it so that a meaning can point to a metric tag then we will be able to extract the service here.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double check that we will be emitting the service tag for logs, metrics, and traces. If we need to follow up on metrics and traces we can do so, but I think we do need to do that before calling this project "finished".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't consider the service tag to be part of the schema - just thought we'd add a metric tag of service: foo


(source, None)
}
}

/// Metric kind.
///
/// Metrics can be either absolute of incremental. Absolute metrics represent a sort of "last write wins" scenario,
Expand Down
20 changes: 19 additions & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ pub use r#ref::{EventMutRef, EventRef};
use serde::{Deserialize, Serialize};
pub use trace::TraceEvent;
use vector_buffers::EventCount;
use vector_common::{finalization, EventDataEq};
use vector_common::{
finalization,
request_metadata::{EventCountTags, GetEventCountTags},
EventDataEq,
};
pub use vrl::value::Value;
#[cfg(feature = "vrl")]
pub use vrl_target::{TargetEvents, VrlTarget};
Expand Down Expand Up @@ -90,6 +94,20 @@ impl Finalizable for Event {
}
}

impl GetEventCountTags for Event {
fn get_tags(&self) -> EventCountTags {
let source = self.metadata().source_id().map(|id| id.to_string());
let service = if let Event::Log(log) = self {
log.get_by_meaning("service")
.map(|service| service.to_string())
} else {
None
};

(source, service)
}
}

impl Event {
/// Return self as a `LogEvent`
///
Expand Down
Loading