Skip to content

Commit

Permalink
fix: retry on network errors in MySQL connector
Browse files Browse the repository at this point in the history
Resume database queries on network errors.
Select queries with multiple rows resume from the last row received.
CDC continues from its last position.
  • Loading branch information
abcpro1 committed Aug 31, 2023
1 parent de3d0c1 commit 9068b0e
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 58 deletions.
21 changes: 16 additions & 5 deletions dozer-ingestion/src/connectors/mysql/binlog.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use super::{
connection::Conn,
conversion::{IntoField, IntoFields, IntoJsonValue},
schema::{ColumnDefinition, TableDefinition},
};
use crate::{
connectors::mysql::connection::is_network_failure,
errors::{ConnectorError, MySQLConnectorError},
ingestion::Ingestor,
};
Expand All @@ -13,7 +15,7 @@ use dozer_types::{
};
use dozer_types::{json_types::JsonValue, types::Field};
use futures::StreamExt;
use mysql_async::{binlog::EventFlags, prelude::Queryable, BinlogStream, Conn, Pool};
use mysql_async::{binlog::EventFlags, BinlogStream, Pool};
use mysql_common::{
binlog::{
self,
Expand Down Expand Up @@ -99,8 +101,7 @@ impl<'a, 'b, 'c, 'd, 'e> BinlogIngestor<'a, 'b, 'c, 'd, 'e> {

impl BinlogIngestor<'_, '_, '_, '_, '_> {
async fn connect(&self) -> Result<Conn, MySQLConnectorError> {
self.conn_pool
.get_conn()
Conn::new(self.conn_pool.clone())
.await
.map_err(|err| MySQLConnectorError::ConnectionFailure(self.conn_url.clone(), err))
}
Expand Down Expand Up @@ -138,15 +139,25 @@ impl BinlogIngestor<'_, '_, '_, '_, '_> {
let mut seq_no = 0;
let mut table_cache = BinlogTableCache::new(self.tables);

'binlog_read: while let Some(event) = self.binlog_stream.as_mut().unwrap().next().await {
'binlog_read: while let Some(result) = self.binlog_stream.as_mut().unwrap().next().await {
match self.local_stop_position {
Some(stop_position) if self.next_position.position >= stop_position => {
break 'binlog_read;
}
_ => {}
}

let binlog_event = event.map_err(MySQLConnectorError::BinlogReadError)?;
let binlog_event = match result {
Ok(event) => event,
Err(err) => {
if is_network_failure(&err) {
self.open_binlog().await?;
continue 'binlog_read;
} else {
Err(MySQLConnectorError::BinlogReadError(err))?
}
}
};

let is_artificial = binlog_event
.header()
Expand Down
231 changes: 231 additions & 0 deletions dozer-ingestion/src/connectors/mysql/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use crate::retry_on_network_failure;
use futures::{Stream, StreamExt};
use mysql_async::{prelude::Queryable, BinlogRequest, BinlogStream, Params, Pool};
use mysql_common::{prelude::FromRow, Row};
use std::{
pin::Pin,
task::{ready, Poll},
};
use tokio::sync::mpsc::Receiver;

#[derive(Debug)]
pub struct Conn {
pool: mysql_async::Pool,
inner: mysql_async::Conn,
}

impl Conn {
pub async fn new(pool: mysql_async::Pool) -> Result<Conn, mysql_async::Error> {
let conn = new_mysql_connection(&pool).await?;
Ok(Conn { pool, inner: conn })
}

pub async fn exec_first<'a: 'b, 'b, T, P>(
&'a mut self,
query: &str,
params: P,
) -> Result<Option<T>, mysql_async::Error>
where
P: Into<Params> + Send + Copy + 'b,
T: FromRow + Send + 'static,
{
retry_on_network_failure!(
"query",
self.inner.exec_first(query, params).await,
is_network_failure,
self.reconnect().await?
)
}

pub fn exec_iter(&mut self, query: String, params: Vec<mysql_common::Value>) -> QueryResult {
exec_iter_impl(self.pool.clone(), query, params)
}

#[allow(unused)]
pub async fn exec_drop<'a: 'b, 'b, P>(
&'a mut self,
query: &str,
params: P,
) -> Result<(), mysql_async::Error>
where
P: Into<Params> + Send + Copy + 'b,
{
retry_on_network_failure!(
"query",
self.inner.exec_drop(query, params).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn query_drop(&mut self, query: &str) -> Result<(), mysql_async::Error> {
retry_on_network_failure!(
"query",
self.inner.query_drop(query).await,
is_network_failure,
self.reconnect().await?
)
}

pub async fn get_binlog_stream(
self,
request: BinlogRequest<'_>,
) -> Result<BinlogStream, mysql_async::Error> {
let mut inner = self.inner;
retry_on_network_failure!(
"get_binlog_stream",
inner.get_binlog_stream(request.clone()).await,
is_network_failure,
inner = new_mysql_connection(&self.pool).await?
)
}

async fn reconnect(&mut self) -> Result<(), mysql_async::Error> {
self.inner = new_mysql_connection(&self.pool).await?;
Ok(())
}
}

async fn new_mysql_connection(pool: &Pool) -> Result<mysql_async::Conn, mysql_async::Error> {
retry_on_network_failure!("connect", pool.get_conn().await, is_network_failure, ())
}

pub fn is_network_failure(err: &mysql_async::Error) -> bool {
use mysql_async::DriverError::*;
use mysql_async::Error::*;
match err {
Driver(err) => match err {
ConnectionClosed => true,
_ => false,
},
Io(_) => true,
_ => false,
}
}

fn add_query_offset(query: &str, offset: u64) -> String {
assert!(query.trim_start().get(0..7).map(|s| s.to_uppercase() == "SELECT ").unwrap_or(false));

if offset == 0 {
query.into()
} else {
format!("{query} OFFSET {offset}")
}
}

fn exec_iter_impl(pool: Pool, query: String, params: Vec<mysql_common::Value>) -> QueryResult {
// this is basically a generator/coroutine using a channel to communicate the results
let (sender, receiver) = tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let mut cursor_position: u64 = 0;
'main: loop {
let mut conn = match new_mysql_connection(&pool).await {
Ok(conn) => conn,
Err(err) => {
let _ = sender.send(Err(err)).await;
break;
}
};
let mut rows = match retry_on_network_failure!(
"query",
conn.exec_iter(add_query_offset(&query, cursor_position), &params)
.await,
is_network_failure,
continue 'main
) {
Ok(rows) => rows,
Err(err) => {
let _ = sender.send(Err(err)).await;
break;
}
};
loop {
let result = retry_on_network_failure!(
"query",
rows.next().await,
is_network_failure,
continue 'main
);
let stop = result.is_err() || result.as_ref().unwrap().is_none();
if sender.send(result).await.is_err() {
break;
}
if stop {
break;
}
cursor_position += 1;
}
break;
}
});

let stream = QueryResult::new(receiver);
stream
}

pub struct QueryResult {
receiver: Receiver<Result<Option<Row>, mysql_async::Error>>,
}

impl QueryResult {
pub fn new(receiver: Receiver<Result<Option<Row>, mysql_async::Error>>) -> Self {
Self { receiver }
}

pub async fn next(&mut self) -> Option<Result<Row, mysql_async::Error>> {
StreamExt::next(self).await
}

pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>, mysql_async::Error>
where
F: FnMut(Row) -> U,
{
let mut acc = Vec::new();
while let Some(result) = self.next().await {
let row = result?;
acc.push(fun(mysql_async::from_row(row)));
}
Ok(acc)
}

pub async fn reduce<T, F, U>(
&mut self,
mut init: U,
mut fun: F,
) -> Result<U, mysql_async::Error>
where
F: FnMut(U, T) -> U,
T: FromRow + Send + 'static,
{
while let Some(result) = self.next().await {
let row = result?;
init = fun(init, mysql_async::from_row(row));
}
Ok(init)
}
}

impl Stream for QueryResult {
type Item = Result<Row, mysql_async::Error>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match ready!(self.as_mut().receiver.poll_recv(cx)) {
Some(result) => match result {
Ok(Some(row)) => Poll::Ready(Some(Ok(row))),
Ok(None) => Poll::Ready(None),
Err(err) => Poll::Ready(Some(Err(err))),
},
None => Poll::Ready(None),
}
}
}

impl Drop for QueryResult {
fn drop(&mut self) {
self.receiver.close()
}
}
Loading

0 comments on commit 9068b0e

Please sign in to comment.