Skip to content

Commit

Permalink
Merge pull request #3593 from jdisanti/fix-dynamo-example
Browse files Browse the repository at this point in the history
Simplify waiting logic in Rust DynamoDB examples
  • Loading branch information
scmacdon authored Sep 14, 2022
2 parents e91e7d9 + 8dc7f71 commit 248b6ff
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 148 deletions.
97 changes: 21 additions & 76 deletions rust_dev_preview/dynamodb/src/bin/crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,13 @@
*/

use aws_config::meta::region::RegionProviderChain;
use aws_http::retry::AwsErrorRetryPolicy;
use aws_sdk_dynamodb::error::DescribeTableError;
use aws_sdk_dynamodb::input::DescribeTableInput;
use aws_sdk_dynamodb::middleware::DefaultMiddleware;
use aws_sdk_dynamodb::model::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ProvisionedThroughput,
ScalarAttributeType, Select, TableStatus,
};
use aws_sdk_dynamodb::operation::DescribeTable;
use aws_sdk_dynamodb::output::DescribeTableOutput;
use aws_sdk_dynamodb::{Client, Config, Error, Region, PKG_VERSION};
use aws_smithy_client::erase::DynConnector;
use aws_smithy_http::result::{SdkError, SdkSuccess};

use aws_smithy_http::operation::Operation;
use aws_smithy_http::retry::ClassifyResponse;
use aws_smithy_types::retry::RetryKind;
use aws_sdk_dynamodb::{Client, Error, Region, PKG_VERSION};
use aws_smithy_http::result::SdkError;

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::io::{stdin, Read};
Expand Down Expand Up @@ -195,46 +185,6 @@ async fn remove_table(client: &Client, table: &str) -> Result<(), Error> {
}
// snippet-end:[dynamodb.rust.crud-remove_table]

/// Hand-written waiter to retry every second until the table is out of `Creating` state
#[derive(Clone)]
struct WaitForReadyTable<R> {
inner: R,
}

impl<R> ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>
for WaitForReadyTable<R>
where
R: ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>,
{
fn classify(
&self,
response: Result<&SdkSuccess<DescribeTableOutput>, &SdkError<DescribeTableError>>,
) -> RetryKind {
match self.inner.classify(response) {
RetryKind::UnretryableFailure | RetryKind::Unnecessary => (),
other => return other,
};
match response {
Ok(SdkSuccess { parsed, .. }) => {
if parsed
.table
.as_ref()
.unwrap()
.table_status
.as_ref()
.unwrap()
== &TableStatus::Creating
{
RetryKind::Explicit(Duration::from_secs(1))
} else {
RetryKind::Unnecessary
}
}
_ => RetryKind::UnretryableFailure,
}
}
}

/// Wait for the user to press Enter.
fn pause() {
println!("Press Enter to continue.");
Expand Down Expand Up @@ -317,12 +267,7 @@ async fn main() -> Result<(), Error> {

println!("Waiting for table to be ready.");

let raw_client = aws_smithy_client::Client::<DynConnector, DefaultMiddleware>::dyn_https();

raw_client
.call(wait_for_ready_table(&table, client.conf()).await)
.await
.expect("table should become ready.");
wait_for_ready_table(&client, &table).await?;

println!("Table is now ready to use.");

Expand Down Expand Up @@ -387,21 +332,21 @@ async fn main() -> Result<(), Error> {
Ok(())
}

/// Construct a `DescribeTable` request with a policy to retry every second until the table
/// is ready
async fn wait_for_ready_table(
table_name: &str,
conf: &Config,
) -> Operation<DescribeTable, WaitForReadyTable<AwsErrorRetryPolicy>> {
let operation = DescribeTableInput::builder()
.table_name(table_name)
.build()
.expect("valid input")
.make_operation(conf)
.await
.expect("valid operation");
let waiting_policy = WaitForReadyTable {
inner: operation.retry_policy().clone(),
};
operation.with_retry_policy(waiting_policy)
/// Poll the DescribeTable operation once per second until the table exists.
async fn wait_for_ready_table(client: &Client, table_name: &str) -> Result<(), Error> {
loop {
if let Some(table) = client
.describe_table()
.table_name(table_name)
.send()
.await?
.table()
{
if !matches!(table.table_status, Some(TableStatus::Creating)) {
break;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
88 changes: 16 additions & 72 deletions rust_dev_preview/dynamodb/src/bin/movies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,12 @@ use std::collections::HashMap;
use std::time::Duration;

use aws_config::meta::region::RegionProviderChain;
use aws_http::retry::AwsErrorRetryPolicy;
use aws_sdk_dynamodb::client::fluent_builders::Query;
use aws_sdk_dynamodb::error::DescribeTableError;
use aws_sdk_dynamodb::input::DescribeTableInput;
use aws_sdk_dynamodb::middleware::DefaultMiddleware;
use aws_sdk_dynamodb::model::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ProvisionedThroughput,
ScalarAttributeType, TableStatus,
};
use aws_sdk_dynamodb::operation::DescribeTable;
use aws_sdk_dynamodb::output::DescribeTableOutput;
use aws_sdk_dynamodb::{Client, Config, Error, Region, PKG_VERSION};
use aws_smithy_client::erase::DynConnector;
use aws_smithy_http::operation::Operation;
use aws_smithy_http::result::{SdkError, SdkSuccess};
use aws_smithy_http::retry::ClassifyResponse;
use aws_smithy_types::retry::RetryKind;
use aws_sdk_dynamodb::{Client, Error, Region, PKG_VERSION};
use serde_json::Value;
use structopt::StructOpt;

Expand Down Expand Up @@ -79,8 +68,6 @@ async fn main() -> Result<(), Error> {

let client = Client::new(&shared_config);

let raw_client = aws_smithy_client::Client::<DynConnector, DefaultMiddleware>::dyn_https();

let table_exists = does_table_exist(&client, &table).await?;

if !table_exists {
Expand All @@ -92,10 +79,7 @@ async fn main() -> Result<(), Error> {
.expect("failed to create table");
}

raw_client
.call(wait_for_ready_table(&table.to_string(), client.conf()).await)
.await
.expect("table should become ready");
wait_for_ready_table(&client, &table.to_string()).await?;

// data.json contains 2 movies from 2013
let data = match serde_json::from_str(include_str!("data.json")).expect("should be valid JSON")
Expand Down Expand Up @@ -259,61 +243,21 @@ async fn delete_table(client: &Client, table: &str) -> Result<(), Error> {
}
// snippet-end:[dynamodb.rust.movies-delete_table]

/// Hand-written waiter to retry every second until the table is out of `Creating` state
#[derive(Clone)]
struct WaitForReadyTable<R> {
inner: R,
}

impl<R> ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>
for WaitForReadyTable<R>
where
R: ClassifyResponse<SdkSuccess<DescribeTableOutput>, SdkError<DescribeTableError>>,
{
fn classify(
&self,
response: Result<&SdkSuccess<DescribeTableOutput>, &SdkError<DescribeTableError>>,
) -> RetryKind {
match self.inner.classify(response) {
RetryKind::UnretryableFailure | RetryKind::Unnecessary => (),
other => return other,
};
match response {
Ok(SdkSuccess { parsed, .. }) => {
if parsed
.table
.as_ref()
.unwrap()
.table_status
.as_ref()
.unwrap()
== &TableStatus::Creating
{
RetryKind::Explicit(Duration::from_secs(1))
} else {
RetryKind::Unnecessary
}
/// Poll the DescribeTable operation once per second until the table exists.
async fn wait_for_ready_table(client: &Client, table_name: &str) -> Result<(), Error> {
loop {
if let Some(table) = client
.describe_table()
.table_name(table_name)
.send()
.await?
.table()
{
if !matches!(table.table_status, Some(TableStatus::Creating)) {
break;
}
_ => RetryKind::UnretryableFailure,
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

/// Construct a `DescribeTable` request with a policy to retry every second until the table
/// is ready
async fn wait_for_ready_table(
table_name: &str,
conf: &Config,
) -> Operation<DescribeTable, WaitForReadyTable<AwsErrorRetryPolicy>> {
let operation = DescribeTableInput::builder()
.table_name(table_name)
.build()
.expect("valid input")
.make_operation(conf)
.await
.expect("valid operation");
let waiting_policy = WaitForReadyTable {
inner: operation.retry_policy().clone(),
};
operation.with_retry_policy(waiting_policy)
Ok(())
}

0 comments on commit 248b6ff

Please sign in to comment.