Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/table_identifier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ message TableIdentifier {
uint64 version = 2;
optional string manifest_etag = 3;
optional bytes serialized_manifest = 4;
map<string, string> storage_options = 5;
}
43 changes: 37 additions & 6 deletions rust/lance/src/io/exec/filtered_read_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use lance_table::format::Fragment;
use prost::Message;
use snafu::location;

use crate::dataset::builder::DatasetBuilder;
use crate::Dataset;

use super::filtered_read::{
Expand All @@ -47,6 +48,10 @@ pub fn table_identifier_from_dataset(dataset: &Dataset) -> pb::TableIdentifier {
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: None,
storage_options: dataset
.initial_storage_options()
.cloned()
.unwrap_or_default(),
}
}

Expand All @@ -60,7 +65,25 @@ pub fn table_identifier_from_dataset_with_manifest(dataset: &Dataset) -> pb::Tab
version: dataset.manifest.version,
manifest_etag: dataset.manifest_location.e_tag.clone(),
serialized_manifest: Some(manifest_proto.encode_to_vec()),
storage_options: dataset
.initial_storage_options()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should call latest_storage_options()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's good!

.cloned()
.unwrap_or_default(),
}
}

/// Open a dataset from a table identifier proto
pub async fn open_dataset_from_table_identifier(
table_id: &pb::TableIdentifier,
) -> Result<Arc<Dataset>> {
let mut builder = DatasetBuilder::from_uri(&table_id.uri).with_version(table_id.version);
if let Some(manifest_bytes) = &table_id.serialized_manifest {
builder = builder.with_serialized_manifest(manifest_bytes)?;
}
if !table_id.storage_options.is_empty() {
builder = builder.with_storage_options(table_id.storage_options.clone());
}
Ok(Arc::new(builder.load().await?))
}

// =============================================================================
Expand Down Expand Up @@ -96,16 +119,23 @@ pub fn filtered_read_exec_to_proto(
}

/// Reconstruct a [`FilteredReadExec`] from proto.
///
/// The `dataset` must already be opened on the remote side (using
/// the `TableIdentifier` from the proto). The optional `index_input`
/// child plan is provided by DataFusion's codec via its `inputs` parameter.
pub async fn filtered_read_exec_from_proto(
proto: pb::FilteredReadExecProto,
dataset: Arc<Dataset>,
dataset: Option<Arc<Dataset>>,
index_input: Option<Arc<dyn ExecutionPlan>>,
state: &SessionState,
) -> Result<FilteredReadExec> {
let dataset = match dataset {
Some(ds) => ds, // dataset could be opened or cached by the caller
None => {
let table_id = proto.table.as_ref().ok_or_else(|| Error::InvalidInput {
source: "Missing table identifier in FilteredReadExecProto".into(),
location: location!(),
})?;
open_dataset_from_table_identifier(table_id).await?
}
};

let options_proto = proto.options.ok_or_else(|| Error::InvalidInput {
source: "Missing options in FilteredReadExecProto".into(),
location: location!(),
Expand Down Expand Up @@ -623,6 +653,7 @@ mod tests {
version: 42,
manifest_etag: Some("etag123".to_string()),
serialized_manifest: None,
storage_options: HashMap::new(),
};
let bytes = id.encode_to_vec();
let back = pb::TableIdentifier::decode(bytes.as_slice()).unwrap();
Expand Down Expand Up @@ -780,7 +811,7 @@ mod tests {
assert!(table.serialized_manifest.is_none());

// Roundtrip back
let back = filtered_read_exec_from_proto(proto, dataset.clone(), None, &state)
let back = filtered_read_exec_from_proto(proto, Some(dataset.clone()), None, &state)
.await
.unwrap();

Expand Down
Loading