Skip to content

Commit c8315c1

Browse files
committed
feat: introduce zstd compression (#3185)
Introduces the ZSTD Compression algorithm for Fluvio Compression Crate. Related: #2268
1 parent 961e256 commit c8315c1

File tree

18 files changed

+126
-16
lines changed

18 files changed

+126
-16
lines changed

.github/workflows/ci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,6 @@ jobs:
10321032
max_attempts: 3
10331033
command: make cli-fluvio-smoke
10341034

1035-
10361035
# test smdk
10371036
- name: Download artifact - smdk
10381037
if: matrix.test == 'smdk'

Cargo.lock

+24-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ fluvio = { version = "0.19.0", path = "crates/fluvio" }
138138
fluvio-auth = { path = "crates/fluvio-auth" }
139139
fluvio-channel = { path = "crates/fluvio-channel" }
140140
fluvio-cli-common = { path = "crates/fluvio-cli-common"}
141-
fluvio-compression = { version = "0.2", path = "crates/fluvio-compression" }
141+
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
142142
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
143143
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
144144
fluvio-controlplane-metadata = { version = "0.22.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }

crates/fluvio-cli/src/client/produce/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ mod stats;
44
pub use cmd::ProduceOpt;
55

66
mod cmd {
7-
87
use std::sync::Arc;
98
use std::io::{BufReader, BufRead};
109
use std::collections::BTreeMap;
@@ -91,7 +90,7 @@ mod cmd {
9190
pub raw: bool,
9291

9392
/// Compression algorithm to use when sending records.
94-
/// Supported values: none, gzip, snappy and lz4.
93+
/// Supported values: none, gzip, snappy, zstd and lz4.
9594
#[arg(long)]
9695
pub compression: Option<Compression>,
9796

crates/fluvio-compression/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluvio-compression"
3-
version = "0.2.5"
3+
version = "0.3.0"
44
edition = "2021"
55
license = "Apache-2.0"
66
authors = ["Fluvio Contributors <[email protected]>"]
@@ -19,4 +19,4 @@ snap = { version = "1" }
1919
serde = { workspace = true, features = ['derive'] }
2020
lz4_flex = { version = "0.10.0", default-features = false, features = ["safe-decode", "safe-encode", "frame"] }
2121
thiserror = { workspace = true }
22-
22+
zstd = { version = "0.12.3+zstd.1.5.2", features = ['wasm'], default-features = false }

crates/fluvio-compression/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ Library with handlers to compress and uncompress data in the fluvio protocol.
44

55
In fluvio, compression is done in producer side, then consumers and SPU when it is using SmartModules, uncompress the data using the compression information that is in the attributes of the batch.
66

7-
Currently, the supported compressions codecs are None (default), Gzip, Snappy and LZ4.
7+
Currently, the supported compressions codecs are None (default), Gzip, Snappy, Zstd and LZ4.

crates/fluvio-compression/src/lib.rs

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod error;
66
mod gzip;
77
mod snappy;
88
mod lz4;
9+
mod zstd;
910

1011
pub use error::CompressionError;
1112
use serde::{Serialize, Deserialize};
@@ -21,6 +22,7 @@ pub enum Compression {
2122
Gzip = 1,
2223
Snappy = 2,
2324
Lz4 = 3,
25+
Zstd = 4,
2426
}
2527

2628
impl TryFrom<i8> for Compression {
@@ -31,6 +33,7 @@ impl TryFrom<i8> for Compression {
3133
1 => Ok(Compression::Gzip),
3234
2 => Ok(Compression::Snappy),
3335
3 => Ok(Compression::Lz4),
36+
4 => Ok(Compression::Zstd),
3437
_ => Err(CompressionError::UnknownCompressionFormat(format!(
3538
"i8 representation: {v}"
3639
))),
@@ -47,6 +50,7 @@ impl FromStr for Compression {
4750
"gzip" => Ok(Compression::Gzip),
4851
"snappy" => Ok(Compression::Snappy),
4952
"lz4" => Ok(Compression::Lz4),
53+
"zstd" => Ok(Compression::Zstd),
5054
_ => Err(CompressionError::UnknownCompressionFormat(s.into())),
5155
}
5256
}
@@ -60,6 +64,7 @@ impl Compression {
6064
Compression::Gzip => gzip::compress(src),
6165
Compression::Snappy => snappy::compress(src),
6266
Compression::Lz4 => lz4::compress(src),
67+
Compression::Zstd => zstd::compress(src),
6368
}
6469
}
6570

@@ -79,6 +84,10 @@ impl Compression {
7984
let output = lz4::uncompress(src)?;
8085
Ok(Some(output))
8186
}
87+
Compression::Zstd => {
88+
let output = zstd::uncompress(src)?;
89+
Ok(Some(output))
90+
}
8291
}
8392
}
8493
}
@@ -89,6 +98,7 @@ impl std::fmt::Display for Compression {
8998
Compression::Gzip => write!(f, "gzip"),
9099
Compression::Snappy => write!(f, "snappy"),
91100
Compression::Lz4 => write!(f, "lz4"),
101+
Compression::Zstd => write!(f, "zstd"),
92102
}
93103
}
94104
}

crates/fluvio-compression/src/zstd.rs

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::io::{Read, Write};
2+
3+
use bytes::{BufMut, Bytes, BytesMut};
4+
use zstd::{Decoder, Encoder};
5+
6+
use crate::error::CompressionError;
7+
8+
pub fn compress(src: &[u8]) -> Result<Bytes, CompressionError> {
9+
let mut encoder = Encoder::new(BytesMut::new().writer(), 1)?;
10+
encoder.write_all(src)?;
11+
Ok(encoder.finish()?.into_inner().freeze())
12+
}
13+
14+
pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {
15+
let mut decoder = Decoder::new(src)?;
16+
let mut buffer: Vec<u8> = Vec::new();
17+
decoder.read_to_end(&mut buffer)?;
18+
Ok(buffer)
19+
}
20+
21+
#[cfg(test)]
22+
mod tests {
23+
use bytes::Buf;
24+
use super::*;
25+
26+
#[test]
27+
fn test_compress_decompress() {
28+
let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
29+
let compressed = compress(text.as_bytes()).unwrap();
30+
31+
assert!(compressed.len() < text.as_bytes().len());
32+
33+
let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap();
34+
35+
assert_eq!(uncompressed, text);
36+
}
37+
}

crates/fluvio-connector-package/src/config/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ mod tests {
417417
.expect_err("This yaml should error");
418418
#[cfg(unix)]
419419
assert_eq!(
420-
"unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`",
420+
"unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`, `zstd`",
421421
format!("{connector_cfg}")
422422
);
423423

crates/fluvio-controlplane-metadata/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "fluvio-controlplane-metadata"
33
edition = "2021"
4-
version = "0.22.1"
4+
version = "0.22.2"
55
authors = ["Fluvio Contributors <[email protected]>"]
66
description = "Metadata definition for Fluvio control plane"
77
repository = "https://github.com/infinyon/fluvio"

crates/fluvio-controlplane-metadata/src/topic/spec.rs

+4
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,8 @@ pub enum CompressionAlgorithm {
619619
#[default]
620620
#[fluvio(tag = 4)]
621621
Any,
622+
#[fluvio(tag = 5)]
623+
Zstd,
622624
}
623625

624626
#[derive(Debug, thiserror::Error)]
@@ -635,6 +637,7 @@ impl std::str::FromStr for CompressionAlgorithm {
635637
"snappy" => Ok(CompressionAlgorithm::Snappy),
636638
"lz4" => Ok(CompressionAlgorithm::Lz4),
637639
"any" => Ok(CompressionAlgorithm::Any),
640+
"zstd" => Ok(CompressionAlgorithm::Zstd),
638641
_ => Err(InvalidCompressionAlgorithm),
639642
}
640643
}
@@ -647,6 +650,7 @@ impl std::fmt::Display for CompressionAlgorithm {
647650
Self::Snappy => write!(f, "snappy"),
648651
Self::Lz4 => write!(f, "lz4"),
649652
Self::Any => write!(f, "any"),
653+
Self::Zstd => write!(f, "zstd"),
650654
}
651655
}
652656
}

crates/fluvio-spu/src/services/public/produce_handler.rs

+1
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ fn validate_records<R: BatchRecords>(
259259
CompressionAlgorithm::Gzip => batch_compression == Compression::Gzip,
260260
CompressionAlgorithm::Snappy => batch_compression == Compression::Snappy,
261261
CompressionAlgorithm::Lz4 => batch_compression == Compression::Lz4,
262+
CompressionAlgorithm::Zstd => batch_compression == Compression::Zstd,
262263
}
263264
}) {
264265
Ok(())

crates/fluvio/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluvio"
3-
version = "0.19.0"
3+
version = "0.19.1"
44
edition = "2021"
55
license = "Apache-2.0"
66
authors = ["Fluvio Contributors <[email protected]>"]

crates/fluvio/src/producer/memory_batch.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ impl MemoryBatch {
7676
(self.current_size_uncompressed as f32
7777
* match self.compression {
7878
Compression::None => 1.0,
79-
Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5,
79+
Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => {
80+
0.5
81+
}
8082
}) as usize
8183
+ Batch::<RawRecords>::default().write_size(0)
8284
}

crates/fluvio/src/producer/mod.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ use fluvio_types::event::StickyEvent;
1414
mod accumulator;
1515
mod config;
1616
mod error;
17-
pub mod event;
1817
mod output;
1918
mod record;
2019
mod partitioning;
2120
mod partition_producer;
2221
mod memory_batch;
2322

23+
pub mod event;
24+
2425
pub use fluvio_protocol::record::{RecordKey, RecordData};
2526

2627
use crate::FluvioError;
@@ -343,6 +344,12 @@ impl TopicProducer {
343344
format!("Compression in the producer ({compression_config}) does not match with topic level compression (lz4)"),
344345
)).into()),
345346
},
347+
CompressionAlgorithm::Zstd => match config.compression {
348+
Some(Compression::Zstd) | None => Compression::Zstd,
349+
Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration(
350+
format!("Compression in the producer ({compression_config}) does not match with topic level compression (zstd)" ),
351+
)).into()),
352+
},
346353
CompressionAlgorithm::None => match config.compression {
347354
Some(Compression::None) | None => Compression::None,
348355
Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration(

k8-util/helm/fluvio-sys/templates/crd_partition.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ spec:
5353
- Gzip
5454
- Snappy
5555
- Lz4
56+
- Zstd
5657
status:
5758
type: object
5859
x-kubernetes-preserve-unknown-fields: true

k8-util/helm/fluvio-sys/templates/crd_topic.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ spec:
8080
- Gzip
8181
- Snappy
8282
- Lz4
83+
- Zstd
8384
storage:
8485
type: object
8586
properties:

tests/cli/fluvio_smoke_tests/e2e-basic.bats

+29
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ setup_file() {
6060
export TOPIC_NAME_12
6161
debug_msg "Topic name: $TOPIC_NAME_12"
6262

63+
TOPIC_NAME_13=$(random_string)
64+
export TOPIC_NAME_13
65+
debug_msg "Topic name: $TOPIC_NAME_13"
66+
6367
MESSAGE="$(random_string 7)"
6468
export MESSAGE
6569
debug_msg "$MESSAGE"
@@ -79,6 +83,9 @@ setup_file() {
7983
LZ4_MESSAGE="$MESSAGE-LZ4"
8084
export LZ4_MESSAGE
8185

86+
ZSTD_MESSAGE="$MESSAGE-ZSTD"
87+
export ZSTD_MESSAGE
88+
8289
LINGER_MESSAGE="$MESSAGE-LINGER"
8390
export LINGER_MESSAGE
8491

@@ -129,6 +136,11 @@ teardown_file() {
129136
assert_success
130137
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_12"
131138
assert_success
139+
140+
if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then
141+
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_13" --compression-type zstd
142+
assert_success
143+
fi
132144
}
133145

134146
# Produce message
@@ -145,6 +157,12 @@ teardown_file() {
145157
assert_success
146158
run bash -c 'echo -e "$LZ4_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_6" --compression lz4'
147159
assert_success
160+
161+
if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then
162+
run bash -c 'echo -e "$ZSTD_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_13" --compression zstd'
163+
assert_success
164+
fi
165+
148166
run bash -c 'echo -e "$LINGER_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_7" --linger 0s'
149167
assert_success
150168
run bash -c 'echo -e "$BATCH_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_8" --batch-size 100'
@@ -211,6 +229,17 @@ teardown_file() {
211229
assert_success
212230
}
213231

232+
@test "Consume zstd message" {
233+
if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" || "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]]; then
234+
skip "don't run on stable version"
235+
fi
236+
237+
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_13" -B -d
238+
239+
assert_output --partial "$ZSTD_MESSAGE"
240+
assert_success
241+
}
242+
214243
@test "ReadCommitted Consume ReadCommitted message" {
215244
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_9" -B -d --isolation read_committed
216245

0 commit comments

Comments
 (0)