diff --git a/dozer-ingestion/src/connectors/delta_lake/connector.rs b/dozer-ingestion/src/connectors/delta_lake/connector.rs index e92bbea820..404cfeaf39 100644 --- a/dozer-ingestion/src/connectors/delta_lake/connector.rs +++ b/dozer-ingestion/src/connectors/delta_lake/connector.rs @@ -2,8 +2,8 @@ use crate::connectors::delta_lake::reader::DeltaLakeReader; use crate::connectors::delta_lake::schema_helper::SchemaHelper; use crate::connectors::delta_lake::ConnectorResult; use crate::connectors::{ - table_name, ConnectorMeta, ConnectorStart, ListOrFilterColumns, SourceSchemaResult, - TableIdentifier, TableInfo, TableToIngest, + table_name, Connector, ListOrFilterColumns, SourceSchemaResult, TableIdentifier, TableInfo, + TableToIngest, }; use crate::errors::ConnectorError; use crate::ingestion::Ingestor; @@ -22,7 +22,7 @@ impl DeltaLakeConnector { } #[async_trait] -impl ConnectorMeta for DeltaLakeConnector { +impl Connector for DeltaLakeConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -107,10 +107,7 @@ impl ConnectorMeta for DeltaLakeConnector { let schema_helper = SchemaHelper::new(self.config.clone()); schema_helper.get_schemas(&table_infos).await } -} -#[async_trait(?Send)] -impl ConnectorStart for DeltaLakeConnector { async fn start(&self, ingestor: &Ingestor, tables: Vec) -> ConnectorResult<()> { let reader = DeltaLakeReader::new(self.config.clone()); reader.read(&tables, ingestor).await diff --git a/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs b/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs index c685de9434..691fe211ff 100644 --- a/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs +++ b/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs @@ -1,5 +1,5 @@ use crate::connectors::delta_lake::DeltaLakeConnector; -use crate::connectors::ConnectorMeta; +use crate::connectors::Connector; use crate::test_util::create_runtime_and_spawn_connector_all_tables; use dozer_types::ingestion_types::IngestionMessage; use dozer_types::ingestion_types::{DeltaLakeConfig, DeltaTable}; diff --git a/dozer-ingestion/src/connectors/dozer/connector.rs b/dozer-ingestion/src/connectors/dozer/connector.rs index 7065b1f1fb..b6e43af654 100644 --- a/dozer-ingestion/src/connectors/dozer/connector.rs +++ b/dozer-ingestion/src/connectors/dozer/connector.rs @@ -25,8 +25,8 @@ use tonic::{async_trait, transport::Channel}; use crate::{ connectors::{ - warn_dropped_primary_index, CdcType, ConnectorMeta, ConnectorStart, SourceSchema, - SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, + warn_dropped_primary_index, CdcType, Connector, SourceSchema, SourceSchemaResult, + TableIdentifier, TableInfo, TableToIngest, }, errors::{ConnectorError, NestedDozerConnectorError}, ingestion::Ingestor, @@ -38,7 +38,7 @@ pub struct NestedDozerConnector { } #[async_trait] -impl ConnectorMeta for NestedDozerConnector { +impl Connector for NestedDozerConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -123,10 +123,7 @@ impl ConnectorMeta for NestedDozerConnector { Ok(schemas) } -} -#[async_trait(?Send)] -impl ConnectorStart for NestedDozerConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/ethereum/log/connector.rs b/dozer-ingestion/src/connectors/ethereum/log/connector.rs index 8c101ce018..c1d1e89bfa 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/connector.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::{str::FromStr, sync::Arc}; use crate::connectors::{ - table_name, CdcType, ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, - TableIdentifier, TableToIngest, + table_name, CdcType, Connector, SourceSchema, SourceSchemaResult, TableIdentifier, + TableToIngest, }; use crate::ingestion::Ingestor; use crate::{connectors::TableInfo, errors::ConnectorError}; @@ -123,7 +123,7 @@ impl EthLogConnector { } #[async_trait] -impl ConnectorMeta for EthLogConnector { +impl Connector for EthLogConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -232,10 +232,7 @@ impl ConnectorMeta for EthLogConnector { Ok(result) } -} -#[async_trait(?Send)] -impl ConnectorStart for EthLogConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/ethereum/log/tests/helper.rs b/dozer-ingestion/src/connectors/ethereum/log/tests/helper.rs index c0d5f85d64..0391c51517 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/tests/helper.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/tests/helper.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use crate::{ connectors::{ ethereum::{helper, EthLogConnector}, - ConnectorMeta, TableInfo, + Connector, TableInfo, }, errors::ConnectorError, test_util::spawn_connector, diff --git a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs index adaebb7238..08a3bf8834 100644 --- a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs @@ -1,8 +1,8 @@ use super::super::helper as conn_helper; use super::helper::{self, get_block_traces, map_trace_to_ops}; use crate::connectors::{ - table_name, CdcType, ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, - TableIdentifier, TableToIngest, + table_name, CdcType, Connector, SourceSchema, SourceSchemaResult, TableIdentifier, + TableToIngest, }; use crate::{connectors::TableInfo, errors::ConnectorError, ingestion::Ingestor}; use dozer_types::ingestion_types::{EthTraceConfig, IngestionMessage}; @@ -25,7 +25,7 @@ impl EthTraceConnector { } #[async_trait] -impl ConnectorMeta for EthTraceConnector { +impl Connector for EthTraceConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -91,10 +91,7 @@ impl ConnectorMeta for EthTraceConnector { CdcType::Nothing, ))]) } -} -#[async_trait(?Send)] -impl ConnectorStart for EthTraceConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/ethereum/trace/tests.rs b/dozer-ingestion/src/connectors/ethereum/trace/tests.rs index 05d0bf37d1..4167eff88e 100644 --- a/dozer-ingestion/src/connectors/ethereum/trace/tests.rs +++ b/dozer-ingestion/src/connectors/ethereum/trace/tests.rs @@ -9,7 +9,7 @@ use dozer_types::{ use crate::{ connectors::{ ethereum::{helper, trace::helper::get_block_traces, EthTraceConnector}, - ConnectorMeta, + Connector, }, test_util::{create_test_runtime, spawn_connector}, }; diff --git a/dozer-ingestion/src/connectors/grpc/connector.rs b/dozer-ingestion/src/connectors/grpc/connector.rs index 78060f02fc..95dd13103e 100644 --- a/dozer-ingestion/src/connectors/grpc/connector.rs +++ b/dozer-ingestion/src/connectors/grpc/connector.rs @@ -4,8 +4,7 @@ use std::path::Path; use super::adapter::{GrpcIngestor, IngestAdapter}; use super::ingest::IngestorServiceImpl; use crate::connectors::{ - table_name, ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, TableIdentifier, - TableToIngest, + table_name, Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableToIngest, }; use crate::{connectors::TableInfo, errors::ConnectorError, ingestion::Ingestor}; use dozer_types::grpc_types::ingest::ingest_service_server::IngestServiceServer; @@ -119,7 +118,7 @@ impl GrpcConnector { } #[async_trait] -impl ConnectorMeta for GrpcConnector +impl Connector for GrpcConnector where T: IngestAdapter, { @@ -217,10 +216,7 @@ where Ok(result) } -} -#[async_trait(?Send)] -impl ConnectorStart for GrpcConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/kafka/connector.rs b/dozer-ingestion/src/connectors/kafka/connector.rs index 735948ef95..862398de86 100644 --- a/dozer-ingestion/src/connectors/kafka/connector.rs +++ b/dozer-ingestion/src/connectors/kafka/connector.rs @@ -2,7 +2,7 @@ use rdkafka::consumer::BaseConsumer; use rdkafka::ClientConfig; use crate::connectors::{ - ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, TableIdentifier, TableToIngest, + Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableToIngest, }; use crate::ingestion::Ingestor; use crate::{connectors::TableInfo, errors::ConnectorError}; @@ -42,7 +42,7 @@ impl KafkaConnector { } #[async_trait] -impl ConnectorMeta for KafkaConnector { +impl Connector for KafkaConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -126,10 +126,7 @@ impl ConnectorMeta for KafkaConnector { .map(Ok) .collect()) } -} -#[async_trait(?Send)] -impl ConnectorStart for KafkaConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index d94672b12b..192a6bbd1a 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -86,7 +86,7 @@ impl SourceSchema { pub type SourceSchemaResult = Result; #[async_trait] -pub trait ConnectorMeta { +pub trait Connector: Send + Sync + Debug { /// Returns all the external types and their corresponding Dozer types. /// If the external type is not supported, None should be returned. fn types_mapping() -> Vec<(String, Option)> @@ -132,12 +132,7 @@ pub trait ConnectorMeta { .collect::, _>>()?; Ok((table_infos, schemas)) } -} -/// We split `Connector` trait into two because snowflake's (using odbc) `start` future is `!Send`. -/// Once we switch to a better client, we should merge them. -#[async_trait(?Send)] -pub trait ConnectorStart { /// Starts outputting data from `tables` to `ingestor`. This method should never return unless there is an unrecoverable error. async fn start( &self, @@ -146,10 +141,6 @@ pub trait ConnectorStart { ) -> Result<(), ConnectorError>; } -pub trait Connector: ConnectorMeta + ConnectorStart + Send + Sync + Debug {} - -impl Connector for T {} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] /// Unique identifier of a source table. A source table must have a `name`, optionally under a `schema` scope. pub struct TableIdentifier { diff --git a/dozer-ingestion/src/connectors/mongodb/mod.rs b/dozer-ingestion/src/connectors/mongodb/mod.rs index ba09aa76f3..40b61af9e6 100644 --- a/dozer-ingestion/src/connectors/mongodb/mod.rs +++ b/dozer-ingestion/src/connectors/mongodb/mod.rs @@ -20,8 +20,7 @@ use dozer_types::{ }; use super::{ - ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, + Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, }; #[derive(Error, Debug)] @@ -454,7 +453,7 @@ impl MongodbConnector { } #[async_trait] -impl ConnectorMeta for MongodbConnector { +impl Connector for MongodbConnector { async fn validate_connection(&self) -> Result<(), ConnectorError> { let client = self.client().await?; let server_info = self.identify_server(&client).await?; @@ -588,10 +587,7 @@ impl ConnectorMeta for MongodbConnector { } Ok(()) } -} -#[async_trait(?Send)] -impl ConnectorStart for MongodbConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/mysql/connector.rs b/dozer-ingestion/src/connectors/mysql/connector.rs index 18345bab58..d9d12a6e00 100644 --- a/dozer-ingestion/src/connectors/mysql/connector.rs +++ b/dozer-ingestion/src/connectors/mysql/connector.rs @@ -7,8 +7,8 @@ use super::{ }; use crate::{ connectors::{ - CdcType, ConnectorMeta, ConnectorStart, SourceSchema, SourceSchemaResult, TableIdentifier, - TableInfo, TableToIngest, + CdcType, Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, + TableToIngest, }, errors::MySQLConnectorError, }; @@ -43,7 +43,7 @@ impl MySQLConnector { } #[async_trait] -impl ConnectorMeta for MySQLConnector { +impl Connector for MySQLConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -186,10 +186,7 @@ impl ConnectorMeta for MySQLConnector { Ok(schemas) } -} -#[async_trait(?Send)] -impl ConnectorStart for MySQLConnector { async fn start( &self, ingestor: &Ingestor, @@ -405,7 +402,7 @@ mod tests { connection::Conn, tests::{create_test_table, mariadb_test_config, mysql_test_config, TestConfig}, }, - CdcType, ConnectorMeta, SourceSchema, TableIdentifier, + CdcType, Connector, SourceSchema, TableIdentifier, }, ingestion::{IngestionIterator, Ingestor}, }; diff --git a/dozer-ingestion/src/connectors/object_store/connector.rs b/dozer-ingestion/src/connectors/object_store/connector.rs index 5c3876468c..0ce241f3cb 100644 --- a/dozer-ingestion/src/connectors/object_store/connector.rs +++ b/dozer-ingestion/src/connectors/object_store/connector.rs @@ -8,8 +8,7 @@ use tonic::async_trait; use crate::connectors::object_store::adapters::DozerObjectStore; use crate::connectors::object_store::schema_mapper; use crate::connectors::{ - ConnectorMeta, ConnectorStart, ListOrFilterColumns, SourceSchemaResult, TableIdentifier, - TableInfo, TableToIngest, + Connector, ListOrFilterColumns, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, }; use crate::errors::{ConnectorError, ObjectStoreConnectorError}; use crate::ingestion::Ingestor; @@ -36,7 +35,7 @@ impl ObjectStoreConnector { } #[async_trait] -impl ConnectorMeta for ObjectStoreConnector { +impl Connector for ObjectStoreConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -99,10 +98,7 @@ impl ConnectorMeta for ObjectStoreConnector { .collect::>(); schema_mapper::get_schema(&self.config, &list_or_filter_columns).await } -} -#[async_trait(?Send)] -impl ConnectorStart for ObjectStoreConnector { async fn start(&self, ingestor: &Ingestor, tables: Vec) -> ConnectorResult<()> { let (sender, mut receiver) = channel::, ObjectStoreConnectorError>>(100); // todo: increase buffer siz diff --git a/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs b/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs index 40f0e6c372..e0fa4948d6 100644 --- a/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs +++ b/dozer-ingestion/src/connectors/object_store/tests/local_storage_tests.rs @@ -1,5 +1,5 @@ use crate::connectors::object_store::connector::ObjectStoreConnector; -use crate::connectors::ConnectorMeta; +use crate::connectors::Connector; use crate::test_util::create_runtime_and_spawn_connector_all_tables; use dozer_types::ingestion_types::IngestionMessage; use dozer_types::ingestion_types::LocalDetails; diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index 3276e73ffa..0117e120a1 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -1,8 +1,7 @@ use crate::connectors::postgres::connection::validator::validate_connection; use crate::connectors::postgres::iterator::PostgresIterator; use crate::connectors::{ - ConnectorMeta, ConnectorStart, ListOrFilterColumns, SourceSchemaResult, TableIdentifier, - TableInfo, TableToIngest, + Connector, ListOrFilterColumns, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, }; use crate::errors::ConnectorError; use crate::ingestion::Ingestor; @@ -82,7 +81,7 @@ impl PostgresConnector { } #[async_trait] -impl ConnectorMeta for PostgresConnector { +impl Connector for PostgresConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -162,10 +161,7 @@ impl ConnectorMeta for PostgresConnector { .await .map_err(Into::into) } -} -#[async_trait(?Send)] -impl ConnectorStart for PostgresConnector { async fn start( &self, ingestor: &Ingestor, diff --git a/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs b/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs index 5bed613296..6c3d4d04b3 100644 --- a/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs +++ b/dozer-ingestion/src/connectors/snowflake/connector/snowflake.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::connectors::snowflake::connection::client::Client; use crate::connectors::{ - ConnectorMeta, ConnectorStart, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, + Connector, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, }; use crate::errors::ConnectorError; use crate::ingestion::Ingestor; @@ -16,8 +16,6 @@ use dozer_types::log::{info, warn}; use crate::connectors::snowflake::schema_helper::SchemaHelper; -use tokio::time; - use crate::errors::{SnowflakeError, SnowflakeStreamError}; #[derive(Debug)] @@ -30,10 +28,18 @@ impl SnowflakeConnector { pub fn new(name: String, config: SnowflakeConfig) -> Self { Self { name, config } } + + async fn get_schemas_async( + &self, + table_names: Option>, + ) -> Result>, ConnectorError> { + let config = self.config.clone(); + spawn_blocking(move || SchemaHelper::get_schema(&config, table_names.as_deref())).await + } } #[async_trait] -impl ConnectorMeta for SnowflakeConnector { +impl Connector for SnowflakeConnector { fn types_mapping() -> Vec<(String, Option)> where Self: Sized, @@ -42,11 +48,11 @@ impl ConnectorMeta for SnowflakeConnector { } async fn validate_connection(&self) -> Result<(), ConnectorError> { - SchemaHelper::get_schema(&self.config, None).map(|_| ()) + self.get_schemas_async(None).await.map(|_| ()) } async fn list_tables(&self) -> Result, ConnectorError> { - let schemas = SchemaHelper::get_schema(&self.config, None)?; + let schemas = self.get_schemas_async(None).await?; let mut tables = vec![]; for schema in schemas { tables.push(TableIdentifier::from_table_name(schema?.0)); @@ -59,7 +65,7 @@ impl ConnectorMeta for SnowflakeConnector { .iter() .map(|table| table.name.clone()) .collect::>(); - let schemas = SchemaHelper::get_schema(&self.config, Some(&table_names))?; + let schemas = self.get_schemas_async(Some(table_names)).await?; for schema in schemas { schema?; } @@ -74,7 +80,7 @@ impl ConnectorMeta for SnowflakeConnector { .iter() .map(|table| table.name.clone()) .collect::>(); - let schemas = SchemaHelper::get_schema(&self.config, Some(&table_names))?; + let schemas = self.get_schemas_async(Some(table_names)).await?; let mut result = vec![]; for schema in schemas { let (name, schema) = schema?; @@ -102,33 +108,38 @@ impl ConnectorMeta for SnowflakeConnector { .iter() .map(|table_info| table_info.name.clone()) .collect::>(); - Ok(SchemaHelper::get_schema(&self.config, Some(&table_names))? + Ok(self + .get_schemas_async(Some(table_names)) + .await? .into_iter() .map(|schema_result| schema_result.map(|(_, schema)| schema)) .collect()) } -} -#[async_trait(?Send)] -impl ConnectorStart for SnowflakeConnector { async fn start( &self, ingestor: &Ingestor, tables: Vec, ) -> Result<(), ConnectorError> { - run(self.name.clone(), self.config.clone(), tables, ingestor).await + spawn_blocking({ + let name = self.name.clone(); + let config = self.config.clone(); + let ingestor = ingestor.clone(); + move || run(name, config, tables, ingestor) + }) + .await } } -async fn run( +fn run( name: String, config: SnowflakeConfig, tables: Vec, - ingestor: &Ingestor, + ingestor: Ingestor, ) -> Result<(), ConnectorError> { // SNAPSHOT part - run it when stream table doesn't exist let stream_client = Client::new(&config); - let mut interval = time::interval(Duration::from_secs(5)); + let interval = Duration::from_secs(5); let mut consumer = StreamConsumer::new(); let mut iteration = 0; @@ -162,13 +173,28 @@ async fn run( info!("[{}][{}] Reading from changes stream", name, table.name); - consumer - .consume_stream(&stream_client, &table.name, ingestor, idx, iteration) - .await?; + consumer.consume_stream(&stream_client, &table.name, &ingestor, idx, iteration)?; - interval.tick().await; + std::thread::sleep(interval); } iteration += 1; } } + +async fn spawn_blocking(f: F) -> T +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + tokio::task::spawn_blocking(f) + .await + .unwrap_or_else(|join_err| { + let msg = format!("{join_err}"); + if join_err.is_panic() { + panic!("{msg}; panic: {:?}", join_err.into_panic()) + } else { + panic!("{msg}") + } + }) +} diff --git a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs index 7231e8ac5d..1ee743c91f 100644 --- a/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs +++ b/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs @@ -111,7 +111,7 @@ impl StreamConsumer { } } - pub async fn consume_stream( + pub fn consume_stream( &mut self, client: &Client, table_name: &str, @@ -149,12 +149,11 @@ impl StreamConsumer { for (idx, row) in iterator.enumerate() { let op = Self::get_operation(row, action_idx, used_columns_for_schema)?; ingestor - .handle_message(IngestionMessage::OperationEvent { + .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, id: Some(OpIdentifier::new(iteration, idx as u64)), }) - .await .map_err(|_| ConnectorError::IngestorError)?; } } diff --git a/dozer-ingestion/src/connectors/snowflake/tests.rs b/dozer-ingestion/src/connectors/snowflake/tests.rs index 25ae802dd5..8e43bb623e 100644 --- a/dozer-ingestion/src/connectors/snowflake/tests.rs +++ b/dozer-ingestion/src/connectors/snowflake/tests.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::connectors::snowflake::connector::SnowflakeConnector; use crate::connectors::snowflake::test_utils::remove_streams; -use crate::connectors::{get_connector, ConnectorMeta, TableIdentifier}; +use crate::connectors::{get_connector, Connector, TableIdentifier}; use dozer_types::types::FieldType::{ Binary, Boolean, Date, Decimal, Float, Int, String, Timestamp, diff --git a/dozer-ingestion/src/ingestion/ingestor.rs b/dozer-ingestion/src/ingestion/ingestor.rs index 46a8bde510..f889e11279 100644 --- a/dozer-ingestion/src/ingestion/ingestor.rs +++ b/dozer-ingestion/src/ingestion/ingestor.rs @@ -49,6 +49,13 @@ impl Ingestor { ) -> Result<(), SendError> { self.sender.send(message).await } + + pub fn blocking_handle_message( + &self, + message: IngestionMessage, + ) -> Result<(), SendError> { + self.sender.blocking_send(message) + } } #[cfg(test)] diff --git a/dozer-ingestion/tests/test_suite/basic.rs b/dozer-ingestion/tests/test_suite/basic.rs index 938fb7995a..775e986d48 100644 --- a/dozer-ingestion/tests/test_suite/basic.rs +++ b/dozer-ingestion/tests/test_suite/basic.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use dozer_ingestion::{ - connectors::{CdcType, ConnectorMeta, SourceSchema, TableIdentifier}, + connectors::{CdcType, Connector, SourceSchema, TableIdentifier}, test_util::spawn_connector, }; use dozer_types::{