Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6645c6a
Add incremental export and cross account export functionality
Aug 6, 2024
9d3f188
replacements of right single quotation with straight one
Aug 6, 2024
3ff5ba5
convert str.format into f-strings
Aug 7, 2024
dc9b48a
Add default value to s3_bucket_owner and update comments
Aug 7, 2024
9fa988a
update tests
Aug 7, 2024
56b5c46
update readme and documentation
Aug 7, 2024
35d40a9
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 7, 2024
078b7d8
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 9, 2024
b05690c
revert f-string changes to the original state
Aug 11, 2024
493f507
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 11, 2024
76b5522
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 11, 2024
27436c4
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 11, 2024
7dc0b6a
fix doc build error for DynamoDBToS3 Operator point-in-time export
Aug 11, 2024
6b7ce52
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 12, 2024
cd8fca0
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 12, 2024
1082149
fix task id to match the operator logic
Aug 12, 2024
33c203a
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 12, 2024
9d74345
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 12, 2024
0cea15d
Add backwards compatbility to old DynamoDBToS3 Operator using only ex…
Aug 12, 2024
aa8326b
add export_table_to_point_in_time_kwargs to DynamoDBToS3 Operator
Aug 12, 2024
747be27
update ruff format changes
Aug 13, 2024
d04d1b8
remove `s3_bucket_owner` as it can be provided as part of `export_tab…
Aug 13, 2024
e269e65
using airflow helper function to clean up None values from `export_ta…
Aug 13, 2024
f7b5e0f
update `file_size` with a default value of 1000
Aug 13, 2024
0a195aa
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 13, 2024
197fb43
ruff linting: move import to appropriate posistion
Aug 13, 2024
f3a0083
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 13, 2024
44cdeac
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 13, 2024
38a3b00
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 14, 2024
c4085ba
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 14, 2024
a111edc
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 14, 2024
0ed60a5
Update airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
vincbeck Aug 14, 2024
d950db2
Merge branch 'main' into feature/add-point-in-time-export-functionali…
Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,29 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):

:param dynamodb_table_name: Dynamodb table to replicate data from
:param s3_bucket_name: S3 bucket to replicate data to
:param s3_bucket_owner: The ID of the Amazon Web Services account that owns the bucket the export will be stored in.
Comment thread
vincbeck marked this conversation as resolved.
Outdated
(NOTE: S3BucketOwner is a required parameter when exporting to a S3 bucket in another account.)
:param file_size: Flush file to s3 if file size >= file_size
:param dynamodb_scan_kwargs: kwargs pass to
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transform a dynamodb item to bytes. By default, we dump the json
:param point_in_time_export: Boolean value indicating the operator to use 'scan' or 'point in time export'
:param export_time: Time in the past from which to export table data, counted in seconds from the start of
the Unix epoch. The table export will be a snapshot of the table's state at this point in time.
:param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON
or ION.
:param export_type: Choice of whether to execute as a full export or incremental export. Valid values are FULL_EXPORT
or INCREMENTAL_EXPORT. The default value is FULL_EXPORT. If INCREMENTAL_EXPORT is provided, the IncrementalExportSpecification
must also be used (incremental_export_from_time, incremental_export_to_time, incremental_export_view_type).
:param incremental_export_from_time: Time in the past which provides the inclusive start range for the export table's data,
counted in seconds from the start of the Unix epoch. The incremental export will reflect the table's state including and after
this point in time.
:param incremental_export_to_time: Time in the past which provides the exclusive end range for the export table's data,
counted in seconds from the start of the Unix epoch. The incremental export will reflect the table's state just prior to
this point in time. If this is not provided, the latest time with data available will be used.
:param incremental_export_view_type: The view type that was chosen for the export. Valid values are NEW_AND_OLD_IMAGES
and NEW_IMAGES. The default value is NEW_AND_OLD_IMAGES.
:param check_interval: The amount of time in seconds to wait between attempts. Only if ``export_time`` is
provided.
:param max_attempts: The maximum number of attempts to be made. Only if ``export_time`` is provided.
Expand Down Expand Up @@ -120,12 +134,18 @@ def __init__(
*,
dynamodb_table_name: str,
s3_bucket_name: str,
s3_bucket_owner: str | None = None,
file_size: int,
dynamodb_scan_kwargs: dict[str, Any] | None = None,
s3_key_prefix: str = "",
process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes,
point_in_time_export: bool = False,
export_time: datetime | None = None,
export_format: str = "DYNAMODB_JSON",
export_type: str = "FULL_EXPORT",
incremental_export_from_time: datetime | None = None,
incremental_export_to_time: datetime | None = None,
incremental_export_view_type: str = "NEW_AND_OLD_IMAGES",
check_interval: int = 30,
max_attempts: int = 60,
**kwargs,
Expand All @@ -137,8 +157,14 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
self.s3_bucket_owner = s3_bucket_owner
self.point_in_time_export = point_in_time_export
self.export_time = export_time
self.export_format = export_format
self.export_type = export_type
self.incremental_export_from_time = incremental_export_from_time
self.incremental_export_to_time = incremental_export_to_time
self.incremental_export_view_type = incremental_export_view_type
self.check_interval = check_interval
self.max_attempts = max_attempts

Expand All @@ -148,29 +174,58 @@ def hook(self):
return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)

def execute(self, context: Context) -> None:
if self.export_time:
# There are 2 separate export to point in time configuration:
# 1. Full export, which takes the export_time arg.
# 2. Incremental export, which takes the incremental_export_... args
# Hence export time could not be used as the proper indicator for the `_export_table_to_point_in_time`
# function. This change introduces a new boolean, as the indicator for whether the operator scans
# and export entire data or using the point in time functionality.
if self.point_in_time_export:
Comment thread
vincbeck marked this conversation as resolved.
Outdated
self._export_table_to_point_in_time()
else:
self._export_entire_data()

def _export_table_to_point_in_time(self):
"""
Export data from start of epoc till `export_time`.
Export data to point in time.

Full export exports data from start of epoc till `export_time`.
Table export will be a snapshot of the table's state at this point in time.

Incremental export exports the data from a specific datetime to a specific datetime


Note: S3BucketOwner is a required parameter when exporting to a S3 bucket in another account.
"""
if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo):
raise ValueError("The export_time parameter cannot be a future time.")

client = self.hook.conn.meta.client
table_description = client.describe_table(TableName=self.dynamodb_table_name)
response = client.export_table_to_point_in_time(
TableArn=table_description.get("Table", {}).get("TableArn"),
ExportTime=self.export_time,
S3Bucket=self.s3_bucket_name,
S3Prefix=self.s3_key_prefix,
ExportFormat=self.export_format,
)

export_table_to_point_in_time_args = {
"TableArn": table_description.get("Table", {}).get("TableArn"),
"ExportTime": self.export_time,
"S3Bucket": self.s3_bucket_name,
"S3Prefix": self.s3_key_prefix,
"S3BucketOwner": self.s3_bucket_owner,
"ExportFormat": self.export_format,
"ExportType": self.export_type,
"IncrementalExportSpecification": {
"ExportFromTime": self.incremental_export_from_time,
"ExportToTime": self.incremental_export_to_time,
"ExportViewType": self.incremental_export_view_type,
},
}
args_filtered = {
key: value for key, value in export_table_to_point_in_time_args.items() if value is not None
}
Comment thread
vincbeck marked this conversation as resolved.
Outdated

if self.export_type == "FULL_EXPORT":
# If it is a full export, no need to pass any incremental export specification.
del args_filtered["IncrementalExportSpecification"]

response = client.export_table_to_point_in_time(**args_filtered)
waiter = self.hook.get_waiter("export_table")
export_arn = response.get("ExportDescription", {}).get("ExportArn")
waiter.wait(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ def users_import(args):

users_created, users_updated = _import_users(users_list)
if users_created:
print("Created the following users:\n\t{}".format("\n\t".join(users_created)))
print(f"Created the following users:\n\t\n\t{users_created}")
Comment thread
Ghoul-SSZ marked this conversation as resolved.
Outdated

if users_updated:
print("Updated the following users:\n\t{}".format("\n\t".join(users_updated)))
print(f"Updated the following users:\n\t\n\t{users_updated}")


def _import_users(users_list: list[dict[str, Any]]):
Expand All @@ -231,9 +231,7 @@ def _import_users(users_list: list[dict[str, Any]]):
msg.append(f"[Item {row_num}]")
for key, value in failure.items():
msg.append(f"\t{key}: {value}")
raise SystemExit(
"Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg))
)
raise SystemExit(f"Error: Input file didn't pass validation. See below:\n\n{msg}")

for user in users_list:
roles = []
Expand Down
16 changes: 13 additions & 3 deletions docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,23 @@ To parallelize the replication, users can create multiple ``DynamoDBToS3Operator
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
:end-before: [END howto_transfer_dynamodb_to_s3_segmented]

Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to recover data from a point in time.
Users can also pass in ``point_in_time_export`` boolean param to ``DynamoDBToS3Operator`` to recover data from a point in time.

Full export example usage:

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
:end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]

Incremental export example usage:

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
:end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
:start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
:end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]

Reference
---------
Expand Down
2 changes: 2 additions & 0 deletions tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def test_dynamodb_execute_calling_export_table_to_point_in_time(self, _export_ta
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
point_in_time_export=True,
export_time=datetime(year=1983, month=1, day=1),
)
dynamodb_to_s3_operator.execute(context={})
Expand All @@ -362,5 +363,6 @@ def test_dynamodb_with_future_date(self):
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
point_in_time_export=True,
export_time=datetime(year=3000, month=1, day=1),
).execute(context={})
40 changes: 34 additions & 6 deletions tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def get_export_time(table_name: str):
return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]


@task
def get_latest_export_time(table_name: str):
r = boto3.client("dynamodb").describe_continuous_backups(
TableName=table_name,
)

return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"]


@task
def wait_for_bucket(s3_bucket_name):
waiter = boto3.client("s3").get_waiter("bucket_exists")
Expand Down Expand Up @@ -162,18 +171,36 @@ def delete_dynamodb_table(table_name: str):
# [END howto_transfer_dynamodb_to_s3_segmented]

export_time = get_export_time(table_name)
# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
backup_db_to_point_in_time = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time",
latest_export_time = get_latest_export_time(table_name)
# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time (Full Export)]
backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
file_size=1000,
s3_bucket_name=bucket_name,
point_in_time_export=True,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time (Full Export)]

# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time (Incremental Export)]
backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
file_size=1000,
s3_bucket_name=bucket_name,
point_in_time_export=True,
s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
export_type="INCREMENTAL_EXPORT",
incremental_export_from_time=export_time,
incremental_export_to_time=latest_export_time,
incremental_export_view_type="NEW_AND_OLD_IMAGES",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time (Incremental Export)]
# This operation can take a long time to complete
backup_db_to_point_in_time.max_attempts = 90
backup_db_to_point_in_time_full_export.max_attempts = 90
backup_db_to_point_in_time_incremental_export.max_attempts = 90

delete_table = delete_dynamodb_table(table_name=table_name)

Expand All @@ -195,7 +222,8 @@ def delete_dynamodb_table(table_name: str):
backup_db_segment_1,
backup_db_segment_2,
export_time,
backup_db_to_point_in_time,
backup_db_to_point_in_time_full_export,
Comment on lines 227 to +228

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert latest_export_time between these two lines

       export_time,
       latest_export_time
       backup_db_to_point_in_time_full_export,

Without it listed in the chain(), it fires off at the very beginning and causes the test to fail because env_id isn't populated yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in #41517

backup_db_to_point_in_time_incremental_export,
# TEST TEARDOWN
delete_table,
delete_bucket,
Expand Down