-
Notifications
You must be signed in to change notification settings - Fork 736
feat: dataset supports deep_clone #5250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
c0bc1c5
38db690
1a9d3b5
c84719c
bad5e7b
21e5071
93b5c41
91c2fc2
d8a73c4
3ce420b
ad4a6f8
5216f58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,13 +40,13 @@ use lance_io::object_store::{ | |
| use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct}; | ||
| use lance_namespace::LanceNamespace; | ||
| use lance_table::format::{ | ||
| pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, | ||
| pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, | ||
| }; | ||
| use lance_table::io::commit::{ | ||
| migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler, | ||
| CommitLock, ManifestLocation, ManifestNamingScheme, | ||
| }; | ||
| use lance_table::io::manifest::read_manifest; | ||
| use lance_table::io::manifest::{read_manifest, read_manifest_indexes}; | ||
| use object_store::path::Path; | ||
| use prost::Message; | ||
| use roaring::RoaringBitmap; | ||
|
|
@@ -112,6 +112,7 @@ use lance_core::box_error; | |
| pub use lance_core::ROW_ID; | ||
| use lance_namespace::models::{CreateEmptyTableRequest, DescribeTableRequest}; | ||
| use lance_table::feature_flags::{apply_feature_flags, can_read_dataset}; | ||
| use lance_table::io::deletion::relative_deletion_file_path; | ||
| pub use schema_evolution::{ | ||
| BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore, | ||
| }; | ||
|
|
@@ -207,6 +208,30 @@ impl From<&Manifest> for Version { | |
| } | ||
| } | ||
|
|
||
| /// A file path wrapper that can be used to represent a file in a dataset. | ||
| /// This wrapper is used for changing the base_path like deep_clone | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct FilePath { | ||
| pub relative_path: String, | ||
| pub base_path: Path, | ||
| } | ||
|
|
||
| impl FilePath { | ||
| fn absolute_path_with_base(&self, base: &Path) -> Path { | ||
| let mut path = base.clone(); | ||
| for seg in self.relative_path.split('/') { | ||
| if !seg.is_empty() { | ||
| path = path.child(seg); | ||
| } | ||
| } | ||
| path | ||
| } | ||
|
|
||
| fn absolute_path(&self) -> Path { | ||
| self.absolute_path_with_base(&self.base_path) | ||
| } | ||
| } | ||
|
|
||
| /// Customize read behavior of a dataset. | ||
| #[derive(Clone, Debug)] | ||
| pub struct ReadParams { | ||
|
|
@@ -2141,6 +2166,86 @@ impl Dataset { | |
| builder.execute(transaction).await | ||
| } | ||
|
|
||
| /// Deep clone the target version into a new dataset at target_path. | ||
| /// This performs a server-side copy of all relevant dataset files (data files, | ||
| /// deletion files, and any external row-id files) into the target dataset | ||
| /// without loading data into memory. | ||
| /// | ||
| /// Parameters: | ||
| /// - `target_path`: the URI string to clone the dataset into. | ||
| /// - `version`: the version cloned from, could be a version number, branch head, or tag. | ||
| /// - `store_params`: the object store params to use for the new dataset. | ||
| pub async fn deep_clone( | ||
| &mut self, | ||
| target_path: &str, | ||
| version: impl Into<refs::Ref>, | ||
| store_params: Option<ObjectStoreParams>, | ||
| ) -> Result<Self> { | ||
| use futures::StreamExt; | ||
|
|
||
| // Resolve source dataset and its manifest using checkout_version | ||
| let src_ds = self.checkout_version(version).await?; | ||
| let path_specs = self.collect_paths().await?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should use src_ds? |
||
|
|
||
| // Prepare target object store and base path | ||
| let (target_store, target_base) = ObjectStore::from_uri_and_params( | ||
| self.session.store_registry(), | ||
| target_path, | ||
| &store_params.clone().unwrap_or_default(), | ||
| ) | ||
| .await?; | ||
|
|
||
| // Prevent cloning into an existing target dataset | ||
| if self | ||
| .commit_handler | ||
| .resolve_latest_location(&target_base, &target_store) | ||
| .await | ||
| .is_ok() | ||
| { | ||
| return Err(Error::DatasetAlreadyExists { | ||
| uri: target_path.to_string(), | ||
| location: location!(), | ||
| }); | ||
| } | ||
|
|
||
| let io_parallelism = self.object_store.io_parallelism(); | ||
| let copy_futures = path_specs | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this is the best we can do now, but all cloud storage have batch copy, we should ideally leverage that, but that requires upstream support. Maybe create a github issue to track that and add a TODO here. |
||
| .iter() | ||
| .map(|file| { | ||
| let store = Arc::clone(&target_store); | ||
| let src_path = file.absolute_path(); | ||
| let target_path = file.absolute_path_with_base(&target_base); | ||
| async move { store.copy(&src_path, &target_path).await.map(|_| ()) } | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| futures::stream::iter(copy_futures) | ||
| .buffer_unordered(io_parallelism) | ||
| .collect::<Vec<_>>() | ||
| .await | ||
| .into_iter() | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| // Record a Clone operation and commit via CommitBuilder | ||
| let ref_name = src_ds.manifest.branch.clone(); | ||
| let ref_version = src_ds.manifest_location.version; | ||
| let clone_op = Operation::Clone { | ||
| is_shallow: false, | ||
| ref_name, | ||
| ref_version, | ||
| ref_path: src_ds.uri().to_string(), | ||
| branch_name: None, | ||
| }; | ||
| let txn = Transaction::new(ref_version, clone_op, None); | ||
| let builder = CommitBuilder::new(WriteDestination::Uri(target_path)) | ||
| .with_store_params(store_params.clone().unwrap_or_default()) | ||
| .with_object_store(target_store.clone()) | ||
| .with_commit_handler(self.commit_handler.clone()) | ||
| .with_storage_format(self.manifest.data_storage_format.lance_file_version()?); | ||
| let new_ds = builder.execute(txn).await?; | ||
| Ok(new_ds) | ||
| } | ||
|
|
||
| async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> { | ||
| match reference { | ||
| refs::Ref::Version(branch, version_number) => { | ||
|
|
@@ -2162,6 +2267,93 @@ impl Dataset { | |
| } | ||
| } | ||
|
|
||
| async fn collect_paths(&self) -> Result<Vec<FilePath>> { | ||
| let mut file_paths = Vec::new(); | ||
| for fragment in self.manifest.fragments.iter() { | ||
| if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta { | ||
| return Err(Error::Internal { | ||
| message: format!( | ||
| "External row_id_meta is not supported yet. external file path: {}", | ||
| external_file.path | ||
| ), | ||
| location: location!(), | ||
| }); | ||
| } | ||
| for data_file in fragment.files.iter() { | ||
| let base_root = if let Some(base_id) = data_file.base_id { | ||
| let base_path = | ||
| self.manifest | ||
| .base_paths | ||
| .get(&base_id) | ||
| .ok_or_else(|| Error::Internal { | ||
| message: format!("base_id {} not found", base_id), | ||
| location: location!(), | ||
| })?; | ||
| Path::parse(base_path.path.as_str())? | ||
| } else { | ||
| self.base.clone() | ||
| }; | ||
| file_paths.push(FilePath { | ||
| relative_path: format!("{}/{}", DATA_DIR, data_file.path.clone()), | ||
| base_path: base_root, | ||
| }); | ||
| } | ||
| if let Some(deletion_file) = &fragment.deletion_file { | ||
| let base_root = if let Some(base_id) = deletion_file.base_id { | ||
| let base_path = | ||
| self.manifest | ||
| .base_paths | ||
| .get(&base_id) | ||
| .ok_or_else(|| Error::Internal { | ||
| message: format!("base_id {} not found", base_id), | ||
| location: location!(), | ||
| })?; | ||
| Path::parse(base_path.path.as_str())? | ||
| } else { | ||
| self.base.clone() | ||
| }; | ||
| file_paths.push(FilePath { | ||
| relative_path: relative_deletion_file_path(fragment.id, deletion_file), | ||
| base_path: base_root, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| let indices = read_manifest_indexes( | ||
| self.object_store.as_ref(), | ||
| &self.manifest_location, | ||
| &self.manifest, | ||
| ) | ||
| .await?; | ||
|
|
||
| for index in &indices { | ||
| let base_root = if let Some(base_id) = index.base_id { | ||
| let base_path = | ||
| self.manifest | ||
| .base_paths | ||
| .get(&base_id) | ||
| .ok_or_else(|| Error::Internal { | ||
| message: format!("base_id {} not found", base_id), | ||
| location: location!(), | ||
| })?; | ||
| Path::parse(base_path.path.as_str())? | ||
| } else { | ||
| self.base.clone() | ||
| }; | ||
| let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string()); | ||
| let mut stream = self.object_store.read_dir_all(&index_root, None); | ||
| while let Some(meta) = stream.next().await.transpose()? { | ||
| if let Some(filename) = meta.location.filename() { | ||
| file_paths.push(FilePath { | ||
| relative_path: format!("{}/{}/{}", INDICES_DIR, index.uuid, filename), | ||
| base_path: base_root.clone(), | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| Ok(file_paths) | ||
| } | ||
|
|
||
| /// Run a SQL query against the dataset. | ||
| /// The underlying SQL engine is DataFusion. | ||
| /// Please refer to the DataFusion documentation for supported SQL syntax. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,7 @@ async fn do_commit_new_dataset( | |
| }; | ||
|
|
||
| let (mut manifest, indices) = if let Operation::Clone { | ||
| is_shallow, | ||
| ref_name, | ||
| ref_version, | ||
| ref_path, | ||
|
|
@@ -138,37 +139,74 @@ async fn do_commit_new_dataset( | |
| ) | ||
| .await?; | ||
|
|
||
| let new_base_id = source_manifest | ||
| .base_paths | ||
| .keys() | ||
| .max() | ||
| .map(|id| *id + 1) | ||
| .unwrap_or(0); | ||
| let new_manifest = source_manifest.shallow_clone( | ||
| ref_name.clone(), | ||
| ref_path.clone(), | ||
| new_base_id, | ||
| branch_name.clone(), | ||
| transaction_file, | ||
| ); | ||
| if *is_shallow { | ||
| let new_base_id = source_manifest | ||
| .base_paths | ||
| .keys() | ||
| .max() | ||
| .map(|id| *id + 1) | ||
| .unwrap_or(0); | ||
| let new_manifest = source_manifest.shallow_clone( | ||
| ref_name.clone(), | ||
| ref_path.clone(), | ||
| new_base_id, | ||
| branch_name.clone(), | ||
| transaction_file, | ||
| ); | ||
|
|
||
| let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { | ||
| let reader = object_store.open(&source_manifest_location.path).await?; | ||
| let section: pb::IndexSection = | ||
| lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; | ||
| section | ||
| .indices | ||
| .into_iter() | ||
| .map(|index_pb| { | ||
| let mut index = IndexMetadata::try_from(index_pb)?; | ||
| index.base_id = Some(new_base_id); | ||
| Ok(index) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()? | ||
| let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { | ||
| let reader = object_store.open(&source_manifest_location.path).await?; | ||
| let section: pb::IndexSection = | ||
| lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; | ||
| section | ||
| .indices | ||
| .into_iter() | ||
| .map(|index_pb| { | ||
| let mut index = IndexMetadata::try_from(index_pb)?; | ||
| index.base_id = Some(new_base_id); | ||
| Ok(index) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()? | ||
| } else { | ||
| vec![] | ||
| }; | ||
| (new_manifest, updated_indices) | ||
| } else { | ||
| vec![] | ||
| }; | ||
| (new_manifest, updated_indices) | ||
| // Deep clone: build a manifest that references local files (no external bases) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realized one thing, we need to also set base_id for ExternalFile, so those files can be handled for shallow clone case, and also not inherit base_id for deep clone case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did realize that. But I searched the code and asked @yanghua he told me this has not been used in any write paths yet. And I don't know what the path looks like and how it shoud be copied. So I wrote a check: Maybe you have more context to give me?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another place I can think of is https://github.com/lance-format/lance/blob/main/rust/lance/src/index/frag_reuse.rs#L162 For stable tow id, we are not doing it today but from spec correctness perspective I think we should probably handle it here. Can be a separated PR though I'm fine with that
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think we can handle |
||
| let mut new_manifest = source_manifest.clone(); | ||
| new_manifest.base_paths.clear(); | ||
| new_manifest.branch = None; | ||
| new_manifest.tag = None; | ||
| new_manifest.index_section = None; // will be rewritten below | ||
| let mut new_frags = new_manifest.fragments.as_ref().clone(); | ||
| for f in &mut new_frags { | ||
| for df in &mut f.files { | ||
| df.base_id = None; | ||
| } | ||
| if let Some(d) = f.deletion_file.as_mut() { | ||
| d.base_id = None; | ||
| } | ||
| } | ||
| new_manifest.fragments = Arc::new(new_frags); | ||
|
|
||
| // Indices: keep metadata but normalize base to local | ||
| let mut updated_indices = Vec::new(); | ||
| if let Some(index_section_pos) = source_manifest.index_section { | ||
| let reader = object_store.open(&source_manifest_location.path).await?; | ||
| let section: pb::IndexSection = | ||
| lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?; | ||
| updated_indices = section | ||
| .indices | ||
| .into_iter() | ||
| .map(|index_pb| { | ||
| let mut index = IndexMetadata::try_from(index_pb)?; | ||
| index.base_id = None; | ||
| Ok(index) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| } | ||
| (new_manifest, updated_indices) | ||
| } | ||
| } else { | ||
| let (manifest, indices) = | ||
| transaction.build_manifest(None, vec![], &transaction_file, write_config)?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just not have this for now? I agree we probably need some refactoring to have this, but it should be in an independent PR that covers all cases.