diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index f72c68c964007..74ce254793ece 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -9,42 +9,8 @@ use crate::{ sinks::{prelude::*, util::UriSerde}, }; -/// Loki-specific compression. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum ExtendedCompression { - /// Snappy compression. - /// - /// This implies sending push requests as Protocol Buffers. - #[serde(rename = "snappy")] - Snappy, -} - -/// Compression configuration. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[serde(untagged)] -pub enum CompressionConfigAdapter { - /// Basic compression. - Original(Compression), - - /// Loki-specific compression. - Extended(ExtendedCompression), -} - -impl CompressionConfigAdapter { - pub const fn content_encoding(self) -> Option<&'static str> { - match self { - CompressionConfigAdapter::Original(compression) => compression.content_encoding(), - CompressionConfigAdapter::Extended(_) => Some("snappy"), - } - } -} - -impl Default for CompressionConfigAdapter { - fn default() -> Self { - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) - } +const fn default_compression() -> Compression { + Compression::Snappy } fn default_loki_path() -> String { @@ -106,9 +72,10 @@ pub struct LokiConfig { #[serde(default = "crate::serde::default_true")] pub remove_timestamp: bool, - #[configurable(derived)] - #[serde(default)] - pub compression: CompressionConfigAdapter, + /// Compression configuration. + /// Snappy compression implies sending push requests as Protocol Buffers. + #[serde(default = "default_compression")] + pub compression: Compression, #[configurable(derived)] #[serde(default)] diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index edcc762042fba..2a4c33280732f 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -5,7 +5,6 @@ use http::StatusCode; use snafu::Snafu; use tracing::Instrument; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::{ http::{Auth, HttpClient}, sinks::{prelude::*, util::UriSerde}, @@ -60,7 +59,7 @@ impl DriverResponse for LokiResponse { #[derive(Clone)] pub struct LokiRequest { - pub compression: CompressionConfigAdapter, + pub compression: Compression, pub finalizers: EventFinalizers, pub payload: Bytes, pub tenant_id: Option, @@ -113,10 +112,8 @@ impl Service for LokiService { fn call(&mut self, request: LokiRequest) -> Self::Future { let content_type = match request.compression { - CompressionConfigAdapter::Original(_) => "application/json", - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - "application/x-protobuf" - } + Compression::Snappy => "application/x-protobuf", + _ => "application/json", }; let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index a9c1a1cc5b092..ac5fe5fe98baf 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -12,7 +12,6 @@ use super::{ event::{LokiBatchEncoder, LokiEvent, LokiRecord, PartitionKey}, service::{LokiRequest, LokiRetryLogic, LokiService}, }; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::sinks::loki::event::LokiBatchEncoding; use crate::{ http::{get_http_scheme_from_uri, HttpClient}, @@ -65,7 +64,7 @@ impl Partitioner for RecordPartitioner { #[derive(Clone)] pub struct LokiRequestBuilder { - compression: CompressionConfigAdapter, + compression: Compression, encoder: LokiBatchEncoder, } @@ -92,9 +91,12 @@ impl RequestBuilder<(PartitionKey, Vec)> for LokiRequestBuilder { type Error = RequestBuildError; fn compression(&self) -> Compression { - match self.compression { - CompressionConfigAdapter::Original(compression) => compression, - CompressionConfigAdapter::Extended(_) => Compression::None, + if self.compression == Compression::Snappy { + // Snappy compression is applied after converting the batch to protobuf so + // we need to handle this separately. + Compression::None + } else { + self.compression } } @@ -415,10 +417,8 @@ impl LokiSink { let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let batch_encoder = match config.compression { - CompressionConfigAdapter::Original(_) => LokiBatchEncoder(LokiBatchEncoding::Json), - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - LokiBatchEncoder(LokiBatchEncoding::Protobuf) - } + Compression::Snappy => LokiBatchEncoder(LokiBatchEncoding::Protobuf), + _ => LokiBatchEncoder(LokiBatchEncoding::Json), }; Ok(Self { diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index c44bc390a0c1a..6b381045937ed 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -110,8 +110,11 @@ base: components: sinks: loki: configuration: { } } compression: { - description: "Compression configuration." - required: false + description: """ + Compression configuration. + Snappy compression implies sending push requests as Protocol Buffers. + """ + required: false type: string: { default: "snappy" enum: { @@ -122,9 +125,9 @@ base: components: sinks: loki: configuration: { """ none: "No compression." snappy: """ - Snappy compression. + [Snappy][snappy] compression. - This implies sending push requests as Protocol Buffers. + [snappy]: https://github.com/google/snappy/blob/main/docs/README.md """ zlib: """ [Zlib][zlib] compression.