Skip to content

Commit

Permalink
Add support for brotli compression and decompression
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 23, 2024
1 parent efd2001 commit 709e637
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### New features
* add collect postprocessor
* add support for access tokens for aws connectors
* add support for brotli compression

### Fixes
* fix reconnect issue in HTTP client
Expand Down
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ tremor-config = { path = "../tremor-config", version = "0.13.0-rc.22" }
tremor-common = { path = "../tremor-common", version = "0.13.0-rc.22" }
log = "0.4"
serde = { version = "1", features = ["derive"] }
libflate = "2"
xz2 = "0.1"
lz4 = "1"
snap = "1"
zstd = "0.13"
byteorder = "1"
value-trait = "0.8"
rand = "0.8"
Expand All @@ -31,5 +26,13 @@ memchr = "2.6"
anyhow = { version = "1.0", default-features = true }
thiserror = { version = "1.0", default-features = false }

# Compression
brotli = { version = "5", default-features = false, features = ["std"] }
xz2 = "0.1"
lz4 = "1"
snap = "1"
zstd = "0.13"
libflate = "2"

[dev-dependencies]
proptest = "1.4"
34 changes: 32 additions & 2 deletions tremor-interceptor/src/postprocessor/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
//! |----------|------------------------------------------------------------------------------------|
//! | `gzip` | `GZip` |
//! | `zlib` | `ZLib` |
//! | `xz` | `Xz2` level 9 (default) |
//! | `xz2` | `Xz2` level 9 (default) |
//! | `snappy` | `Snappy` |
//! | `lz4` | `Lz` level 4 compression (default) |
//! | `zstd` | [`Zstandard`](https://datatracker.ietf.org/doc/html/rfc8878) (defaults to level 0) |
//! | `br` | `Brotli` |
//!
//! Example configuration:
//!
Expand Down Expand Up @@ -60,7 +61,7 @@

use super::Postprocessor;
use std::{
io::Write,
io::{Cursor, Write},
str::{self, FromStr},
};
use tremor_value::Value;
Expand All @@ -74,6 +75,7 @@ enum Algorithm {
Zstd,
Snappy,
Lz4,
Brotli,
}

/// Compression postprocessor errors
Expand All @@ -97,6 +99,9 @@ pub enum Error {
/// Zstd Error
#[error(transparent)]
Zstd(#[from] ZstdError),
/// IO Error
#[error(transparent)]
Io(#[from] std::io::Error),
}

impl FromStr for Algorithm {
Expand All @@ -109,6 +114,7 @@ impl FromStr for Algorithm {
"snappy" => Ok(Algorithm::Snappy),
"lz4" => Ok(Algorithm::Lz4),
"zstd" => Ok(Algorithm::Zstd),
"br" => Ok(Algorithm::Brotli),
other => Err(Error::UnknownAlgorithm(other.to_string())),
}
}
Expand All @@ -134,6 +140,7 @@ impl Algorithm {
Algorithm::Zstd => Box::<Zstd>::default(),
Algorithm::Snappy => Box::<Snappy>::default(),
Algorithm::Lz4 => Box::<Lz4>::default(),
Algorithm::Brotli => Box::<Brotli>::default(),
};
Ok(codec)
}
Expand Down Expand Up @@ -161,6 +168,29 @@ impl Postprocessor for Gzip {
}
}

#[derive(Default)]
struct Brotli {
params: brotli::enc::BrotliEncoderParams,
}
impl Postprocessor for Brotli {
fn name(&self) -> &str {
"br"
}

Check warning on line 178 in tremor-interceptor/src/postprocessor/compress.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/compress.rs#L176-L178

Added lines #L176 - L178 were not covered by tests

fn process(
&mut self,
_ingres_ns: u64,
_egress_ns: u64,
data: &[u8],
) -> anyhow::Result<Vec<Vec<u8>>> {
// Heuristic because it's nice to avoid some allocations
let mut res = Vec::with_capacity(data.len() / 10);
let mut c = Cursor::new(data);
brotli::BrotliCompress(&mut c, &mut res, &self.params)?;
Ok(vec![res])
}
}

#[derive(Default)]
struct Zlib {}
impl Postprocessor for Zlib {
Expand Down
51 changes: 47 additions & 4 deletions tremor-interceptor/src/preprocessor/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,17 @@
//!## zlib
//!
//!Decompress Zlib (deflate) compressed payload.
//!
//!### br
//!
//!Decompress Brotli compressed payload. This is not supported for the `autodetect` mode.
//!
//!### autodetect
//!
//! Try to autodetect the compression algorithm based on the magic bytes of the compressed data.

use super::prelude::*;
use std::io::{self, Read};
use std::io::{self, Cursor, Read};

#[derive(Clone, Default, Debug)]
struct Gzip {}
Expand All @@ -70,6 +78,26 @@ impl Preprocessor for Gzip {
}
}

#[derive(Clone, Default, Debug)]
struct Brotli {}
impl Preprocessor for Brotli {
fn name(&self) -> &str {
"br"
}

Check warning on line 86 in tremor-interceptor/src/preprocessor/decompress.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/preprocessor/decompress.rs#L84-L86

Added lines #L84 - L86 were not covered by tests

fn process(
&mut self,
_ingest_ns: &mut u64,
data: &[u8],
meta: Value<'static>,
) -> anyhow::Result<Vec<(Vec<u8>, Value<'static>)>> {
let mut res = Vec::with_capacity(data.len() / 10);
let mut c = Cursor::new(data);
brotli::BrotliDecompress(&mut c, &mut res)?;
Ok(vec![(res, meta)])
}
}

#[derive(Clone, Default, Debug)]
struct Zlib {}
impl Preprocessor for Zlib {
Expand Down Expand Up @@ -251,6 +279,7 @@ impl Decompress {
Some("snappy") => Box::<Snappy>::default(),
Some("lz4") => Box::<Lz4>::default(),
Some("zstd") => Box::<Zstd>::default(),
Some("br") => Box::<Brotli>::default(),
Some("autodetect") | None => Box::<Fingerprinted>::default(),
Some(other) => {
return Err(super::Error::InvalidConfig(
Expand Down Expand Up @@ -282,15 +311,22 @@ mod test {
use crate::postprocessor::{self as post, Postprocessor};
use tremor_value::literal;

fn decode_magic(data: &[u8]) -> &'static str {
fn decode_magic(data: &[u8], algo: &str) -> &'static str {
match data.get(0..6) {
Some(&[0x1f, 0x8b, _, _, _, _]) => "gzip",
Some(&[0x78, _, _, _, _, _]) => "zlib",
Some(&[0xfd, b'7', b'z', _, _, _]) => "xz2",
Some(b"sNaPpY" | &[0xff, 0x6, 0x0, 0x0, _, _]) => "snappy",
Some(&[0x04, 0x22, 0x4d, 0x18, _, _]) => "lz4",
Some(&[0x28, 0xb5, 0x2f, 0xfd, _, _]) => "zstd",
_ => "fail/unknown",
_ => {
if algo == "br" {
// br does not have magic bytes
"br"
} else {
"fail/unknown"

Check warning on line 327 in tremor-interceptor/src/preprocessor/decompress.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/preprocessor/decompress.rs#L327

Added line #L327 was not covered by tests
}
}
}
} // Assert pre and post processors have a sensible default() ctor

Expand All @@ -307,7 +343,7 @@ mod test {
let ext = &r?[0];
let ext = ext.as_slice();
// Assert actual encoded form is as expected ( magic code only )
assert_eq!(algo, decode_magic(ext));
assert_eq!(algo, decode_magic(ext, algo));

let r = pre.process(&mut ingest_ns, ext, Value::object());
let out = &r?[0].0;
Expand Down Expand Up @@ -351,6 +387,13 @@ mod test {
Ok(())
}

#[test]
fn test_br() -> anyhow::Result<()> {
let int = "snot".as_bytes();
assert_simple_symmetric(int, "br")?;
Ok(())
}

#[test]
fn test_gzip_fingerprinted() -> anyhow::Result<()> {
let int = "snot".as_bytes();
Expand Down

0 comments on commit 709e637

Please sign in to comment.