Skip to content

refactor: Serialize iceberg table inside engine options to reduce catalog requests #17049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 23, 2024
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ http = "1"
humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" }
indexmap = "2.0.0"
indicatif = "0.17.5"
itertools = "0.13.0"
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ iceberg-catalog-glue = { workspace = true }
iceberg-catalog-hms = { workspace = true }
iceberg-catalog-rest = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }
typetag = { workspace = true }

[lints]
Expand Down
138 changes: 93 additions & 45 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::any::Any;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
Expand Down Expand Up @@ -40,7 +42,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_pipeline_core::Pipeline;
use databend_storages_common_table_meta::table::ChangeType;
use futures::TryStreamExt;
use tokio::sync::OnceCell;
use iceberg::io::FileIOBuilder;

use crate::partition::IcebergPartInfo;
use crate::predicate::PredicateBuilder;
Expand All @@ -53,28 +55,15 @@ pub const ICEBERG_ENGINE: &str = "ICEBERG";
#[derive(Clone)]
pub struct IcebergTable {
info: TableInfo,
ctl: IcebergCatalog,
database_name: String,
table_name: String,

table: OnceCell<iceberg::table::Table>,
pub table: iceberg::table::Table,
}

impl IcebergTable {
/// create a new table on the table directory
#[async_backtrace::framed]
pub fn try_create(info: TableInfo) -> Result<Box<dyn Table>> {
let ctl = IcebergCatalog::try_create(info.catalog_info.clone())?;
let (db_name, table_name) = info.desc.as_str().rsplit_once('.').ok_or_else(|| {
ErrorCode::BadArguments(format!("Iceberg table desc {} is invalid", &info.desc))
})?;
Ok(Box::new(Self {
info: info.clone(),
ctl,
database_name: db_name.to_string(),
table_name: table_name.to_string(),
table: OnceCell::new(),
}))
let table = Self::parse_engine_options(&info.meta.engine_options)?;
Ok(Box::new(Self { info, table }))
}

pub fn description() -> StorageDescription {
Expand Down Expand Up @@ -111,6 +100,88 @@ impl IcebergTable {
TableSchema::try_from(&arrow_schema)
}

/// build_engine_options will generate `engine_options` from [`iceberg::table::Table`] so that
/// we can distribute it across nodes and rebuild this table without loading from catalog again.
///
/// We will never persist the `engine_options` to storage, so it's safe to change the implementation.
/// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated.
pub fn build_engine_options(table: &iceberg::table::Table) -> Result<BTreeMap<String, String>> {
let (file_io_scheme, file_io_props) = table.file_io().clone().into_props();
let file_io_props = serde_json::to_string(&file_io_props)?;
let metadata_location = table
.metadata_location()
.map(|v| v.to_string())
.unwrap_or_default();
let metadata = serde_json::to_string(table.metadata())?;
let identifier = serde_json::to_string(table.identifier())?;

Ok(BTreeMap::from_iter([
("iceberg.file_io.scheme".to_string(), file_io_scheme),
("iceberg.file_io.props".to_string(), file_io_props),
("iceberg.metadata_location".to_string(), metadata_location),
("iceberg.metadata".to_string(), metadata),
("iceberg.identifier".to_string(), identifier),
]))
}

/// parse_engine_options will parse `engine_options` to [`BTreeMap`] so that we can rebuild the table.
///
/// See [`build_engine_options`] for more information.
pub fn parse_engine_options(
options: &BTreeMap<String, String>,
) -> Result<iceberg::table::Table> {
let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.file_io.scheme",
)
})?;

let file_io_props: HashMap<String, String> =
serde_json::from_str(options.get("iceberg.file_io.props").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.file_io.props",
)
})?)?;

let metadata_location = options
.get("iceberg.metadata_location")
.map(|s| s.to_string())
.unwrap_or_default();

let metadata: iceberg::spec::TableMetadata =
serde_json::from_str(options.get("iceberg.metadata").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.metadata",
)
})?)?;

let identifier: iceberg::TableIdent =
serde_json::from_str(options.get("iceberg.identifier").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.identifier",
)
})?)?;

let file_io = FileIOBuilder::new(file_io_scheme)
.with_props(file_io_props)
.build()
.map_err(|err| {
ErrorCode::ReadTableDataError(format!(
"Rebuild iceberg table file io failed: {err:?}"
))
})?;

iceberg::table::Table::builder()
.identifier(identifier)
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(file_io)
.build()
.map_err(|err| {
ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}"))
})
}

/// create a new table on the table directory
#[async_backtrace::framed]
pub async fn try_create_from_iceberg_catalog(
Expand All @@ -121,6 +192,8 @@ impl IcebergTable {
let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?;
let table_schema = Self::get_schema(&table)?;

let engine_options = Self::build_engine_options(&table)?;

// construct table info
let info = TableInfo {
ident: TableIdent::new(0, 0),
Expand All @@ -129,38 +202,15 @@ impl IcebergTable {
meta: TableMeta {
schema: Arc::new(table_schema),
engine: "iceberg".to_string(),
engine_options,
created_on: Utc::now(),
..Default::default()
},
catalog_info: ctl.info(),
..Default::default()
};

Ok(Self {
info,
ctl,
database_name: database_name.to_string(),
table_name: table_name.to_string(),
table: OnceCell::new_with(Some(table)),
})
}

/// Fetch or init the iceberg table
pub async fn table(&self) -> Result<&iceberg::table::Table> {
self.table
.get_or_try_init(|| async {
let table =
Self::load_iceberg_table(&self.ctl, &self.database_name, &self.table_name)
.await
.map_err(|err| {
ErrorCode::ReadTableDataError(format!(
"Iceberg catalog load failed: {err:?}"
))
})?;

Ok(table)
})
.await
Ok(Self { info, table })
}

pub fn do_read_data(
Expand Down Expand Up @@ -189,9 +239,7 @@ impl IcebergTable {
_: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
) -> Result<(PartStatistics, Partitions)> {
let table = self.table().await?;

let mut scan = table.scan();
let mut scan = self.table.scan();

if let Some(push_downs) = &push_downs {
if let Some(projection) = &push_downs.projection {
Expand Down
10 changes: 8 additions & 2 deletions src/query/storages/iceberg/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,17 @@ impl Processor for IcebergTableSource {
// And we should try to build another stream (in next event loop).
} else if let Some(part) = self.ctx.get_partition() {
let part = IcebergPartInfo::from_part(&part)?;
// TODO: enable row filter?
let reader = self.table.table().await?.reader_builder().build();
let reader = self
.table
.table
.reader_builder()
.with_batch_size(self.ctx.get_settings().get_parquet_max_block_size()? as usize)
.with_row_group_filtering_enabled(true)
.build();
// TODO: don't use stream here.
let stream = reader
.read(Box::pin(stream::iter([Ok(part.to_task())])))
.await
.map_err(|err| ErrorCode::Internal(format!("iceberg data stream read: {err:?}")))?;
self.stream = Some(stream);
} else {
Expand Down
Loading