-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Add incremental export and cross account export functionality in DynamoDBToS3Operator
#41304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 31 commits
6645c6a
9d3f188
3ff5ba5
dc9b48a
9fa988a
56b5c46
35d40a9
078b7d8
b05690c
493f507
76b5522
27436c4
7dc0b6a
6b7ce52
cd8fca0
1082149
33c203a
9d74345
0cea15d
aa8326b
747be27
d04d1b8
e269e65
f7b5e0f
0a195aa
197fb43
f3a0083
44cdeac
38a3b00
c4085ba
a111edc
0ed60a5
d950db2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook | ||
| from airflow.providers.amazon.aws.hooks.s3 import S3Hook | ||
| from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator | ||
| from airflow.utils.helpers import prune_dict | ||
|
|
||
| if TYPE_CHECKING: | ||
| from airflow.utils.context import Context | ||
|
|
@@ -89,10 +90,13 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): | |
| <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> | ||
| :param s3_key_prefix: Prefix of s3 object key | ||
| :param process_func: How we transform a dynamodb item to bytes. By default, we dump the json | ||
| :param point_in_time_export: Boolean value indicating the operator to use 'scan' or 'point in time export' | ||
| :param export_time: Time in the past from which to export table data, counted in seconds from the start of | ||
| the Unix epoch. The table export will be a snapshot of the table's state at this point in time. | ||
| :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON | ||
| or ION. | ||
| :param export_table_to_point_in_time_kwargs: extra parameters for the boto3 | ||
| `export_table_to_point_in_time` function all. e.g. `ExportType`, `IncrementalExportSpecification` | ||
| :param check_interval: The amount of time in seconds to wait between attempts. Only if ``export_time`` is | ||
| provided. | ||
| :param max_attempts: The maximum number of attempts to be made. Only if ``export_time`` is provided. | ||
|
|
@@ -107,25 +111,29 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): | |
| "s3_key_prefix", | ||
| "export_time", | ||
| "export_format", | ||
| "export_table_to_point_in_time_kwargs", | ||
| "check_interval", | ||
| "max_attempts", | ||
| ) | ||
|
|
||
| template_fields_renderers = { | ||
| "dynamodb_scan_kwargs": "json", | ||
| "export_table_to_point_in_time_kwargs": "json", | ||
| } | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| dynamodb_table_name: str, | ||
| s3_bucket_name: str, | ||
| file_size: int, | ||
| file_size: int = 1000, | ||
| dynamodb_scan_kwargs: dict[str, Any] | None = None, | ||
| s3_key_prefix: str = "", | ||
| process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes, | ||
| point_in_time_export: bool = False, | ||
| export_time: datetime | None = None, | ||
| export_format: str = "DYNAMODB_JSON", | ||
| export_table_to_point_in_time_kwargs: dict | None = None, | ||
| check_interval: int = 30, | ||
| max_attempts: int = 60, | ||
| **kwargs, | ||
|
|
@@ -137,8 +145,10 @@ def __init__( | |
| self.dynamodb_scan_kwargs = dynamodb_scan_kwargs | ||
| self.s3_bucket_name = s3_bucket_name | ||
| self.s3_key_prefix = s3_key_prefix | ||
| self.point_in_time_export = point_in_time_export | ||
| self.export_time = export_time | ||
| self.export_format = export_format | ||
| self.export_table_to_point_in_time_kwargs = export_table_to_point_in_time_kwargs | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to be otherwise if None is passed in, Line 199 fails because None can't be unpacked.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in #41517 |
||
| self.check_interval = check_interval | ||
| self.max_attempts = max_attempts | ||
|
|
||
|
|
@@ -148,29 +158,50 @@ def hook(self): | |
| return DynamoDBHook(aws_conn_id=self.source_aws_conn_id) | ||
|
|
||
| def execute(self, context: Context) -> None: | ||
| if self.export_time: | ||
| # There are 2 separate export to point in time configuration: | ||
| # 1. Full export, which takes the export_time arg. | ||
| # 2. Incremental export, which takes the incremental_export_... args | ||
| # Hence export time could not be used as the proper indicator for the `_export_table_to_point_in_time` | ||
| # function. This change introduces a new boolean, as the indicator for whether the operator scans | ||
| # and export entire data or using the point in time functionality. | ||
| if self.point_in_time_export or self.export_time: | ||
| self._export_table_to_point_in_time() | ||
| else: | ||
| self._export_entire_data() | ||
|
|
||
| def _export_table_to_point_in_time(self): | ||
| """ | ||
| Export data from start of epoc till `export_time`. | ||
| Export data to point in time. | ||
|
|
||
| Full export exports data from start of epoc till `export_time`. | ||
| Table export will be a snapshot of the table's state at this point in time. | ||
|
|
||
| Incremental export exports the data from a specific datetime to a specific datetime | ||
|
|
||
|
|
||
| Note: S3BucketOwner is a required parameter when exporting to a S3 bucket in another account. | ||
| """ | ||
| if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo): | ||
| raise ValueError("The export_time parameter cannot be a future time.") | ||
|
|
||
| client = self.hook.conn.meta.client | ||
| table_description = client.describe_table(TableName=self.dynamodb_table_name) | ||
| response = client.export_table_to_point_in_time( | ||
| TableArn=table_description.get("Table", {}).get("TableArn"), | ||
| ExportTime=self.export_time, | ||
| S3Bucket=self.s3_bucket_name, | ||
| S3Prefix=self.s3_key_prefix, | ||
| ExportFormat=self.export_format, | ||
| ) | ||
|
|
||
| export_table_to_point_in_time_base_args = { | ||
| "TableArn": table_description.get("Table", {}).get("TableArn"), | ||
| "ExportTime": self.export_time, | ||
| "S3Bucket": self.s3_bucket_name, | ||
| "S3Prefix": self.s3_key_prefix, | ||
| "ExportFormat": self.export_format, | ||
| } | ||
| export_table_to_point_in_time_args = { | ||
| **export_table_to_point_in_time_base_args, | ||
| **self.export_table_to_point_in_time_kwargs, | ||
| } | ||
|
|
||
| args_filtered = prune_dict(export_table_to_point_in_time_args) | ||
|
|
||
| response = client.export_table_to_point_in_time(**args_filtered) | ||
| waiter = self.hook.get_waiter("export_table") | ||
| export_arn = response.get("ExportDescription", {}).get("ExportArn") | ||
| waiter.wait( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,6 +92,15 @@ def get_export_time(table_name: str): | |
| return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"] | ||
|
|
||
|
|
||
| @task | ||
| def get_latest_export_time(table_name: str): | ||
| r = boto3.client("dynamodb").describe_continuous_backups( | ||
| TableName=table_name, | ||
| ) | ||
|
|
||
| return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["LatestRestorableDateTime"] | ||
|
|
||
|
|
||
| @task | ||
| def wait_for_bucket(s3_bucket_name): | ||
| waiter = boto3.client("s3").get_waiter("bucket_exists") | ||
|
|
@@ -162,18 +171,39 @@ def delete_dynamodb_table(table_name: str): | |
| # [END howto_transfer_dynamodb_to_s3_segmented] | ||
|
|
||
| export_time = get_export_time(table_name) | ||
| # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time] | ||
| backup_db_to_point_in_time = DynamoDBToS3Operator( | ||
| task_id="backup_db_to_point_in_time", | ||
| latest_export_time = get_latest_export_time(table_name) | ||
| # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export] | ||
| backup_db_to_point_in_time_full_export = DynamoDBToS3Operator( | ||
| task_id="backup_db_to_point_in_time_full_export", | ||
| dynamodb_table_name=table_name, | ||
| file_size=1000, | ||
| s3_bucket_name=bucket_name, | ||
| point_in_time_export=True, | ||
| export_time=export_time, | ||
| s3_key_prefix=f"{S3_KEY_PREFIX}-3-", | ||
| ) | ||
| # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time] | ||
| # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export] | ||
|
|
||
| # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export] | ||
| backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator( | ||
| task_id="backup_db_to_point_in_time_incremental_export", | ||
| dynamodb_table_name=table_name, | ||
| s3_bucket_name=bucket_name, | ||
| point_in_time_export=True, | ||
| s3_key_prefix=f"{S3_KEY_PREFIX}-4-", | ||
| export_table_to_point_in_time_kwargs={ | ||
| "ExportType": "INCREMENTAL_EXPORT", | ||
| "IncrementalExportSpecification": { | ||
| "ExportFromTime": export_time, | ||
| "ExportToTime": latest_export_time, | ||
|
Comment on lines
+196
to
+197
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an issue I have run out of time for today. with the other two fixes applied locally, the test keeps failing and I've tried a number of solutions. Here's what I've found so far: error: "from" time can't be equal to "to" time We will either need to figure out a way to make this happy or set that task to not run using an unreachable trigger_rule or a branch operator to skip it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "error: Difference between "from" time and "to" is less than 15 minutes" is the most scary. That means we need to wait for 15 minutes so that we can create the incremental export. In that case we dont want to run it as part of the system test, so either we delete it from the system test, or we, somehow, skip it
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. I just put up #41546 which skips that task. That way we can still have the snippet in the docs. |
||
| "ExportViewType": "NEW_AND_OLD_IMAGES", | ||
| }, | ||
| }, | ||
| ) | ||
| # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export] | ||
|
|
||
| # This operation can take a long time to complete | ||
| backup_db_to_point_in_time.max_attempts = 90 | ||
| backup_db_to_point_in_time_full_export.max_attempts = 90 | ||
| backup_db_to_point_in_time_incremental_export.max_attempts = 90 | ||
|
|
||
| delete_table = delete_dynamodb_table(table_name=table_name) | ||
|
|
||
|
|
@@ -195,7 +225,8 @@ def delete_dynamodb_table(table_name: str): | |
| backup_db_segment_1, | ||
| backup_db_segment_2, | ||
| export_time, | ||
| backup_db_to_point_in_time, | ||
| backup_db_to_point_in_time_full_export, | ||
|
Comment on lines
227
to
+228
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Insert Without it listed in the chain(), it fires off at the very beginning and causes the test to fail because env_id isn't populated yet.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in #41517 |
||
| backup_db_to_point_in_time_incremental_export, | ||
| # TEST TEARDOWN | ||
| delete_table, | ||
| delete_bucket, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.