Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integrations/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ homepage = "https://opendal.apache.org/"
license = "Apache-2.0"
repository = "https://github.com/apache/opendal"
rust-version = "1.85"
version = "0.55.0"
version = "0.56.0"

[features]
send_wrapper = ["dep:send_wrapper"]
Expand All @@ -42,7 +42,7 @@ bytes = "1"
chrono = { version = "0.4.42", features = ["std", "clock"] }
futures = "0.3"
mea = "0.6"
object_store = "0.12.3"
object_store = "0.13.0"
opendal = { version = "0.55.0", path = "../../core", default-features = false }
pin-project = "1.1"
send_wrapper = { version = "0.6", features = ["futures"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use bytes::Bytes;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
#[cfg(feature = "services-s3")]
use object_store::aws::AmazonS3Builder;
use object_store::path::Path as ObjectStorePath;
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use service::{ObjectStoreBuilder, ObjectStoreService};
// Make sure `send_wrapper` works as expected
#[cfg(all(feature = "send_wrapper", test))]
mod assert_send {
use object_store::{ObjectStore, PutPayload};
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
use opendal::Operator;

#[allow(dead_code)]
Expand Down
4 changes: 2 additions & 2 deletions integrations/object_store/src/service/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::sync::Arc;

use futures::stream::{self, StreamExt};
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectStore, ObjectStoreExt};
use opendal::raw::oio::BatchDeleteResult;
use opendal::raw::*;
use opendal::*;
Expand All @@ -44,7 +44,7 @@ impl oio::BatchDelete for ObjectStoreDeleter {

async fn delete_batch(&self, paths: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
// convert paths to stream, then use [`ObjectStore::delete_stream`] to delete them in batch
let stream = stream::iter(paths.iter())
let stream = stream::iter(paths.clone())
.map(|(path, _)| Ok::<_, object_store::Error>(ObjectStorePath::from(path.as_str())))
.boxed();
let results = self.store.delete_stream(stream).collect::<Vec<_>>().await;
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
let builder = ObjectStoreBuilder::new(store);

let backend = builder.build().expect("build should succeed");
assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
assert_eq!(backend.info().scheme(), OBJECT_STORE_SCHEME);
}

#[tokio::test]
Expand Down
73 changes: 38 additions & 35 deletions integrations/object_store/src/store.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @alamb @tustvold in case you'd like to see how other lib is integrated with object_store 0.13.

And perhaps there is more best pratice to do what gets done here.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use futures::TryStreamExt;
use futures::stream::BoxStream;
use mea::mutex::Mutex;
use mea::oneshot;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
Expand All @@ -41,6 +40,7 @@ use object_store::path::Path;
use object_store::{GetOptions, UploadPart};
use object_store::{GetRange, GetResultPayload};
use object_store::{GetResult, PutMode};
use object_store::{ListResult, RenameOptions};
use opendal::Buffer;
use opendal::Writer;
use opendal::options::CopyOptions;
Expand Down Expand Up @@ -227,23 +227,6 @@ impl ObjectStore for OpendalStore {
Ok(PutResult { e_tag, version })
}

async fn put_multipart(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now internally implemented by ObjectStoreExt.

&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
let decoded_location = percent_decode_path(location.as_ref());
let writer = self
.inner
.writer_with(&decoded_location)
.concurrent(8)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());

Ok(Box::new(upload))
}

async fn put_multipart_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -430,15 +413,27 @@ impl ObjectStore for OpendalStore {
})
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with delete_stream.

let decoded_location = percent_decode_path(location.as_ref());
self.inner
.delete(&decoded_location)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Ok(())
fn delete_stream(
&self,
locations: BoxStream<'static, object_store::Result<Path>>,
) -> BoxStream<'static, object_store::Result<Path>> {
// TODO: use batch delete to optimize performance
let client = self.inner.clone();
locations
.then(move |location| {
let client = client.clone();
async move {
let location = location?;
let decoded_location = percent_decode_path(location.as_ref());
client
.delete(&decoded_location)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(location)
}
})
.boxed()
Comment on lines +422 to +436
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object_store uses multiple batching mechanisms:

    fn delete_stream(
        &self,
        locations: BoxStream<'static, Result<Path>>,
    ) -> BoxStream<'static, Result<Path>> {
        let client = Arc::clone(&self.client);
        locations
            .try_chunks(1_000)
            .map(move |locations| {
                let client = Arc::clone(&client);
                async move {
                    // Early return the error. We ignore the paths that have already been
                    // collected into the chunk.
                    let locations = locations.map_err(|e| e.1)?;
                    client
                        .bulk_delete_request(locations)
                        .await
                        .map(futures::stream::iter)
                }
            })
            .buffered(20)
            .try_flatten()
            .boxed()
    }

}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
Expand Down Expand Up @@ -576,11 +571,23 @@ impl ObjectStore for OpendalStore {
})
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.copy_request(from, to, false).await
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: object_store::CopyOptions,
) -> object_store::Result<()> {
let if_not_exists = matches!(options.mode, object_store::CopyMode::Create);
self.copy_request(from, to, if_not_exists).await
}

async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
async fn rename_opts(
&self,
from: &Path,
to: &Path,
// TODO: if we need to support rename options in the future
_options: RenameOptions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OpenDAL doesn't support different rename mode now.

) -> object_store::Result<()> {
self.inner
.rename(
&percent_decode_path(from.as_ref()),
Expand All @@ -592,10 +599,6 @@ impl ObjectStore for OpendalStore {

Ok(())
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.copy_request(from, to, true).await
}
Comment on lines -596 to -598
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to ObjectStoreExt.

}

/// `MultipartUpload`'s impl based on `Writer` in opendal
Expand Down Expand Up @@ -691,7 +694,7 @@ impl Debug for OpendalMultipartUpload {
mod tests {
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, WriteMultipart};
use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
use opendal::services;
use rand::prelude::*;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/tests/behavior/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::utils::build_trail;
use bytes::Bytes;
use libtest_mimic::Trial;
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path;
use object_store_opendal::OpendalStore;

Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/tests/behavior/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::utils::{build_trail, new_file_path};
use anyhow::Result;
use bytes::Bytes;
use libtest_mimic::Trial;
use object_store::{GetOptions, GetRange, ObjectStore};
use object_store::{GetOptions, GetRange, ObjectStore, ObjectStoreExt};
use object_store_opendal::OpendalStore;

pub fn tests(store: &OpendalStore, tests: &mut Vec<Trial>) {
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/tests/behavior/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::utils::{build_trail, new_file_path};
use anyhow::Result;
use bytes::Bytes;
use libtest_mimic::Trial;
use object_store::{ObjectStore, PutMode, UpdateVersion};
use object_store::{ObjectStore, ObjectStoreExt, PutMode, UpdateVersion};
use object_store_opendal::OpendalStore;

pub fn tests(store: &OpendalStore, tests: &mut Vec<Trial>) {
Expand Down
Loading