Skip to content

Commit

Permalink
feat: iceberg table engine. (#13835)
Browse files Browse the repository at this point in the history
* feat: iceberg table engine.

* feat: iceberg table engine.

* refactor: use table options.

* show table options when 'show create table.'

* fix clippy

* rebase
  • Loading branch information
youngsofun authored Dec 4, 2023
1 parent 4f1bfcd commit 6bc7124
Show file tree
Hide file tree
Showing 33 changed files with 258 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub enum Engine {
Fuse,
View,
Random,
Iceberg,
}

impl Display for Engine {
Expand All @@ -629,6 +630,7 @@ impl Display for Engine {
Engine::Fuse => write!(f, "FUSE"),
Engine::View => write!(f, "VIEW"),
Engine::Random => write!(f, "RANDOM"),
Engine::Iceberg => write!(f, "ICEBERG"),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2936,6 +2936,7 @@ pub fn engine(i: Input) -> IResult<Engine> {
value(Engine::Fuse, rule! { FUSE }),
value(Engine::View, rule! { VIEW }),
value(Engine::Random, rule! { RANDOM }),
value(Engine::Iceberg, rule! { ICEBERG }),
));

map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ use storages_common_table_meta::meta::Versioned;
use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;
use storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
use storages_common_table_meta::table::OPT_KEY_COMMENT;
use storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
use storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use storages_common_table_meta::table::OPT_KEY_ENGINE;
use storages_common_table_meta::table::OPT_KEY_LOCATION;
use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
Expand Down Expand Up @@ -436,6 +438,11 @@ pub static CREATE_TABLE_OPTIONS: Lazy<HashSet<&'static str>> = Lazy::new(|| {

r.insert(OPT_KEY_ENGINE);

r.insert(OPT_KEY_ENGINE);

r.insert(OPT_KEY_LOCATION);
r.insert(OPT_KEY_CONNECTION_NAME);

r.insert("transient");
r
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl ShowCreateTableInterpreter {
.get_hide_options_in_show_create_table()
.unwrap_or(false);

if !hide_options_in_show_create_table {
if !hide_options_in_show_create_table || engine == "ICEBERG" {
table_create_sql.push_str({
let mut opts = table_info.options().iter().collect::<Vec<_>>();
opts.sort_by_key(|(k, _)| *k);
Expand Down
19 changes: 15 additions & 4 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use common_storage::MergeStatus;
use common_storage::StageFileInfo;
use common_storage::StorageMetrics;
use common_storages_fuse::TableContext;
use common_storages_iceberg::IcebergTable;
use common_storages_parquet::Parquet2Table;
use common_storages_parquet::ParquetRSTable;
use common_storages_result_cache::ResultScan;
Expand All @@ -97,6 +98,7 @@ use crate::sessions::QueryContextShared;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sql::binder::get_storage_params_from_options;
use crate::storages::Table;

const MYSQL_VERSION: &str = "8.0.26";
Expand Down Expand Up @@ -684,9 +686,7 @@ impl TableContext for QueryContext {
}
}
async fn get_connection(&self, name: &str) -> Result<UserDefinedConnection> {
let user_mgr = UserApiProvider::instance();
let tenant = self.get_tenant();
user_mgr.get_connection(&tenant, name).await
self.shared.get_connection(name).await
}

/// Fetch a Table by db and table name.
Expand All @@ -703,7 +703,18 @@ impl TableContext for QueryContext {
database: &str,
table: &str,
) -> Result<Arc<dyn Table>> {
self.shared.get_table(catalog, database, table).await
let table = self.shared.get_table(catalog, database, table).await?;
// the better place to do this is in the QueryContextShared::get_table_to_cache() method,
// but there is no way to access dyn TableContext.
let table: Arc<dyn Table> = if table.engine() == "ICEBERG" {
let sp = get_storage_params_from_options(self, table.options()).await?;
let mut info = table.get_table_info().to_owned();
info.meta.storage_params = Some(sp);
IcebergTable::try_create(info.to_owned())?.into()
} else {
table
};
Ok(table)
}

#[async_backtrace::framed]
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserDefinedConnection;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::InputError;
use common_settings::Settings;
use common_storage::CopyStatus;
use common_storage::DataOperator;
use common_storage::MergeStatus;
use common_storage::StorageMetrics;
use common_users::UserApiProvider;
use dashmap::DashMap;
use parking_lot::Mutex;
use parking_lot::RwLock;
Expand Down Expand Up @@ -401,6 +403,12 @@ impl QueryContextShared {
let status = self.status.read();
status.clone()
}

pub async fn get_connection(&self, name: &str) -> Result<UserDefinedConnection> {
let user_mgr = UserApiProvider::instance();
let tenant = self.get_tenant();
user_mgr.get_connection(&tenant, name).await
}
}

impl Drop for QueryContextShared {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl AsyncSource for InferSchemaSource {
)?;
FileLocation::Uri(uri)
};
let (stage_info, path) = resolve_file_location(&self.ctx, &file_location).await?;
let (stage_info, path) = resolve_file_location(self.ctx.as_ref(), &file_location).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl AsyncSource for InspectParquetSource {
}
self.is_finished = true;
let uri = self.uri.strip_prefix('@').unwrap().to_string();
let (stage_info, path) = resolve_stage_location(&self.ctx, &uri).await?;
let (stage_info, path) = resolve_stage_location(self.ctx.as_ref(), &uri).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl AsyncSource for ListStagesSource {
self.is_finished = true;

let (stage_info, path) =
resolve_stage_location(&self.ctx, &self.args_parsed.location).await?;
resolve_stage_location(self.ctx.as_ref(), &self.args_parsed.location).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
21 changes: 11 additions & 10 deletions src/query/service/tests/it/storages/testdata/engines_table.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
---------- TABLE INFO ------------
DB.Table: 'system'.'engines', Table: engines-table_id:1, ver:0, Engine: SystemEngines
-------- TABLE CONTENTS ----------
+----------+-------------------------------+
| Column 0 | Column 1 |
+----------+-------------------------------+
| 'FUSE' | 'FUSE Storage Engine' |
| 'MEMORY' | 'MEMORY Storage Engine' |
| 'NULL' | 'NULL Storage Engine' |
| 'RANDOM' | 'RANDOM Storage Engine' |
| 'STREAM' | 'STREAM STORAGE Engine' |
| 'VIEW' | 'VIEW STORAGE (LOGICAL VIEW)' |
+----------+-------------------------------+
+-----------+-------------------------------+
| Column 0 | Column 1 |
+-----------+-------------------------------+
| 'FUSE' | 'FUSE Storage Engine' |
| 'ICEBERG' | 'ICEBERG STORAGE Engine' |
| 'MEMORY' | 'MEMORY Storage Engine' |
| 'NULL' | 'NULL Storage Engine' |
| 'RANDOM' | 'RANDOM Storage Engine' |
| 'STREAM' | 'STREAM STORAGE Engine' |
| 'VIEW' | 'VIEW STORAGE (LOGICAL VIEW)' |
+-----------+-------------------------------+


1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common-pipeline-transforms = { path = "../pipeline/transforms" }
common-profile = { path = "../profile" }
common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }
common-storages-iceberg = { path = "../storages/iceberg" }
common-storages-parquet = { path = "../storages/parquet" }
common-storages-result-cache = { path = "../storages/result_cache" }
common-storages-stage = { path = "../storages/stage" }
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<'a> Binder {
}
}?;

let (mut stage_info, path) = resolve_file_location(&self.ctx, &stmt.dst).await?;
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), &stmt.dst).await?;
self.apply_copy_into_location_options(stmt, &mut stage_info)
.await?;

Expand Down
8 changes: 4 additions & 4 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<'a> Binder {
let validation_mode = ValidationMode::from_str(stmt.validation_mode.as_str())
.map_err(ErrorCode::SyntaxException)?;

let (mut stage_info, path) = resolve_file_location(&self.ctx, location).await?;
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), location).await?;
self.apply_copy_into_table_options(stmt, &mut stage_info)
.await?;
let files_info = StageFilesInfo {
Expand Down Expand Up @@ -218,7 +218,7 @@ impl<'a> Binder {
attachment: StageAttachment,
) -> Result<(StageInfo, StageFilesInfo)> {
let (mut stage_info, path) =
resolve_stage_location(&self.ctx, &attachment.location[1..]).await?;
resolve_stage_location(self.ctx.as_ref(), &attachment.location[1..]).await?;

if let Some(ref options) = attachment.file_format_options {
stage_info.file_format_params = FileFormatOptionsAst {
Expand Down Expand Up @@ -527,7 +527,7 @@ fn check_transform_query(
/// - @internal/abc => (internal, "/stage/internal/abc")
#[async_backtrace::framed]
pub async fn resolve_stage_location(
ctx: &Arc<dyn TableContext>,
ctx: &dyn TableContext,
location: &str,
) -> Result<(StageInfo, String)> {
// my_named_stage/abc/
Expand All @@ -550,7 +550,7 @@ pub async fn resolve_stage_location(

#[async_backtrace::framed]
pub async fn resolve_file_location(
ctx: &Arc<dyn TableContext>,
ctx: &dyn TableContext,
location: &FileLocation,
) -> Result<(StageInfo, String)> {
match location.clone() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn parse_catalog_url(
};

let mut location = UriLocation::from_uri(uri, "".to_string(), options)?;
let (sp, _) = parse_uri_location(&mut location, Some(ctx)).await?;
let (sp, _) = parse_uri_location(&mut location, Some(ctx.as_ref())).await?;

Ok(Some(sp))
}
5 changes: 3 additions & 2 deletions src/query/sql/src/planner/binder/ddl/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Binder {
location: &str,
pattern: &str,
) -> Result<Plan> {
let (stage, path) = resolve_stage_location(&self.ctx, location).await?;
let (stage, path) = resolve_stage_location(self.ctx.as_ref(), location).await?;
let plan_node = RemoveStagePlan {
path,
stage,
Expand Down Expand Up @@ -82,7 +82,8 @@ impl Binder {
connection: uri.connection.clone(),
};

let (stage_storage, path) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (stage_storage, path) =
parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

if !path.ends_with('/') {
return Err(ErrorCode::SyntaxException(
Expand Down
38 changes: 29 additions & 9 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use common_expression::TableSchemaRefExt;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::storage::StorageParams;
use common_storage::DataOperator;
use common_storages_iceberg::IcebergTable;
use common_storages_view::view_table::QUERY;
use common_storages_view::view_table::VIEW_ENGINE;
use log::debug;
Expand All @@ -80,7 +81,8 @@ use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
use storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
use storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION;

use crate::binder::location::parse_uri_location;
use crate::binder::get_storage_params_from_options;
use crate::binder::parse_uri_location;
use crate::binder::scalar::ScalarBinder;
use crate::binder::Binder;
use crate::binder::ColumnBindingBuilder;
Expand Down Expand Up @@ -418,16 +420,16 @@ impl Binder {
)?;
}

let (storage_params, part_prefix) = match uri_location {
Some(uri) => {
let (mut storage_params, part_prefix) = match (uri_location, engine) {
(Some(uri), Engine::Fuse) => {
let mut uri = UriLocation {
protocol: uri.protocol.clone(),
name: uri.name.clone(),
path: uri.path.clone(),
part_prefix: uri.part_prefix.clone(),
connection: uri.connection.clone(),
};
let (sp, _) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (sp, _) = parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

// create a temporary op to check if params is correct
DataOperator::try_create(&sp).await?;
Expand All @@ -441,7 +443,11 @@ impl Binder {

(Some(sp), fp)
}
None => (None, "".to_string()),
(Some(uri), _) => Err(ErrorCode::BadArguments(format!(
"Incorrect CREATE query: CREATE TABLE with external location is only supported for FUSE engine, but got {:?} for {:?}",
engine, uri
)))?,
_ => (None, "".to_string()),
};

// If table is TRANSIENT, set a flag in table option
Expand Down Expand Up @@ -495,9 +501,23 @@ impl Binder {
Self::validate_create_table_schema(&source_schema)?;
(source_schema, source_comments)
}
_ => Err(ErrorCode::BadArguments(
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT..",
))?,
_ => {
if engine == Engine::Iceberg {
let sp = get_storage_params_from_options(self.ctx.as_ref(), &options).await?;
let dop = DataOperator::try_new(&sp)?;
let table = IcebergTable::load_iceberg_table(dop).await?;
let table_schema = IcebergTable::get_schema(&table).await?;
// the first version of current iceberg table do not need to persist the storage_params,
// since we get it from table options location and connection when load table each time.
// we do this in case we change this idea.
storage_params = Some(sp);
(Arc::new(table_schema), vec![])
} else {
Err(ErrorCode::BadArguments(
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT or ICEBERG table engine",
))?
}
}
};

// for fuse engine, we will insert database_id, so if we check it in execute phase,
Expand Down Expand Up @@ -646,7 +666,7 @@ impl Binder {

let mut uri = stmt.uri_location.clone();
uri.path = root;
let (sp, _) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (sp, _) = parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

// create a temporary op to check if params is correct
DataOperator::try_create(&sp).await?;
Expand Down
Loading

0 comments on commit 6bc7124

Please sign in to comment.