Skip to content

Commit

Permalink
feat(cli): cache connection inside client
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Apr 10, 2024
1 parent f1b456f commit e3246af
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
6 changes: 6 additions & 0 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ impl APIClient {
Ok(client)
}

pub fn reuse(&self) -> Self {
let mut client = self.clone();
client.session_state = Arc::new(Mutex::new(SessionState::default()));
client
}

async fn from_dsn(dsn: &str) -> Result<Self> {
let u = Url::parse(dsn)?;
let mut client = Self::default();
Expand Down
32 changes: 19 additions & 13 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use dyn_clone::DynClone;
Expand Down Expand Up @@ -44,7 +44,7 @@ static VERSION: Lazy<String> = Lazy::new(|| {
pub struct Client {
dsn: String,
name: String,
conn: Option<Box<dyn Connection>>,
conn: Arc<Mutex<Option<Box<dyn Connection>>>>,
}

impl Client {
Expand All @@ -53,7 +53,7 @@ impl Client {
Self {
dsn,
name,
conn: None,
conn: Arc::new(Mutex::new(None)),
}
}

Expand All @@ -63,26 +63,30 @@ impl Client {
}

pub async fn get_conn(&self) -> Result<Box<dyn Connection>> {
if let Some(conn) = &self.conn {
return Ok(conn.clone());
if let Some(conn) = self.conn.lock().unwrap().as_ref() {
return Ok(conn.reuse());
}

let u = Url::parse(&self.dsn)?;
match u.scheme() {
let conn: Box<dyn Connection> = match u.scheme() {
"databend" | "databend+http" | "databend+https" => {
let conn = RestAPIConnection::try_create(&self.dsn, self.name.clone()).await?;
Ok(Box::new(conn))
Box::new(conn) as _
}
#[cfg(feature = "flight-sql")]
"databend+flight" | "databend+grpc" => {
let conn = FlightSQLConnection::try_create(&self.dsn, self.name.clone()).await?;
Ok(Box::new(conn))
Box::new(conn) as _
}
_ => Err(Error::Parsing(format!(
"Unsupported scheme: {}",
u.scheme()
))),
}
_ => {
return Err(Error::Parsing(format!(
"Unsupported scheme: {}",
u.scheme()
)))
}
};
*self.conn.lock().unwrap() = Some(conn.reuse());
Ok(conn)
}
}

Expand All @@ -99,6 +103,8 @@ pub type Reader = Box<dyn AsyncRead + Send + Sync + Unpin + 'static>;

#[async_trait]
pub trait Connection: DynClone + Send + Sync {
fn reuse(&self) -> Box<dyn Connection>;

async fn info(&self) -> ConnectionInfo;

async fn version(&self) -> Result<String> {
Expand Down
4 changes: 4 additions & 0 deletions driver/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub struct FlightSQLConnection {

#[async_trait]
impl Connection for FlightSQLConnection {
fn reuse(&self) -> Box<dyn Connection> {
Box::new(self.clone())
}

async fn info(&self) -> ConnectionInfo {
ConnectionInfo {
handler: "FlightSQL".to_string(),
Expand Down
6 changes: 6 additions & 0 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ pub struct RestAPIConnection {

#[async_trait]
impl Connection for RestAPIConnection {
fn reuse(&self) -> Box<dyn Connection> {
Box::new(RestAPIConnection {
client: self.client.reuse(),
})
}

async fn info(&self) -> ConnectionInfo {
ConnectionInfo {
handler: "RestAPI".to_string(),
Expand Down

0 comments on commit e3246af

Please sign in to comment.