From 740a0abf6f284aaa8e45026f7c3a9f0eeeb2619a Mon Sep 17 00:00:00 2001 From: "jay.narale" Date: Thu, 13 Nov 2025 01:49:36 -0800 Subject: [PATCH 1/4] feat: add target_bases extension to python write_fragments API --- python/python/lance/fragment.py | 17 ++++ python/python/tests/test_multi_base.py | 126 +++++++++++++++++++++++++ 2 files changed, 143 insertions(+) diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index d08167fcb52..a9e9c2ba703 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -865,6 +865,7 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, + target_bases: Optional[List[str]] = None, ) -> Transaction: ... @overload @@ -883,6 +884,7 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, + target_bases: Optional[List[str]] = None, ) -> List[FragmentMetadata]: ... @@ -901,6 +903,7 @@ def write_fragments( use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, + target_bases: Optional[List[str]] = None, ) -> List[FragmentMetadata] | Transaction: """ Write data into one or more fragments. @@ -954,6 +957,19 @@ def write_fragments( These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids. + target_bases : list of str, optional + References to base paths where data should be written. Can be + specified in all modes. + + Each string is resolved by trying to match: + 1. Base name (e.g., "primary", "archive") from registered bases + 2. Base path URI (e.g., "s3://bucket1/data") + + **CREATE mode**: References must match bases in `initial_bases` + (from dataset creation) + **APPEND/OVERWRITE modes**: References must match bases in the + existing manifest + Returns ------- List[FragmentMetadata] | Transaction @@ -1002,6 +1018,7 @@ def write_fragments( data_storage_version=data_storage_version, storage_options=storage_options, enable_stable_row_ids=enable_stable_row_ids, + target_bases=target_bases, ) diff --git a/python/python/tests/test_multi_base.py b/python/python/tests/test_multi_base.py index d7164f639f7..bde947b84e0 100644 --- a/python/python/tests/test_multi_base.py +++ b/python/python/tests/test_multi_base.py @@ -12,8 +12,10 @@ import lance import pandas as pd +import pyarrow as pa import pytest from lance import DatasetBasePath +from lance.fragment import write_fragments class TestMultiBase: @@ -966,3 +968,127 @@ def test_add_bases_with_transaction_properties(self): result = dataset.to_table().to_pandas() assert len(result) == 30 assert set(result["id"]) == set(range(30)) + + +class TestWriteFragmentsWithTargetBases: + """Test write_fragments with target_bases parameter.""" + + def setup_method(self): + """Set up test directories for each test.""" + self.test_dir = tempfile.mkdtemp() + self.test_id = str(uuid.uuid4())[:8] + + # Create primary and additional path directories + self.primary_uri = str(Path(self.test_dir) / "primary") + self.base1_uri = str(Path(self.test_dir) / f"base1_{self.test_id}") + self.base2_uri = str(Path(self.test_dir) / f"base2_{self.test_id}") + + # Create directories + for uri in [self.primary_uri, self.base1_uri, self.base2_uri]: + Path(uri).mkdir(parents=True, exist_ok=True) + + def teardown_method(self): + """Clean up test directories after each test.""" + if hasattr(self, "test_dir"): + shutil.rmtree(self.test_dir, ignore_errors=True) + + def test_write_fragments_with_target_bases(self): + """Test write_fragments with target_bases parameter.""" + # Create initial dataset with multiple bases + initial_data = pd.DataFrame( + { + "id": range(50), + "value": [f"initial_{i}" for i in range(50)], + } + ) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.base1_uri, name="base1"), + DatasetBasePath(self.base2_uri, name="base2"), + ], + target_bases=["base1"], + max_rows_per_file=25, + ) + + # Verify initial data is written + assert len(dataset.to_table()) == 50 + + # Write fragments using write_fragments with target_bases + fragment_data = pd.DataFrame( + { + "id": range(50, 75), + "value": [f"fragment_{i}" for i in range(50, 75)], + } + ) + + # Use write_fragments with target_bases set to base2 + fragments = write_fragments( + pa.Table.from_pandas(fragment_data), + dataset, + mode="append", + target_bases=["base2"], + max_rows_per_file=25, + ) + + # Fragments should be created + assert len(fragments) > 0 + + # Commit the fragments using dataset.commit + operation = lance.LanceOperation.Append(fragments) + dataset = lance.LanceDataset.commit( + dataset.uri, operation, read_version=dataset.version + ) + + # Verify all data is present + result = dataset.to_table().to_pandas() + assert len(result) == 75 + assert set(result["id"]) == set(range(75)) + + # Verify fragments are in the correct base + # Check that some fragments exist in base2 + base2_path = Path(self.base2_uri) + data_files = list(base2_path.glob("**/*.lance")) + assert len(data_files) > 0, "Expected data files in base2" + + def test_write_fragments_transaction_with_target_bases(self): + """Test write_fragments with return_transaction and target_bases.""" + # Create initial dataset + initial_data = pd.DataFrame({"id": range(30), "value": range(30)}) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.base1_uri, name="base1"), + DatasetBasePath(self.base2_uri, name="base2"), + ], + target_bases=["base1"], + max_rows_per_file=15, + ) + + # Use write_fragments with return_transaction=True and target_bases + new_data = pd.DataFrame({"id": range(30, 50), "value": range(30, 50)}) + + transaction = write_fragments( + pa.Table.from_pandas(new_data), + dataset, + mode="append", + return_transaction=True, + target_bases=["base2"], + max_rows_per_file=10, + ) + + # Commit the transaction + dataset = lance.LanceDataset.commit( + dataset.uri, transaction, read_version=dataset.version + ) + + # Verify data + result = dataset.to_table().to_pandas() + assert len(result) == 50 + assert set(result["id"]) == set(range(50)) From 0d00dacf55cbfdc45717b528fe2d1ddda31ebece Mon Sep 17 00:00:00 2001 From: "jay.narale" Date: Fri, 14 Nov 2025 11:55:17 -0800 Subject: [PATCH 2/4] add a test for overwrite mode --- python/python/tests/test_multi_base.py | 59 ++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/python/python/tests/test_multi_base.py b/python/python/tests/test_multi_base.py index bde947b84e0..33bf0731df2 100644 --- a/python/python/tests/test_multi_base.py +++ b/python/python/tests/test_multi_base.py @@ -1092,3 +1092,62 @@ def test_write_fragments_transaction_with_target_bases(self): result = dataset.to_table().to_pandas() assert len(result) == 50 assert set(result["id"]) == set(range(50)) + + def test_write_fragments_overwrite_mode_with_target_bases(self): + """Test write_fragments in OVERWRITE mode with target_bases.""" + # Create initial dataset + initial_data = pd.DataFrame({ + "id": range(30), + "value": [f"initial_{i}" for i in range(30)], + }) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.base1_uri, name="base1"), + DatasetBasePath(self.base2_uri, name="base2"), + ], + target_bases=["base1"], + max_rows_per_file=15, + ) + + assert len(dataset.to_table()) == 30 + + # Use write_fragments with mode="overwrite" to replace all data + overwrite_data = pd.DataFrame({ + "id": range(100, 120), + "value": [f"overwrite_{i}" for i in range(100, 120)], + }) + + fragments = write_fragments( + pa.Table.from_pandas(overwrite_data), + dataset, + mode="overwrite", + target_bases=["base2"], # Write to base2 this time + max_rows_per_file=10, + ) + + assert len(fragments) > 0 + + # Commit with Overwrite operation + operation = lance.LanceOperation.Overwrite( + pa.Table.from_pandas(overwrite_data).schema, + fragments + ) + dataset = lance.LanceDataset.commit( + dataset.uri, operation, read_version=dataset.version + ) + + # Verify data was overwritten (only new data should exist) + result = dataset.to_table().to_pandas() + assert len(result) == 20 + assert set(result["id"]) == set(range(100, 120)) + # Old data (0-29) should be gone + assert not any(result["id"] < 100) + + # Verify fragments are in base2 + base2_path = Path(self.base2_uri) + data_files = list(base2_path.glob("**/*.lance")) + assert len(data_files) > 0, "Expected data files in base2" From 212739efebf020e14589ab0a31782d38a8d590a0 Mon Sep 17 00:00:00 2001 From: "jay.narale" Date: Fri, 14 Nov 2025 16:58:15 -0800 Subject: [PATCH 3/4] Add support for create flow --- python/python/lance/dataset.py | 5 ++ python/python/lance/fragment.py | 17 +++++- python/python/tests/test_multi_base.py | 80 ++++++++++++++++++++++---- python/src/transaction.rs | 22 ++++++- 4 files changed, 112 insertions(+), 12 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cb67d62d13f..3725e911f21 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3739,6 +3739,10 @@ class Overwrite(BaseOperation): The schema of the new dataset. fragments: list[FragmentMetadata] The fragments that make up the new dataset. + initial_bases: list[DatasetBasePath], optional + Base paths to register when creating a new dataset (CREATE mode only). + **Only valid in CREATE mode**. Will raise an error if used with + OVERWRITE on existing dataset. Warning ------- @@ -3773,6 +3777,7 @@ class Overwrite(BaseOperation): new_schema: LanceSchema | pa.Schema fragments: Iterable[FragmentMetadata] + initial_bases: Optional[List[DatasetBasePath]] = None def __post_init__(self): if isinstance(self.new_schema, pa.Schema): diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index a9e9c2ba703..eb323edac41 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -41,6 +41,7 @@ if TYPE_CHECKING: from .dataset import ( ColumnOrdering, + DatasetBasePath, LanceDataset, LanceScanner, ReaderLike, @@ -866,6 +867,7 @@ def write_fragments( storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, + initial_bases: Optional[List["DatasetBasePath"]] = None, ) -> Transaction: ... @overload @@ -885,6 +887,7 @@ def write_fragments( storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, + initial_bases: Optional[List["DatasetBasePath"]] = None, ) -> List[FragmentMetadata]: ... @@ -904,6 +907,7 @@ def write_fragments( storage_options: Optional[Dict[str, str]] = None, enable_stable_row_ids: bool = False, target_bases: Optional[List[str]] = None, + initial_bases: Optional[List["DatasetBasePath"]] = None, ) -> List[FragmentMetadata] | Transaction: """ Write data into one or more fragments. @@ -966,9 +970,19 @@ def write_fragments( 2. Base path URI (e.g., "s3://bucket1/data") **CREATE mode**: References must match bases in `initial_bases` - (from dataset creation) **APPEND/OVERWRITE modes**: References must match bases in the existing manifest + initial_bases : list of DatasetBasePath, optional + Base paths to register when creating a new dataset (CREATE mode only). + + This allows `target_bases` references to be resolved during fragment + writing. Example: + + >>> from lance import DatasetBasePath + >>> initial_bases = [DatasetBasePath(path="s3://bucket1/data", name="base1")] + + **Only valid in CREATE mode**. Will raise an error if used with + APPEND/OVERWRITE modes. Returns ------- @@ -1019,6 +1033,7 @@ def write_fragments( storage_options=storage_options, enable_stable_row_ids=enable_stable_row_ids, target_bases=target_bases, + initial_bases=initial_bases, ) diff --git a/python/python/tests/test_multi_base.py b/python/python/tests/test_multi_base.py index 33bf0731df2..baa437b4fc6 100644 --- a/python/python/tests/test_multi_base.py +++ b/python/python/tests/test_multi_base.py @@ -1096,10 +1096,12 @@ def test_write_fragments_transaction_with_target_bases(self): def test_write_fragments_overwrite_mode_with_target_bases(self): """Test write_fragments in OVERWRITE mode with target_bases.""" # Create initial dataset - initial_data = pd.DataFrame({ - "id": range(30), - "value": [f"initial_{i}" for i in range(30)], - }) + initial_data = pd.DataFrame( + { + "id": range(30), + "value": [f"initial_{i}" for i in range(30)], + } + ) dataset = lance.write_dataset( initial_data, @@ -1116,10 +1118,12 @@ def test_write_fragments_overwrite_mode_with_target_bases(self): assert len(dataset.to_table()) == 30 # Use write_fragments with mode="overwrite" to replace all data - overwrite_data = pd.DataFrame({ - "id": range(100, 120), - "value": [f"overwrite_{i}" for i in range(100, 120)], - }) + overwrite_data = pd.DataFrame( + { + "id": range(100, 120), + "value": [f"overwrite_{i}" for i in range(100, 120)], + } + ) fragments = write_fragments( pa.Table.from_pandas(overwrite_data), @@ -1133,8 +1137,7 @@ def test_write_fragments_overwrite_mode_with_target_bases(self): # Commit with Overwrite operation operation = lance.LanceOperation.Overwrite( - pa.Table.from_pandas(overwrite_data).schema, - fragments + pa.Table.from_pandas(overwrite_data).schema, fragments ) dataset = lance.LanceDataset.commit( dataset.uri, operation, read_version=dataset.version @@ -1151,3 +1154,60 @@ def test_write_fragments_overwrite_mode_with_target_bases(self): base2_path = Path(self.base2_uri) data_files = list(base2_path.glob("**/*.lance")) assert len(data_files) > 0, "Expected data files in base2" + + def test_write_fragments_create_mode_with_initial_bases(self): + """Test write_fragments in CREATE mode with initial_bases.""" + # Create a new dataset URI (doesn't exist yet) + dataset_uri = Path(self.test_dir) / "new_dataset_with_commit" + + # Create base paths + base1_path = Path(self.test_dir) / "base1_new" + base2_path = Path(self.test_dir) / "base2_new" + base1_path.mkdir(parents=True, exist_ok=True) + base2_path.mkdir(parents=True, exist_ok=True) + + # Define initial bases to register using DatasetBasePath objects + initial_bases = [ + lance.DatasetBasePath(path=str(base1_path), name="base1"), + lance.DatasetBasePath(path=str(base2_path), name="base2"), + ] + + # Write fragments in CREATE mode with both initial_bases and target_bases + # Use return_transaction=True so that the Rust code properly assigns + # IDs to initial_bases + data = pa.table({"id": range(20), "value": [f"val_{i}" for i in range(20)]}) + transaction = write_fragments( + data, + str(dataset_uri), + mode="create", + target_bases=["base1"], + initial_bases=initial_bases, + return_transaction=True, + ) + + # Commit the transaction (initial_bases with proper IDs are already in + # the transaction) + dataset = lance.LanceDataset.commit(str(dataset_uri), transaction) + + # Verify dataset was created + assert dataset.count_rows() == 20 + result = dataset.to_table().to_pandas() + assert len(result) == 20 + assert set(result["id"]) == set(range(20)) + + # Verify base paths are registered + base_paths = dataset._ds.base_paths() + assert len(base_paths) == 2 # 2 bases (base1, base2) + # Check that our named bases are registered + base_names = [bp.name for bp in base_paths.values() if bp.name is not None] + assert "base1" in base_names + assert "base2" in base_names + + # Verify data files are in base1 (not in dataset root) + data_files_base1 = list(base1_path.glob("**/*.lance")) + assert len(data_files_base1) > 0, "Expected data files in base1" + + # Dataset root should not have data files (only manifest) + dataset_root = Path(dataset_uri) + data_files_root = list(dataset_root.glob("*.lance")) + assert len(data_files_root) == 0, "Should not have data files in root" diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 131f7957782..c384095495a 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -351,6 +351,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { Operation::Overwrite { ref fragments, ref schema, + ref initial_bases, .. } => { let fragments_py = export_vec(py, fragments.as_slice())?; @@ -361,7 +362,26 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { .getattr("Overwrite") .expect("Failed to get Overwrite class"); - cls.call1((schema_py, fragments_py)) + // Convert initial_bases to Python if present. + let initial_bases_py = if let Some(bases) = initial_bases { + use crate::dataset::DatasetBasePath; + // Convert each Rust BasePath to a Python DatasetBasePath object + let bases_py: Vec = bases + .iter() + .map(|bp| DatasetBasePath::from(bp.clone())) + .collect(); + Some(pyo3::types::PyList::new(py, bases_py)?) + } else { + None + }; + + // Call Python Overwrite constructor with or without initial_bases + // to maintain backward compatibility with existing code + if let Some(bases_list) = initial_bases_py { + cls.call1((schema_py, fragments_py, bases_list)) + } else { + cls.call1((schema_py, fragments_py)) + } } Operation::Update { removed_fragment_ids, From ad3035e9cc5e93ffa4d8cbf61e2c9f84bdaa7c08 Mon Sep 17 00:00:00 2001 From: "jay.narale" Date: Tue, 18 Nov 2025 09:15:04 -0800 Subject: [PATCH 4/4] Explictly set to none --- python/src/transaction.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/python/src/transaction.rs b/python/src/transaction.rs index c384095495a..2fc5272bd93 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -362,7 +362,6 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { .getattr("Overwrite") .expect("Failed to get Overwrite class"); - // Convert initial_bases to Python if present. let initial_bases_py = if let Some(bases) = initial_bases { use crate::dataset::DatasetBasePath; // Convert each Rust BasePath to a Python DatasetBasePath object @@ -370,18 +369,12 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { .iter() .map(|bp| DatasetBasePath::from(bp.clone())) .collect(); - Some(pyo3::types::PyList::new(py, bases_py)?) + pyo3::types::PyList::new(py, bases_py)?.into_any() } else { - None + py.None().into_bound(py) }; - // Call Python Overwrite constructor with or without initial_bases - // to maintain backward compatibility with existing code - if let Some(bases_list) = initial_bases_py { - cls.call1((schema_py, fragments_py, bases_list)) - } else { - cls.call1((schema_py, fragments_py)) - } + cls.call1((schema_py, fragments_py, initial_bases_py)) } Operation::Update { removed_fragment_ids,