Skip to content

Commit

Permalink
add maxRowIndex to dv descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Jun 21, 2024
1 parent 6f95fd3 commit bd22f42
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 8 deletions.
7 changes: 7 additions & 0 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct DeletionVectorDescriptor {

/// Number of rows the given DV logically removes from the file.
pub cardinality: i64,

/// Something to do with row tracking
pub max_row_index: Option<i64>,
}

impl DeletionVectorDescriptor {
Expand Down Expand Up @@ -240,6 +243,7 @@ mod tests {
offset: Some(4),
size_in_bytes: 40,
cardinality: 6,
max_row_index: None,
}
}

Expand All @@ -251,6 +255,7 @@ mod tests {
offset: Some(4),
size_in_bytes: 40,
cardinality: 6,
max_row_index: None,
}
}

Expand All @@ -262,6 +267,7 @@ mod tests {
offset: None,
size_in_bytes: 44,
cardinality: 6,
max_row_index: None,
}
}

Expand All @@ -272,6 +278,7 @@ mod tests {
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
max_row_index: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ mod tests {
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
StructField::new("maxRowIndex", DataType::LONG, true),
]))),
true,
)
Expand Down
13 changes: 8 additions & 5 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ impl AddVisitor {

let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;

let base_row_id: Option<i64> = getters[12].get_opt(row_index, "add.base_row_id")?;
let base_row_id: Option<i64> = getters[13].get_opt(row_index, "add.base_row_id")?;
let default_row_commit_version: Option<i64> =
getters[13].get_opt(row_index, "add.default_row_commit")?;
getters[14].get_opt(row_index, "add.default_row_commit")?;
let clustering_provider: Option<String> =
getters[14].get_opt(row_index, "add.clustering_provider")?;
getters[15].get_opt(row_index, "add.clustering_provider")?;

Ok(Add {
path,
Expand Down Expand Up @@ -198,9 +198,9 @@ impl RemoveVisitor {

let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;

let base_row_id: Option<i64> = getters[12].get_opt(row_index, "remove.baseRowId")?;
let base_row_id: Option<i64> = getters[13].get_opt(row_index, "remove.baseRowId")?;
let default_row_commit_version: Option<i64> =
getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?;
getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?;

Ok(Remove {
path,
Expand Down Expand Up @@ -306,12 +306,15 @@ pub(crate) fn visit_deletion_vector_at<'a>(
let offset: Option<i32> = getters[2].get_opt(row_index, "deletionVector.offset")?;
let size_in_bytes: i32 = getters[3].get(row_index, "deletionVector.sizeInBytes")?;
let cardinality: i64 = getters[4].get(row_index, "deletionVector.cardinality")?;
let max_row_index: Option<i64> =
getters[5].get_opt(row_index, "deletionVector.maxRowIndex")?;
Ok(Some(DeletionVectorDescriptor {
storage_type,
path_or_inline_dv,
offset,
size_in_bytes,
cardinality,
max_row_index,
}))
} else {
Ok(None)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ fn column_as_struct<'a>(
}

fn make_arrow_error(s: String) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s)).with_backtrace()
}

/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct AddRemoveVisitor {
is_log_batch: bool,
}

const ADD_FIELD_COUNT: usize = 15;
const ADD_FIELD_COUNT: usize = 16;

impl AddRemoveVisitor {
fn new(selection_vector: Option<Vec<bool>>, is_log_batch: bool) -> Self {
Expand Down Expand Up @@ -92,6 +92,7 @@ lazy_static! {
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
StructField::new("maxRowIndex", DataType::LONG, true),
]),
true
),
Expand Down
1 change: 1 addition & 0 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ impl Scan {
/// offset: int,
/// sizeInBytes: int,
/// cardinality: long,
/// maxRowIndex: long
/// },
/// fileConstantValues: {
/// partitionValues: map<string, string>
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<T> DataVisitor for ScanFileVisitor<'_, T> {
let deletion_vector = visit_deletion_vector_at(row_index, &getters[dv_index..])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[8].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(&mut self.context, path, size, dv_info, partition_values)
}
}
Expand Down

0 comments on commit bd22f42

Please sign in to comment.