Skip to content

Commit 21468bf

Browse files
authored
Merge branch 'main' into feature/implement-project-node-for-insert-into-datafusion
2 parents 37ac5d5 + 1de3315 commit 21468bf

File tree

22 files changed

+811
-409
lines changed

22 files changed

+811
-409
lines changed

.github/workflows/ci_typos.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@v5
4444
- name: Check typos
45-
uses: crate-ci/typos@v1.36.3
45+
uses: crate-ci/typos@v1.37.2

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ include = [
4949
ignore = ["F403", "F405"]
5050

5151
[tool.hatch.envs.dev]
52-
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "datafusion==45.*", "pyiceberg[sql-sqlite,pyarrow]>=0.10.0", "fastavro>=1.11.1"]
52+
dependencies = ["maturin>=1.0,<2.0", "pytest>=8.3.2", "datafusion==45.*", "pyiceberg[sql-sqlite,pyarrow]>=0.10.0", "fastavro>=1.11.1", "pydantic<2.12.0"]
5353

5454
[tool.hatch.envs.dev.scripts]
5555
build = "maturin build --out dist --sdist"

crates/catalog/hms/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const OWNER: &str = "owner";
3737
const COMMENT: &str = "comment";
3838
/// hive metatore `location` property
3939
const LOCATION: &str = "location";
40-
/// hive metatore `metadat_location` property
40+
/// hive metatore `metadata_location` property
4141
const METADATA_LOCATION: &str = "metadata_location";
4242
/// hive metatore `external` property
4343
const EXTERNAL: &str = "EXTERNAL";

crates/catalog/sql/src/catalog.rs

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -765,13 +765,30 @@ impl Catalog for SqlCatalog {
765765

766766
async fn register_table(
767767
&self,
768-
_table_ident: &TableIdent,
769-
_metadata_location: String,
768+
table_ident: &TableIdent,
769+
metadata_location: String,
770770
) -> Result<Table> {
771-
Err(Error::new(
772-
ErrorKind::FeatureUnsupported,
773-
"Registering a table is not supported yet",
774-
))
771+
if self.table_exists(table_ident).await? {
772+
return table_already_exists_err(table_ident);
773+
}
774+
775+
let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
776+
777+
let namespace = table_ident.namespace();
778+
let tbl_name = table_ident.name().to_string();
779+
780+
self.execute(&format!(
781+
"INSERT INTO {CATALOG_TABLE_NAME}
782+
({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
783+
VALUES (?, ?, ?, ?, ?)
784+
"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
785+
786+
Ok(Table::builder()
787+
.identifier(table_ident.clone())
788+
.metadata_location(metadata_location)
789+
.metadata(metadata)
790+
.file_io(self.fileio.clone())
791+
.build()?)
775792
}
776793

777794
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
@@ -1908,4 +1925,63 @@ mod tests {
19081925
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
19091926
);
19101927
}
1928+
1929+
#[tokio::test]
1930+
async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
1931+
let warehouse_loc = temp_path();
1932+
let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1933+
let namespace_ident = NamespaceIdent::new("a".into());
1934+
create_namespace(&catalog, &namespace_ident).await;
1935+
let table_name = "tbl1";
1936+
let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1937+
create_table(&catalog, &table_ident).await;
1938+
1939+
assert_eq!(
1940+
catalog
1941+
.register_table(&table_ident, warehouse_loc)
1942+
.await
1943+
.unwrap_err()
1944+
.to_string(),
1945+
format!("Unexpected => Table {:?} already exists.", &table_ident)
1946+
);
1947+
}
1948+
1949+
#[tokio::test]
1950+
async fn test_register_table() {
1951+
let warehouse_loc = temp_path();
1952+
let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1953+
let namespace_ident = NamespaceIdent::new("a".into());
1954+
create_namespace(&catalog, &namespace_ident).await;
1955+
1956+
let table_name = "abc";
1957+
let location = warehouse_loc.clone();
1958+
let table_creation = TableCreation::builder()
1959+
.name(table_name.into())
1960+
.location(location.clone())
1961+
.schema(simple_table_schema())
1962+
.build();
1963+
1964+
let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1965+
let expected_table = catalog
1966+
.create_table(&namespace_ident, table_creation)
1967+
.await
1968+
.unwrap();
1969+
1970+
let metadata_location = expected_table
1971+
.metadata_location()
1972+
.expect("Expected metadata location to be set")
1973+
.to_string();
1974+
1975+
assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
1976+
1977+
let _ = catalog.drop_table(&table_ident).await;
1978+
1979+
let table = catalog
1980+
.register_table(&table_ident, metadata_location.clone())
1981+
.await
1982+
.unwrap();
1983+
1984+
assert_eq!(table.identifier(), expected_table.identifier());
1985+
assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
1986+
}
19111987
}

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
2626
use super::_serde::DataFileSerde;
2727
use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
2828
use crate::error::Result;
29-
use crate::spec::{Struct, StructType};
29+
use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
3030
use crate::{Error, ErrorKind};
3131

3232
/// Data file carries data file path, partition tuple, metrics, …
@@ -49,6 +49,7 @@ pub struct DataFile {
4949
///
5050
/// Partition data tuple, schema based on the partition spec output using
5151
/// partition field ids for the struct field ids
52+
#[builder(default = "Struct::empty()")]
5253
pub(crate) partition: Struct,
5354
/// field id: 103
5455
///
@@ -156,6 +157,7 @@ pub struct DataFile {
156157
pub(crate) first_row_id: Option<i64>,
157158
/// This field is not included in spec. It is just store in memory representation used
158159
/// in process.
160+
#[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
159161
pub(crate) partition_spec_id: i32,
160162
/// field id: 143
161163
///

crates/iceberg/src/spec/partition.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ impl PartitionSpec {
155155
true
156156
}
157157

158-
pub(crate) fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
158+
/// Returns partition path string containing partition type and partition
159+
/// value as key-value pairs.
160+
pub fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
159161
let partition_type = self.partition_type(&schema).unwrap();
160162
let field_types = partition_type.fields();
161163

@@ -194,6 +196,15 @@ impl PartitionKey {
194196
Self { spec, schema, data }
195197
}
196198

199+
/// Creates a new partition key from another partition key, with a new data field.
200+
pub fn copy_with_data(&self, data: Struct) -> Self {
201+
Self {
202+
spec: self.spec.clone(),
203+
schema: self.schema.clone(),
204+
data,
205+
}
206+
}
207+
197208
/// Generates a partition path based on the partition values.
198209
pub fn to_path(&self) -> String {
199210
self.spec.partition_to_path(&self.data, self.schema.clone())
@@ -207,6 +218,21 @@ impl PartitionKey {
207218
Some(pk) => pk.spec.is_unpartitioned(),
208219
}
209220
}
221+
222+
/// Returns the associated [`PartitionSpec`].
223+
pub fn spec(&self) -> &PartitionSpec {
224+
&self.spec
225+
}
226+
227+
/// Returns the associated [`SchemaRef`].
228+
pub fn schema(&self) -> &SchemaRef {
229+
&self.schema
230+
}
231+
232+
/// Returns the associated [`Struct`].
233+
pub fn data(&self) -> &Struct {
234+
&self.data
235+
}
210236
}
211237

212238
/// Reference to [`UnboundPartitionSpec`].
@@ -1789,6 +1815,9 @@ mod tests {
17891815
.with_fields(vec![
17901816
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
17911817
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1818+
NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
1819+
.into(),
1820+
NestedField::required(4, "empty", Type::Primitive(PrimitiveType::String)).into(),
17921821
])
17931822
.build()
17941823
.unwrap();
@@ -1798,14 +1827,23 @@ mod tests {
17981827
.unwrap()
17991828
.add_partition_field("name", "name", Transform::Identity)
18001829
.unwrap()
1830+
.add_partition_field("timestamp", "ts_hour", Transform::Hour)
1831+
.unwrap()
1832+
.add_partition_field("empty", "empty_void", Transform::Void)
1833+
.unwrap()
18011834
.build()
18021835
.unwrap();
18031836

1804-
let data = Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
1837+
let data = Struct::from_iter([
1838+
Some(Literal::int(42)),
1839+
Some(Literal::string("alice")),
1840+
Some(Literal::int(1000)),
1841+
Some(Literal::string("empty")),
1842+
]);
18051843

18061844
assert_eq!(
18071845
spec.partition_to_path(&data, schema.into()),
1808-
"id=42/name=alice"
1846+
"id=42/name=alice/ts_hour=1000/empty_void=null"
18091847
);
18101848
}
18111849
}

crates/iceberg/src/spec/transform.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,19 +137,17 @@ pub enum Transform {
137137
impl Transform {
138138
/// Returns a human-readable String representation of a transformed value.
139139
pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
140-
if let Some(value) = value {
141-
if let Some(value) = value.as_primitive_literal() {
142-
let field_type = field_type.as_primitive_type().unwrap();
143-
let datum = Datum::new(field_type.clone(), value);
144-
match self {
145-
Self::Identity => datum.to_human_string(),
146-
Self::Void => "null".to_string(),
147-
_ => {
148-
todo!()
149-
}
150-
}
151-
} else {
152-
"null".to_string()
140+
let Some(value) = value else {
141+
return "null".to_string();
142+
};
143+
144+
if let Some(value) = value.as_primitive_literal() {
145+
let field_type = field_type.as_primitive_type().unwrap();
146+
let datum = Datum::new(field_type.clone(), value);
147+
148+
match self {
149+
Self::Void => "null".to_string(),
150+
_ => datum.to_human_string(),
153151
}
154152
} else {
155153
"null".to_string()

crates/iceberg/src/spec/values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1271,7 +1271,7 @@ impl Datum {
12711271
///
12721272
/// For string literals, this returns the raw string value without quotes.
12731273
/// For all other literals, it falls back to [`to_string()`].
1274-
pub(crate) fn to_human_string(&self) -> String {
1274+
pub fn to_human_string(&self) -> String {
12751275
match self.literal() {
12761276
PrimitiveLiteral::String(s) => s.to_string(),
12771277
_ => self.to_string(),

0 commit comments

Comments
 (0)