Skip to content

Commit

Permalink
Replace AsyncWrite with Upload trait (apache#5458)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 13, 2024
1 parent 51bcadb commit 7a0d010
Show file tree
Hide file tree
Showing 13 changed files with 509 additions and 745 deletions.
10 changes: 3 additions & 7 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
//!
//! ## Streaming uploads
//!
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
//! are uploaded concurrently.
//! [ObjectStore::upload] will upload data in blocks and write a blob from those blocks.
//!
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
multipart::{MultiPartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Expand Down
70 changes: 32 additions & 38 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Utilities for performing tokio-style buffered IO

use crate::path::Path;
use crate::{MultipartId, ObjectMeta, ObjectStore};
use crate::{ChunkedUpload, ObjectMeta, ObjectStore};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand Down Expand Up @@ -213,30 +213,26 @@ impl AsyncBufRead for BufReader {
pub struct BufWriter {
capacity: usize,
state: BufWriterState,
multipart_id: Option<MultipartId>,
store: Arc<dyn ObjectStore>,
}

impl std::fmt::Debug for BufWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufWriter")
.field("capacity", &self.capacity)
.field("multipart_id", &self.multipart_id)
.finish()
}
}

type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);

enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, Vec<u8>),
/// [`ObjectStore::put_multipart`]
Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
Prepare(BoxFuture<'static, std::io::Result<ChunkedUpload>>),
/// Write to a multipart upload
Write(Box<dyn AsyncWrite + Send + Unpin>),
Write(Option<ChunkedUpload>),
/// [`ObjectStore::put`]
Put(BoxFuture<'static, std::io::Result<()>>),
Flush(BoxFuture<'static, std::io::Result<()>>),
}

impl BufWriter {
Expand All @@ -251,14 +247,8 @@ impl BufWriter {
capacity,
store,
state: BufWriterState::Buffer(path, Vec::new()),
multipart_id: None,
}
}

/// Returns the [`MultipartId`] if multipart upload
pub fn multipart_id(&self) -> Option<&MultipartId> {
self.multipart_id.as_ref()
}
}

impl AsyncWrite for BufWriter {
Expand All @@ -270,12 +260,15 @@ impl AsyncWrite for BufWriter {
let cap = self.capacity;
loop {
return match &mut self.state {
BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Write(Some(write)) => {
write.write(buf);
Poll::Ready(Ok(buf.len()))
}
BufWriterState::Write(None) | BufWriterState::Flush(_) => {
panic!("Already shut down")
}
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
BufWriterState::Buffer(path, b) => {
Expand All @@ -284,9 +277,10 @@ impl AsyncWrite for BufWriter {
let path = std::mem::take(path);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let (id, mut writer) = store.put_multipart(&path).await?;
writer.write_all(&buffer).await?;
Ok((id, writer))
let upload = store.upload(&path).await?;
let mut chunked = ChunkedUpload::new(upload);
chunked.write(&buffer);
Ok(chunked)
}));
continue;
}
Expand All @@ -300,13 +294,10 @@ impl AsyncWrite for BufWriter {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
return match &mut self.state {
BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
BufWriterState::Flush(_) => panic!("Already shut down"),
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
};
Expand All @@ -317,21 +308,28 @@ impl AsyncWrite for BufWriter {
loop {
match &mut self.state {
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
}
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
let path = std::mem::take(p);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Put(Box::pin(async move {
self.state = BufWriterState::Flush(Box::pin(async move {
store.put(&path, buf.into()).await?;
Ok(())
}));
}
BufWriterState::Put(f) => return f.poll_unpin(cx),
BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx),
BufWriterState::Flush(f) => return f.poll_unpin(cx),
BufWriterState::Write(x) => {
let upload = x.take().unwrap();
self.state = BufWriterState::Flush(
async move {
upload.finish().await?;
Ok(())
}
.boxed(),
)
}
}
}
}
Expand Down Expand Up @@ -443,19 +441,15 @@ mod tests {
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
assert!(writer.multipart_id().is_none());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_none());
assert_eq!(store.head(&path).await.unwrap().size, 25);

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
assert!(writer.multipart_id().is_some());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_some());

assert_eq!(store.head(&path).await.unwrap().size, 40);
}
Expand Down
16 changes: 4 additions & 12 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::StreamExt;
use tokio::io::AsyncWrite;

use crate::path::Path;
use crate::Result;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions,
PutResult,
PutResult, Upload,
};
use crate::{MultipartId, Result};

/// Wraps a [`ObjectStore`] and makes its get response return chunks
/// in a controllable manner.
Expand Down Expand Up @@ -67,15 +66,8 @@ impl ObjectStore for ChunkedStore {
self.inner.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.inner.put_multipart(location).await
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.inner.abort_multipart(location, multipart_id).await
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
self.inner.upload(location).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down
99 changes: 54 additions & 45 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,35 @@
//! ## Multi-part uploads
//!
//! [Multi-part uploads](https://cloud.google.com/storage/docs/multipart-uploads)
//! can be initiated with the [ObjectStore::put_multipart] method.
//! Data passed to the writer is automatically buffered to meet the minimum size
//! requirements for a part. Multiple parts are uploaded concurrently.
//!
//! If the writer fails for any reason, you may have parts uploaded to GCS but not
//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
//! can be initiated with the [ObjectStore::upload] method. If neither [`Upload::complete`]
//! nor [`Upload::abort`] is invoked, you may have parts uploaded to GCS but not used,
//! that you will be charged for. It is recommended you configure a [lifecycle rule] to
//! abort incomplete multipart uploads after a certain period of time to avoid being
//! charged for storing partial uploads
//!
//! ## Using HTTP/2
//!
//! Google Cloud Storage supports both HTTP/2 and HTTP/1. HTTP/1 is used by default
//! because it allows much higher throughput in our benchmarks (see
//! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be
//! enabled by setting [crate::ClientConfigKey::Http1Only] to false.
//!
//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
use std::sync::Arc;

use crate::client::CredentialProvider;
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, PutOptions, PutResult, Result, Upload, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
use client::GoogleCloudStorageClient;
use futures::stream::BoxStream;
use tokio::io::AsyncWrite;

use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::multipart::MultiPartStore;
use crate::multipart::{MultiPartStore, Parts};
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
pub use credential::GcpCredential;

Expand Down Expand Up @@ -89,27 +84,50 @@ impl GoogleCloudStorage {
}
}

#[derive(Debug)]
struct GCSMultipartUpload {
state: Arc<UploadState>,
part_idx: usize,
}

#[derive(Debug)]
struct UploadState {
client: Arc<GoogleCloudStorageClient>,
path: Path,
multipart_id: MultipartId,
parts: Parts,
}

#[async_trait]
impl PutPart for GCSMultipartUpload {
/// Upload an object part <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
self.client
.put_part(&self.path, &self.multipart_id, part_idx, buf.into())
impl Upload for GCSMultipartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.path, &state.multipart_id, idx, data)
.await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.multipart_complete(&self.state.path, &self.state.multipart_id, parts)
.await
}

/// Complete a multipart upload <https://cloud.google.com/storage/docs/xml-api/post-object-complete>
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.multipart_complete(&self.path, &self.multipart_id, completed_parts)
.await?;
Ok(())
async fn abort(&mut self) -> Result<()> {
self.state
.client
.multipart_cleanup(&self.state.path, &self.state.multipart_id)
.await
}
}

Expand All @@ -119,27 +137,18 @@ impl ObjectStore for GoogleCloudStorage {
self.client.put(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
let upload_id = self.client.multipart_initiate(location).await?;

let inner = GCSMultipartUpload {
client: Arc::clone(&self.client),
path: location.clone(),
multipart_id: upload_id.clone(),
};

Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.multipart_cleanup(location, multipart_id)
.await?;

Ok(())
Ok(Box::new(GCSMultipartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
path: location.clone(),
multipart_id: upload_id.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down
13 changes: 3 additions & 10 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig,
ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig, Upload,
};

mod client;
Expand Down Expand Up @@ -115,15 +115,8 @@ impl ObjectStore for HttpStore {
})
}

async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
Err(super::Error::NotImplemented)
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
Err(super::Error::NotImplemented)
async fn upload(&self, _location: &Path) -> Result<Box<dyn Upload>> {
Err(crate::Error::NotImplemented)
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down
Loading

0 comments on commit 7a0d010

Please sign in to comment.