Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn derive_to_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStre
use delta_kernel::schema::derive_macro_utils::{
ToDataType as _, GetStructField as _, GetNullableContainerStructField as _,
};
delta_kernel::schema::StructType::new([
delta_kernel::schema::StructType::new_unchecked([
#schema_fields
])
}
Expand Down
8 changes: 3 additions & 5 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,9 @@ mod tests {
#[test]
fn test_new_expression_evaluator() {
let engine = get_default_engine("memory:///doesntmatter/foo");
let in_schema = Arc::new(StructType::new(vec![StructField::new(
"a",
DataType::LONG,
true,
)]));
let in_schema = Arc::new(
StructType::try_new(vec![StructField::new("a", DataType::LONG, true)]).unwrap(),
);
let expr = Expression::literal(1);
let output_type: Handle<SharedSchema> = in_schema.clone().into();
let in_schema_handle: Handle<SharedSchema> = in_schema.into();
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub unsafe extern "C" fn visit_schema(
fn visit_schema_impl(schema: &StructType, visitor: &mut EngineSchemaVisitor) -> usize {
// Visit all the fields of a struct and return the list of children
fn visit_struct_fields(visitor: &EngineSchemaVisitor, s: &StructType) -> usize {
let child_list_id = (visitor.make_field_list)(visitor.data, s.fields.len());
let child_list_id = (visitor.make_field_list)(visitor.data, s.num_fields());
for field in s.fields() {
visit_schema_item(
field.name(),
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/test_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub unsafe extern "C" fn get_testing_kernel_expression() -> Handle<SharedExpress
];
let nested_values = vec![Scalar::Integer(500), Scalar::Array(array_data.clone())];
let nested_struct = StructData::try_new(nested_fields.clone(), nested_values).unwrap();
let nested_struct_type = StructType::new(nested_fields);
let nested_struct_type = StructType::try_new(nested_fields).unwrap();

let top_level_struct = StructData::try_new(
vec![StructField::nullable(
Expand Down
20 changes: 13 additions & 7 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ mod tests {
#[tokio::test]
#[cfg_attr(miri, ignore)] // FIXME: re-enable miri (can't call foreign function `linkat` on OS `linux`)
async fn test_basic_append() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(StructType::new(vec![
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("number", DataType::INTEGER),
StructField::nullable("string", DataType::STRING),
]));
])?);

// Create a temporary local directory for use during this test
let tmp_test_dir = tempdir()?;
Expand Down Expand Up @@ -275,11 +275,17 @@ mod tests {
// Ensure we get the correct schema
let write_schema = unsafe { get_write_schema(write_context.shallow_copy()) };
let write_schema_ref = unsafe { write_schema.as_ref() };
assert_eq!(write_schema_ref.fields.len(), 2);
assert_eq!(write_schema_ref.fields[0].name, "number");
assert_eq!(write_schema_ref.fields[0].data_type, DataType::INTEGER);
assert_eq!(write_schema_ref.fields[1].name, "string");
assert_eq!(write_schema_ref.fields[1].data_type, DataType::STRING);
assert_eq!(write_schema_ref.num_fields(), 2);
assert_eq!(write_schema_ref.field_at_index(0).unwrap().name, "number");
assert_eq!(
write_schema_ref.field_at_index(0).unwrap().data_type,
DataType::INTEGER
);
assert_eq!(write_schema_ref.field_at_index(1).unwrap().name, "string");
assert_eq!(
write_schema_ref.field_at_index(1).unwrap().data_type,
DataType::STRING
);

// Ensure the ffi returns the correct table path
let write_path = unsafe { get_write_path(write_context.shallow_copy(), allocate_str) };
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn create_kernel_schema() -> delta_kernel::schema::Schema {
use delta_kernel::schema::{DataType, StructField};
let field_a = StructField::not_null("a", DataType::LONG);
let field_b = StructField::not_null("b", DataType::BOOLEAN);
delta_kernel::schema::Schema::new(vec![field_a, field_b])
delta_kernel::schema::Schema::try_new(vec![field_a, field_b]).unwrap()
}

fn main() {
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>
"Table has no such column: {col}"
)))
});
Schema::try_new(selected_fields).map(Arc::new)
Schema::try_from_iter(selected_fields).map(Arc::new)
})
.transpose()?;
Ok(Some(
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
})
.collect::<DeltaResult<Vec<_>>>()?;

Ok(Arc::new(StructType::new(fields)))
Ok(Arc::new(StructType::try_new(fields)?))
}

/// Create a new Delta table with the given schema.
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/actions/crc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
#[test]
fn test_file_size_histogram_schema() {
let schema = FileSizeHistogram::to_schema();
let expected = StructType::new([
let expected = StructType::new_unchecked([
StructField::not_null("sortedBinBoundaries", ArrayType::new(DataType::LONG, false)),
StructField::not_null("fileCounts", ArrayType::new(DataType::LONG, false)),
StructField::not_null("totalBytes", ArrayType::new(DataType::LONG, false)),
Expand All @@ -168,7 +168,7 @@ mod tests {
#[test]
fn test_deleted_record_counts_histogram_schema() {
let schema = DeletedRecordCountsHistogram::to_schema();
let expected = StructType::new([StructField::not_null(
let expected = StructType::new_unchecked([StructField::not_null(
"deletedRecordCounts",
ArrayType::new(DataType::LONG, false),
)]);
Expand All @@ -178,7 +178,7 @@ mod tests {
#[test]
fn test_crc_schema() {
let schema = Crc::to_schema();
let expected = StructType::new([
let expected = StructType::new_unchecked([
StructField::nullable("txnId", DataType::STRING),
StructField::not_null("tableSizeBytes", DataType::LONG),
StructField::not_null("numFiles", DataType::LONG),
Expand Down
77 changes: 45 additions & 32 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";
pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";

static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new([
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
Expand All @@ -83,28 +83,28 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
});

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new([StructField::nullable(
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME,
Add::to_schema(),
)]))
});

static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new([StructField::nullable(
Arc::new(StructType::new_unchecked([StructField::nullable(
COMMIT_INFO_NAME,
CommitInfo::to_schema(),
)]))
});

static LOG_TXN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new([StructField::nullable(
Arc::new(StructType::new_unchecked([StructField::nullable(
SET_TRANSACTION_NAME,
SetTransaction::to_schema(),
)]))
});

static LOG_DOMAIN_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new([StructField::nullable(
Arc::new(StructType::new_unchecked([StructField::nullable(
DOMAIN_METADATA_NAME,
DomainMetadata::to_schema(),
)]))
Expand Down Expand Up @@ -137,7 +137,9 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
/// This is useful for JSON conversion, as it allows us to wrap a dynamically maintained add action
/// schema in a top-level "add" struct.
pub(crate) fn as_log_add_schema(schema: SchemaRef) -> SchemaRef {
Arc::new(StructType::new([StructField::nullable(ADD_NAME, schema)]))
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME, schema,
)]))
}

#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
Expand Down Expand Up @@ -174,7 +176,7 @@ impl TryFrom<Format> for Scalar {
)
.map(Scalar::Map)?;
Ok(Scalar::Struct(StructData::try_new(
Format::to_schema().fields().cloned().collect(),
Format::to_schema().into_fields().collect(),
vec![provider, options],
)?))
}
Expand All @@ -188,6 +190,7 @@ impl TryFrom<Format> for Scalar {
)]
#[internal_api]
pub(crate) struct Metadata {
// TODO: Make the struct fields private to force using the try_new function.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadata is too often constructed directly across the codebase, so I would rather address this in a follow-up PR than make this PR bigger.

/// Unique identifier for this table
pub(crate) id: String,
/// User-provided identifier for this table
Expand Down Expand Up @@ -217,6 +220,16 @@ impl Metadata {
created_time: i64,
configuration: HashMap<String, String>,
) -> DeltaResult<Self> {
// Validate that the schema does not contain metadata columns
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to enforce that we never leak metadata columns into the Delta log.

// Note: We don't have to look for nested metadata columns because that is already validated
// when creating a StructType.
if let Some(metadata_field) = schema.fields().find(|field| field.is_metadata_column()) {
return Err(Error::Schema(format!(
"Table schema must not contain metadata columns. Found metadata column: '{}'",
metadata_field.name
)));
}

Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
name,
Expand Down Expand Up @@ -991,15 +1004,15 @@ mod tests {
.project(&[METADATA_NAME])
.expect("Couldn't get metaData field");

let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"metaData",
StructType::new([
StructType::new_unchecked([
StructField::not_null("id", DataType::STRING),
StructField::nullable("name", DataType::STRING),
StructField::nullable("description", DataType::STRING),
StructField::not_null(
"format",
StructType::new([
StructType::new_unchecked([
StructField::not_null("provider", DataType::STRING),
StructField::not_null(
"options",
Expand All @@ -1025,9 +1038,9 @@ mod tests {
.project(&[ADD_NAME])
.expect("Couldn't get add field");

let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"add",
StructType::new([
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
Expand Down Expand Up @@ -1067,7 +1080,7 @@ mod tests {
fn deletion_vector_field() -> StructField {
StructField::nullable(
"deletionVector",
DataType::struct_type([
DataType::struct_type_unchecked([
StructField::not_null("storageType", DataType::STRING),
StructField::not_null("pathOrInlineDv", DataType::STRING),
StructField::nullable("offset", DataType::INTEGER),
Expand All @@ -1082,9 +1095,9 @@ mod tests {
let schema = get_log_schema()
.project(&[REMOVE_NAME])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"remove",
StructType::new([
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("deletionTimestamp", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
Expand All @@ -1105,9 +1118,9 @@ mod tests {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"cdc",
StructType::new([
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
Expand All @@ -1126,9 +1139,9 @@ mod tests {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"sidecar",
StructType::new([
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
Expand All @@ -1143,9 +1156,9 @@ mod tests {
let schema = get_log_schema()
.project(&[CHECKPOINT_METADATA_NAME])
.expect("Couldn't get checkpointMetadata field");
let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"checkpointMetadata",
StructType::new([
StructType::new_unchecked([
StructField::not_null("version", DataType::LONG),
tags_field(),
]),
Expand All @@ -1159,9 +1172,9 @@ mod tests {
.project(&["txn"])
.expect("Couldn't get transaction field");

let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"txn",
StructType::new([
StructType::new_unchecked([
StructField::not_null("appId", DataType::STRING),
StructField::not_null("version", DataType::LONG),
StructField::nullable("lastUpdated", DataType::LONG),
Expand All @@ -1176,9 +1189,9 @@ mod tests {
.project(&["commitInfo"])
.expect("Couldn't get commitInfo field");

let expected = Arc::new(StructType::new(vec![StructField::nullable(
let expected = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
"commitInfo",
StructType::new(vec![
StructType::new_unchecked(vec![
StructField::nullable("timestamp", DataType::LONG),
StructField::nullable("inCommitTimestamp", DataType::LONG),
StructField::nullable("operation", DataType::STRING),
Expand All @@ -1199,9 +1212,9 @@ mod tests {
let schema = get_log_schema()
.project(&[DOMAIN_METADATA_NAME])
.expect("Couldn't get domainMetadata field");
let expected = Arc::new(StructType::new([StructField::nullable(
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"domainMetadata",
StructType::new([
StructType::new_unchecked([
StructField::not_null("domain", DataType::STRING),
StructField::not_null("configuration", DataType::STRING),
StructField::not_null("removed", DataType::BOOLEAN),
Expand Down Expand Up @@ -1526,7 +1539,7 @@ mod tests {

#[test]
fn test_metadata_try_new() {
let schema = StructType::new([StructField::not_null("id", DataType::INTEGER)]);
let schema = StructType::new_unchecked([StructField::not_null("id", DataType::INTEGER)]);
let config = HashMap::from([("key1".to_string(), "value1".to_string())]);

let metadata = Metadata::try_new(
Expand All @@ -1551,7 +1564,7 @@ mod tests {

#[test]
fn test_metadata_try_new_default() {
let schema = StructType::new([StructField::not_null("id", DataType::INTEGER)]);
let schema = StructType::new_unchecked([StructField::not_null("id", DataType::INTEGER)]);
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();

assert!(!metadata.id.is_empty());
Expand All @@ -1561,7 +1574,7 @@ mod tests {

#[test]
fn test_metadata_unique_ids() {
let schema = StructType::new([StructField::not_null("id", DataType::INTEGER)]);
let schema = StructType::new_unchecked([StructField::not_null("id", DataType::INTEGER)]);
let m1 = Metadata::try_new(None, None, schema.clone(), vec![], 0, HashMap::new()).unwrap();
let m2 = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
assert_ne!(m1.id, m2.id);
Expand Down Expand Up @@ -1648,7 +1661,7 @@ mod tests {
#[test]
fn test_metadata_into_engine_data() {
let engine = ExprEngine::new();
let schema = StructType::new([StructField::not_null("id", DataType::INTEGER)]);
let schema = StructType::new_unchecked([StructField::not_null("id", DataType::INTEGER)]);

let test_metadata = Metadata::try_new(
Some("test".to_string()),
Expand Down Expand Up @@ -1699,7 +1712,7 @@ mod tests {
#[test]
fn test_metadata_with_log_schema() {
let engine = ExprEngine::new();
let schema = StructType::new([StructField::not_null("id", DataType::INTEGER)]);
let schema = StructType::new_unchecked([StructField::not_null("id", DataType::INTEGER)]);

let metadata = Metadata::try_new(
Some("table".to_string()),
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,9 @@ impl InCommitTimestampVisitor {
pub(crate) fn schema() -> Arc<Schema> {
static SCHEMA: LazyLock<Arc<Schema>> = LazyLock::new(|| {
let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true);
Arc::new(StructType::new(vec![StructField::new(
Arc::new(StructType::new_unchecked(vec![StructField::new(
COMMIT_INFO_NAME,
StructType::new([ict_type]),
StructType::new_unchecked([ict_type]),
true,
)]))
});
Expand Down
Loading
Loading