Skip to content

Commit

Permalink
fix(iceberg): Make sure iceberg table has been loaded (#16593)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Oct 11, 2024
1 parent dff4617 commit 1be1360
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 14 deletions.
15 changes: 4 additions & 11 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::IcebergCatalog;
pub const ICEBERG_ENGINE: &str = "ICEBERG";

/// accessor wrapper as a table
#[derive(Clone)]
pub struct IcebergTable {
info: TableInfo,
ctl: IcebergCatalog,
Expand Down Expand Up @@ -141,7 +142,8 @@ impl IcebergTable {
})
}

async fn table(&self) -> Result<&iceberg::table::Table> {
/// Fetch or init the iceberg table
pub async fn table(&self) -> Result<&iceberg::table::Table> {
self.table
.get_or_try_init(|| async {
let table =
Expand All @@ -164,23 +166,14 @@ impl IcebergTable {
plan: &DataSourcePlan,
pipeline: &mut Pipeline,
) -> Result<()> {
let table = self
.table
.get()
.expect("iceberg table must have been loaded");
let parts_len = plan.parts.len();
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let max_threads = std::cmp::min(parts_len, max_threads);

let output_schema = Arc::new(DataSchema::from(plan.schema()));
pipeline.add_source(
|output| {
IcebergTableSource::create(
ctx.clone(),
output,
output_schema.clone(),
table.clone(),
)
IcebergTableSource::create(ctx.clone(), output, output_schema.clone(), self.clone())
},
max_threads.max(1),
)
Expand Down
7 changes: 4 additions & 3 deletions src/query/storages/iceberg/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ use futures::StreamExt;
use iceberg::scan::ArrowRecordBatchStream;

use crate::partition::IcebergPartInfo;
use crate::IcebergTable;

pub struct IcebergTableSource {
// Source processor related fields.
table: iceberg::table::Table,
table: IcebergTable,
output: Arc<OutputPort>,
scan_progress: Arc<Progress>,

Expand All @@ -57,7 +58,7 @@ impl IcebergTableSource {
ctx: Arc<dyn TableContext>,
output: Arc<OutputPort>,
output_schema: DataSchemaRef,
table: iceberg::table::Table,
table: IcebergTable,
) -> Result<ProcessorPtr> {
let scan_progress = ctx.get_scan_progress();
Ok(ProcessorPtr::create(Box::new(IcebergTableSource {
Expand Down Expand Up @@ -135,7 +136,7 @@ impl Processor for IcebergTableSource {
} else if let Some(part) = self.ctx.get_partition() {
let part = IcebergPartInfo::from_part(&part)?;
// TODO: enable row filter?
let reader = self.table.reader_builder().build();
let reader = self.table.table().await?.reader_builder().build();
// TODO: don't use stream here.
let stream = reader
.read(Box::pin(stream::iter([Ok(part.to_task())])))
Expand Down

0 comments on commit 1be1360

Please sign in to comment.