diff --git a/datafusion-postgres/src/pg_catalog/catalog_info.rs b/datafusion-postgres/src/pg_catalog/catalog_info.rs index cf576e1..673cd81 100644 --- a/datafusion-postgres/src/pg_catalog/catalog_info.rs +++ b/datafusion-postgres/src/pg_catalog/catalog_info.rs @@ -10,11 +10,14 @@ use datafusion::{ /// Define the interface for retrieve catalog data for pg_catalog tables #[async_trait] pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static { - fn catalog_names(&self) -> Result, DataFusionError>; + async fn catalog_names(&self) -> Result, DataFusionError>; - fn schema_names(&self, catalog_name: &str) -> Result>, DataFusionError>; + async fn schema_names( + &self, + catalog_name: &str, + ) -> Result>, DataFusionError>; - fn table_names( + async fn table_names( &self, catalog_name: &str, schema_name: &str, @@ -37,15 +40,18 @@ pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static { #[async_trait] impl CatalogInfo for Arc { - fn catalog_names(&self) -> Result, DataFusionError> { + async fn catalog_names(&self) -> Result, DataFusionError> { Ok(CatalogProviderList::catalog_names(self.as_ref())) } - fn schema_names(&self, catalog_name: &str) -> Result>, DataFusionError> { + async fn schema_names( + &self, + catalog_name: &str, + ) -> Result>, DataFusionError> { Ok(self.catalog(catalog_name).map(|c| c.schema_names())) } - fn table_names( + async fn table_names( &self, catalog_name: &str, schema_name: &str, diff --git a/datafusion-postgres/src/pg_catalog/pg_attribute.rs b/datafusion-postgres/src/pg_catalog/pg_attribute.rs index 9727f84..4911f31 100644 --- a/datafusion-postgres/src/pg_catalog/pg_attribute.rs +++ b/datafusion-postgres/src/pg_catalog/pg_attribute.rs @@ -105,11 +105,13 @@ impl PgAttributeTable { // original one in case that schemas or tables were dropped. let mut swap_cache = HashMap::new(); - for catalog_name in this.catalog_list.catalog_names()? { - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { + for catalog_name in this.catalog_list.catalog_names().await? { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? { for schema_name in schema_names { - if let Some(table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name)? + if let Some(table_names) = this + .catalog_list + .table_names(&catalog_name, &schema_name) + .await? { // Process all tables in this schema for table_name in table_names { diff --git a/datafusion-postgres/src/pg_catalog/pg_class.rs b/datafusion-postgres/src/pg_catalog/pg_class.rs index 6e76920..ba5a71c 100644 --- a/datafusion-postgres/src/pg_catalog/pg_class.rs +++ b/datafusion-postgres/src/pg_catalog/pg_class.rs @@ -117,7 +117,7 @@ impl PgClassTable { let mut swap_cache = HashMap::new(); // Iterate through all catalogs and schemas - for catalog_name in this.catalog_list.catalog_names()? { + for catalog_name in this.catalog_list.catalog_names().await? { let cache_key = OidCacheKey::Catalog(catalog_name.clone()); let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) { *oid @@ -126,7 +126,7 @@ impl PgClassTable { }; swap_cache.insert(cache_key, catalog_oid); - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? { for schema_name in schema_names { let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone()); let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) { @@ -140,8 +140,10 @@ impl PgClassTable { // (In a full implementation, this would go in pg_namespace) // Now process all tables in this schema - if let Some(table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name)? + if let Some(table_names) = this + .catalog_list + .table_names(&catalog_name, &schema_name) + .await? { for table_name in table_names { let cache_key = OidCacheKey::Table( diff --git a/datafusion-postgres/src/pg_catalog/pg_database.rs b/datafusion-postgres/src/pg_catalog/pg_database.rs index 6b6071c..8598e3d 100644 --- a/datafusion-postgres/src/pg_catalog/pg_database.rs +++ b/datafusion-postgres/src/pg_catalog/pg_database.rs @@ -80,7 +80,7 @@ impl PgDatabaseTable { let mut oid_cache = this.oid_cache.write().await; // Add a record for each catalog (treating catalogs as "databases") - for catalog_name in this.catalog_list.catalog_names()? { + for catalog_name in this.catalog_list.catalog_names().await? { let cache_key = OidCacheKey::Catalog(catalog_name.clone()); let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) { *oid diff --git a/datafusion-postgres/src/pg_catalog/pg_namespace.rs b/datafusion-postgres/src/pg_catalog/pg_namespace.rs index fd10523..fa18677 100644 --- a/datafusion-postgres/src/pg_catalog/pg_namespace.rs +++ b/datafusion-postgres/src/pg_catalog/pg_namespace.rs @@ -62,8 +62,8 @@ impl PgNamespaceTable { let mut oid_cache = this.oid_cache.write().await; // Now add all schemas from DataFusion catalogs - for catalog_name in this.catalog_list.catalog_names()? { - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { + for catalog_name in this.catalog_list.catalog_names().await? { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? { for schema_name in schema_names { let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone()); let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) { diff --git a/datafusion-postgres/src/pg_catalog/pg_tables.rs b/datafusion-postgres/src/pg_catalog/pg_tables.rs index 2d4ce7f..ec1c77c 100644 --- a/datafusion-postgres/src/pg_catalog/pg_tables.rs +++ b/datafusion-postgres/src/pg_catalog/pg_tables.rs @@ -49,11 +49,15 @@ impl PgTablesTable { let mut row_security = Vec::new(); // Iterate through all catalogs and schemas - for catalog_name in this.catalog_list.catalog_names()? { - if let Some(catalog_schema_names) = this.catalog_list.schema_names(&catalog_name)? { + for catalog_name in this.catalog_list.catalog_names().await? { + if let Some(catalog_schema_names) = + this.catalog_list.schema_names(&catalog_name).await? + { for schema_name in catalog_schema_names { - if let Some(catalog_table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name)? + if let Some(catalog_table_names) = this + .catalog_list + .table_names(&catalog_name, &schema_name) + .await? { // Now process all tables in this schema for table_name in catalog_table_names {