Skip to content

feat: iceberg table engine. #13835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub enum Engine {
Fuse,
View,
Random,
Iceberg,
}

impl Display for Engine {
Expand All @@ -629,6 +630,7 @@ impl Display for Engine {
Engine::Fuse => write!(f, "FUSE"),
Engine::View => write!(f, "VIEW"),
Engine::Random => write!(f, "RANDOM"),
Engine::Iceberg => write!(f, "ICEBERG"),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2936,6 +2936,7 @@ pub fn engine(i: Input) -> IResult<Engine> {
value(Engine::Fuse, rule! { FUSE }),
value(Engine::View, rule! { VIEW }),
value(Engine::Random, rule! { RANDOM }),
value(Engine::Iceberg, rule! { ICEBERG }),
));

map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ use storages_common_table_meta::meta::Versioned;
use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;
use storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
use storages_common_table_meta::table::OPT_KEY_COMMENT;
use storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
use storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use storages_common_table_meta::table::OPT_KEY_ENGINE;
use storages_common_table_meta::table::OPT_KEY_LOCATION;
use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
Expand Down Expand Up @@ -436,6 +438,11 @@ pub static CREATE_TABLE_OPTIONS: Lazy<HashSet<&'static str>> = Lazy::new(|| {

r.insert(OPT_KEY_ENGINE);

r.insert(OPT_KEY_ENGINE);

r.insert(OPT_KEY_LOCATION);
r.insert(OPT_KEY_CONNECTION_NAME);

r.insert("transient");
r
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl ShowCreateTableInterpreter {
.get_hide_options_in_show_create_table()
.unwrap_or(false);

if !hide_options_in_show_create_table {
if !hide_options_in_show_create_table || engine == "ICEBERG" {
table_create_sql.push_str({
let mut opts = table_info.options().iter().collect::<Vec<_>>();
opts.sort_by_key(|(k, _)| *k);
Expand Down
19 changes: 15 additions & 4 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use common_storage::MergeStatus;
use common_storage::StageFileInfo;
use common_storage::StorageMetrics;
use common_storages_fuse::TableContext;
use common_storages_iceberg::IcebergTable;
use common_storages_parquet::Parquet2Table;
use common_storages_parquet::ParquetRSTable;
use common_storages_result_cache::ResultScan;
Expand All @@ -97,6 +98,7 @@ use crate::sessions::QueryContextShared;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sql::binder::get_storage_params_from_options;
use crate::storages::Table;

const MYSQL_VERSION: &str = "8.0.26";
Expand Down Expand Up @@ -684,9 +686,7 @@ impl TableContext for QueryContext {
}
}
async fn get_connection(&self, name: &str) -> Result<UserDefinedConnection> {
let user_mgr = UserApiProvider::instance();
let tenant = self.get_tenant();
user_mgr.get_connection(&tenant, name).await
self.shared.get_connection(name).await
}

/// Fetch a Table by db and table name.
Expand All @@ -703,7 +703,18 @@ impl TableContext for QueryContext {
database: &str,
table: &str,
) -> Result<Arc<dyn Table>> {
self.shared.get_table(catalog, database, table).await
let table = self.shared.get_table(catalog, database, table).await?;
// the better place to do this is in the QueryContextShared::get_table_to_cache() method,
// but there is no way to access dyn TableContext.
let table: Arc<dyn Table> = if table.engine() == "ICEBERG" {
let sp = get_storage_params_from_options(self, table.options()).await?;
let mut info = table.get_table_info().to_owned();
info.meta.storage_params = Some(sp);
IcebergTable::try_create(info.to_owned())?.into()
} else {
table
};
Ok(table)
}

#[async_backtrace::framed]
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserDefinedConnection;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::InputError;
use common_settings::Settings;
use common_storage::CopyStatus;
use common_storage::DataOperator;
use common_storage::MergeStatus;
use common_storage::StorageMetrics;
use common_users::UserApiProvider;
use dashmap::DashMap;
use parking_lot::Mutex;
use parking_lot::RwLock;
Expand Down Expand Up @@ -401,6 +403,12 @@ impl QueryContextShared {
let status = self.status.read();
status.clone()
}

pub async fn get_connection(&self, name: &str) -> Result<UserDefinedConnection> {
let user_mgr = UserApiProvider::instance();
let tenant = self.get_tenant();
user_mgr.get_connection(&tenant, name).await
}
}

impl Drop for QueryContextShared {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl AsyncSource for InferSchemaSource {
)?;
FileLocation::Uri(uri)
};
let (stage_info, path) = resolve_file_location(&self.ctx, &file_location).await?;
let (stage_info, path) = resolve_file_location(self.ctx.as_ref(), &file_location).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl AsyncSource for InspectParquetSource {
}
self.is_finished = true;
let uri = self.uri.strip_prefix('@').unwrap().to_string();
let (stage_info, path) = resolve_stage_location(&self.ctx, &uri).await?;
let (stage_info, path) = resolve_stage_location(self.ctx.as_ref(), &uri).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl AsyncSource for ListStagesSource {
self.is_finished = true;

let (stage_info, path) =
resolve_stage_location(&self.ctx, &self.args_parsed.location).await?;
resolve_stage_location(self.ctx.as_ref(), &self.args_parsed.location).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
21 changes: 11 additions & 10 deletions src/query/service/tests/it/storages/testdata/engines_table.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
---------- TABLE INFO ------------
DB.Table: 'system'.'engines', Table: engines-table_id:1, ver:0, Engine: SystemEngines
-------- TABLE CONTENTS ----------
+----------+-------------------------------+
| Column 0 | Column 1 |
+----------+-------------------------------+
| 'FUSE' | 'FUSE Storage Engine' |
| 'MEMORY' | 'MEMORY Storage Engine' |
| 'NULL' | 'NULL Storage Engine' |
| 'RANDOM' | 'RANDOM Storage Engine' |
| 'STREAM' | 'STREAM STORAGE Engine' |
| 'VIEW' | 'VIEW STORAGE (LOGICAL VIEW)' |
+----------+-------------------------------+
+-----------+-------------------------------+
| Column 0 | Column 1 |
+-----------+-------------------------------+
| 'FUSE' | 'FUSE Storage Engine' |
| 'ICEBERG' | 'ICEBERG STORAGE Engine' |
| 'MEMORY' | 'MEMORY Storage Engine' |
| 'NULL' | 'NULL Storage Engine' |
| 'RANDOM' | 'RANDOM Storage Engine' |
| 'STREAM' | 'STREAM STORAGE Engine' |
| 'VIEW' | 'VIEW STORAGE (LOGICAL VIEW)' |
+-----------+-------------------------------+


1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common-pipeline-transforms = { path = "../pipeline/transforms" }
common-profile = { path = "../profile" }
common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }
common-storages-iceberg = { path = "../storages/iceberg" }
common-storages-parquet = { path = "../storages/parquet" }
common-storages-result-cache = { path = "../storages/result_cache" }
common-storages-stage = { path = "../storages/stage" }
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<'a> Binder {
}
}?;

let (mut stage_info, path) = resolve_file_location(&self.ctx, &stmt.dst).await?;
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), &stmt.dst).await?;
self.apply_copy_into_location_options(stmt, &mut stage_info)
.await?;

Expand Down
8 changes: 4 additions & 4 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<'a> Binder {
let validation_mode = ValidationMode::from_str(stmt.validation_mode.as_str())
.map_err(ErrorCode::SyntaxException)?;

let (mut stage_info, path) = resolve_file_location(&self.ctx, location).await?;
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), location).await?;
self.apply_copy_into_table_options(stmt, &mut stage_info)
.await?;
let files_info = StageFilesInfo {
Expand Down Expand Up @@ -218,7 +218,7 @@ impl<'a> Binder {
attachment: StageAttachment,
) -> Result<(StageInfo, StageFilesInfo)> {
let (mut stage_info, path) =
resolve_stage_location(&self.ctx, &attachment.location[1..]).await?;
resolve_stage_location(self.ctx.as_ref(), &attachment.location[1..]).await?;

if let Some(ref options) = attachment.file_format_options {
stage_info.file_format_params = FileFormatOptionsAst {
Expand Down Expand Up @@ -527,7 +527,7 @@ fn check_transform_query(
/// - @internal/abc => (internal, "/stage/internal/abc")
#[async_backtrace::framed]
pub async fn resolve_stage_location(
ctx: &Arc<dyn TableContext>,
ctx: &dyn TableContext,
location: &str,
) -> Result<(StageInfo, String)> {
// my_named_stage/abc/
Expand All @@ -550,7 +550,7 @@ pub async fn resolve_stage_location(

#[async_backtrace::framed]
pub async fn resolve_file_location(
ctx: &Arc<dyn TableContext>,
ctx: &dyn TableContext,
location: &FileLocation,
) -> Result<(StageInfo, String)> {
match location.clone() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn parse_catalog_url(
};

let mut location = UriLocation::from_uri(uri, "".to_string(), options)?;
let (sp, _) = parse_uri_location(&mut location, Some(ctx)).await?;
let (sp, _) = parse_uri_location(&mut location, Some(ctx.as_ref())).await?;

Ok(Some(sp))
}
5 changes: 3 additions & 2 deletions src/query/sql/src/planner/binder/ddl/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Binder {
location: &str,
pattern: &str,
) -> Result<Plan> {
let (stage, path) = resolve_stage_location(&self.ctx, location).await?;
let (stage, path) = resolve_stage_location(self.ctx.as_ref(), location).await?;
let plan_node = RemoveStagePlan {
path,
stage,
Expand Down Expand Up @@ -82,7 +82,8 @@ impl Binder {
connection: uri.connection.clone(),
};

let (stage_storage, path) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (stage_storage, path) =
parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

if !path.ends_with('/') {
return Err(ErrorCode::SyntaxException(
Expand Down
38 changes: 29 additions & 9 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use common_expression::TableSchemaRefExt;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::storage::StorageParams;
use common_storage::DataOperator;
use common_storages_iceberg::IcebergTable;
use common_storages_view::view_table::QUERY;
use common_storages_view::view_table::VIEW_ENGINE;
use log::debug;
Expand All @@ -80,7 +81,8 @@ use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
use storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
use storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION;

use crate::binder::location::parse_uri_location;
use crate::binder::get_storage_params_from_options;
use crate::binder::parse_uri_location;
use crate::binder::scalar::ScalarBinder;
use crate::binder::Binder;
use crate::binder::ColumnBindingBuilder;
Expand Down Expand Up @@ -418,16 +420,16 @@ impl Binder {
)?;
}

let (storage_params, part_prefix) = match uri_location {
Some(uri) => {
let (mut storage_params, part_prefix) = match (uri_location, engine) {
(Some(uri), Engine::Fuse) => {
let mut uri = UriLocation {
protocol: uri.protocol.clone(),
name: uri.name.clone(),
path: uri.path.clone(),
part_prefix: uri.part_prefix.clone(),
connection: uri.connection.clone(),
};
let (sp, _) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (sp, _) = parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

// create a temporary op to check if params is correct
DataOperator::try_create(&sp).await?;
Expand All @@ -441,7 +443,11 @@ impl Binder {

(Some(sp), fp)
}
None => (None, "".to_string()),
(Some(uri), _) => Err(ErrorCode::BadArguments(format!(
"Incorrect CREATE query: CREATE TABLE with external location is only supported for FUSE engine, but got {:?} for {:?}",
engine, uri
)))?,
_ => (None, "".to_string()),
};

// If table is TRANSIENT, set a flag in table option
Expand Down Expand Up @@ -495,9 +501,23 @@ impl Binder {
Self::validate_create_table_schema(&source_schema)?;
(source_schema, source_comments)
}
_ => Err(ErrorCode::BadArguments(
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT..",
))?,
_ => {
if engine == Engine::Iceberg {
let sp = get_storage_params_from_options(self.ctx.as_ref(), &options).await?;
let dop = DataOperator::try_new(&sp)?;
let table = IcebergTable::load_iceberg_table(dop).await?;
let table_schema = IcebergTable::get_schema(&table).await?;
// the first version of current iceberg table do not need to persist the storage_params,
// since we get it from table options location and connection when load table each time.
// we do this in case we change this idea.
storage_params = Some(sp);
(Arc::new(table_schema), vec![])
} else {
Err(ErrorCode::BadArguments(
"Incorrect CREATE query: required list of column descriptions or AS section or SELECT or ICEBERG table engine",
))?
}
}
};

// for fuse engine, we will insert database_id, so if we check it in execute phase,
Expand Down Expand Up @@ -646,7 +666,7 @@ impl Binder {

let mut uri = stmt.uri_location.clone();
uri.path = root;
let (sp, _) = parse_uri_location(&mut uri, Some(&self.ctx)).await?;
let (sp, _) = parse_uri_location(&mut uri, Some(self.ctx.as_ref())).await?;

// create a temporary op to check if params is correct
DataOperator::try_create(&sp).await?;
Expand Down
Loading