diff --git a/CHANGELOG.md b/CHANGELOG.md index 18fe8e4211..4d6962c38b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### New features * add collect postprocessor * add support for access tokens for aws connectors +* add support for brotli compression ### Fixes * fix reconnect issue in HTTP client diff --git a/Cargo.lock b/Cargo.lock index 4fd093dc55..6294c12679 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -1088,6 +1103,27 @@ dependencies = [ "serde_with", ] +[[package]] +name = "brotli" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6221fe77a248b9117d431ad93761222e1cf8ff282d9d1d5d9f53d6299a1cf76" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "1.9.1" @@ -6638,6 +6674,7 @@ name = "tremor-interceptor" version = "0.13.0-rc.22" dependencies = [ "anyhow", + "brotli", "byteorder", "bytes", "libflate", diff --git a/tremor-interceptor/Cargo.toml b/tremor-interceptor/Cargo.toml index c1034e05be..fe3edc0f65 100644 --- a/tremor-interceptor/Cargo.toml +++ b/tremor-interceptor/Cargo.toml @@ -18,11 +18,6 @@ tremor-config = { path = "../tremor-config", version = "0.13.0-rc.22" } tremor-common = { path = "../tremor-common", version = "0.13.0-rc.22" } log = "0.4" serde = { version = "1", features = ["derive"] } -libflate = "2" -xz2 = "0.1" -lz4 = "1" -snap = "1" -zstd = "0.13" byteorder = "1" value-trait = "0.8" rand = "0.8" @@ -31,5 +26,13 @@ memchr = "2.6" anyhow = { version = "1.0", default-features = true } thiserror = { version = "1.0", default-features = false } +# Compression +brotli = { version = "5", default-features = false, features = ["std"] } +xz2 = "0.1" +lz4 = "1" +snap = "1" +zstd = "0.13" +libflate = "2" + [dev-dependencies] proptest = "1.4" diff --git a/tremor-interceptor/src/postprocessor/compress.rs b/tremor-interceptor/src/postprocessor/compress.rs index d92f9cadcd..8eef6a37bc 100644 --- a/tremor-interceptor/src/postprocessor/compress.rs +++ b/tremor-interceptor/src/postprocessor/compress.rs @@ -22,10 +22,11 @@ //! |----------|------------------------------------------------------------------------------------| //! | `gzip` | `GZip` | //! | `zlib` | `ZLib` | -//! | `xz` | `Xz2` level 9 (default) | +//! | `xz2` | `Xz2` level 9 (default) | //! | `snappy` | `Snappy` | //! | `lz4` | `Lz` level 4 compression (default) | //! | `zstd` | [`Zstandard`](https://datatracker.ietf.org/doc/html/rfc8878) (defaults to level 0) | +//! | `br` | `Brotli` | //! //! Example configuration: //! @@ -60,7 +61,7 @@ use super::Postprocessor; use std::{ - io::Write, + io::{Cursor, Write}, str::{self, FromStr}, }; use tremor_value::Value; @@ -74,6 +75,7 @@ enum Algorithm { Zstd, Snappy, Lz4, + Brotli, } /// Compression postprocessor errors @@ -97,6 +99,9 @@ pub enum Error { /// Zstd Error #[error(transparent)] Zstd(#[from] ZstdError), + /// IO Error + #[error(transparent)] + Io(#[from] std::io::Error), } impl FromStr for Algorithm { @@ -109,6 +114,7 @@ impl FromStr for Algorithm { "snappy" => Ok(Algorithm::Snappy), "lz4" => Ok(Algorithm::Lz4), "zstd" => Ok(Algorithm::Zstd), + "br" => Ok(Algorithm::Brotli), other => Err(Error::UnknownAlgorithm(other.to_string())), } } @@ -134,6 +140,7 @@ impl Algorithm { Algorithm::Zstd => Box::::default(), Algorithm::Snappy => Box::::default(), Algorithm::Lz4 => Box::::default(), + Algorithm::Brotli => Box::::default(), }; Ok(codec) } @@ -161,6 +168,29 @@ impl Postprocessor for Gzip { } } +#[derive(Default)] +struct Brotli { + params: brotli::enc::BrotliEncoderParams, +} +impl Postprocessor for Brotli { + fn name(&self) -> &str { + "br" + } + + fn process( + &mut self, + _ingres_ns: u64, + _egress_ns: u64, + data: &[u8], + ) -> anyhow::Result>> { + // Heuristic because it's nice to avoid some allocations + let mut res = Vec::with_capacity(data.len() / 10); + let mut c = Cursor::new(data); + brotli::BrotliCompress(&mut c, &mut res, &self.params)?; + Ok(vec![res]) + } +} + #[derive(Default)] struct Zlib {} impl Postprocessor for Zlib { diff --git a/tremor-interceptor/src/preprocessor/decompress.rs b/tremor-interceptor/src/preprocessor/decompress.rs index 849e06f5b6..760b29769d 100644 --- a/tremor-interceptor/src/preprocessor/decompress.rs +++ b/tremor-interceptor/src/preprocessor/decompress.rs @@ -45,9 +45,17 @@ //!## zlib //! //!Decompress Zlib (deflate) compressed payload. +//! +//!### br +//! +//!Decompress Brotli compressed payload. This is not supported for the `autodetect` mode. +//! +//!### autodetect +//! +//! Try to autodetect the compression algorithm based on the magic bytes of the compressed data. use super::prelude::*; -use std::io::{self, Read}; +use std::io::{self, Cursor, Read}; #[derive(Clone, Default, Debug)] struct Gzip {} @@ -70,6 +78,26 @@ impl Preprocessor for Gzip { } } +#[derive(Clone, Default, Debug)] +struct Brotli {} +impl Preprocessor for Brotli { + fn name(&self) -> &str { + "br" + } + + fn process( + &mut self, + _ingest_ns: &mut u64, + data: &[u8], + meta: Value<'static>, + ) -> anyhow::Result, Value<'static>)>> { + let mut res = Vec::with_capacity(data.len() / 10); + let mut c = Cursor::new(data); + brotli::BrotliDecompress(&mut c, &mut res)?; + Ok(vec![(res, meta)]) + } +} + #[derive(Clone, Default, Debug)] struct Zlib {} impl Preprocessor for Zlib { @@ -251,6 +279,7 @@ impl Decompress { Some("snappy") => Box::::default(), Some("lz4") => Box::::default(), Some("zstd") => Box::::default(), + Some("br") => Box::::default(), Some("autodetect") | None => Box::::default(), Some(other) => { return Err(super::Error::InvalidConfig( @@ -282,7 +311,7 @@ mod test { use crate::postprocessor::{self as post, Postprocessor}; use tremor_value::literal; - fn decode_magic(data: &[u8]) -> &'static str { + fn decode_magic(data: &[u8], algo: &str) -> &'static str { match data.get(0..6) { Some(&[0x1f, 0x8b, _, _, _, _]) => "gzip", Some(&[0x78, _, _, _, _, _]) => "zlib", @@ -290,7 +319,14 @@ mod test { Some(b"sNaPpY" | &[0xff, 0x6, 0x0, 0x0, _, _]) => "snappy", Some(&[0x04, 0x22, 0x4d, 0x18, _, _]) => "lz4", Some(&[0x28, 0xb5, 0x2f, 0xfd, _, _]) => "zstd", - _ => "fail/unknown", + _ => { + if algo == "br" { + // br does not have magic bytes + "br" + } else { + "fail/unknown" + } + } } } // Assert pre and post processors have a sensible default() ctor @@ -307,7 +343,7 @@ mod test { let ext = &r?[0]; let ext = ext.as_slice(); // Assert actual encoded form is as expected ( magic code only ) - assert_eq!(algo, decode_magic(ext)); + assert_eq!(algo, decode_magic(ext, algo)); let r = pre.process(&mut ingest_ns, ext, Value::object()); let out = &r?[0].0; @@ -351,6 +387,13 @@ mod test { Ok(()) } + #[test] + fn test_br() -> anyhow::Result<()> { + let int = "snot".as_bytes(); + assert_simple_symmetric(int, "br")?; + Ok(()) + } + #[test] fn test_gzip_fingerprinted() -> anyhow::Result<()> { let int = "snot".as_bytes();