Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,6 @@ pub async fn write_fragments_internal(
// Make sure the max rows per group is not larger than the max rows per file
params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file);

let allow_blob_version_change =
dataset.is_none() || matches!(params.mode, WriteMode::Overwrite);
let (schema, storage_version) = if let Some(dataset) = dataset {
match params.mode {
WriteMode::Append | WriteMode::Create => {
Expand Down Expand Up @@ -641,10 +639,10 @@ pub async fn write_fragments_internal(
let target_blob_version = blob_version_for(storage_version);
if let Some(dataset) = dataset {
let existing_version = dataset.blob_version();
if !allow_blob_version_change && existing_version != target_blob_version {
if existing_version != target_blob_version {
return Err(Error::InvalidInput {
source: format!(
"Blob column version mismatch. Dataset uses {:?} but write requires {:?}",
"Blob column version mismatch. Existing dataset uses {:?} but requested write requires {:?}. Changing blob version is not allowed",
existing_version, target_blob_version
)
.into(),
Expand Down
31 changes: 30 additions & 1 deletion rust/lance/src/dataset/write/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ struct WriteContext<'a> {

#[cfg(test)]
mod test {
use arrow_array::StructArray;
use arrow_array::{Int32Array, StructArray};
use arrow_schema::{DataType, Field, Schema};

use crate::session::Session;
Expand Down Expand Up @@ -486,4 +486,33 @@ mod test {
1
);
}

#[tokio::test]
async fn prevent_blob_version_upgrade_on_overwrite() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))])
.unwrap();

let dataset = InsertBuilder::new("memory://blob-version-guard")
.execute_stream(RecordBatchIterator::new(
vec![Ok(batch.clone())],
schema.clone(),
))
.await
.unwrap();

let dataset = Arc::new(dataset);
let params = WriteParams {
mode: WriteMode::Overwrite,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};

let result = InsertBuilder::new(dataset.clone())
.with_params(&params)
.execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone()))
.await;

assert!(matches!(result, Err(Error::InvalidInput { .. })));
}
}