Skip to content

Commit

Permalink
fix: retry on network errors in Postgres connector
Browse files Browse the repository at this point in the history
Similarly to the MySQL connector, select queries resume from the last row received.
The CDC resumes from the position where it was stopped.
  • Loading branch information
abcpro1 committed Aug 31, 2023
1 parent 9068b0e commit 3d9fa2d
Show file tree
Hide file tree
Showing 19 changed files with 509 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mysql_async = { version = "0.32.2", default-features = false, features = ["defau
mysql_common = { version = "0.30", default-features = false, features = ["chrono", "rust_decimal"] }
chrono = "0.4.26"
geozero = { version = "0.10.0", default-features = false, features = ["with-wkb"] }
bytes = "1.4.0"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/postgres/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod helper;
mod tables_validator;
pub mod validator;
pub mod client;
255 changes: 255 additions & 0 deletions dozer-ingestion/src/connectors/postgres/connection/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};

use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::stream::BoxStream;
use futures::Stream;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Config, CopyBothDuplex, Row, SimpleQueryMessage, Statement, ToStatement};

use super::helper;
use crate::connectors::postgres::connection::helper::is_network_failure;
use crate::errors::PostgresConnectorError;
use crate::retry_on_network_failure;

#[derive(Debug)]
pub struct Client {
config: tokio_postgres::Config,
inner: tokio_postgres::Client,
}

impl Client {
pub fn new(config: Config, client: tokio_postgres::Client) -> Self {
Self {
config,
inner: client,
}
}

pub fn config(&self) -> &Config {
&self.config
}

pub async fn prepare(&mut self, query: &str) -> Result<Statement, tokio_postgres::Error> {
retry_on_network_failure!(
"prepare",
self.inner.prepare(query).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn simple_query(
&mut self,
query: &str,
) -> Result<Vec<SimpleQueryMessage>, tokio_postgres::Error> {
retry_on_network_failure!(
"simple_query",
self.inner.simple_query(query).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn query_one<T>(
&mut self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Row, tokio_postgres::Error>
where
T: ?Sized + ToStatement,
{
retry_on_network_failure!(
"query_one",
self.inner.query_one(statement, params).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn query<T>(
&mut self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, tokio_postgres::Error>
where
T: ?Sized + ToStatement,
{
retry_on_network_failure!(
"query",
self.inner.query(statement, params).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn query_raw(
&mut self,
query: String,
params: Vec<String>,
) -> Result<BoxStream<'static, Result<Row, tokio_postgres::Error>>, tokio_postgres::Error> {
let client = Self::connect(self.config.clone()).await?;
let row_stream = RowStream::new(client, query, params).await?;
Ok(Box::pin(row_stream))
}

pub async fn copy_both_simple<T>(
&mut self,
query: &str,
) -> Result<CopyBothDuplex<T>, tokio_postgres::Error>
where
T: bytes::Buf + 'static + Send,
{
retry_on_network_failure!(
"copy_both_simple",
self.inner.copy_both_simple(query).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn batch_execute(&mut self, query: &str) -> Result<(), tokio_postgres::Error> {
retry_on_network_failure!(
"batch_execute",
self.inner.batch_execute(query).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn reconnect(&mut self) -> Result<(), tokio_postgres::Error> {
let new_client = Self::connect(self.config.clone()).await?;
self.inner = new_client.inner;
Ok(())
}

pub async fn connect(config: Config) -> Result<Client, tokio_postgres::Error> {
let client = match helper::connect(config).await {
Ok(client) => client,
Err(PostgresConnectorError::ConnectionFailure(err)) => return Err(err),
Err(err) => panic!("unexpected error {err}"),
};
Ok(client)
}

async fn query_raw_internal(
&mut self,
statement: Statement,
params: Vec<String>,
) -> Result<tokio_postgres::RowStream, tokio_postgres::Error> {
retry_on_network_failure!(
"query_raw",
self.inner.query_raw(&statement, &params).await,
is_network_failure,
self.reconnect().await?
)
}
}

pub struct RowStream {
client: Arc<Mutex<Client>>,
query: String,
query_params: Vec<String>,
cursor_position: u64,
inner: Pin<Box<tokio_postgres::RowStream>>,
pending_resume:
Option<BoxFuture<'static, Result<(Client, tokio_postgres::RowStream), tokio_postgres::Error>>>,
}

impl RowStream {
pub async fn new(
mut client: Client,
query: String,
params: Vec<String>,
) -> Result<Self, tokio_postgres::Error> {
let statement = client.prepare(&query).await?;
let inner = client.query_raw_internal(statement, params.clone()).await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
query,
query_params: params,
cursor_position: 0,
inner: Box::pin(inner),
pending_resume: None,
})
}

fn resume(
&mut self,
) -> BoxFuture<'static, Result<(Client, tokio_postgres::RowStream), tokio_postgres::Error>>
{
async fn resume_async(
client: Arc<Mutex<Client>>,
query: String,
params: Vec<String>,
offset: u64,
) -> Result<(Client, tokio_postgres::RowStream), tokio_postgres::Error> {
let config = client.lock().await.config().clone();
// reconnect
let mut client = Client::connect(config).await?;
// send query with offset
let statement = client.prepare(&add_query_offset(&query, offset)).await?;
let row_stream = client.query_raw_internal(statement, params).await?;
Ok((client, row_stream))
}

Box::pin(resume_async(
self.client.clone(),
self.query.clone(),
self.query_params.clone(),
self.cursor_position,
))
}
}

impl Stream for RowStream {
type Item = Result<Row, tokio_postgres::Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

loop {
if let Some(resume) = this.pending_resume.as_mut() {
match ready!(resume.as_mut().poll(cx)) {
Ok((client, inner)) => {
this.pending_resume = None;
this.client = Arc::new(Mutex::new(client));
this.inner = Box::pin(inner);
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
}

match ready!(this.inner.as_mut().poll_next(cx)) {
Some(Ok(row)) => {
this.cursor_position += 1;
return Poll::Ready(Some(Ok(row)));
}
Some(Err(err)) => {
if is_network_failure(&err) {
this.pending_resume = Some(this.resume());
continue;
} else {
return Poll::Ready(Some(Err(err)));
}
}
None => return Poll::Ready(None),
}
}
}
}

fn add_query_offset(query: &str, offset: u64) -> String {
assert!(query.trim_start().get(0..7).map(|s| s.to_uppercase() == "SELECT ").unwrap_or(false));

if offset == 0 {
query.into()
} else {
format!("{query} OFFSET {offset}")
}
}
57 changes: 46 additions & 11 deletions dozer-ingestion/src/connectors/postgres/connection/helper.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use super::client::Client;
use crate::errors::ConnectorError::WrongConnectionConfiguration;
use crate::errors::PostgresConnectorError::InvalidSslError;
use crate::errors::{ConnectorError, PostgresConnectorError};
use crate::retry_on_network_failure;
use dozer_types::log::{debug, error};
use dozer_types::models::connection::ConnectionConfig;
use rustls::client::{ServerCertVerified, ServerCertVerifier};
use rustls::{Certificate, Error, ServerName};
use std::sync::Arc;
use std::time::SystemTime;
use tokio_postgres::config::SslMode;
use tokio_postgres::{Client, NoTls};
use tokio_postgres::{Connection, NoTls, Socket};

pub fn map_connection_config(
auth_details: &ConnectionConfig,
Expand Down Expand Up @@ -66,8 +68,7 @@ pub async fn connect(config: tokio_postgres::Config) -> Result<Client, PostgresC
match config.get_ssl_mode() {
SslMode::Disable => {
// tokio-postgres::Config::SslMode::Disable + NoTLS connection
let (client, connection) = config
.connect(NoTls)
let (client, connection) = connect_helper(config, NoTls)
.await
.map_err(PostgresConnectorError::ConnectionFailure)?;
tokio::spawn(async move {
Expand All @@ -82,10 +83,12 @@ pub async fn connect(config: tokio_postgres::Config) -> Result<Client, PostgresC
rustls_config
.dangerous()
.set_certificate_verifier(Arc::new(AcceptAllVerifier {}));
let (client, connection) = config
.connect(tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config))
.await
.map_err(PostgresConnectorError::ConnectionFailure)?;
let (client, connection) = connect_helper(
config,
tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config),
)
.await
.map_err(PostgresConnectorError::ConnectionFailure)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
Expand All @@ -96,10 +99,12 @@ pub async fn connect(config: tokio_postgres::Config) -> Result<Client, PostgresC
// SslMode::Allow => unimplemented!(),
SslMode::Require => {
// tokio-postgres::Config::SslMode::Require + TLS connection with verification
let (client, connection) = config
.connect(tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config))
.await
.map_err(PostgresConnectorError::ConnectionFailure)?;
let (client, connection) = connect_helper(
config,
tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config),
)
.await
.map_err(PostgresConnectorError::ConnectionFailure)?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
Expand All @@ -110,3 +115,33 @@ pub async fn connect(config: tokio_postgres::Config) -> Result<Client, PostgresC
ssl_mode => Err(InvalidSslError(ssl_mode)),
}
}

async fn connect_helper<T>(
config: tokio_postgres::Config,
tls: T,
) -> Result<(Client, Connection<Socket, T::Stream>), tokio_postgres::Error>
where
T: tokio_postgres::tls::MakeTlsConnect<Socket> + Clone,
{
retry_on_network_failure!(
"connect",
config.connect(tls.clone()).await,
is_network_failure,
()
)
.map(|(client, connection)| (Client::new(config, client), connection))
}

pub fn is_network_failure(err: &tokio_postgres::Error) -> bool {
let err_str = err.to_string();
if err_str.starts_with("error communicating with the server")
|| err_str.starts_with("error performing TLS handshake")
|| err_str.starts_with("connection closed")
|| err_str.starts_with("error connecting to server")
|| err_str.starts_with("timeout waiting for server")
{
true
} else {
false
}
}
Loading

0 comments on commit 3d9fa2d

Please sign in to comment.