Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 5 additions & 27 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@ mod tests {
use delta_kernel::schema::{DataType, StructField, StructType};

use delta_kernel::arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use delta_kernel::arrow::datatypes::Schema as ArrowSchema;
use delta_kernel::arrow::ffi::to_ffi;
use delta_kernel::arrow::json::reader::ReaderBuilder;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter;
use delta_kernel::parquet::file::properties::WriterProperties;
use delta_kernel::transaction::add_files_schema;

use delta_kernel_ffi::engine_data::get_engine_data;
use delta_kernel_ffi::engine_data::ArrowFFIData;
Expand Down Expand Up @@ -191,39 +193,15 @@ mod tests {
path: &str,
num_rows: i64,
) -> Result<ArrowFFIData, Box<dyn std::error::Error>> {
let schema = ArrowSchema::new(vec![
Field::new("path", ArrowDataType::Utf8, false),
Field::new(
"partitionValues",
ArrowDataType::Map(
Arc::new(Field::new(
"entries",
ArrowDataType::Struct(
vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
),
Field::new("size", ArrowDataType::Int64, false),
Field::new("modificationTime", ArrowDataType::Int64, false),
Field::new("dataChange", ArrowDataType::Boolean, false),
Field::new("numRecords", ArrowDataType::Int64, true),
]);
let schema: ArrowSchema = add_files_schema().as_ref().try_into_arrow()?;

let current_time: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;

let file_metadata = format!(
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "numRecords": {num_rows}}}"#,
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "stats": {{"numRecords": {num_rows}}}}}"#,
);

create_arrow_ffi_from_json(schema, file_metadata.as_str())
Expand Down
91 changes: 79 additions & 12 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
&LOG_DOMAIN_METADATA_SCHEMA
}

/// Nest an existing add action schema in an additional [`ADD_NAME`] struct.
///
/// This is useful for JSON conversion, as it allows us to wrap a dynamically maintained add action
/// schema in a top-level "add" struct.
pub(crate) fn as_log_add_schema(schema: SchemaRef) -> SchemaRef {
Arc::new(StructType::new([StructField::nullable(ADD_NAME, schema)]))
}

#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[cfg_attr(
any(test, feature = "internal-api"),
Expand Down Expand Up @@ -461,7 +469,18 @@ impl Protocol {
match &self.writer_features {
Some(writer_features) if self.min_writer_version == 7 => {
// if we're on version 7, make sure we support all the specified features
ensure_supported_features(writer_features, &SUPPORTED_WRITER_FEATURES)
ensure_supported_features(writer_features, &SUPPORTED_WRITER_FEATURES)?;

// ensure that there is no illegal combination of features
if writer_features.contains(&WriterFeature::RowTracking)
&& !writer_features.contains(&WriterFeature::DomainMetadata)
{
Err(Error::invalid_protocol(
"rowTracking feature requires domainMetadata to also be enabled",
))
} else {
Ok(())
}
}
Some(_) => {
// there are features, but we're not on 7, so the protocol is actually broken
Expand Down Expand Up @@ -674,7 +693,7 @@ pub(crate) struct Add {

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
/// row within the file to the base Row ID.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,

Expand Down Expand Up @@ -882,6 +901,17 @@ pub(crate) struct DomainMetadata {
}

impl DomainMetadata {
/// Create a new DomainMetadata action.
// TODO: Discuss if we should remove `removed` from this method and introduce a dedicated
// method for removed domain metadata.
pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self {
DomainMetadata {
domain,
configuration,
removed,
}
}

// returns true if the domain metadata is an system-controlled domain (all domains that start
// with "delta.")
#[allow(unused)]
Expand All @@ -894,14 +924,15 @@ impl DomainMetadata {
mod tests {
use super::*;
use crate::{
arrow::array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
arrow::{
array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
},
datatypes::{DataType as ArrowDataType, Field, Schema},
json::ReaderBuilder,
},
arrow::datatypes::{DataType as ArrowDataType, Field, Schema},
arrow::json::ReaderBuilder,
engine::arrow_data::ArrowEngineData,
engine::arrow_expression::ArrowEvaluationHandler,
engine::{arrow_data::ArrowEngineData, arrow_expression::ArrowEvaluationHandler},
schema::{ArrayType, DataType, MapType, StructField},
utils::test_utils::assert_result_error_with_message,
Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler,
Expand Down Expand Up @@ -1296,22 +1327,58 @@ mod tests {
Some(vec![
WriterFeature::AppendOnly,
WriterFeature::DeletionVectors,
WriterFeature::DomainMetadata,
WriterFeature::Invariants,
WriterFeature::RowTracking,
]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_ok());

// Verify that unsupported writer features are rejected
// NOTE: Unsupported reader features should not cause an error here
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::Unknown("unsupported reader".to_string())]),
Some([WriterFeature::IdentityColumns]),
)
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown WriterFeatures: "identityColumns". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
);

// Unknown writer features should cause an error
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::DeletionVectors]),
Some([WriterFeature::RowTracking]),
Some([ReaderFeature::Unknown("unsupported reader".to_string())]),
Some([WriterFeature::Unknown("unsupported writer".to_string())]),
)
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown WriterFeatures: "rowTracking". Supported WriterFeatures: "appendOnly", "deletionVectors", "invariants", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
r#"Unsupported: Unknown WriterFeatures: "unsupported writer". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
);
}

#[test]
fn test_illegal_writer_feature_combination() {
let protocol = Protocol::try_new(
3,
7,
Some::<Vec<String>>(vec![]),
Some(vec![
// No domain metadata even though that is required
WriterFeature::RowTracking,
]),
)
.unwrap();

assert_result_error_with_message(
protocol.ensure_write_supported(),
"rowTracking feature requires domainMetadata to also be enabled",
);
}

Expand Down
32 changes: 24 additions & 8 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::ops::Range;
use std::sync::Arc;

use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray};
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field};
use crate::parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
Expand All @@ -24,6 +25,7 @@ use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requeste
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::transaction::add_files_schema;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler,
PredicateRef,
Expand Down Expand Up @@ -55,7 +57,10 @@ impl DataFileMetadata {
}
}

// convert DataFileMetadata into a record batch which matches the 'add_files_schema' schema
/// Convert DataFileMetadata into a record batch which matches the schema returned by
/// [`add_files_schema`].
///
/// [`add_files_schema`]: crate::transaction::add_files_schema
fn as_record_batch(
&self,
partition_values: &HashMap<String, String>,
Expand All @@ -70,8 +75,6 @@ impl DataFileMetadata {
},
num_records,
} = self;
let add_files_schema = crate::transaction::add_files_schema();

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
let key_builder = StringBuilder::new();
Expand All @@ -95,17 +98,22 @@ impl DataFileMetadata {
let size = Arc::new(Int64Array::from(vec![size]));
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
let num_records = Arc::new(Int64Array::from(vec![*num_records as i64]));
let stats = Arc::new(StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
None,
1,
)?);

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(add_files_schema.as_ref().try_into_arrow()?),
Arc::new(add_files_schema().as_ref().try_into_arrow()?),
vec![
path,
partitions,
size,
modification_time,
data_change,
num_records,
stats,
],
)?)))
}
Expand Down Expand Up @@ -502,6 +510,14 @@ mod tests {
partition_values_builder.values().append_value("a");
partition_values_builder.append(true).unwrap();
let partition_values = partition_values_builder.finish();
let stats_struct = StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![num_records as i64]))],
None,
1,
)
.unwrap();

let expected = RecordBatch::try_new(
schema,
vec![
Expand All @@ -510,7 +526,7 @@ mod tests {
Arc::new(Int64Array::from(vec![size as i64])),
Arc::new(Int64Array::from(vec![last_modified])),
Arc::new(BooleanArray::from(vec![data_change])),
Arc::new(Int64Array::from(vec![num_records as i64])),
Arc::new(stats_struct),
],
)
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub mod table_features;
pub mod table_properties;
pub mod transaction;

mod row_tracking;

mod arrow_compat;
#[cfg(any(feature = "arrow-55", feature = "arrow-56"))]
pub use arrow_compat::*;
Expand Down
Loading
Loading