Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'admin update table' command to call UpdateTable API for existing tables. Only mode and CU for now. #2

Merged
merged 1 commit into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ FLAGS: ...
OPTIONS: ...

SUBCOMMANDS:
create Create new DynamoDB table or GSI
delete Delete a DynamoDB table or GSI
create Create new DynamoDB table or GSI. [API: CreateTable, UpdateTable]
delete Delete a DynamoDB table or GSI. [API: DeleteTable]
desc Show detailed information of a table. [API: DescribeTable]
help Prints this message or the help of the given subcommand(s)
list List tables in the region. [API: ListTables]
update Update a DynamoDB table. [API: UpdateTable etc]
```

By executing following command, you can create a DynamoDB table.
Expand Down Expand Up @@ -300,7 +301,7 @@ size_bytes: 0
created_at: "2020-03-03T13:34:43+00:00"
```

After the table get ready (= `ACTIVE` status), you can write-to and read-from the table.
After the table get ready (i.e. `status: CREATING` changed to `ACTIVE`), you can write-to and read-from the table.

```
$ dy use app_users
Expand Down Expand Up @@ -332,7 +333,7 @@ myapp 1234 {"rank":99}
Similarly you can update tables with dynein.

```
(currently not available) $ dy admin update table --mode provisioned --wcu 10 --rcu 25
$ dy admin update table app_users --mode provisioned --wcu 10 --rcu 25
```


Expand Down Expand Up @@ -814,7 +815,6 @@ $ RUST_LOG=debug RUST_BACKTRACE=1 dy scan --table your_table

## Ideas for future works

- `dy admin update table` command
- `dy admin plan` & `dy admin apply` commands to manage tables through CloudFormation.
- These subcommand names are inspired by [HashiCorp's Terraform](https://www.terraform.io/).
- Linux's `top` -like experience to monitor table status. e.g. `dy top tables`
Expand All @@ -828,3 +828,4 @@ $ RUST_LOG=debug RUST_BACKTRACE=1 dy scan --table your_table
- Support Transaction APIs (TransactGetItems, TransactWriteItems)
- simple load testing. e.g. `dy load --tps 100`
- import/export tool supports LTSV, TSV
- PITR configuration enable/disable (UpdateContinuousBackups) and exporting/restoring tables ([ExportTableToPointInTime](https://aws.amazon.com/blogs/aws/new-export-amazon-dynamodb-table-data-to-data-lake-amazon-s3/), RestoreTableToPointInTime)
46 changes: 42 additions & 4 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,21 @@ pub enum AdminSub {
output: Option<String>,
},

/// Create new DynamoDB table or GSI.
/// Create new DynamoDB table or GSI. [API: CreateTable, UpdateTable]
#[structopt()]
Create {
#[structopt(subcommand)]
target_type: CreateSub,
},

/// Delete a DynamoDB table or GSI.
/// Update a DynamoDB table. [API: UpdateTable etc]
#[structopt()]
Update {
#[structopt(subcommand)]
target_type: UpdateSub,
},

/// Delete a DynamoDB table or GSI. [API: DeleteTable]
#[structopt()]
Delete {
#[structopt(subcommand)]
Expand Down Expand Up @@ -415,7 +422,7 @@ pub enum AdminSub {
#[derive(StructOpt, Debug, Serialize, Deserialize)]
pub enum CreateSub {

/// Create new DynamoDB table with given primary key(s).
/// Create new DynamoDB table with given primary key(s). [API: CreateTable]
#[structopt()]
Table {
/// table name to create
Expand All @@ -427,7 +434,7 @@ pub enum CreateSub {
keys: Vec<String>,
},

/// Create new GSI (global secondary index) for a table with given primary key(s).
/// Create new GSI (global secondary index) for a table with given primary key(s). [API: UpdateTable]
#[structopt()]
Index {
/// index name to create
Expand All @@ -441,6 +448,37 @@ pub enum CreateSub {
}


#[derive(StructOpt, Debug, Serialize, Deserialize)]
pub enum UpdateSub {

/// Update a DynamoDB table.
#[structopt()]
Table {
/// table name to update
table_name_to_update: String,

/// DynamoDB capacity mode. Availablle values: [provisioned, ondemand].
/// When you switch from OnDemand to Provisioned mode, you can pass WCU and RCU as well (NOTE: default capacity unit for Provisioned mode is 5).
#[structopt(short, long, possible_values = &["provisioned", "ondemand"])]
mode: Option<String>,

/// WCU (write capacity units) for the table. Acceptable only on Provisioned mode.
#[structopt(long)]
wcu: Option<i64>,

/// RCU (read capacity units) for the table. Acceptable only on Provisioned mode.
#[structopt(long)]
rcu: Option<i64>,

// TODO: support following parameters
// - sse_enabled: bool, (default false) ... UpdateTable API
// - stream_enabled: bool, (default false) ... UpdateTable API
// - ttl_enabled: bool, UpdateTimeToLive API
// - pitr_enabled: bool, UpdateContinuousBackups API (PITR)
},
}


#[derive(StructOpt, Debug, Serialize, Deserialize)]
pub enum DeleteSub {

Expand Down
153 changes: 131 additions & 22 deletions src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct PrintDescribeTable {
created_at: String,
}

const PROVISIONED_API_SPEC: &'static str = "PROVISIONED";
const ONDEMAND_API_SPEC: &'static str = "PAY_PER_REQUEST";

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
Expand Down Expand Up @@ -229,7 +230,7 @@ pub async fn create_table_api(cx: app::Context, name: String, given_keys: Vec<St
..Default::default()
};

return ddb.create_table(req).await.map(|res| res.table_description.unwrap());
return ddb.create_table(req).await.map(|res| res.table_description.expect("Table Description returned from API should be valid."));
}


Expand Down Expand Up @@ -275,6 +276,100 @@ pub async fn create_index(cx: app::Context, index_name: String, given_keys: Vec<
}


pub async fn update_table(cx: app::Context, table_name_to_update: String,
mode_string: Option<String>, wcu: Option<i64>, rcu: Option<i64>) {
// Retrieve TableDescription of the table to update, current (before update) status.
let desc: TableDescription = app::describe_table_api(&cx.effective_region(), table_name_to_update.clone()).await;

// Map given string into "Mode" enum. Note that in cmd.rs structopt already limits acceptable values.
let switching_to_mode: Option<Mode> = match mode_string {
None => None,
Some(ms) => match ms.as_str() {
"provisioned" => Some(Mode::Provisioned),
"ondemand" => Some(Mode::OnDemand),
_ => panic!("You shouldn't see this message as --mode can takes only 'provisioned' or 'ondemand'."),
},
};

// Configure ProvisionedThroughput struct based on argumsnts (mode/wcu/rcu).
let provisioned_throughput: Option<ProvisionedThroughput> = match &switching_to_mode {
// when --mode is not given, no mode switch happens. Check the table's current mode.
None => {
match extract_mode(&desc.clone().billing_mode_summary) {
// When currently OnDemand mode and you're not going to change the it, set None for CU.
Mode::OnDemand => {
if wcu.is_some() || rcu.is_some() { println!("Ignoring --rcu/--wcu options as the table mode is OnDemand."); };
None
},
// When currently Provisioned mode and you're not going to change the it,
// pass given rcu/wcu, and use current values if missing. Provisioned table should have valid capacity units so unwrap() here.
Mode::Provisioned => Some(ProvisionedThroughput {
read_capacity_units: rcu.unwrap_or(desc.clone().provisioned_throughput.unwrap().read_capacity_units.unwrap()),
write_capacity_units: wcu.unwrap_or(desc.clone().provisioned_throughput.unwrap().write_capacity_units.unwrap()),
}),
}
},
// When the user trying to switch mode.
Some(target_mode) => match target_mode {
// when switching Provisioned->OnDemand mode, ProvisionedThroughput can be None.
Mode::OnDemand => {
if wcu.is_some() || rcu.is_some() { println!("Ignoring --rcu/--wcu options as --mode ondemand."); };
None
},
// when switching OnDemand->Provisioned mode, set given wcu/rcu, fill with "5" as a default if not given.
Mode::Provisioned => Some(ProvisionedThroughput {
read_capacity_units: rcu.unwrap_or(5),
write_capacity_units: wcu.unwrap_or(5),
}),
},
};

// TODO: support updating CU of the table with GSI. If the table has GSIs, you must specify CU for them at the same time.
// error message: One or more parameter values were invalid: ProvisionedThroughput must be specified for index: xyz_index,abc_index2
// if table has gsi
// build GlobalSecondaryIndexUpdates { [... current values ...] }

match update_table_api(cx.clone(), table_name_to_update, switching_to_mode, provisioned_throughput).await {
Ok(desc) => print_table_description(cx.effective_region(), desc),
Err(e) => {
debug!("UpdateTable API call got an error -- {:#?}", e);
error!("{}", e.to_string());
std::process::exit(1);
},
}
}


/// UpdateTable API accepts following parameters (ref: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateTable.html):
/// * [x] TableName (required)
/// * [x] BillingMode
/// * [x] ProvisionedThroughput > obj
/// * [-] AttributeDefinitions > array of AttributeDefinition obj
/// * [-] GlobalSecondaryIndexUpdates > Create/Update/Delete and details of the update on GSIs
/// * [-] ReplicaUpdates > Create/Update/Delete and details of the update on Global Tbles replicas
/// * [] SSESpecification > obj
/// * [] StreamSpecification > obj
/// [+] = supported, [-] = implemented (or plan to so) in another location, [] = not yet supported
/// Especially note that you should explicitly pass GSI update parameter to make any change on GSI.
async fn update_table_api(cx: app::Context, table_name_to_update: String, switching_to_mode: Option<Mode>, provisioned_throughput: Option<ProvisionedThroughput>)
-> Result<TableDescription, rusoto_core::RusotoError<rusoto_dynamodb::UpdateTableError>> {
debug!("Trying to update the table '{}'.", &table_name_to_update);

let ddb = DynamoDbClient::new(cx.effective_region());

let req: UpdateTableInput = UpdateTableInput {
table_name: table_name_to_update,
billing_mode: switching_to_mode.map(|m| mode_to_billing_mode_api_spec(m)),
provisioned_throughput: provisioned_throughput,
// NOTE: In this function we set `global_secondary_index_updates` to None. GSI update is handled in different commands (e.g. dy admin create index xxx --keys)
global_secondary_index_updates: None /* intentional */,
..Default::default()
};

return ddb.update_table(req).await.map(|res| res.table_description.expect("Table Description returned from API should be valid."))
}


pub async fn delete_table(cx: app::Context, name: String, skip_confirmation: bool) {
debug!("Trying to delete a table '{}'", &name);

Expand All @@ -295,7 +390,7 @@ pub async fn delete_table(cx: app::Context, name: String, skip_confirmation: boo
},
Ok(res) => {
debug!("Returned result: {:#?}", res);
println!("DynamoDB table '{}' has been deleted successfully.", res.table_description.unwrap().table_name.unwrap());
println!("Delete operation for the table '{}' has been started.", res.table_description.unwrap().table_name.unwrap());
}
}
}
Expand Down Expand Up @@ -359,16 +454,6 @@ pub async fn list_backups(cx: app::Context, all_tables: bool) -> Result<(), IOEr
}


fn fetch_arn_from_backup_name(backup_name: String, available_backups: Vec<BackupSummary>) -> String {
available_backups.into_iter().find(|b|
b.to_owned().backup_name.unwrap() == backup_name
) /* Option<BackupSummary */
.unwrap() /* BackupSummary */
.backup_arn /* Option<String> */
.unwrap()
}


/// This function restores DynamoDB table from specified backup data.
/// If you don't specify backup data (name) explicitly, dynein will list backups and you can select out of them.
/// Currently overwriting properties during rstore is not supported.
Expand Down Expand Up @@ -445,6 +530,22 @@ pub async fn restore(cx: app::Context, backup_name: Option<String>, restore_name
}


/// Map "BilingModeSummary" field in table description returned from DynamoDB API,
/// into convenient mode name ("Provisioned" or "OnDemand")
pub fn extract_mode(bs: &Option<BillingModeSummary>) -> Mode {
let provisioned_mode = Mode::Provisioned;
let ondemand_mode = Mode::OnDemand;
match bs {
// if BillingModeSummary field doesn't exist, the table is Provisioned Mode.
None => provisioned_mode,
Some(x) => {
if x.clone().billing_mode.unwrap() == ONDEMAND_API_SPEC { ondemand_mode }
else { provisioned_mode }
},
}
}


/* =================================================
Private functions
================================================= */
Expand Down Expand Up @@ -517,24 +618,32 @@ async fn list_backups_api(cx: &app::Context, all_tables: bool) -> Vec<BackupSumm
}


fn fetch_arn_from_backup_name(backup_name: String, available_backups: Vec<BackupSummary>) -> String {
available_backups.into_iter().find(|b|
b.to_owned().backup_name.unwrap() == backup_name
) /* Option<BackupSummary */
.unwrap() /* BackupSummary */
.backup_arn /* Option<String> */
.unwrap()
}


fn epoch_to_rfc3339(epoch: f64) -> String {
let utc_datetime = NaiveDateTime::from_timestamp(epoch as i64, 0);
return DateTime::<Utc>::from_utc(utc_datetime, Utc).to_rfc3339();
}

pub fn extract_mode(bs: &Option<BillingModeSummary>) -> Mode {
let provisioned_mode = Mode::Provisioned;
let ondemand_mode = Mode::OnDemand;
match bs {
// if BillingModeSummary field doesn't exist, the table is Provisioned Mode.
None => provisioned_mode,
Some(x) => {
if x.clone().billing_mode.unwrap() == ONDEMAND_API_SPEC { ondemand_mode }
else { provisioned_mode }
},

/// Takes "Mode" enum and return exact string value required by DynamoDB API.
/// i.e. this function returns "PROVISIONED" or "PAY_PER_REQUEST".
fn mode_to_billing_mode_api_spec(mode: Mode) -> String {
match mode {
Mode::OnDemand => String::from(ONDEMAND_API_SPEC),
Mode::Provisioned => String::from(PROVISIONED_API_SPEC),
}
}


fn extract_capacity(mode: &Mode, cap_desc: &Option<ProvisionedThroughputDescription>)
-> Option<PrintCapacityUnits> {
if mode == &Mode::OnDemand { return None }
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
if all_tables { control::describe_all_tables(context).await }
else { control::describe_table(context).await }
},
// cmd::AdminSub::Apply { } => cfn::apply(context).await?,
cmd::AdminSub::Create { target_type } => match target_type {
cmd::CreateSub::Table { new_table_name, keys } => control::create_table(context, new_table_name, keys).await,
cmd::CreateSub::Index { index_name, keys } => control::create_index(context, index_name, keys).await,
},
cmd::AdminSub::Update { target_type } => match target_type {
cmd::UpdateSub::Table { table_name_to_update, mode, wcu, rcu } => control::update_table(context, table_name_to_update, mode, wcu, rcu).await,
},
cmd::AdminSub::Delete { target_type } => match target_type {
cmd::DeleteSub::Table { table_name_to_delete, yes } => control::delete_table(context, table_name_to_delete, yes).await,
},
Expand Down