Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/tutorials/sinks/1_basic_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the
sink's behaviour.

```rust
#[configurable_component(sink("basic", "Basic sink."))]
#[configurable_component(sink("basic"))]
#[derive(Clone, Debug)]
/// A basic sink that dumps its output to stdout.
pub struct BasicConfig {
Expand Down
49 changes: 36 additions & 13 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
http::HttpClient,
internal_events::SinkRequestBuildError,
};
use vector_core::config::telemetry;
use bytes::Bytes;
```

Expand Down Expand Up @@ -81,12 +82,12 @@ struct BasicEncoder;
The Encoder must implement the [`Encoder`][encoder] trait:

```rust
impl Encoder<Event> for BasicEncoder {
impl encoding::Encoder<Event> for BasicEncoder {
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
}
}
```
Expand All @@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each
event is processed in some way prior to encoding.

[`encode_input`][encoder_encode_input] serializes the event to a String and
writes these bytes:
writes these bytes. The function also creates a [`GroupedCountByteSize`]
[grouped_count_byte_size] object. This object tracks the size of the event
that is sent by the sink, optionally grouped by the source and service that
originated the event if Vector has been configured to do so. It is necessary to
calculate the sizes in this function since the encode function sometimes drops
fields from the event prior to encoding. We need the size to be calculated after
these fields have been dropped.

```rust
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<usize> {
) -> std::io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());

let event = serde_json::to_string(&input).unwrap();
write_all(writer, 1, event.as_bytes()).map(|()| event.len())
write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size))
}
```

Expand Down Expand Up @@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields:

```rust
impl MetaDescriptive for BasicRequest {
fn get_metadata(&self) -> RequestMetadata {
self.metadata
fn get_metadata(&self) -> &RequestMetadata {
&self.metadata
}

fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.metadata
}
}

Expand Down Expand Up @@ -249,7 +263,7 @@ when sending the event to an `amqp` server.
mut input: Event,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = input.take_finalizers();
let metadata_builder = RequestMetadataBuilder::from_events(&input);
let metadata_builder = RequestMetadataBuilder::from_event(&input);
(finalizers, metadata_builder, input)
}
```
Expand Down Expand Up @@ -338,7 +352,12 @@ that will be invoked to send the actual data.
match client.call(req).await {
Ok(response) => {
if response.status().is_success() {
Ok(BasicResponse { byte_size })
Ok(BasicResponse {
byte_size,
json_size: request
.metadata
.into_events_estimated_json_encoded_byte_size(),
})
} else {
Err("received error response")
}
Expand All @@ -359,18 +378,21 @@ The return from our service must be an object that implements the
```rust
struct BasicResponse {
byte_size: usize,
json_size: GroupedCountByteSize,
}

impl DriverResponse for BasicResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
}

fn events_sent(&self) -> RequestCountByteSize {
// (events count, byte size)
CountByteSize(1, self.byte_size).into()
fn events_sent(&self) -> &GroupedCountByteSize {
&self.json_size
}
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.byte_size)
}}
```

Vector calls the methods in this trait to determine if the event was delivered successfully.
Expand Down Expand Up @@ -492,3 +514,4 @@ BODY:
[sinkbuilder_ext_into_driver]: https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver
[stream_filter_map]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map
[driver]: https://rust-doc.vector.dev/vector_core/stream/struct.driver
[grouped_count_byte_size]: https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize