Skip to content

Commit 57481f4

Browse files
authored
feat(query): add iceberg table functions (#17626)
* feat(query): add iceberg table functions * feat(query): add iceberg table functionsm * feat(query): add iceberg table functions * feat(query): add iceberg table functions
1 parent be1f233 commit 57481f4

File tree

13 files changed

+513
-129
lines changed

13 files changed

+513
-129
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/table_args.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::Ordering;
1516
use std::collections::HashMap;
1617

1718
use databend_common_exception::ErrorCode;
1819
use databend_common_exception::Result;
20+
use databend_common_expression::types::NumberScalar;
1921
use databend_common_expression::Scalar;
2022
use log::debug;
2123

@@ -99,3 +101,108 @@ impl TableArgs {
99101
}
100102
}
101103
}
104+
105+
pub fn string_value(value: &Scalar) -> Result<String> {
106+
match value {
107+
Scalar::String(val) => Ok(val.clone()),
108+
_ => Err(ErrorCode::BadArguments(format!(
109+
"invalid value {value} expect to be string literal."
110+
))),
111+
}
112+
}
113+
114+
pub fn bool_value(value: &Scalar) -> Result<bool> {
115+
match value {
116+
Scalar::Boolean(val) => Ok(*val),
117+
_ => Err(ErrorCode::BadArguments(format!(
118+
"invalid value {value} expect to be boolean literal."
119+
))),
120+
}
121+
}
122+
123+
pub fn string_literal(val: &str) -> Scalar {
124+
Scalar::String(val.to_string())
125+
}
126+
127+
pub fn bool_literal(val: bool) -> Scalar {
128+
Scalar::Boolean(val)
129+
}
130+
131+
pub fn u64_literal(val: u64) -> Scalar {
132+
Scalar::Number(NumberScalar::UInt64(val))
133+
}
134+
135+
pub fn cmp_with_null(v1: &Scalar, v2: &Scalar) -> Ordering {
136+
match (v1.is_null(), v2.is_null()) {
137+
(true, true) => Ordering::Equal,
138+
(true, false) => Ordering::Greater,
139+
(false, true) => Ordering::Less,
140+
(false, false) => v1.cmp(v2),
141+
}
142+
}
143+
144+
pub fn parse_sequence_args(table_args: &TableArgs, func_name: &str) -> Result<String> {
145+
let args = table_args.expect_all_positioned(func_name, Some(1))?;
146+
let sequence = string_value(&args[0])?;
147+
Ok(sequence)
148+
}
149+
150+
pub fn parse_db_tb_args(table_args: &TableArgs, func_name: &str) -> Result<(String, String)> {
151+
let args = table_args.expect_all_positioned(func_name, Some(2))?;
152+
let db = string_value(&args[0])?;
153+
let tbl = string_value(&args[1])?;
154+
Ok((db, tbl))
155+
}
156+
157+
pub fn parse_db_tb_opt_args(
158+
table_args: &TableArgs,
159+
func_name: &str,
160+
) -> Result<(String, String, Option<String>)> {
161+
let args = table_args.expect_all_positioned(func_name, None)?;
162+
match args.len() {
163+
3 => {
164+
let arg1 = string_value(&args[0])?;
165+
let arg2 = string_value(&args[1])?;
166+
let arg3 = string_value(&args[2])?;
167+
Ok((arg1, arg2, Some(arg3)))
168+
}
169+
2 => {
170+
let arg1 = string_value(&args[0])?;
171+
let arg2 = string_value(&args[1])?;
172+
Ok((arg1, arg2, None))
173+
}
174+
_ => Err(ErrorCode::BadArguments(format!(
175+
"expecting <database>, <table_name> and <opt_arg> (as string literals), but got {:?}",
176+
args
177+
))),
178+
}
179+
}
180+
181+
pub fn parse_opt_opt_args(
182+
table_args: &TableArgs,
183+
func_name: &str,
184+
) -> Result<(Option<String>, Option<String>)> {
185+
let args = table_args.expect_all_positioned(func_name, None)?;
186+
match args.len() {
187+
2 => {
188+
let arg1 = string_value(&args[0])?;
189+
let arg2 = string_value(&args[1])?;
190+
Ok((Some(arg1), Some(arg2)))
191+
}
192+
1 => {
193+
let arg1 = string_value(&args[0])?;
194+
Ok((Some(arg1), None))
195+
}
196+
0 => Ok((None, None)),
197+
_ => Err(ErrorCode::BadArguments(format!(
198+
"expecting <opt_arg1> and <opt_arg2> (as string literals), but got {:?}",
199+
args
200+
))),
201+
}
202+
}
203+
204+
pub fn parse_db_tb_col_args(table_args: &TableArgs, func_name: &str) -> Result<String> {
205+
let args = table_args.expect_all_positioned(func_name, Some(1))?;
206+
let db = string_value(&args[0])?;
207+
Ok(db)
208+
}

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_storages_fuse::table_functions::FuseVacuumTemporaryTable;
3232
use databend_common_storages_fuse::table_functions::HilbertClusteringInfoFunc;
3333
use databend_common_storages_fuse::table_functions::SetCacheCapacity;
3434
use databend_common_storages_fuse::table_functions::TableFunctionTemplate;
35+
use databend_common_storages_iceberg::IcebergInspectTable;
3536
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
3637
use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUC_ID_END;
3738
use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUNC_ID_BEGIN;
@@ -354,6 +355,16 @@ impl TableFunctionFactory {
354355
(next_id(), Arc::new(ShowRoles::create)),
355356
);
356357

358+
creators.insert(
359+
"iceberg_snapshot".to_string(),
360+
(next_id(), Arc::new(IcebergInspectTable::create)),
361+
);
362+
363+
creators.insert(
364+
"iceberg_manifest".to_string(),
365+
(next_id(), Arc::new(IcebergInspectTable::create)),
366+
);
367+
357368
TableFunctionFactory {
358369
creators: RwLock::new(creators),
359370
}

src/query/sql/src/planner/plans/scan.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,12 @@ impl Operator for Scan {
236236
}
237237
if let Some(col_stat) = v.clone() {
238238
// Safe to unwrap: min, max are all `Some(_)`.
239-
let min = col_stat.min.unwrap();
240-
let max = col_stat.max.unwrap();
239+
let Some(min) = col_stat.min.clone() else {
240+
continue;
241+
};
242+
let Some(max) = col_stat.max.clone() else {
243+
continue;
244+
};
241245
// ndv could be `None`, we will use `num_rows - null_count` as ndv instead.
242246
//
243247
// NOTE: don't touch the original num_rows, since it will be used in other places.

src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use chrono::Utc;
1818
use databend_common_catalog::catalog::CATALOG_DEFAULT;
1919
use databend_common_catalog::plan::DataSourcePlan;
2020
use databend_common_catalog::table::Table;
21+
use databend_common_catalog::table_args::string_literal;
2122
use databend_common_catalog::table_args::TableArgs;
2223
use databend_common_catalog::table_context::TableContext;
2324
use databend_common_exception::ErrorCode;
@@ -40,7 +41,6 @@ use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
4041

4142
use crate::io::SegmentsIO;
4243
use crate::table_functions::parse_db_tb_args;
43-
use crate::table_functions::string_literal;
4444
use crate::table_functions::SimpleArgFunc;
4545
use crate::table_functions::SimpleArgFuncTemplate;
4646
use crate::FuseTable;

src/query/storages/fuse/src/table_functions/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ mod fuse_vacuum_drop_aggregating_index;
2727
mod fuse_vacuum_drop_inverted_index;
2828
mod fuse_vacuum_temporary_table;
2929
mod hilbert_clustering_information;
30-
mod table_args;
3130

3231
mod set_cache_capacity;
3332

3433
pub use clustering_information::ClusteringInformationFunc;
3534
pub use clustering_statistics::ClusteringStatisticsFunc;
36-
use databend_common_catalog::table_args::TableArgs;
35+
pub use databend_common_catalog::table_args::*;
3736
use databend_common_catalog::table_function::TableFunction;
3837
pub use function_template::SimpleTableFunc;
3938
pub use function_template::TableFunctionTemplate;
@@ -52,4 +51,3 @@ pub use fuse_vacuum_drop_inverted_index::FuseVacuumDropInvertedIndex;
5251
pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable;
5352
pub use hilbert_clustering_information::HilbertClusteringInfoFunc;
5453
pub use set_cache_capacity::SetCacheCapacity;
55-
pub use table_args::*;

src/query/storages/fuse/src/table_functions/table_args.rs

Lines changed: 0 additions & 123 deletions
This file was deleted.

src/query/storages/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ databend-common-meta-app = { workspace = true }
2020
databend-common-meta-store = { workspace = true }
2121
databend-common-meta-types = { workspace = true }
2222
databend-common-pipeline-core = { workspace = true }
23+
databend-common-pipeline-sources = { workspace = true }
2324
databend-common-storage = { workspace = true }
2425
databend-common-storages-parquet = { workspace = true }
2526
databend-storages-common-table-meta = { workspace = true }

0 commit comments

Comments
 (0)