diff --git a/Cargo.lock b/Cargo.lock index e6711221ff..d819623420 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2666,6 +2666,7 @@ version = "0.1.35" dependencies = [ "base64 0.21.0", "bson", + "bytes", "chrono", "criterion", "crossbeam", diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 22936e8e03..fc1a07078e 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -50,6 +50,7 @@ mysql_async = { version = "0.32.2", default-features = false, features = ["defau mysql_common = { version = "0.30", default-features = false, features = ["chrono", "rust_decimal"] } chrono = "0.4.26" geozero = { version = "0.10.0", default-features = false, features = ["with-wkb"] } +bytes = "1.4.0" [dev-dependencies] criterion = { version = "0.4.0", features = ["html_reports"] } diff --git a/dozer-ingestion/src/connectors/postgres/connection.rs b/dozer-ingestion/src/connectors/postgres/connection.rs index f47ab9f48e..538903f6d6 100644 --- a/dozer-ingestion/src/connectors/postgres/connection.rs +++ b/dozer-ingestion/src/connectors/postgres/connection.rs @@ -1,3 +1,4 @@ pub mod helper; mod tables_validator; pub mod validator; +pub mod client; diff --git a/dozer-ingestion/src/connectors/postgres/connection/client.rs b/dozer-ingestion/src/connectors/postgres/connection/client.rs new file mode 100644 index 0000000000..f5556dcf53 --- /dev/null +++ b/dozer-ingestion/src/connectors/postgres/connection/client.rs @@ -0,0 +1,255 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Poll}; + +use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::stream::BoxStream; +use futures::Stream; +use tokio_postgres::types::ToSql; +use tokio_postgres::{Config, CopyBothDuplex, Row, SimpleQueryMessage, Statement, ToStatement}; + +use super::helper; +use crate::connectors::postgres::connection::helper::is_network_failure; +use crate::errors::PostgresConnectorError; +use crate::retry_on_network_failure; + +#[derive(Debug)] +pub struct Client { + config: tokio_postgres::Config, + inner: tokio_postgres::Client, +} + +impl Client { + pub fn new(config: Config, client: tokio_postgres::Client) -> Self { + Self { + config, + inner: client, + } + } + + pub fn config(&self) -> &Config { + &self.config + } + + pub async fn prepare(&mut self, query: &str) -> Result { + retry_on_network_failure!( + "prepare", + self.inner.prepare(query).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn simple_query( + &mut self, + query: &str, + ) -> Result, tokio_postgres::Error> { + retry_on_network_failure!( + "simple_query", + self.inner.simple_query(query).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn query_one( + &mut self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> Result + where + T: ?Sized + ToStatement, + { + retry_on_network_failure!( + "query_one", + self.inner.query_one(statement, params).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn query( + &mut self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> Result, tokio_postgres::Error> + where + T: ?Sized + ToStatement, + { + retry_on_network_failure!( + "query", + self.inner.query(statement, params).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn query_raw( + &mut self, + query: String, + params: Vec, + ) -> Result>, tokio_postgres::Error> { + let client = Self::connect(self.config.clone()).await?; + let row_stream = RowStream::new(client, query, params).await?; + Ok(Box::pin(row_stream)) + } + + pub async fn copy_both_simple( + &mut self, + query: &str, + ) -> Result, tokio_postgres::Error> + where + T: bytes::Buf + 'static + Send, + { + retry_on_network_failure!( + "copy_both_simple", + self.inner.copy_both_simple(query).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn batch_execute(&mut self, query: &str) -> Result<(), tokio_postgres::Error> { + retry_on_network_failure!( + "batch_execute", + self.inner.batch_execute(query).await, + is_network_failure, + self.reconnect().await? + ) + } + + pub async fn reconnect(&mut self) -> Result<(), tokio_postgres::Error> { + let new_client = Self::connect(self.config.clone()).await?; + self.inner = new_client.inner; + Ok(()) + } + + pub async fn connect(config: Config) -> Result { + let client = match helper::connect(config).await { + Ok(client) => client, + Err(PostgresConnectorError::ConnectionFailure(err)) => return Err(err), + Err(err) => panic!("unexpected error {err}"), + }; + Ok(client) + } + + async fn query_raw_internal( + &mut self, + statement: Statement, + params: Vec, + ) -> Result { + retry_on_network_failure!( + "query_raw", + self.inner.query_raw(&statement, ¶ms).await, + is_network_failure, + self.reconnect().await? + ) + } +} + +pub struct RowStream { + client: Arc>, + query: String, + query_params: Vec, + cursor_position: u64, + inner: Pin>, + pending_resume: + Option>>, +} + +impl RowStream { + pub async fn new( + mut client: Client, + query: String, + params: Vec, + ) -> Result { + let statement = client.prepare(&query).await?; + let inner = client.query_raw_internal(statement, params.clone()).await?; + Ok(Self { + client: Arc::new(Mutex::new(client)), + query, + query_params: params, + cursor_position: 0, + inner: Box::pin(inner), + pending_resume: None, + }) + } + + fn resume( + &mut self, + ) -> BoxFuture<'static, Result<(Client, tokio_postgres::RowStream), tokio_postgres::Error>> + { + async fn resume_async( + client: Arc>, + query: String, + params: Vec, + offset: u64, + ) -> Result<(Client, tokio_postgres::RowStream), tokio_postgres::Error> { + let config = client.lock().await.config().clone(); + // reconnect + let mut client = Client::connect(config).await?; + // send query with offset + let statement = client.prepare(&add_query_offset(&query, offset)).await?; + let row_stream = client.query_raw_internal(statement, params).await?; + Ok((client, row_stream)) + } + + Box::pin(resume_async( + self.client.clone(), + self.query.clone(), + self.query_params.clone(), + self.cursor_position, + )) + } +} + +impl Stream for RowStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + loop { + if let Some(resume) = this.pending_resume.as_mut() { + match ready!(resume.as_mut().poll(cx)) { + Ok((client, inner)) => { + this.pending_resume = None; + this.client = Arc::new(Mutex::new(client)); + this.inner = Box::pin(inner); + } + Err(err) => return Poll::Ready(Some(Err(err))), + } + } + + match ready!(this.inner.as_mut().poll_next(cx)) { + Some(Ok(row)) => { + this.cursor_position += 1; + return Poll::Ready(Some(Ok(row))); + } + Some(Err(err)) => { + if is_network_failure(&err) { + this.pending_resume = Some(this.resume()); + continue; + } else { + return Poll::Ready(Some(Err(err))); + } + } + None => return Poll::Ready(None), + } + } + } +} + +fn add_query_offset(query: &str, offset: u64) -> String { + assert!(query.trim_start().get(0..7).map(|s| s.to_uppercase() == "SELECT ").unwrap_or(false)); + + if offset == 0 { + query.into() + } else { + format!("{query} OFFSET {offset}") + } +} diff --git a/dozer-ingestion/src/connectors/postgres/connection/helper.rs b/dozer-ingestion/src/connectors/postgres/connection/helper.rs index 9ad69e52b1..b4eb8d4b07 100644 --- a/dozer-ingestion/src/connectors/postgres/connection/helper.rs +++ b/dozer-ingestion/src/connectors/postgres/connection/helper.rs @@ -1,6 +1,8 @@ +use super::client::Client; use crate::errors::ConnectorError::WrongConnectionConfiguration; use crate::errors::PostgresConnectorError::InvalidSslError; use crate::errors::{ConnectorError, PostgresConnectorError}; +use crate::retry_on_network_failure; use dozer_types::log::{debug, error}; use dozer_types::models::connection::ConnectionConfig; use rustls::client::{ServerCertVerified, ServerCertVerifier}; @@ -8,7 +10,7 @@ use rustls::{Certificate, Error, ServerName}; use std::sync::Arc; use std::time::SystemTime; use tokio_postgres::config::SslMode; -use tokio_postgres::{Client, NoTls}; +use tokio_postgres::{Connection, NoTls, Socket}; pub fn map_connection_config( auth_details: &ConnectionConfig, @@ -66,8 +68,7 @@ pub async fn connect(config: tokio_postgres::Config) -> Result { // tokio-postgres::Config::SslMode::Disable + NoTLS connection - let (client, connection) = config - .connect(NoTls) + let (client, connection) = connect_helper(config, NoTls) .await .map_err(PostgresConnectorError::ConnectionFailure)?; tokio::spawn(async move { @@ -82,10 +83,12 @@ pub async fn connect(config: tokio_postgres::Config) -> Result Result unimplemented!(), SslMode::Require => { // tokio-postgres::Config::SslMode::Require + TLS connection with verification - let (client, connection) = config - .connect(tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config)) - .await - .map_err(PostgresConnectorError::ConnectionFailure)?; + let (client, connection) = connect_helper( + config, + tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config), + ) + .await + .map_err(PostgresConnectorError::ConnectionFailure)?; tokio::spawn(async move { if let Err(e) = connection.await { error!("Postgres connection error: {}", e); @@ -110,3 +115,33 @@ pub async fn connect(config: tokio_postgres::Config) -> Result Err(InvalidSslError(ssl_mode)), } } + +async fn connect_helper( + config: tokio_postgres::Config, + tls: T, +) -> Result<(Client, Connection), tokio_postgres::Error> +where + T: tokio_postgres::tls::MakeTlsConnect + Clone, +{ + retry_on_network_failure!( + "connect", + config.connect(tls.clone()).await, + is_network_failure, + () + ) + .map(|(client, connection)| (Client::new(config, client), connection)) +} + +pub fn is_network_failure(err: &tokio_postgres::Error) -> bool { + let err_str = err.to_string(); + if err_str.starts_with("error communicating with the server") + || err_str.starts_with("error performing TLS handshake") + || err_str.starts_with("connection closed") + || err_str.starts_with("error connecting to server") + || err_str.starts_with("timeout waiting for server") + { + true + } else { + false + } +} diff --git a/dozer-ingestion/src/connectors/postgres/connection/tables_validator.rs b/dozer-ingestion/src/connectors/postgres/connection/tables_validator.rs index f0c603f854..0394ca8470 100644 --- a/dozer-ingestion/src/connectors/postgres/connection/tables_validator.rs +++ b/dozer-ingestion/src/connectors/postgres/connection/tables_validator.rs @@ -1,5 +1,3 @@ -use tokio_postgres::Client; - use crate::connectors::postgres::schema::helper::DEFAULT_SCHEMA_NAME; use crate::connectors::ListOrFilterColumns; use crate::errors::PostgresConnectorError::{ColumnsNotFound, InvalidQueryError, TablesNotFound}; @@ -10,6 +8,8 @@ use std::collections::HashMap; use crate::errors::PostgresConnectorError; use crate::errors::PostgresSchemaError::TableTypeNotFound; +use super::client::Client; + pub struct TablesValidator<'a> { tables: HashMap<(String, String), &'a ListOrFilterColumns>, tables_identifiers: Vec, @@ -40,7 +40,7 @@ impl<'a> TablesValidator<'a> { async fn fetch_tables( &self, - client: &Client, + client: &mut Client, ) -> Result>, PostgresConnectorError> { let result = client .query( @@ -66,7 +66,7 @@ impl<'a> TablesValidator<'a> { async fn fetch_columns( &self, - client: &Client, + client: &mut Client, ) -> Result { let tables_columns = client .query( @@ -100,7 +100,7 @@ impl<'a> TablesValidator<'a> { async fn fetch_data( &self, - client: &Client, + client: &mut Client, ) -> Result<(PostgresTablesWithTypes, PostgresTablesColumns), PostgresConnectorError> { let tables = self.fetch_tables(client).await?; let columns = self.fetch_columns(client).await?; @@ -108,7 +108,7 @@ impl<'a> TablesValidator<'a> { Ok((tables, columns)) } - pub async fn validate(&self, client: &Client) -> Result<(), PostgresConnectorError> { + pub async fn validate(&self, client: &mut Client) -> Result<(), PostgresConnectorError> { let (tables, tables_columns) = self.fetch_data(client).await?; let missing_columns = self.find_missing_columns(tables_columns)?; diff --git a/dozer-ingestion/src/connectors/postgres/connection/validator.rs b/dozer-ingestion/src/connectors/postgres/connection/validator.rs index bfbff6837b..f83e2fe6e6 100644 --- a/dozer-ingestion/src/connectors/postgres/connection/validator.rs +++ b/dozer-ingestion/src/connectors/postgres/connection/validator.rs @@ -8,12 +8,12 @@ use crate::errors::PostgresConnectorError::{ WALLevelIsNotCorrect, }; +use super::client::Client; use crate::connectors::postgres::connection::tables_validator::TablesValidator; use crate::errors::PostgresConnectorError; use dozer_types::indicatif::ProgressStyle; use postgres_types::PgLsn; use regex::Regex; -use tokio_postgres::Client; pub enum Validations { Details, @@ -46,23 +46,23 @@ pub async fn validate_connection( ); pb.set_message("Validating connection to source"); - let client = super::helper::connect(config).await?; + let mut client = super::helper::connect(config).await?; for validation_type in validations_order { match validation_type { - Validations::Details => validate_details(&client).await?, - Validations::User => validate_user(&client).await?, + Validations::Details => validate_details(&mut client).await?, + Validations::User => validate_user(&mut client).await?, Validations::Tables => { if let Some(tables_info) = &tables { - validate_tables(&client, tables_info).await?; + validate_tables(&mut client, tables_info).await?; } } - Validations::WALLevel => validate_wal_level(&client).await?, + Validations::WALLevel => validate_wal_level(&mut client).await?, Validations::Slot => { if let Some(replication_details) = &replication_info { - validate_slot(&client, replication_details, tables).await?; + validate_slot(&mut client, replication_details, tables).await?; } else { - validate_limit_of_replications(&client).await?; + validate_limit_of_replications(&mut client).await?; } } } @@ -75,7 +75,7 @@ pub async fn validate_connection( Ok(()) } -async fn validate_details(client: &Client) -> Result<(), PostgresConnectorError> { +async fn validate_details(client: &mut Client) -> Result<(), PostgresConnectorError> { client .simple_query("SELECT version()") .await @@ -84,7 +84,7 @@ async fn validate_details(client: &Client) -> Result<(), PostgresConnectorError> Ok(()) } -async fn validate_user(client: &Client) -> Result<(), PostgresConnectorError> { +async fn validate_user(client: &mut Client) -> Result<(), PostgresConnectorError> { client .query_one( " @@ -114,7 +114,7 @@ async fn validate_user(client: &Client) -> Result<(), PostgresConnectorError> { }) } -async fn validate_wal_level(client: &Client) -> Result<(), PostgresConnectorError> { +async fn validate_wal_level(client: &mut Client) -> Result<(), PostgresConnectorError> { let result = client .query_one("SHOW wal_level", &[]) .await @@ -164,7 +164,7 @@ fn validate_columns_names( } async fn validate_tables( - client: &Client, + client: &mut Client, table_info: &Vec, ) -> Result<(), PostgresConnectorError> { validate_tables_names(table_info)?; @@ -177,7 +177,7 @@ async fn validate_tables( } pub async fn validate_slot( - client: &Client, + client: &mut Client, replication_info: &ReplicationSlotInfo, tables: Option<&Vec>, ) -> Result<(), PostgresConnectorError> { @@ -230,7 +230,7 @@ pub async fn validate_slot( Ok(()) } -async fn validate_limit_of_replications(client: &Client) -> Result<(), PostgresConnectorError> { +async fn validate_limit_of_replications(client: &mut Client) -> Result<(), PostgresConnectorError> { let slots_limit_result = client .query_one("SHOW max_replication_slots", &[]) .await @@ -345,7 +345,7 @@ mod tests { async fn test_connector_validation_connection_requested_tables_not_exist() { run_connector_test("postgres", |app_config| async move { let config = get_config(app_config); - let client = connect(config.clone()).await.unwrap(); + let mut client = connect(config.clone()).await.unwrap(); client .simple_query("DROP TABLE IF EXISTS not_existing") @@ -386,7 +386,7 @@ mod tests { async fn test_connector_validation_connection_requested_columns_not_exist() { run_connector_test("postgres", |app_config| async move { let config = get_config(app_config); - let client = connect(config.clone()).await.unwrap(); + let mut client = connect(config.clone()).await.unwrap(); client .simple_query("CREATE TABLE IF NOT EXISTS existing(column_1 serial PRIMARY KEY, column_2 serial);") @@ -492,7 +492,7 @@ mod tests { async fn test_connector_validation_connection_valid_number_of_replication_slots() { run_connector_test("postgres", |app_config| async move { let config = get_config(app_config); - let client = connect(config.clone()).await.unwrap(); + let mut client = connect(config.clone()).await.unwrap(); let slots_limit_result = client .query_one("SHOW max_replication_slots", &[]) @@ -534,7 +534,7 @@ mod tests { async fn test_connector_validation_connection_not_any_replication_slot_availble() { run_connector_test("postgres", |app_config| async move { let config = get_config(app_config); - let client = connect(config.clone()).await.unwrap(); + let mut client = connect(config.clone()).await.unwrap(); let slots_limit_result = client .query_one("SHOW max_replication_slots", &[]) @@ -622,7 +622,7 @@ mod tests { #[serial] async fn test_connector_return_error_on_view_in_table_validation() { run_connector_test("postgres", |app_config| async move { - let client = get_client(app_config.clone()).await; + let mut client = get_client(app_config.clone()).await; let mut rng = rand::thread_rng(); @@ -635,10 +635,10 @@ mod tests { client.create_view(&schema, &table_name, &view_name).await; let config = get_config(app_config); - let pg_client = connect(config).await.unwrap(); + let mut pg_client = connect(config).await.unwrap(); let result = validate_tables( - &pg_client, + &mut pg_client, &vec![ListOrFilterColumns { name: table_name, schema: Some(schema.clone()), @@ -650,7 +650,7 @@ mod tests { assert!(result.is_ok()); let result = validate_tables( - &pg_client, + &mut pg_client, &vec![ListOrFilterColumns { name: view_name, schema: Some(schema), diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index db8938febd..4ce582388f 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -15,8 +15,9 @@ use crate::connectors::postgres::schema::helper::{SchemaHelper, DEFAULT_SCHEMA_N use crate::errors::ConnectorError::PostgresConnectorError; use crate::errors::PostgresConnectorError::{CreatePublicationError, DropPublicationError}; use tokio_postgres::config::ReplicationMode; -use tokio_postgres::{Client, Config}; +use tokio_postgres::Config; +use super::connection::client::Client; use super::connection::helper; #[derive(Clone, Debug)] @@ -220,7 +221,7 @@ impl PostgresConnector { pub async fn create_publication( &self, - client: Client, + mut client: Client, table_identifiers: Option<&[TableIdentifier]>, ) -> Result<(), ConnectorError> { let publication_name = self.get_publication_name(); diff --git a/dozer-ingestion/src/connectors/postgres/iterator.rs b/dozer-ingestion/src/connectors/postgres/iterator.rs index da2b3fb2e5..744655128d 100644 --- a/dozer-ingestion/src/connectors/postgres/iterator.rs +++ b/dozer-ingestion/src/connectors/postgres/iterator.rs @@ -103,7 +103,7 @@ impl<'a> PostgresIteratorHandler<'a> { pub async fn start(&mut self) -> Result<(), ConnectorError> { let details = Arc::clone(&self.details); let replication_conn_config = details.replication_conn_config.to_owned(); - let client = helper::connect(replication_conn_config) + let mut client = helper::connect(replication_conn_config) .await .map_err(ConnectorError::PostgresConnectorError)?; @@ -113,20 +113,20 @@ impl<'a> PostgresIteratorHandler<'a> { // - When publication tables changes // We clear inactive replication slots before starting replication - ReplicationSlotHelper::clear_inactive_slots(&client, REPLICATION_SLOT_PREFIX) + ReplicationSlotHelper::clear_inactive_slots(&mut client, REPLICATION_SLOT_PREFIX) .await .map_err(ConnectorError::PostgresConnectorError)?; if self.lsn.is_none() { debug!("\nCreating Slot...."); let slot_exist = - ReplicationSlotHelper::replication_slot_exists(&client, &details.slot_name) + ReplicationSlotHelper::replication_slot_exists(&mut client, &details.slot_name) .await .map_err(ConnectorError::PostgresConnectorError)?; if slot_exist { // We dont have lsn, so we need to drop replication slot and start from scratch - ReplicationSlotHelper::drop_replication_slot(&client, &details.slot_name) + ReplicationSlotHelper::drop_replication_slot(&mut client, &details.slot_name) .await .map_err(InvalidQueryError)?; } @@ -140,7 +140,7 @@ impl<'a> PostgresIteratorHandler<'a> { })?; let replication_slot_lsn = - ReplicationSlotHelper::create_replication_slot(&client, &details.slot_name).await?; + ReplicationSlotHelper::create_replication_slot(&mut client, &details.slot_name).await?; if let Some(lsn) = replication_slot_lsn { let parsed_lsn = PgLsn::from_str(&lsn).map_err(|_| LsnParseError(lsn.to_string()))?; diff --git a/dozer-ingestion/src/connectors/postgres/replication_slot_helper.rs b/dozer-ingestion/src/connectors/postgres/replication_slot_helper.rs index ca9895f1c3..169c69b8df 100644 --- a/dozer-ingestion/src/connectors/postgres/replication_slot_helper.rs +++ b/dozer-ingestion/src/connectors/postgres/replication_slot_helper.rs @@ -1,14 +1,15 @@ +use super::connection::client::Client; use crate::errors::ConnectorError::UnexpectedQueryMessageError; use crate::errors::PostgresConnectorError::{FetchReplicationSlotError, InvalidQueryError}; use crate::errors::{ConnectorError, PostgresConnectorError}; use dozer_types::log::debug; -use tokio_postgres::{Client, Error, SimpleQueryMessage}; +use tokio_postgres::{Error, SimpleQueryMessage}; pub struct ReplicationSlotHelper {} impl ReplicationSlotHelper { pub async fn drop_replication_slot( - client: &Client, + client: &mut Client, slot_name: &str, ) -> Result, Error> { let res = client @@ -23,7 +24,7 @@ impl ReplicationSlotHelper { } pub async fn create_replication_slot( - client: &Client, + client: &mut Client, slot_name: &str, ) -> Result, ConnectorError> { let create_replication_slot_query = @@ -48,7 +49,7 @@ impl ReplicationSlotHelper { } pub async fn replication_slot_exists( - client: &Client, + client: &mut Client, slot_name: &str, ) -> Result { let replication_slot_info_query = @@ -66,7 +67,7 @@ impl ReplicationSlotHelper { } pub async fn clear_inactive_slots( - client: &Client, + client: &mut Client, slot_name_prefix: &str, ) -> Result<(), PostgresConnectorError> { let inactive_slots_query = format!( @@ -121,14 +122,14 @@ mod tests { let mut config = get_config(app_config); config.replication_mode(ReplicationMode::Logical); - let client = connect(config).await.unwrap(); + let mut client = connect(config).await.unwrap(); client .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") .await .unwrap(); - let actual = ReplicationSlotHelper::create_replication_slot(&client, "test").await; + let actual = ReplicationSlotHelper::create_replication_slot(&mut client, "test").await; assert!(actual.is_ok()); @@ -155,7 +156,7 @@ mod tests { let mut config = get_config(app_config); config.replication_mode(ReplicationMode::Logical); - let client = connect(config).await.unwrap(); + let mut client = connect(config).await.unwrap(); client .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") @@ -170,7 +171,8 @@ mod tests { .await .expect("failed"); - let actual = ReplicationSlotHelper::create_replication_slot(&client, slot_name).await; + let actual = + ReplicationSlotHelper::create_replication_slot(&mut client, slot_name).await; assert!(actual.is_err()); @@ -205,7 +207,7 @@ mod tests { let mut config = get_config(app_config); config.replication_mode(ReplicationMode::Logical); - let client = connect(config).await.unwrap(); + let mut client = connect(config).await.unwrap(); client .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") @@ -220,7 +222,7 @@ mod tests { .await .expect("failed"); - let actual = ReplicationSlotHelper::drop_replication_slot(&client, slot_name).await; + let actual = ReplicationSlotHelper::drop_replication_slot(&mut client, slot_name).await; assert!(actual.is_ok()); }) @@ -236,14 +238,14 @@ mod tests { let mut config = get_config(app_config); config.replication_mode(ReplicationMode::Logical); - let client = connect(config).await.unwrap(); + let mut client = connect(config).await.unwrap(); client .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") .await .unwrap(); - let actual = ReplicationSlotHelper::drop_replication_slot(&client, slot_name).await; + let actual = ReplicationSlotHelper::drop_replication_slot(&mut client, slot_name).await; assert!(actual.is_err()); diff --git a/dozer-ingestion/src/connectors/postgres/replicator.rs b/dozer-ingestion/src/connectors/postgres/replicator.rs index aebda87dc4..dd6e358bfc 100644 --- a/dozer-ingestion/src/connectors/postgres/replicator.rs +++ b/dozer-ingestion/src/connectors/postgres/replicator.rs @@ -1,4 +1,5 @@ -use crate::connectors::postgres::connection::helper; +use crate::connectors::postgres::connection::client::Client; +use crate::connectors::postgres::connection::helper::{self, is_network_failure}; use crate::connectors::postgres::xlog_mapper::XlogMapper; use crate::errors::ConnectorError; use crate::errors::ConnectorError::PostgresConnectorError; @@ -15,8 +16,8 @@ use postgres_protocol::message::backend::ReplicationMessage::*; use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage}; use postgres_types::PgLsn; +use std::pin::Pin; use std::time::SystemTime; -use tokio_postgres::replication::LogicalReplicationStream; use tokio_postgres::Error; use super::schema::helper::PostgresTableInfo; @@ -42,7 +43,7 @@ pub struct CDCHandler<'a> { impl<'a> CDCHandler<'a> { pub async fn start(&mut self, tables: Vec) -> Result<(), ConnectorError> { let replication_conn_config = self.replication_conn_config.clone(); - let client: tokio_postgres::Client = helper::connect(replication_conn_config).await?; + let client: Client = helper::connect(replication_conn_config).await?; info!( "[{}] Starting Replication: {:?}, {:?}", @@ -56,20 +57,15 @@ impl<'a> CDCHandler<'a> { r#"("proto_version" '1', "publication_names" '{publication_name}')"#, publication_name = self.publication_name ); - let query = format!( - r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#, - self.slot_name, lsn, options - ); self.offset_lsn = u64::from(lsn); self.last_commit_lsn = u64::from(lsn); - let copy_stream = client - .copy_both_simple::(&query) - .await - .map_err(|e| ConnectorError::InternalError(Box::new(e)))?; + let mut stream = + LogicalReplicationStream::new(client, self.slot_name.clone(), lsn, options) + .await + .map_err(|e| ConnectorError::InternalError(Box::new(e)))?; - let stream = LogicalReplicationStream::new(copy_stream); let tables_columns = tables .into_iter() .enumerate() @@ -79,7 +75,6 @@ impl<'a> CDCHandler<'a> { .collect(); let mut mapper = XlogMapper::new(tables_columns); - tokio::pin!(stream); loop { let message = stream.next().await; if let Some(Ok(PrimaryKeepAlive(ref k))) = message { @@ -92,7 +87,6 @@ impl<'a> CDCHandler<'a> { .unwrap() .as_millis(); stream - .as_mut() .standby_status_update( PgLsn::from(self.last_commit_lsn), PgLsn::from(self.last_commit_lsn), @@ -159,3 +153,108 @@ impl<'a> CDCHandler<'a> { } } } + +pub struct LogicalReplicationStream { + client: Client, + slot_name: String, + resume_lsn: PgLsn, + options: String, + inner: Pin>, +} + +impl LogicalReplicationStream { + pub async fn new( + mut client: Client, + slot_name: String, + lsn: PgLsn, + options: String, + ) -> Result { + let inner = + Box::pin(Self::open_replication_stream(&mut client, &slot_name, lsn, &options).await?); + Ok(Self { + client, + slot_name, + resume_lsn: lsn, + options, + inner, + }) + } + + pub async fn next( + &mut self, + ) -> Option, tokio_postgres::Error>> { + loop { + let result = self.inner.next().await; + match result.as_ref() { + Some(Err(err)) if is_network_failure(err) => { + if let Err(err) = self.resume().await { + return Some(Err(err)); + } + continue; + } + Some(Ok(XLogData(body))) => self.resume_lsn = body.wal_end().into(), + _ => {} + } + return result; + } + } + + pub async fn standby_status_update( + &mut self, + write_lsn: PgLsn, + flush_lsn: PgLsn, + apply_lsn: PgLsn, + ts: i64, + reply: u8, + ) -> Result<(), Error> { + loop { + match self + .inner + .as_mut() + .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply) + .await + { + Err(err) if is_network_failure(&err) => { + self.resume().await?; + continue; + } + _ => {} + } + break Ok(()); + } + } + + async fn resume(&mut self) -> Result<(), tokio_postgres::Error> { + self.client.reconnect().await?; + + let stream = Self::open_replication_stream( + &mut self.client, + &self.slot_name, + self.resume_lsn, + &self.options, + ) + .await?; + + self.inner = Box::pin(stream); + + Ok(()) + } + + async fn open_replication_stream( + client: &mut Client, + slot_name: &str, + lsn: PgLsn, + options: &str, + ) -> Result { + let query = format!( + r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#, + slot_name, lsn, options + ); + + let copy_stream = client.copy_both_simple::(&query).await?; + + Ok(tokio_postgres::replication::LogicalReplicationStream::new( + copy_stream, + )) + } +} diff --git a/dozer-ingestion/src/connectors/postgres/schema/helper.rs b/dozer-ingestion/src/connectors/postgres/schema/helper.rs index 07a79f0d2e..d369062477 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/helper.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/helper.rs @@ -153,7 +153,7 @@ impl SchemaHelper { tables: Option<&[ListOrFilterColumns]>, ) -> Result { let mut tables_columns_map: HashMap> = HashMap::new(); - let client = helper::connect(self.conn_config.clone()).await?; + let mut client = helper::connect(self.conn_config.clone()).await?; let query = if let Some(tables) = tables { tables.iter().for_each(|t| { if let Some(columns) = t.columns.clone() { diff --git a/dozer-ingestion/src/connectors/postgres/schema/tests.rs b/dozer-ingestion/src/connectors/postgres/schema/tests.rs index c06e1e9ecc..655601a861 100644 --- a/dozer-ingestion/src/connectors/postgres/schema/tests.rs +++ b/dozer-ingestion/src/connectors/postgres/schema/tests.rs @@ -24,7 +24,7 @@ where #[serial] async fn test_connector_get_tables() { run_connector_test("postgres", |app_config| async move { - let client = get_client(app_config).await; + let mut client = get_client(app_config).await; let mut rng = rand::thread_rng(); @@ -59,7 +59,7 @@ async fn test_connector_get_tables() { #[serial] async fn test_connector_get_schema_with_selected_columns() { run_connector_test("postgres", |app_config| async move { - let client = get_client(app_config).await; + let mut client = get_client(app_config).await; let mut rng = rand::thread_rng(); @@ -94,7 +94,7 @@ async fn test_connector_get_schema_with_selected_columns() { #[serial] async fn test_connector_get_schema_without_selected_columns() { run_connector_test("postgres", |app_config| async move { - let client = get_client(app_config).await; + let mut client = get_client(app_config).await; let mut rng = rand::thread_rng(); @@ -134,7 +134,7 @@ async fn test_connector_get_schema_without_selected_columns() { #[serial] async fn test_connector_view_cannot_be_used() { run_connector_test("postgres", |app_config| async move { - let client = get_client(app_config).await; + let mut client = get_client(app_config).await; let mut rng = rand::thread_rng(); diff --git a/dozer-ingestion/src/connectors/postgres/snapshotter.rs b/dozer-ingestion/src/connectors/postgres/snapshotter.rs index 485f819fd3..573ce5b9db 100644 --- a/dozer-ingestion/src/connectors/postgres/snapshotter.rs +++ b/dozer-ingestion/src/connectors/postgres/snapshotter.rs @@ -43,7 +43,7 @@ impl<'a> PostgresSnapshotter<'a> { conn_config: tokio_postgres::Config, sender: Sender>, ) -> Result<(), ConnectorError> { - let client_plain = connection_helper::connect(conn_config) + let mut client_plain = connection_helper::connect(conn_config) .await .map_err(PostgresConnectorError)?; @@ -63,7 +63,7 @@ impl<'a> PostgresSnapshotter<'a> { let empty_vec: Vec = Vec::new(); let row_stream = client_plain - .query_raw(&stmt, empty_vec) + .query_raw(query, empty_vec) .await .map_err(|e| PostgresConnectorError(InvalidQueryError(e)))?; tokio::pin!(row_stream); @@ -176,7 +176,7 @@ mod tests { .as_ref() .unwrap(); - let test_client = TestPostgresClient::new(config).await; + let mut test_client = TestPostgresClient::new(config).await; let mut rng = rand::thread_rng(); let table_name = format!("test_table_{}", rng.gen::()); @@ -234,7 +234,7 @@ mod tests { .as_ref() .unwrap(); - let test_client = TestPostgresClient::new(config).await; + let mut test_client = TestPostgresClient::new(config).await; let mut rng = rand::thread_rng(); let table_name = format!("test_table_{}", rng.gen::()); diff --git a/dozer-ingestion/src/connectors/postgres/test_utils.rs b/dozer-ingestion/src/connectors/postgres/test_utils.rs index 7704ad7a11..8cdb343fba 100644 --- a/dozer-ingestion/src/connectors/postgres/test_utils.rs +++ b/dozer-ingestion/src/connectors/postgres/test_utils.rs @@ -5,7 +5,9 @@ use std::error::Error; use crate::connectors::postgres::replication_slot_helper::ReplicationSlotHelper; use dozer_types::models::config::Config; use std::str::FromStr; -use tokio_postgres::{error::DbError, Client, Error as PostgresError, SimpleQueryMessage}; +use tokio_postgres::{error::DbError, Error as PostgresError, SimpleQueryMessage}; + +use super::connection::client::Client; pub async fn get_client(app_config: Config) -> TestPostgresClient { let config = app_config @@ -19,24 +21,24 @@ pub async fn get_client(app_config: Config) -> TestPostgresClient { TestPostgresClient::new(config).await } -pub async fn create_slot(client_ref: &Client, slot_name: &str) -> PgLsn { - client_ref +pub async fn create_slot(client_mut: &mut Client, slot_name: &str) -> PgLsn { + client_mut .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") .await .unwrap(); - let created_lsn = ReplicationSlotHelper::create_replication_slot(client_ref, slot_name) + let created_lsn = ReplicationSlotHelper::create_replication_slot(client_mut, slot_name) .await .unwrap() .unwrap(); - client_ref.simple_query("COMMIT;").await.unwrap(); + client_mut.simple_query("COMMIT;").await.unwrap(); PgLsn::from_str(&created_lsn).unwrap() } pub async fn retry_drop_active_slot( e: PostgresError, - client_ref: &Client, + client_mut: &mut Client, slot_name: &str, ) -> Result, PostgresError> { match e.source() { @@ -46,12 +48,12 @@ pub async fn retry_drop_active_slot( let err = db_error.to_string(); let parts = err.rsplit_once(' ').unwrap(); - client_ref + client_mut .simple_query(format!("select pg_terminate_backend('{}');", parts.1).as_ref()) .await .unwrap(); - ReplicationSlotHelper::drop_replication_slot(client_ref, slot_name).await + ReplicationSlotHelper::drop_replication_slot(client_mut, slot_name).await } _ => Err(e), }, diff --git a/dozer-ingestion/src/connectors/postgres/tests/client.rs b/dozer-ingestion/src/connectors/postgres/tests/client.rs index ab5870ed8c..1abc40987f 100644 --- a/dozer-ingestion/src/connectors/postgres/tests/client.rs +++ b/dozer-ingestion/src/connectors/postgres/tests/client.rs @@ -1,8 +1,8 @@ +use crate::connectors::postgres::connection::client::Client; use crate::connectors::postgres::connection::helper::{connect, map_connection_config}; use dozer_types::models::connection::ConnectionConfig; use dozer_types::rust_decimal::Decimal; use std::fmt::Write; -use tokio_postgres::Client; pub struct TestPostgresClient { client: Client, @@ -30,11 +30,11 @@ impl TestPostgresClient { } } - pub async fn execute_query(&self, query: &str) { + pub async fn execute_query(&mut self, query: &str) { self.client.query(query, &[]).await.unwrap(); } - pub async fn create_simple_table(&self, schema: &str, table_name: &str) { + pub async fn create_simple_table(&mut self, schema: &str, table_name: &str) { self.execute_query(&format!( "CREATE TABLE {schema}.{table_name} ( @@ -47,7 +47,7 @@ impl TestPostgresClient { .await; } - pub async fn create_view(&self, schema: &str, table_name: &str, view_name: &str) { + pub async fn create_view(&mut self, schema: &str, table_name: &str, view_name: &str) { self.execute_query(&format!( "CREATE VIEW {schema}.{view_name} AS SELECT id, name @@ -56,22 +56,22 @@ impl TestPostgresClient { .await; } - pub async fn drop_schema(&self, schema: &str) { + pub async fn drop_schema(&mut self, schema: &str) { self.execute_query(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE")) .await; } - pub async fn drop_table(&self, schema: &str, table_name: &str) { + pub async fn drop_table(&mut self, schema: &str, table_name: &str) { self.execute_query(&format!("DROP TABLE IF EXISTS {schema}.{table_name}")) .await; } - pub async fn create_schema(&self, schema: &str) { + pub async fn create_schema(&mut self, schema: &str) { self.drop_schema(schema).await; self.execute_query(&format!("CREATE SCHEMA {schema}")).await; } - pub async fn insert_rows(&self, table_name: &str, count: u64, offset: Option) { + pub async fn insert_rows(&mut self, table_name: &str, count: u64, offset: Option) { let offset = offset.map_or(0, |o| o); let mut buf = String::new(); for i in 0..count { diff --git a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs index 0613b113ca..b46d9a99cf 100644 --- a/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs +++ b/dozer-ingestion/src/connectors/postgres/tests/continue_replication_tests.rs @@ -49,17 +49,17 @@ mod tests { connector.create_publication(client, None).await.unwrap(); // Creating slot - let client = helper::connect(replication_conn_config.clone()) + let mut client = helper::connect(replication_conn_config.clone()) .await .unwrap(); let slot_name = connector.get_slot_name(); - let _parsed_lsn = create_slot(&client, &slot_name).await; + let _parsed_lsn = create_slot(&mut client, &slot_name).await; // let result = connector // .can_start_from((u64::from(parsed_lsn), 0)) // .unwrap(); - ReplicationSlotHelper::drop_replication_slot(&client, &slot_name) + ReplicationSlotHelper::drop_replication_slot(&mut client, &slot_name) .await .unwrap(); // assert!( @@ -83,7 +83,7 @@ mod tests { .as_ref() .unwrap(); - let test_client = TestPostgresClient::new(config).await; + let mut test_client = TestPostgresClient::new(config).await; let mut rng = rand::thread_rng(); let table_name = format!("test_table_{}", rng.gen::()); let connector_name = format!("pg_connector_{}", rng.gen::()); @@ -114,12 +114,12 @@ mod tests { .unwrap(); // Creating slot - let client = helper::connect(replication_conn_config.clone()) + let mut client = helper::connect(replication_conn_config.clone()) .await .unwrap(); let slot_name = connector.get_slot_name(); - let _parsed_lsn = create_slot(&client, &slot_name).await; + let _parsed_lsn = create_slot(&mut client, &slot_name).await; // let config = IngestionConfig::default(); // let (ingestor, mut iterator) = Ingestor::initialize_channel(config); @@ -166,9 +166,9 @@ mod tests { // } // } - if let Err(e) = ReplicationSlotHelper::drop_replication_slot(&client, &slot_name).await + if let Err(e) = ReplicationSlotHelper::drop_replication_slot(&mut client, &slot_name).await { - retry_drop_active_slot(e, &client, &slot_name) + retry_drop_active_slot(e, &mut client, &slot_name) .await .unwrap(); } diff --git a/dozer-ingestion/src/test_util.rs b/dozer-ingestion/src/test_util.rs index d44f8e69cf..4d0c07d724 100644 --- a/dozer-ingestion/src/test_util.rs +++ b/dozer-ingestion/src/test_util.rs @@ -20,7 +20,7 @@ async fn warm_up(app_config: &Config) { .port(replenished_config.port as u16) .ssl_mode(replenished_config.sslmode); - let client = TestPostgresClient::new_with_postgres_config(config).await; + let mut client = TestPostgresClient::new_with_postgres_config(config).await; client .execute_query(&format!( "DROP DATABASE IF EXISTS {}", diff --git a/dozer-ingestion/tests/test_suite/connectors/postgres.rs b/dozer-ingestion/tests/test_suite/connectors/postgres.rs index be6ef7de7a..393f145c8e 100644 --- a/dozer-ingestion/tests/test_suite/connectors/postgres.rs +++ b/dozer-ingestion/tests/test_suite/connectors/postgres.rs @@ -1,5 +1,5 @@ use dozer_ingestion::connectors::postgres::{ - connection::helper::connect, + connection::{client::Client, helper::connect}, connector::{PostgresConfig, PostgresConnector}, }; use dozer_types::types::Field; @@ -31,7 +31,7 @@ impl DataReadyConnectorTest for PostgresConnectorTest { type Connector = PostgresConnector; async fn new() -> (Self, Self::Connector) { - let (client, connector_test, connector) = create_postgres_server().await; + let (mut client, connector_test, connector) = create_postgres_server().await; client .batch_execute(&create_table_with_all_supported_data_types("test_table")) .await @@ -51,7 +51,7 @@ impl InsertOnlyConnectorTest for PostgresConnectorTest { schema: FieldsAndPk, records: Vec>, ) -> Option<(Self, Self::Connector, FieldsAndPk)> { - let (client, mut connector_test, connector) = create_postgres_server().await; + let (mut client, mut connector_test, connector) = create_postgres_server().await; let (actual_schema, _) = schema_to_sql(schema.clone()); @@ -87,7 +87,7 @@ impl InsertOnlyConnectorTest for PostgresConnectorTest { #[async_trait] impl CudConnectorTest for PostgresConnectorTest { async fn start_cud(&self, operations: Vec) { - let client = connect(self.config.clone()).await.unwrap(); + let mut client = connect(self.config.clone()).await.unwrap(); let schema_name = self.schema_name.clone(); let table_name = self.table_name.clone(); let schema = self.schema.clone(); @@ -107,11 +107,7 @@ impl CudConnectorTest for PostgresConnectorTest { } } -async fn create_postgres_server() -> ( - tokio_postgres::Client, - PostgresConnectorTest, - PostgresConnector, -) { +async fn create_postgres_server() -> (Client, PostgresConnectorTest, PostgresConnector) { let host = "localhost"; let port = 5432; let user = "postgres";