diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index e8f7e7a24ec7e..4eac8568ef480 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -47,6 +47,7 @@ use crate::IcebergCatalog; pub const ICEBERG_ENGINE: &str = "ICEBERG"; /// accessor wrapper as a table +#[derive(Clone)] pub struct IcebergTable { info: TableInfo, ctl: IcebergCatalog, @@ -141,7 +142,8 @@ impl IcebergTable { }) } - async fn table(&self) -> Result<&iceberg::table::Table> { + /// Fetch or init the iceberg table + pub async fn table(&self) -> Result<&iceberg::table::Table> { self.table .get_or_try_init(|| async { let table = @@ -164,10 +166,6 @@ impl IcebergTable { plan: &DataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { - let table = self - .table - .get() - .expect("iceberg table must have been loaded"); let parts_len = plan.parts.len(); let max_threads = ctx.get_settings().get_max_threads()? as usize; let max_threads = std::cmp::min(parts_len, max_threads); @@ -175,12 +173,7 @@ impl IcebergTable { let output_schema = Arc::new(DataSchema::from(plan.schema())); pipeline.add_source( |output| { - IcebergTableSource::create( - ctx.clone(), - output, - output_schema.clone(), - table.clone(), - ) + IcebergTableSource::create(ctx.clone(), output, output_schema.clone(), self.clone()) }, max_threads.max(1), ) diff --git a/src/query/storages/iceberg/src/table_source.rs b/src/query/storages/iceberg/src/table_source.rs index 27dcd8c70de43..ef3d55a48f4ec 100644 --- a/src/query/storages/iceberg/src/table_source.rs +++ b/src/query/storages/iceberg/src/table_source.rs @@ -35,10 +35,11 @@ use futures::StreamExt; use iceberg::scan::ArrowRecordBatchStream; use crate::partition::IcebergPartInfo; +use crate::IcebergTable; pub struct IcebergTableSource { // Source processor related fields. - table: iceberg::table::Table, + table: IcebergTable, output: Arc, scan_progress: Arc, @@ -57,7 +58,7 @@ impl IcebergTableSource { ctx: Arc, output: Arc, output_schema: DataSchemaRef, - table: iceberg::table::Table, + table: IcebergTable, ) -> Result { let scan_progress = ctx.get_scan_progress(); Ok(ProcessorPtr::create(Box::new(IcebergTableSource { @@ -135,7 +136,7 @@ impl Processor for IcebergTableSource { } else if let Some(part) = self.ctx.get_partition() { let part = IcebergPartInfo::from_part(&part)?; // TODO: enable row filter? - let reader = self.table.reader_builder().build(); + let reader = self.table.table().await?.reader_builder().build(); // TODO: don't use stream here. let stream = reader .read(Box::pin(stream::iter([Ok(part.to_task())])))