Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ pub struct ObjectStore {
download_retry_count: usize,
/// IO tracker for monitoring read/write operations
io_tracker: IOTracker,
/// The datastore prefix that uniquely identifies this object store. It encodes information
/// which usually cannot be found in the URL such as Azure account name. The prefix plus the
/// path uniquely identifies any object inside the store.
pub store_prefix: String,
}

impl DeepSizeOf for ObjectStore {
Expand Down Expand Up @@ -429,6 +433,7 @@ impl ObjectStore {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
io_tracker,
store_prefix,
};
let path = Path::parse(path.path())?;
return Ok((Arc::new(store), path));
Expand Down Expand Up @@ -858,14 +863,18 @@ impl ObjectStore {
) -> Self {
let scheme = location.scheme();
let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));

let store = match wrapper {
Some(wrapper) => {
let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY
.calculate_object_store_prefix(location.as_ref(), storage_options)
.unwrap();
wrapper.wrap(&store_prefix, store)
let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
Some(provider) => provider
.calculate_object_store_prefix(&location, storage_options)
.unwrap(),
None => {
let store_prefix = format!("{}${}", location.scheme(), location.authority());
log::warn!("Guessing that object store prefix is {}, since object store scheme is not found in registry.", store_prefix);
store_prefix
}
};
let store = match wrapper {
Some(wrapper) => wrapper.wrap(&store_prefix, store),
None => store,
};

Expand All @@ -883,6 +892,7 @@ impl ObjectStore {
io_parallelism,
download_retry_count,
io_tracker,
store_prefix,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ impl ObjectStoreProvider for AwsStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl ObjectStoreProvider for AzureBlobStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ impl ObjectStoreProvider for GcsStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/huggingface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ impl ObjectStoreProvider for HuggingfaceStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ impl ObjectStoreProvider for FileStoreProvider {
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}

Expand Down
4 changes: 3 additions & 1 deletion rust/lance-io/src/object_store/providers/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct MemoryStoreProvider;

#[async_trait::async_trait]
impl ObjectStoreProvider for MemoryStoreProvider {
async fn new_store(&self, _base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result<ObjectStore> {
let block_size = params.block_size.unwrap_or(DEFAULT_LOCAL_BLOCK_SIZE);
let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default());
let download_retry_count = storage_options.download_retry_count();
Expand All @@ -31,6 +31,8 @@ impl ObjectStoreProvider for MemoryStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options.as_ref())?,
})
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-io/src/object_store/providers/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ impl ObjectStoreProvider for OssStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
io_tracker: Default::default(),
store_prefix: self
.calculate_object_store_prefix(&url, params.storage_options.as_ref())?,
})
}
}
Expand Down
Loading