Skip to content

Commit

Permalink
chore: update proto version and enable grpc gzip (#18)
Browse files Browse the repository at this point in the history
* chore: update proto version and enable grpc gzip

* chore: change grpc compression encoding from gzip to zstd

* chore: add compression encoding for request/response

* chore: use derive_builder to reduce boilerplate code

* chore: minor update

Co-authored-by: fys <[email protected]>

---------

Co-authored-by: shuiyisong <[email protected]>
Co-authored-by: fys <[email protected]>
  • Loading branch information
3 people authored May 22, 2024
1 parent 3ce31a1 commit 2e6b0c5
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 45 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ dashmap = "5.4"
enum_dispatch = "0.3"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", tag = "v0.4.1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", tag = "v0.7.0" }
parking_lot = "0.12"
prost = "0.12"
rand = "0.8"
snafu = "0.7"
tokio = { version = "1", features = ["rt", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { version = "0.10", features = ["tls", "tls-roots"] }
tonic = { version = "0.11", features = ["tls", "tls-roots", "gzip", "zstd"] }
tower = "0.4"
derive_builder = "0.20"

[build-dependencies]
tonic-build = "0.9"
Expand Down
9 changes: 6 additions & 3 deletions examples/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{
ChannelConfig, ChannelManager, Client, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
};

#[tokio::main]
Expand All @@ -31,14 +31,17 @@ async fn main() {
.map(|s| s == "1")
.unwrap_or(false);

let builder = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.compression(greptimedb_ingester::Compression::Gzip);
let grpc_client = if greptimedb_secure {
let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default());

let channel_manager = ChannelManager::with_tls_config(channel_config)
.expect("Failed to create channel manager");
Client::with_manager_and_urls(channel_manager, vec![&greptimedb_endpoint])
builder.channel_manager(channel_manager).build()
} else {
Client::with_urls(vec![&greptimedb_endpoint])
builder.build()
};

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
Expand Down
6 changes: 4 additions & 2 deletions examples/stream_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use derive_new::new;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::{Client, Database, DEFAULT_SCHEMA_NAME};
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};

#[tokio::main]
async fn main() {
Expand All @@ -25,7 +25,9 @@ async fn main() {
let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());

let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]);
let grpc_client = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.build();

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

Expand Down
135 changes: 110 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use crate::api::v1::HealthCheckRequest;
use crate::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::OptionExt;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
use crate::{error, Result};
use derive_builder::Builder;

const MAX_MESSAGE_SIZE: usize = 512 * 1024 * 1024;

Expand All @@ -36,22 +38,78 @@ pub struct Client {
inner: Arc<Inner>,
}

#[derive(Debug, Default)]
#[derive(Default)]
pub struct ClientBuilder {
channel_manager: ChannelManager,
load_balance: Loadbalancer,
compression: Compression,
peers: Vec<String>,
}

impl ClientBuilder {
pub fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
self.channel_manager = channel_manager;
self
}

pub fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
self.load_balance = load_balance;
self
}

pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}

pub fn peers<U, A>(mut self, peers: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
self.peers = normalize_urls(peers);
self
}

pub fn build(self) -> Client {
let inner = InnerBuilder::default()
.channel_manager(self.channel_manager)
.load_balance(self.load_balance)
.compression(self.compression)
.peers(self.peers)
.build()
.unwrap();
Client {
inner: Arc::new(inner),
}
}
}

#[derive(Debug, Clone, Default)]
pub enum Compression {
#[default]
Gzip,
Zstd,
None,
}

#[derive(Debug, Default, Builder)]
struct Inner {
channel_manager: ChannelManager,
#[builder(setter(custom))]
peers: Arc<RwLock<Vec<String>>>,
load_balance: Loadbalancer,
compression: Compression,
}

impl Inner {
fn with_manager(channel_manager: ChannelManager) -> Self {
Self {
channel_manager,
peers: Default::default(),
load_balance: Default::default(),
}
impl InnerBuilder {
pub fn peers(&mut self, peers: Vec<String>) -> &mut Self {
self.peers = Some(Arc::new(RwLock::new(peers)));
self
}
}

impl Inner {
fn set_peers(&self, peers: Vec<String>) {
let mut guard = self.peers.write();
*guard = peers;
Expand All @@ -64,50 +122,55 @@ impl Inner {
}

impl Client {
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn new() -> Self {
Default::default()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager(channel_manager: ChannelManager) -> Self {
let inner = Arc::new(Inner::with_manager(channel_manager));
Self { inner }
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.build()
.unwrap();
Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_urls<U, A>(urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
Self::with_manager_and_urls(ChannelManager::new(), urls)
ClientBuilder::default().peers(urls).build()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager_and_urls<U, A>(channel_manager: ChannelManager, urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let inner = Inner::with_manager(channel_manager);
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
inner.set_peers(urls);
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.peers(normalize_urls(urls))
.build()
.unwrap();

Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn start<U, A>(&self, urls: A)
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
let urls: Vec<String> = normalize_urls(urls);

self.inner.set_peers(urls);
}
Expand All @@ -127,8 +190,19 @@ impl Client {

pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
let client =
GreptimeDatabaseClient::new(channel).max_decoding_message_size(MAX_MESSAGE_SIZE);
let mut client = GreptimeDatabaseClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
match self.inner.compression {
Compression::Gzip => {
client = client.send_compressed(CompressionEncoding::Gzip);
}
Compression::Zstd => {
client = client.send_compressed(CompressionEncoding::Zstd);
}
Compression::None => {}
}
Ok(DatabaseClient { inner: client })
}

Expand All @@ -140,6 +214,17 @@ impl Client {
}
}

fn normalize_urls<U, A>(urls: A) -> Vec<String>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
urls.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect()
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
16 changes: 6 additions & 10 deletions src/helpers/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ define_value_fn!(time_second_value, i64, TimeSecondValue);
define_value_fn!(time_millisecond_value, i64, TimeMillisecondValue);
define_value_fn!(time_microsecond_value, i64, TimeMicrosecondValue);
define_value_fn!(time_nanosecond_value, i64, TimeNanosecondValue);
define_value_fn!(interval_year_month_value, i32, IntervalYearMonthValues);
define_value_fn!(interval_day_time_value, i64, IntervalDayTimeValues);
define_value_fn!(duration_second_value, i64, DurationSecondValue);
define_value_fn!(duration_millisecond_value, i64, DurationMillisecondValue);
define_value_fn!(duration_microsecond_value, i64, DurationMicrosecondValue);
define_value_fn!(duration_nanosecond_value, i64, DurationNanosecondValue);
define_value_fn!(interval_year_month_value, i32, IntervalYearMonthValue);
define_value_fn!(interval_day_time_value, i64, IntervalDayTimeValue);

#[inline]
pub fn interval_month_day_nano_value(
Expand All @@ -81,13 +77,13 @@ pub fn interval_month_day_nano_value(
nanoseconds: i64,
) -> crate::api::v1::Value {
crate::api::v1::Value {
value_data: Some(
crate::api::v1::value::ValueData::IntervalMonthDayNanoValues(IntervalMonthDayNano {
value_data: Some(crate::api::v1::value::ValueData::IntervalMonthDayNanoValue(
IntervalMonthDayNano {
months,
days,
nanoseconds,
}),
),
},
)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod load_balance;
mod stream_insert;

pub use self::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
pub use self::client::Client;
pub use self::client::{Client, ClientBuilder, Compression};
pub use self::database::Database;
pub use self::error::{Error, Result};
pub use self::stream_insert::StreamInserter;
Expand Down
4 changes: 2 additions & 2 deletions src/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait LoadBalance {
}

#[enum_dispatch(LoadBalance)]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Loadbalancer {
Random,
}
Expand All @@ -32,7 +32,7 @@ impl Default for Loadbalancer {
}
}

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct Random;

impl LoadBalance for Random {
Expand Down

0 comments on commit 2e6b0c5

Please sign in to comment.