From 0bc6bb5108ec15eb2a8dc0a4c4ef23fcbce55fb2 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 9 Nov 2023 14:18:01 +0000 Subject: [PATCH 1/3] Update to use the global list of compression algorithms Signed-off-by: Stephen Wakely --- src/sinks/loki/config.rs | 42 ++----------------- src/sinks/loki/service.rs | 9 ++-- src/sinks/loki/sink.rs | 14 ++----- .../reference/components/sinks/base/loki.cue | 12 ++++-- 4 files changed, 19 insertions(+), 58 deletions(-) diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index f72c68c964007..bdded3c2cca35 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -9,42 +9,8 @@ use crate::{ sinks::{prelude::*, util::UriSerde}, }; -/// Loki-specific compression. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum ExtendedCompression { - /// Snappy compression. - /// - /// This implies sending push requests as Protocol Buffers. - #[serde(rename = "snappy")] - Snappy, -} - -/// Compression configuration. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[serde(untagged)] -pub enum CompressionConfigAdapter { - /// Basic compression. - Original(Compression), - - /// Loki-specific compression. - Extended(ExtendedCompression), -} - -impl CompressionConfigAdapter { - pub const fn content_encoding(self) -> Option<&'static str> { - match self { - CompressionConfigAdapter::Original(compression) => compression.content_encoding(), - CompressionConfigAdapter::Extended(_) => Some("snappy"), - } - } -} - -impl Default for CompressionConfigAdapter { - fn default() -> Self { - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) - } +const fn default_compression() -> Compression { + Compression::Snappy } fn default_loki_path() -> String { @@ -107,8 +73,8 @@ pub struct LokiConfig { pub remove_timestamp: bool, #[configurable(derived)] - #[serde(default)] - pub compression: CompressionConfigAdapter, + #[serde(default = "default_compression")] + pub compression: Compression, #[configurable(derived)] #[serde(default)] diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index edcc762042fba..2a4c33280732f 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -5,7 +5,6 @@ use http::StatusCode; use snafu::Snafu; use tracing::Instrument; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::{ http::{Auth, HttpClient}, sinks::{prelude::*, util::UriSerde}, @@ -60,7 +59,7 @@ impl DriverResponse for LokiResponse { #[derive(Clone)] pub struct LokiRequest { - pub compression: CompressionConfigAdapter, + pub compression: Compression, pub finalizers: EventFinalizers, pub payload: Bytes, pub tenant_id: Option, @@ -113,10 +112,8 @@ impl Service for LokiService { fn call(&mut self, request: LokiRequest) -> Self::Future { let content_type = match request.compression { - CompressionConfigAdapter::Original(_) => "application/json", - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - "application/x-protobuf" - } + Compression::Snappy => "application/x-protobuf", + _ => "application/json", }; let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index a9c1a1cc5b092..e96ec4b5520ca 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -12,7 +12,6 @@ use super::{ event::{LokiBatchEncoder, LokiEvent, LokiRecord, PartitionKey}, service::{LokiRequest, LokiRetryLogic, LokiService}, }; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::sinks::loki::event::LokiBatchEncoding; use crate::{ http::{get_http_scheme_from_uri, HttpClient}, @@ -65,7 +64,7 @@ impl Partitioner for RecordPartitioner { #[derive(Clone)] pub struct LokiRequestBuilder { - compression: CompressionConfigAdapter, + compression: Compression, encoder: LokiBatchEncoder, } @@ -92,10 +91,7 @@ impl RequestBuilder<(PartitionKey, Vec)> for LokiRequestBuilder { type Error = RequestBuildError; fn compression(&self) -> Compression { - match self.compression { - CompressionConfigAdapter::Original(compression) => compression, - CompressionConfigAdapter::Extended(_) => Compression::None, - } + self.compression } fn encoder(&self) -> &Self::Encoder { @@ -415,10 +411,8 @@ impl LokiSink { let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let batch_encoder = match config.compression { - CompressionConfigAdapter::Original(_) => LokiBatchEncoder(LokiBatchEncoding::Json), - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - LokiBatchEncoder(LokiBatchEncoding::Protobuf) - } + Compression::Snappy => LokiBatchEncoder(LokiBatchEncoding::Protobuf), + _ => LokiBatchEncoder(LokiBatchEncoding::Json), }; Ok(Self { diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index c44bc390a0c1a..7b2480cba5256 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -110,8 +110,12 @@ base: components: sinks: loki: configuration: { } } compression: { - description: "Compression configuration." - required: false + description: """ + Compression configuration. + + All compression algorithms use the default compression level unless otherwise specified. + """ + required: false type: string: { default: "snappy" enum: { @@ -122,9 +126,9 @@ base: components: sinks: loki: configuration: { """ none: "No compression." snappy: """ - Snappy compression. + [Snappy][snappy] compression. - This implies sending push requests as Protocol Buffers. + [snappy]: https://github.com/google/snappy/blob/main/docs/README.md """ zlib: """ [Zlib][zlib] compression. From 9b44c2debcd8a0c90d6dec003247f71e00c34741 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 9 Nov 2023 14:48:01 +0000 Subject: [PATCH 2/3] Mention snappy compression using protocol buffers Signed-off-by: Stephen Wakely --- src/sinks/loki/config.rs | 3 ++- website/cue/reference/components/sinks/base/loki.cue | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index bdded3c2cca35..74ce254793ece 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -72,7 +72,8 @@ pub struct LokiConfig { #[serde(default = "crate::serde::default_true")] pub remove_timestamp: bool, - #[configurable(derived)] + /// Compression configuration. + /// Snappy compression implies sending push requests as Protocol Buffers. #[serde(default = "default_compression")] pub compression: Compression, diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index 7b2480cba5256..6b381045937ed 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -112,8 +112,7 @@ base: components: sinks: loki: configuration: { compression: { description: """ Compression configuration. - - All compression algorithms use the default compression level unless otherwise specified. + Snappy compression implies sending push requests as Protocol Buffers. """ required: false type: string: { From 852943804f354687c9a4bf5c7aca7c50f616811f Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 9 Nov 2023 17:12:58 +0000 Subject: [PATCH 3/3] Snappy is compressed separately Signed-off-by: Stephen Wakely --- src/sinks/loki/sink.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index e96ec4b5520ca..ac5fe5fe98baf 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -91,7 +91,13 @@ impl RequestBuilder<(PartitionKey, Vec)> for LokiRequestBuilder { type Error = RequestBuildError; fn compression(&self) -> Compression { - self.compression + if self.compression == Compression::Snappy { + // Snappy compression is applied after converting the batch to protobuf so + // we need to handle this separately. + Compression::None + } else { + self.compression + } } fn encoder(&self) -> &Self::Encoder {