Skip to content

Commit

Permalink
feat: rows streaming and hints (#19)
Browse files Browse the repository at this point in the history
* feat: row streaming insert

* feat: write with hint
  • Loading branch information
fengjiachun authored Jan 22, 2025
1 parent 2e6b0c5 commit 84eb006
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 106 deletions.
6 changes: 4 additions & 2 deletions examples/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ async fn main() {
let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

let records = weather_records();
let result = client.row_insert(to_insert_request(records)).await;
let result = client
.row_insert_with_hint(to_insert_requests(records), "ttl=1d")
.await;
match result {
Ok(rows) => {
println!("Rows written: {rows}");
Expand Down Expand Up @@ -96,7 +98,7 @@ fn weather_schema() -> Vec<ColumnSchema> {
/// - `temperature`: a value field of f32
/// - `humidity`: a value field of i32
///
fn to_insert_request(records: Vec<WeatherRecord>) -> RowInsertRequests {
fn to_insert_requests(records: Vec<WeatherRecord>) -> RowInsertRequests {
let rows = records
.into_iter()
.map(|record| Row {
Expand Down
111 changes: 36 additions & 75 deletions examples/stream_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
use derive_new::new;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::{field, tag, timestamp};
use greptimedb_ingester::helpers::values::{
f32_value, i32_value, string_value, timestamp_millisecond_value,
};
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};

#[tokio::main]
Expand All @@ -31,17 +35,17 @@ async fn main() {

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

let stream_inserter = client.streaming_inserter().unwrap();
let stream_inserter = client.streaming_inserter(1024, Some("ttl=7d")).unwrap();

if let Err(e) = stream_inserter
.insert(vec![to_insert_request(weather_records_1())])
.row_insert(to_insert_requests(weather_records_1()))
.await
{
eprintln!("Error: {e}");
}

if let Err(e) = stream_inserter
.insert(vec![to_insert_request(weather_records_2())])
.row_insert(to_insert_requests(weather_records_2()))
.await
{
eprintln!("Error: {e}");
Expand Down Expand Up @@ -89,6 +93,15 @@ fn weather_records_2() -> Vec<WeatherRecord> {
]
}

fn weather_schema() -> Vec<ColumnSchema> {
vec![
timestamp("ts", ColumnDataType::TimestampMillisecond),
tag("collector", ColumnDataType::String),
field("temperature", ColumnDataType::Float32),
field("humidity", ColumnDataType::Int32),
]
}

/// This function generates some random data and bundle them into a
/// `InsertRequest`.
///
Expand All @@ -99,78 +112,26 @@ fn weather_records_2() -> Vec<WeatherRecord> {
/// - `temperature`: a value field of f32
/// - `humidity`: a value field of i32
///
fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
// convert records into columns
let rows = records.len();

// transpose records into columns
let (timestamp_millis, collectors, temp, humidity) = records.into_iter().fold(
(
Vec::with_capacity(rows),
Vec::with_capacity(rows),
Vec::with_capacity(rows),
Vec::with_capacity(rows),
),
|mut acc, rec| {
acc.0.push(rec.timestamp_millis);
acc.1.push(rec.collector);
acc.2.push(rec.temperature);
acc.3.push(rec.humidity);

acc
},
);

let columns = vec![
// timestamp column: `ts`
Column {
column_name: "ts".to_owned(),
values: Some(column::Values {
timestamp_millisecond_values: timestamp_millis,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
// tag column: collectors
Column {
column_name: "collector".to_owned(),
values: Some(column::Values {
string_values: collectors.into_iter().collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
// field column: temperature
Column {
column_name: "temperature".to_owned(),
values: Some(column::Values {
f32_values: temp,
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float32 as i32,
..Default::default()
},
// field column: humidity
Column {
column_name: "humidity".to_owned(),
values: Some(column::Values {
i32_values: humidity,
..Default::default()
fn to_insert_requests(records: Vec<WeatherRecord>) -> RowInsertRequests {
let rows = records
.into_iter()
.map(|record| Row {
values: vec![
timestamp_millisecond_value(record.timestamp_millis),
string_value(record.collector),
f32_value(record.temperature),
i32_value(record.humidity),
],
})
.collect();

RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "weather_demo".to_owned(),
rows: Some(Rows {
schema: weather_schema(),
rows,
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
];

InsertRequest {
table_name: "weather_demo".to_owned(),
columns,
row_count: rows as u32,
}],
}
}
60 changes: 40 additions & 20 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use crate::api::v1::{
};
use crate::stream_insert::StreamInserter;

use snafu::OptionExt;

use crate::error::IllegalDatabaseResponseSnafu;
use crate::error::{IllegalDatabaseResponseSnafu, InvalidAsciiSnafu};
use crate::{Client, Result};
use snafu::OptionExt;
use tonic::metadata::MetadataValue;

const DEFAULT_STREAMING_INSERTER_BUFFER_SIZE: usize = 1024;

Expand All @@ -45,7 +45,7 @@ impl Database {
///
/// - the name of database when using GreptimeDB standalone or cluster
/// - the name provided by GreptimeCloud or other multi-tenant GreptimeDB
/// environment
/// environment
pub fn new_with_dbname(dbname: impl Into<String>, client: Client) -> Self {
Self {
dbname: dbname.into(),
Expand Down Expand Up @@ -74,49 +74,69 @@ impl Database {
/// Write insert requests to GreptimeDB and get rows written
#[deprecated(note = "Use row_insert instead.")]
pub async fn insert(&self, requests: Vec<InsertRequest>) -> Result<u32> {
self.handle(Request::Inserts(InsertRequests { inserts: requests }))
self.handle(Request::Inserts(InsertRequests { inserts: requests }), None)
.await
}

/// Write Row based insert requests to GreptimeDB and get rows written
pub async fn row_insert(&self, requests: RowInsertRequests) -> Result<u32> {
self.handle(Request::RowInserts(requests)).await
self.handle(Request::RowInserts(requests), None).await
}

/// Write Row based insert requests with hint to GreptimeDB and get rows written
pub async fn row_insert_with_hint(
&self,
requests: RowInsertRequests,
hint: &str,
) -> Result<u32> {
self.handle(Request::RowInserts(requests), Some(hint)).await
}

/// Initialise a streaming insert handle, using default buffer size `1024`
pub fn streaming_inserter(&self) -> Result<StreamInserter> {
self.streaming_inserter_with_channel_size(DEFAULT_STREAMING_INSERTER_BUFFER_SIZE)
pub fn default_streaming_inserter(&self) -> Result<StreamInserter> {
self.streaming_inserter(DEFAULT_STREAMING_INSERTER_BUFFER_SIZE, None)
}

/// Initialise a stream insert handle using custom buffer size
///
/// The stream insert mechanism uses gRPC client streaming to reduce latency
/// for each write. It is recommended if you have a batch of inserts and do
/// not need intermediate results.
pub fn streaming_inserter_with_channel_size(
/// Initialise a streaming insert handle, using custom buffer size and hint
pub fn streaming_inserter(
&self,
channel_size: usize,
hint: Option<&str>,
) -> Result<StreamInserter> {
let client = self.client.make_database_client()?.inner;
let hint = hint
.map(|value| {
MetadataValue::try_from(value).map_err(|_| InvalidAsciiSnafu { value }.build())
})
.transpose()?;

let stream_inserter = StreamInserter::new(
StreamInserter::new(
client,
self.dbname().to_string(),
self.auth_header.clone(),
channel_size,
);

Ok(stream_inserter)
hint,
)
}

/// Issue a delete to database
pub async fn delete(&self, request: DeleteRequests) -> Result<u32> {
self.handle(Request::Deletes(request)).await
self.handle(Request::Deletes(request), None).await
}

async fn handle(&self, request: Request) -> Result<u32> {
async fn handle(&self, request: Request, hint: Option<&str>) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request);
let mut request = tonic::Request::new(request);
if let Some(hint) = hint {
let hint = MetadataValue::try_from(hint).map_err(|_| {
InvalidAsciiSnafu {
value: hint.to_string(),
}
.build()
})?;
request.metadata_mut().insert("x-greptime-hints", hint);
}
let response = client
.handle(request)
.await?
Expand Down
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ pub enum Error {

#[snafu(display("Failed to send request with streaming: {}", err_msg))]
ClientStreaming { err_msg: String, location: Location },

#[snafu(display("Failed to parse ascii string: {}", value))]
InvalidAscii {
value: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
38 changes: 29 additions & 9 deletions src/stream_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::error::Result;
use crate::error::{self, IllegalDatabaseResponseSnafu};
use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::{greptime_database_client::GreptimeDatabaseClient, InsertRequest};
use greptime_proto::v1::{
greptime_database_client::GreptimeDatabaseClient, InsertRequest, RowInsertRequests,
};
use greptime_proto::v1::{
greptime_response, AffectedRows, AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequests,
RequestHeader,
Expand All @@ -22,12 +26,10 @@ use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::{Ascii, MetadataValue};
use tonic::transport::Channel;
use tonic::{Response, Status};

use crate::error::Result;
use crate::error::{self, IllegalDatabaseResponseSnafu};

/// A structure that provides some methods for streaming data insert.
///
/// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method.
Expand Down Expand Up @@ -57,23 +59,29 @@ impl StreamInserter {
dbname: String,
auth_header: Option<AuthHeader>,
channel_size: usize,
) -> StreamInserter {
let (send, recv) = tokio::sync::mpsc::channel(channel_size);
hint: Option<MetadataValue<Ascii>>,
) -> Result<StreamInserter> {
let (send, recv) = mpsc::channel(channel_size);

let join: JoinHandle<std::result::Result<Response<GreptimeResponse>, Status>> =
tokio::spawn(async move {
let recv_stream = ReceiverStream::new(recv);
client.handle_requests(recv_stream).await
let mut request = tonic::Request::new(recv_stream);
if let Some(hint) = hint {
request.metadata_mut().insert("x-greptime-hints", hint);
}
client.handle_requests(request).await
});

StreamInserter {
Ok(StreamInserter {
sender: send,
auth_header,
dbname,
join,
}
})
}

#[deprecated(note = "Use row_insert instead.")]
pub async fn insert(&self, requests: Vec<InsertRequest>) -> Result<()> {
let inserts = InsertRequests { inserts: requests };
let request = self.to_rpc_request(Request::Inserts(inserts));
Expand All @@ -86,6 +94,18 @@ impl StreamInserter {
})
}

/// Write Row based insert requests to GreptimeDB with streaming
pub async fn row_insert(&self, requests: RowInsertRequests) -> Result<()> {
let request = self.to_rpc_request(Request::RowInserts(requests));

self.sender.send(request).await.map_err(|e| {
error::ClientStreamingSnafu {
err_msg: e.to_string(),
}
.build()
})
}

pub async fn finish(self) -> Result<u32> {
drop(self.sender);

Expand Down

0 comments on commit 84eb006

Please sign in to comment.