Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ derivative.workspace = true
derive_more = { version = "2.0.1", optional = true, features = ["from", "display"] }
dyn-clone = { version = "1", default-features = false }
flate2.workspace = true
futures.workspace = true
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
memchr = { version = "2", default-features = false }
metrics.workspace = true
opentelemetry-proto = { path = "../opentelemetry-proto", optional = true }
ordered-float.workspace = true
prost.workspace = true
Expand All @@ -46,11 +48,13 @@ tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
vrl.workspace = true
vector-common = { path = "../vector-common", default-features = false }
vector-common-macros.workspace = true
vector-config = { path = "../vector-config", default-features = false }
vector-config-macros = { path = "../vector-config-macros", default-features = false }
vector-core = { path = "../vector-core", default-features = false, features = ["vrl"] }
vector-vrl-functions.workspace = true
toml = { version = "0.9.8", optional = true }

[dev-dependencies]
futures.workspace = true
indoc.workspace = true
Expand All @@ -66,3 +70,4 @@ vrl.workspace = true
arrow = []
opentelemetry = ["dep:opentelemetry-proto"]
syslog = ["dep:syslog_loose", "dep:strum", "dep:derive_more", "dep:serde-aux", "dep:toml"]
test = []
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use serde::{Deserialize, Serialize};
use vector_lib::{
codecs::decoding::{DeserializerConfig, FramingConfig},
config::LogNamespace,
};
use vector_core::config::LogNamespace;

use crate::codecs::Decoder;
use crate::decoding::{Decoder, DeserializerConfig, FramingConfig};

/// Config used to build a `Decoder`.
#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -43,7 +40,7 @@ impl DecodingConfig {
}

/// Builds a `Decoder` from the provided configuration.
pub fn build(&self) -> vector_lib::Result<Decoder> {
pub fn build(&self) -> vector_common::Result<Decoder> {
// Build the framer.
let framer = self.framing.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use bytes::{Bytes, BytesMut};
use smallvec::SmallVec;
use vector_lib::{
codecs::decoding::{
BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
format::Deserializer as _,
},
config::LogNamespace,
};
use vector_common::internal_event::emit;
use vector_core::{config::LogNamespace, event::Event};

use crate::{
event::Event,
decoding::format::Deserializer as _,
decoding::{
BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
},
internal_events::{DecoderDeserializeError, DecoderFramingError},
};

type DecodedFrame = (SmallVec<[Event; 1]>, usize);

/// A decoder that can decode structured events from a byte stream / byte
/// messages.
#[derive(Clone)]
Expand Down Expand Up @@ -60,9 +60,9 @@ impl Decoder {
fn handle_framing_result(
&mut self,
frame: Result<Option<Bytes>, BoxedFramingError>,
) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
) -> Result<Option<DecodedFrame>, Error> {
let frame = frame.map_err(|error| {
emit!(DecoderFramingError { error: &error });
emit(DecoderFramingError { error: &error });
Error::FramingError(error)
})?;

Expand All @@ -72,22 +72,22 @@ impl Decoder {
}

/// Parses a frame using the included deserializer, and handles any errors by logging.
pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
pub fn deserializer_parse(&self, frame: Bytes) -> Result<DecodedFrame, Error> {
let byte_size = frame.len();

// Parse structured events from the byte frame.
self.deserializer
.parse(frame, self.log_namespace)
.map(|events| (events, byte_size))
.map_err(|error| {
emit!(DecoderDeserializeError { error: &error });
emit(DecoderDeserializeError { error: &error });
Error::ParsingError(error)
})
}
}

impl tokio_util::codec::Decoder for Decoder {
type Item = (SmallVec<[Event; 1]>, usize);
type Item = DecodedFrame;
type Error = Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -106,13 +106,13 @@ mod tests {
use bytes::Bytes;
use futures::{StreamExt, stream};
use tokio_util::{codec::FramedRead, io::StreamReader};
use vector_lib::codecs::{
JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
decoding::{Deserializer, Framer},
};
use vrl::value::Value;

use super::Decoder;
use crate::{
JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
decoding::{Deserializer, Framer},
};

#[tokio::test]
async fn framed_read_recover_from_error() {
Expand Down
4 changes: 4 additions & 0 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! A collection of support structures that are used in the process of decoding
//! bytes into events.

mod config;
mod decoder;
mod error;
pub mod format;
pub mod framing;

use std::fmt::Debug;

use bytes::{Bytes, BytesMut};
pub use config::DecodingConfig;
pub use decoder::Decoder;
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::codecs::{Encoder, EncoderKind, Transformer};
use vector_lib::{
codecs::{
CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
},
configurable::configurable_component,
use vector_config::configurable_component;

use super::{Encoder, EncoderKind, Transformer};
use crate::encoding::{
CharacterDelimitedEncoder, Framer, FramingConfig, LengthDelimitedEncoder,
NewlineDelimitedEncoder, Serializer, SerializerConfig,
};

#[cfg(feature = "codecs-opentelemetry")]
use vector_lib::codecs::BytesEncoder;
#[cfg(feature = "opentelemetry")]
use crate::encoding::BytesEncoder;

/// Encoding configuration.
#[configurable_component]
Expand Down Expand Up @@ -43,7 +42,7 @@ impl EncodingConfig {
}

/// Build the `Serializer` for this config.
pub fn build(&self) -> crate::Result<Serializer> {
pub fn build(&self) -> vector_common::Result<Serializer> {
self.encoding.build()
}
}
Expand Down Expand Up @@ -100,7 +99,7 @@ impl EncodingConfigWithFraming {
}

/// Build the `Framer` and `Serializer` for this config.
pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
pub fn build(&self, sink_type: SinkType) -> vector_common::Result<(Framer, Serializer)> {
let framer = self.framing.as_ref().map(|framing| framing.build());
let serializer = self.encoding.build()?;

Expand Down Expand Up @@ -132,17 +131,20 @@ impl EncodingConfigWithFraming {
| Serializer::RawMessage(_)
| Serializer::Text(_),
) => NewlineDelimitedEncoder::default().into(),
#[cfg(feature = "codecs-syslog")]
#[cfg(feature = "syslog")]
(None, Serializer::Syslog(_)) => NewlineDelimitedEncoder::default().into(),
#[cfg(feature = "codecs-opentelemetry")]
#[cfg(feature = "opentelemetry")]
(None, Serializer::Otlp(_)) => BytesEncoder.into(),
};

Ok((framer, serializer))
}

/// Build the `Transformer` and `EncoderKind` for this config.
pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> {
pub fn build_encoder(
&self,
sink_type: SinkType,
) -> vector_common::Result<(Transformer, EncoderKind)> {
let (framer, serializer) = self.build(sink_type)?;
let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
Ok((self.transformer(), encoder))
Expand Down Expand Up @@ -172,10 +174,10 @@ where

#[cfg(test)]
mod test {
use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
use lookup::lookup_v2::{ConfigValuePath, parse_value_path};

use super::*;
use crate::codecs::encoding::TimestampFormat;
use crate::encoding::TimestampFormat;

#[test]
fn deserialize_encoding_config() {
Expand Down
Loading
Loading