Skip to content

Commit

Permalink
DynamoDB - add waiter for secondary indexes (#1866)
Browse files Browse the repository at this point in the history
DynamoDB - add waiter for secondary indexes

SUMMARY
Integration tests keep failing due to concurrency issues (the service doesn't like making updates while index updates are in progress)
Adds a waiter for the indexes
ISSUE TYPE

Feature Pull Request

COMPONENT NAME
dynamodb_table
ADDITIONAL INFORMATION

Reviewed-by: Mark Chappell
Reviewed-by: Alina Buzachis
  • Loading branch information
tremble authored Jul 4, 2023
1 parent 5f96421 commit 6fdbd75
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 44 deletions.
3 changes: 3 additions & 0 deletions changelogs/fragments/20230702-dynamodb_waiter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
minor_changes:
- dynamodb_table - added waiter when updating indexes to avoid concurrency issues (https://github.com/ansible-collections/community.aws/pull/1866).
- dynamodb_table - increased default timeout based on time to update indexes in CI (https://github.com/ansible-collections/community.aws/pull/1866).
120 changes: 120 additions & 0 deletions plugins/module_utils/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-

# Copyright: Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

try:
import botocore
except ImportError:
pass # Handled by AnsibleAWSModule

from ansible_collections.community.aws.plugins.module_utils.base import BaseWaiterFactory


class DynamodbWaiterFactory(BaseWaiterFactory):
def __init__(self, module):
# the AWSRetry wrapper doesn't support the wait functions (there's no
# public call we can cleanly wrap)
client = module.client("dynamodb")
super().__init__(module, client)

@property
def _waiter_model_data(self):
data = super()._waiter_model_data
ddb_data = dict(
table_exists=dict(
operation="DescribeTable",
delay=20,
maxAttempts=25,
acceptors=[
dict(expected="ACTIVE", matcher="path", state="success", argument="Table.TableStatus"),
dict(expected="ResourceNotFoundException", matcher="error", state="retry"),
],
),
table_not_exists=dict(
operation="DescribeTable",
delay=20,
maxAttempts=25,
acceptors=[
dict(expected="ResourceNotFoundException", matcher="error", state="success"),
],
),
global_indexes_active=dict(
operation="DescribeTable",
delay=20,
maxAttempts=25,
acceptors=[
dict(expected="ResourceNotFoundException", matcher="error", state="failure"),
# If there are no secondary indexes, simply return
dict(
expected=False,
matcher="path",
state="success",
argument="contains(keys(Table), `GlobalSecondaryIndexes`)",
),
dict(
expected="ACTIVE",
matcher="pathAll",
state="success",
argument="Table.GlobalSecondaryIndexes[].IndexStatus",
),
dict(
expected="CREATING",
matcher="pathAny",
state="retry",
argument="Table.GlobalSecondaryIndexes[].IndexStatus",
),
dict(
expected="UPDATING",
matcher="pathAny",
state="retry",
argument="Table.GlobalSecondaryIndexes[].IndexStatus",
),
dict(
expected="DELETING",
matcher="pathAny",
state="retry",
argument="Table.GlobalSecondaryIndexes[].IndexStatus",
),
dict(
expected=True,
matcher="path",
state="success",
argument="length(Table.GlobalSecondaryIndexes) == `0`",
),
],
),
)
data.update(ddb_data)
return data


def _do_wait(module, waiter_name, action_description, wait_timeout, table_name):
delay = min(wait_timeout, 5)
max_attempts = wait_timeout // delay

try:
waiter = DynamodbWaiterFactory(module).get_waiter(waiter_name)
waiter.wait(
WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts},
TableName=table_name,
)
except botocore.exceptions.WaiterError as e:
module.fail_json_aws(e, msg=f"Timeout while waiting for {action_description}")
except (
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg=f"Failed while waiting for {action_description}")


def wait_table_exists(module, wait_timeout, table_name):
_do_wait(module, "table_exists", "table creation", wait_timeout, table_name)


def wait_table_not_exists(module, wait_timeout, table_name):
_do_wait(module, "table_not_exists", "table deletion", wait_timeout, table_name)


def wait_indexes_active(module, wait_timeout, table_name):
_do_wait(module, "global_indexes_active", "secondary index updates", wait_timeout, table_name)
72 changes: 28 additions & 44 deletions plugins/modules/dynamodb_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@
wait_timeout:
description:
- How long (in seconds) to wait for creation / update / deletion to complete.
- AWS only allows secondary indexies to be updated one at a time, this module will automatically update them
in serial, and the timeout will be separately applied for each index.
aliases: ['wait_for_active_timeout']
default: 300
default: 900
type: int
wait:
description:
- When I(wait=True) the module will wait for up to I(wait_timeout) seconds
for table creation or deletion to complete before returning.
for index updates, table creation or deletion to complete before returning.
default: True
type: bool
extends_documentation_fragment:
Expand Down Expand Up @@ -256,9 +258,11 @@
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict
from ansible_collections.amazon.aws.plugins.module_utils.tagging import compare_aws_tags

from ansible_collections.community.aws.plugins.module_utils.dynamodb import wait_indexes_active
from ansible_collections.community.aws.plugins.module_utils.dynamodb import wait_table_exists
from ansible_collections.community.aws.plugins.module_utils.dynamodb import wait_table_not_exists
from ansible_collections.community.aws.plugins.module_utils.modules import AnsibleCommunityAWSModule as AnsibleAWSModule


DYNAMO_TYPE_DEFAULT = "STRING"
INDEX_REQUIRED_OPTIONS = ["name", "type", "hash_key_name"]
INDEX_OPTIONS = INDEX_REQUIRED_OPTIONS + [
Expand All @@ -283,7 +287,7 @@
retries=45,
delay=5,
max_delay=30,
catch_extra_error_codes=["LimitExceededException", "ResourceInUseException", "ResourceNotFoundException"],
catch_extra_error_codes=["ResourceInUseException", "ResourceNotFoundException"],
)
def _update_table_with_long_retry(**changes):
return client.update_table(TableName=module.params.get("name"), **changes)
Expand All @@ -296,47 +300,27 @@ def _describe_table(**params):


def wait_exists():
table_name = module.params.get("name")
wait_timeout = module.params.get("wait_timeout")

delay = min(wait_timeout, 5)
max_attempts = wait_timeout // delay

try:
waiter = client.get_waiter("table_exists")
waiter.wait(
WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts},
TableName=table_name,
)
except botocore.exceptions.WaiterError as e:
module.fail_json_aws(e, msg="Timeout while waiting on table creation")
except (
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Failed while waiting on table creation")
wait_table_exists(
module,
module.params.get("wait_timeout"),
module.params.get("name"),
)


def wait_not_exists():
table_name = module.params.get("name")
wait_timeout = module.params.get("wait_timeout")
wait_table_not_exists(
module,
module.params.get("wait_timeout"),
module.params.get("name"),
)

delay = min(wait_timeout, 5)
max_attempts = wait_timeout // delay

try:
waiter = client.get_waiter("table_not_exists")
waiter.wait(
WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts},
TableName=table_name,
)
except botocore.exceptions.WaiterError as e:
module.fail_json_aws(e, msg="Timeout while waiting on table deletion")
except (
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Failed while waiting on table deletion")
def wait_indexes():
wait_indexes_active(
module,
module.params.get("wait_timeout"),
module.params.get("name"),
)


def _short_type_to_long(short_key):
Expand Down Expand Up @@ -858,6 +842,7 @@ def _update_table(current_table):

if additional_global_index_changes:
for index in additional_global_index_changes:
wait_indexes()
try:
_update_table_with_long_retry(
GlobalSecondaryIndexUpdates=[index], AttributeDefinitions=changes["AttributeDefinitions"]
Expand All @@ -870,9 +855,6 @@ def _update_table(current_table):
additional_global_index_changes=additional_global_index_changes,
)

if module.params.get("wait"):
wait_exists()

return True


Expand Down Expand Up @@ -927,6 +909,7 @@ def update_table(current_table):

if module.params.get("wait"):
wait_exists()
wait_indexes()

return changed

Expand Down Expand Up @@ -983,6 +966,7 @@ def create_table():

if module.params.get("wait"):
wait_exists()
wait_indexes()

return True

Expand Down Expand Up @@ -1058,7 +1042,7 @@ def main():
tags=dict(type="dict", aliases=["resource_tags"]),
purge_tags=dict(type="bool", default=True),
wait=dict(type="bool", default=True),
wait_timeout=dict(default=300, type="int", aliases=["wait_for_active_timeout"]),
wait_timeout=dict(default=900, type="int", aliases=["wait_for_active_timeout"]),
)

module = AnsibleAWSModule(
Expand Down

0 comments on commit 6fdbd75

Please sign in to comment.