diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da2b8c4..ab59e6f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,6 +16,7 @@ # under the License. --- + name: CI concurrency: @@ -99,8 +100,8 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true - AWS_COPY_IF_NOT_EXISTS: multipart - AWS_CONDITIONAL_PUT: etag + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 + AWS_CONDITIONAL_PUT: dynamo:test-table:2000 AWS_SERVER_SIDE_ENCRYPTION: aws:kms HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket @@ -131,6 +132,7 @@ jobs: aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-checksum aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r .KeyMetadata.KeyId)" >> $GITHUB_ENV diff --git a/src/aws/builder.rs b/src/aws/builder.rs index 06503ca..6e6f8e2 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -1149,6 +1149,7 @@ impl AmazonS3Builder { let config = S3Config { region, + endpoint: self.endpoint, bucket, bucket_endpoint, credentials, diff --git a/src/aws/client.rs b/src/aws/client.rs index 913859d..4edb977 100644 --- a/src/aws/client.rs +++ b/src/aws/client.rs @@ -193,6 +193,7 @@ impl From for Error { #[derive(Debug)] pub(crate) struct S3Config { pub region: String, + pub endpoint: Option, pub bucket: String, pub bucket_endpoint: String, pub credentials: AwsCredentialProvider, diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs new file mode 100644 index 0000000..a6775ef --- /dev/null +++ b/src/aws/dynamo.rs @@ -0,0 +1,595 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A DynamoDB based lock system + +use std::borrow::Cow; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use chrono::Utc; +use http::{Method, StatusCode}; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize, Serializer}; + +use crate::aws::client::S3Client; +use crate::aws::credential::CredentialExt; +use crate::aws::{AwsAuthorizer, AwsCredential}; +use crate::client::get::GetClientExt; +use crate::client::retry::RetryExt; +use crate::client::retry::{RequestError, RetryError}; +use crate::path::Path; +use crate::{Error, GetOptions, Result}; + +/// The exception returned by DynamoDB on conflict +const CONFLICT: &str = "ConditionalCheckFailedException"; + +const STORE: &str = "DynamoDB"; + +/// A DynamoDB-based commit protocol, used to provide conditional write support for S3 +/// +/// ## Limitations +/// +/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can +/// therefore race with non-conditional operations, e.g. `put`, `copy`, `delete`, or +/// conditional operations performed by writers not configured to synchronize with DynamoDB. +/// +/// Workloads making use of this mechanism **must** ensure: +/// +/// * Conditional and non-conditional operations are not performed on the same paths +/// * Conditional operations are only performed via similarly configured clients +/// +/// Additionally as the locking mechanism relies on timeouts to detect stale locks, +/// performance will be poor for systems that frequently delete and then create +/// objects at the same path, instead being optimised for systems that primarily create +/// files with paths never used before, or perform conditional updates to existing files +/// +/// ## Commit Protocol +/// +/// The DynamoDB schema is as follows: +/// +/// * A string partition key named `"path"` +/// * A string sort key named `"etag"` +/// * A numeric [TTL] attribute named `"ttl"` +/// * A numeric attribute named `"generation"` +/// * A numeric attribute named `"timeout"` +/// +/// An appropriate DynamoDB table can be created with the CLI as follows: +/// +/// ```bash +/// $ aws dynamodb create-table --table-name --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S +/// $ aws dynamodb update-time-to-live --table-name --time-to-live-specification Enabled=true,AttributeName=ttl +/// ``` +/// +/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating), +/// the commit protocol is as follows: +/// +/// 1. Perform HEAD request on `path` and error on precondition mismatch +/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout +/// 1. On Success: Perform operation with the configured timeout +/// 2. On Conflict: +/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch +/// 2. If `timeout * max_skew_rate` passed, replace the record incrementing the `"generation"` +/// 1. On Success: GOTO 2.1 +/// 2. On Conflict: GOTO 2.2 +/// +/// Provided no writer modifies an object with a given `path` and `etag` without first adding a +/// corresponding record to DynamoDB, we are guaranteed that only one writer will ever commit. +/// +/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited +/// requirements of synchronizing object storage. The major changes are: +/// +/// * Uses a monotonic generation count instead of a UUID rvn, as this is: +/// * Cheaper to generate, serialize and compare +/// * Cannot collide +/// * More human readable / interpretable +/// * Relies on [TTL] to eventually clean up old locks +/// +/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit protocol, but +/// generalised to not make assumptions about the workload and not rely on first writing +/// to a temporary path. +/// +/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html +/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/ +/// [S3 Multi-Cluster]: https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DynamoCommit { + table_name: String, + /// The number of milliseconds a lease is valid for + timeout: u64, + /// The maximum clock skew rate tolerated by the system + max_clock_skew_rate: u32, + /// The length of time a record will be retained in DynamoDB before being cleaned up + /// + /// This is purely an optimisation to avoid indefinite growth of the DynamoDB table + /// and does not impact how long clients may wait to acquire a lock + ttl: Duration, + /// The backoff duration before retesting a condition + test_interval: Duration, +} + +impl DynamoCommit { + /// Create a new [`DynamoCommit`] with a given table name + pub fn new(table_name: String) -> Self { + Self { + table_name, + timeout: 20_000, + max_clock_skew_rate: 3, + ttl: Duration::from_secs(60 * 60), + test_interval: Duration::from_millis(100), + } + } + + /// Overrides the lock timeout. + /// + /// A longer lock timeout reduces the probability of spurious commit failures and multi-writer + /// races, but will increase the time that writers must wait to reclaim a lock lost. The + /// default value of 20 seconds should be appropriate for must use-cases. + pub fn with_timeout(mut self, millis: u64) -> Self { + self.timeout = millis; + self + } + + /// The maximum clock skew rate tolerated by the system. + /// + /// An environment in which the clock on the fastest node ticks twice as fast as the slowest + /// node, would have a clock skew rate of 2. The default value of 3 should be appropriate + /// for most environments. + pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self { + self.max_clock_skew_rate = rate; + self + } + + /// The length of time a record should be retained in DynamoDB before being cleaned up + /// + /// This should be significantly larger than the configured lock timeout, with the default + /// value of 1 hour appropriate for most use-cases. + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.ttl = ttl; + self + } + + /// Parse [`DynamoCommit`] from a string + pub(crate) fn from_str(value: &str) -> Option { + Some(match value.split_once(':') { + Some((table_name, timeout)) => { + Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) + } + None => Self::new(value.trim().to_string()), + }) + } + + /// Returns the name of the DynamoDB table. + pub(crate) fn table_name(&self) -> &str { + &self.table_name + } + + pub(crate) async fn copy_if_not_exists( + &self, + client: &Arc, + from: &Path, + to: &Path, + ) -> Result<()> { + self.conditional_op(client, to, None, || async { + client.copy_request(from, to).send().await?; + Ok(()) + }) + .await + } + + #[allow(clippy::future_not_send)] // Generics confound this lint + pub(crate) async fn conditional_op( + &self, + client: &Arc, + to: &Path, + etag: Option<&str>, + op: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + check_precondition(client, to, etag).await?; + + let mut previous_lease = None; + + loop { + let existing = previous_lease.as_ref(); + match self.try_lock(client, to.as_ref(), etag, existing).await? { + TryLockResult::Ok(lease) => { + let expiry = lease.acquire + lease.timeout; + return match tokio::time::timeout_at(expiry.into(), op()).await { + Ok(Ok(v)) => Ok(v), + Ok(Err(e)) => Err(e), + Err(_) => Err(Error::Generic { + store: "DynamoDB", + source: format!( + "Failed to perform conditional operation in {} milliseconds", + self.timeout + ) + .into(), + }), + }; + } + TryLockResult::Conflict(conflict) => { + let mut interval = tokio::time::interval(self.test_interval); + let expiry = conflict.timeout * self.max_clock_skew_rate; + loop { + interval.tick().await; + check_precondition(client, to, etag).await?; + if conflict.acquire.elapsed() > expiry { + previous_lease = Some(conflict); + break; + } + } + } + } + } + } + + /// Attempt to acquire a lock, reclaiming an existing lease if provided + async fn try_lock( + &self, + s3: &S3Client, + path: &str, + etag: Option<&str>, + existing: Option<&Lease>, + ) -> Result { + let attributes; + let (next_gen, condition_expression, expression_attribute_values) = match existing { + None => (0_u64, "attribute_not_exists(#pk)", Map(&[])), + Some(existing) => { + attributes = [(":g", AttributeValue::Number(existing.generation))]; + ( + existing.generation.checked_add(1).unwrap(), + "attribute_exists(#pk) AND generation = :g", + Map(attributes.as_slice()), + ) + } + }; + + let ttl = (Utc::now() + self.ttl).timestamp(); + let items = [ + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), + ("generation", AttributeValue::Number(next_gen)), + ("timeout", AttributeValue::Number(self.timeout)), + ("ttl", AttributeValue::Number(ttl as _)), + ]; + let names = [("#pk", "path")]; + + let req = PutItem { + table_name: &self.table_name, + condition_expression, + expression_attribute_values, + expression_attribute_names: Map(&names), + item: Map(&items), + return_values: None, + return_values_on_condition_check_failure: Some(ReturnValues::AllOld), + }; + + let credential = s3.config.get_credential().await?; + + let acquire = Instant::now(); + match self + .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", req) + .await + { + Ok(_) => Ok(TryLockResult::Ok(Lease { + acquire, + generation: next_gen, + timeout: Duration::from_millis(self.timeout), + })), + Err(e) => match parse_error_response(&e) { + Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) { + Some(lease) => Ok(TryLockResult::Conflict(lease)), + None => Err(Error::Generic { + store: STORE, + source: "Failed to extract lease from conflict ReturnValuesOnConditionCheckFailure response".into() + }), + }, + _ => Err(Error::Generic { + store: STORE, + source: Box::new(e), + }), + }, + } + } + + async fn request( + &self, + s3: &S3Client, + cred: Option<&AwsCredential>, + target: &str, + req: R, + ) -> Result { + let region = &s3.config.region; + let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region)); + + let builder = match &s3.config.endpoint { + Some(e) => s3.client.request(Method::POST, e), + None => { + let url = format!("https://dynamodb.{region}.amazonaws.com"); + s3.client.request(Method::POST, url) + } + }; + + // TODO: Timeout + builder + .json(&req) + .header("X-Amz-Target", target) + .with_aws_sigv4(authorizer, None) + .send_retry(&s3.config.retry_config) + .await + } +} + +#[derive(Debug)] +enum TryLockResult { + /// Successfully acquired a lease + Ok(Lease), + /// An existing lease was found + Conflict(Lease), +} + +/// Validates that `path` has the given `etag` or doesn't exist if `None` +async fn check_precondition(client: &Arc, path: &Path, etag: Option<&str>) -> Result<()> { + let options = GetOptions { + head: true, + ..Default::default() + }; + + match etag { + Some(expected) => match client.get_opts(path, options).await { + Ok(r) => match r.meta.e_tag { + Some(actual) if expected == actual => Ok(()), + actual => Err(Error::Precondition { + path: path.to_string(), + source: format!("{} does not match {expected}", actual.unwrap_or_default()) + .into(), + }), + }, + Err(Error::NotFound { .. }) => Err(Error::Precondition { + path: path.to_string(), + source: format!("Object at location {path} not found").into(), + }), + Err(e) => Err(e), + }, + None => match client.get_opts(path, options).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + }, + } +} + +/// Parses the error response if any +fn parse_error_response(e: &RetryError) -> Option> { + match e.inner() { + RequestError::Status { + status: StatusCode::BAD_REQUEST, + body: Some(b), + } => serde_json::from_str(b).ok(), + _ => None, + } +} + +/// Extracts a lease from `item`, returning `None` on error +fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option { + let generation = match item.get("generation") { + Some(AttributeValue::Number(generation)) => generation, + _ => return None, + }; + + let timeout = match item.get("timeout") { + Some(AttributeValue::Number(timeout)) => *timeout, + _ => return None, + }; + + Some(Lease { + acquire: Instant::now(), + generation: *generation, + timeout: Duration::from_millis(timeout), + }) +} + +/// A lock lease +#[derive(Debug, Clone)] +struct Lease { + acquire: Instant, + generation: u64, + timeout: Duration, +} + +/// A DynamoDB [PutItem] payload +/// +/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct PutItem<'a> { + /// The table name + table_name: &'a str, + + /// A condition that must be satisfied in order for a conditional PutItem operation to succeed. + condition_expression: &'a str, + + /// One or more substitution tokens for attribute names in an expression + expression_attribute_names: Map<'a, &'a str, &'a str>, + + /// One or more values that can be substituted in an expression + expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>, + + /// A map of attribute name/value pairs, one for each attribute + item: Map<'a, &'a str, AttributeValue<'a>>, + + /// Use ReturnValues if you want to get the item attributes as they appeared + /// before they were updated with the PutItem request. + #[serde(skip_serializing_if = "Option::is_none")] + return_values: Option, + + /// An optional parameter that returns the item attributes for a PutItem operation + /// that failed a condition check. + #[serde(skip_serializing_if = "Option::is_none")] + return_values_on_condition_check_failure: Option, +} + +#[derive(Deserialize)] +struct ErrorResponse<'a> { + #[serde(rename = "__type")] + error: &'a str, + + #[serde(borrow, default, rename = "Item")] + item: HashMap<&'a str, AttributeValue<'a>>, +} + +#[derive(Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +enum ReturnValues { + AllOld, +} + +/// A collection of key value pairs +/// +/// This provides cheap, ordered serialization of maps +struct Map<'a, K, V>(&'a [(K, V)]); + +impl Serialize for Map<'_, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.0.is_empty() { + return serializer.serialize_none(); + } + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in self.0 { + map.serialize_entry(k, v)? + } + map.end() + } +} + +/// A DynamoDB [AttributeValue] +/// +/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html +#[derive(Debug, Serialize, Deserialize)] +enum AttributeValue<'a> { + #[serde(rename = "S")] + String(Cow<'a, str>), + #[serde(rename = "N", with = "number")] + Number(u64), +} + +impl<'a> From<&'a str> for AttributeValue<'a> { + fn from(value: &'a str) -> Self { + Self::String(Cow::Borrowed(value)) + } +} + +/// Numbers are serialized as strings +mod number { + use serde::{Deserialize, Deserializer, Serializer}; + + pub(crate) fn serialize(v: &u64, s: S) -> Result { + s.serialize_str(&v.to_string()) + } + + pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + let v: &str = Deserialize::deserialize(d)?; + v.parse().map_err(serde::de::Error::custom) + } +} + +use crate::client::HttpResponse; +/// Re-export integration_test to be called by s3_test +#[cfg(test)] +pub(crate) use tests::integration_test; + +#[cfg(test)] +mod tests { + use super::*; + use crate::aws::AmazonS3; + use crate::ObjectStore; + use rand::distr::Alphanumeric; + use rand::{rng, Rng}; + + #[test] + fn test_attribute_serde() { + let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap(); + assert_eq!(serde, "{\"N\":\"23\"}"); + let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap(); + assert!(matches!(back, AttributeValue::Number(23))); + } + + /// An integration test for DynamoDB + /// + /// This is a function called by s3_test to avoid test concurrency issues + pub(crate) async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) { + let client = &integration.client; + + let src = Path::from("dynamo_path_src"); + integration.put(&src, "asd".into()).await.unwrap(); + + let dst = Path::from("dynamo_path"); + let _ = integration.delete(&dst).await; // Delete if present + + // Create a lock if not already exists + let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { + TryLockResult::Conflict(l) => l, + TryLockResult::Ok(l) => l, + }; + + // Should not be able to acquire a lock again + let r = d.try_lock(client, dst.as_ref(), None, None).await; + assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); + + // But should still be able to reclaim lock and perform copy + d.copy_if_not_exists(client, &src, &dst).await.unwrap(); + + match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { + TryLockResult::Conflict(new) => { + // Should have incremented generation to do so + assert_eq!(new.generation, existing.generation + 1); + } + _ => panic!("Should conflict"), + } + + let rng = rng(); + let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + let t = Some(etag.as_str()); + + let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Ok(l) => l, + _ => panic!("should not conflict"), + }; + + match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation), + _ => panic!("should conflict"), + } + + match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() { + TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1), + _ => panic!("should not conflict"), + } + } +} diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 8dac2bd..4abf374 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -56,6 +56,7 @@ mod builder; mod checksum; mod client; mod credential; +mod dynamo; mod precondition; #[cfg(not(target_arch = "wasm32"))] @@ -63,6 +64,7 @@ mod resolve; pub use builder::{AmazonS3Builder, AmazonS3ConfigKey}; pub use checksum::Checksum; +pub use dynamo::DynamoCommit; pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; #[cfg(not(target_arch = "wasm32"))] @@ -195,6 +197,11 @@ impl ObjectStore for AmazonS3 { r => r, } } + #[allow(deprecated)] + (PutMode::Create, S3ConditionalPut::Dynamo(d)) => { + d.conditional_op(&self.client, location, None, move || request.do_put()) + .await + } (PutMode::Update(v), put) => { let etag = v.e_tag.ok_or_else(|| Error::Generic { store: STORE, @@ -222,6 +229,13 @@ impl ObjectStore for AmazonS3 { r => r, } } + #[allow(deprecated)] + S3ConditionalPut::Dynamo(d) => { + d.conditional_op(&self.client, location, Some(&etag), move || { + request.do_put() + }) + .await + } S3ConditionalPut::Disabled => Err(Error::NotImplemented), } } @@ -355,6 +369,10 @@ impl ObjectStore for AmazonS3 { return res; } + #[allow(deprecated)] + Some(S3CopyIfNotExists::Dynamo(lock)) => { + return lock.copy_if_not_exists(&self.client, from, to).await + } None => { return Err(Error::NotSupported { source: "S3 does not support copy-if-not-exists".to_string().into(), @@ -622,6 +640,12 @@ mod tests { let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256); let integration = builder.build().unwrap(); put_get_delete_list(&integration).await; + + match &integration.client.config.copy_if_not_exists { + #[allow(deprecated)] + Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await, + _ => eprintln!("Skipping dynamo integration test - dynamo not configured"), + }; } #[tokio::test] diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs index 52ecb9f..2f11e4f 100644 --- a/src/aws/precondition.rs +++ b/src/aws/precondition.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::aws::dynamo::DynamoCommit; use crate::config::Parse; use itertools::Itertools; @@ -60,6 +61,16 @@ pub enum S3CopyIfNotExists { /// /// Encoded as `multipart` ignoring whitespace. Multipart, + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + #[deprecated(note = "Use S3CopyIfNotExists::Multipart")] + Dynamo(DynamoCommit), } impl std::fmt::Display for S3CopyIfNotExists { @@ -70,6 +81,8 @@ impl std::fmt::Display for S3CopyIfNotExists { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } Self::Multipart => f.write_str("multipart"), + #[allow(deprecated)] + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } } @@ -97,6 +110,8 @@ impl S3CopyIfNotExists { code, )) } + #[allow(deprecated)] + "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)), _ => None, } } @@ -127,6 +142,17 @@ pub enum S3ConditionalPut { #[default] ETagMatch, + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + #[deprecated(note = "Use S3ConditionalPut::ETagMatch")] + Dynamo(DynamoCommit), + /// Disable `conditional put` Disabled, } @@ -135,6 +161,8 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + #[allow(deprecated)] + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), Self::Disabled => write!(f, "disabled"), } } @@ -145,7 +173,11 @@ impl S3ConditionalPut { match s.trim() { "etag" => Some(Self::ETagMatch), "disabled" => Some(Self::Disabled), - _ => None, + trimmed => match trimmed.split_once(':')? { + #[allow(deprecated)] + ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), + _ => None, + }, } } } @@ -162,6 +194,7 @@ impl Parse for S3ConditionalPut { #[cfg(test)] mod tests { use super::S3CopyIfNotExists; + use crate::aws::{DynamoCommit, S3ConditionalPut}; #[test] fn parse_s3_copy_if_not_exists_header() { @@ -186,6 +219,26 @@ mod tests { assert_eq!(expected, S3CopyIfNotExists::from_str(input)); } + #[test] + #[allow(deprecated)] + fn parse_s3_copy_if_not_exists_dynamo() { + let input = "dynamo: table:100"; + let expected = Some(S3CopyIfNotExists::Dynamo( + DynamoCommit::new("table".into()).with_timeout(100), + )); + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + #[allow(deprecated)] + fn parse_s3_condition_put_dynamo() { + let input = "dynamo: table:1300"; + let expected = Some(S3ConditionalPut::Dynamo( + DynamoCommit::new("table".into()).with_timeout(1300), + )); + assert_eq!(expected, S3ConditionalPut::from_str(input)); + } + #[test] fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { let expected = Some(S3CopyIfNotExists::Header( diff --git a/src/client/builder.rs b/src/client/builder.rs index f74c5ec..257cb57 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -165,7 +165,7 @@ impl HttpRequestBuilder { self } - #[cfg(feature = "gcp")] + #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) fn json(mut self, s: S) -> Self { match (serde_json::to_vec(&s), &mut self.request) { (Ok(json), Ok(request)) => { diff --git a/src/integration.rs b/src/integration.rs index 988d8d4..99ee86d 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -34,6 +34,7 @@ use crate::{ use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; +use rand::distr::Alphanumeric; use rand::{rng, Rng}; use std::collections::HashSet; use std::slice; @@ -629,8 +630,15 @@ pub async fn get_opts(storage: &dyn ObjectStore) { /// Tests conditional writes pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB + // As a result each conditional operation will need to wait for the lease to timeout before proceeding + // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code + // so we instead just generate a random suffix for the filenames + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + delete_fixtures(storage).await; - let path = Path::from("put_opts"); + let path = Path::from(format!("put_opts_{suffix}")); let v1 = storage .put_opts(&path, "a".into(), PutMode::Create.into()) .await @@ -688,7 +696,7 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { const NUM_WORKERS: usize = 5; const NUM_INCREMENTS: usize = 10; - let path = Path::from("RACE"); + let path = Path::from(format!("RACE-{suffix}")); let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) .map(|_| async { for _ in 0..NUM_INCREMENTS {