Skip to content

Commit 81d226e

Browse files
authored
DynamoDB: Export Table to S3 (#7839)
1 parent 8a40802 commit 81d226e

File tree

6 files changed

+555
-1
lines changed

6 files changed

+555
-1
lines changed

AUTHORS.md

+1
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@ Moto is written by Steve Pulec with contributions from:
6363
* [Roman Rader](https://github.com/rrader/)
6464
* [Bryan Chen](https://github.com/bchen1116)
6565
* [Jonas Bulik](https://github.com/MrGreenTea)
66+
* [Rita Cheung](https://github.com/rita-cheung)

moto/core/utils.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
import inspect
33
import re
4-
from gzip import decompress
4+
from gzip import compress, decompress
55
from typing import Any, Callable, Dict, List, Optional, Tuple
66
from urllib.parse import ParseResult, urlparse
77

@@ -403,6 +403,10 @@ def gzip_decompress(body: bytes) -> bytes:
403403
return decompress(body)
404404

405405

406+
def gzip_compress(body: bytes) -> bytes:
407+
return compress(body)
408+
409+
406410
ISO_REGION_DOMAINS = {
407411
"iso": "c2s.ic.gov",
408412
"isob": "sc2s.sgov.gov",

moto/dynamodb/models/__init__.py

+38
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
RestoredTable,
3838
Table,
3939
)
40+
from moto.dynamodb.models.table_export import TableExport
4041
from moto.dynamodb.models.table_import import TableImport
4142
from moto.dynamodb.parsing import partiql
4243
from moto.dynamodb.parsing.executors import UpdateExpressionExecutor
@@ -53,6 +54,7 @@ def __init__(self, region_name: str, account_id: str):
5354
self.tables: Dict[str, Table] = OrderedDict()
5455
self.backups: Dict[str, Backup] = OrderedDict()
5556
self.table_imports: Dict[str, TableImport] = {}
57+
self.table_exports: Dict[str, TableExport] = {}
5658

5759
@staticmethod
5860
def default_vpc_endpoint_service(
@@ -999,5 +1001,41 @@ def import_table(
9991001
def describe_import(self, import_arn: str) -> TableImport:
10001002
return self.table_imports[import_arn]
10011003

1004+
def export_table(
1005+
self,
1006+
s3_bucket: str,
1007+
s3_prefix: str,
1008+
table_arn: str,
1009+
export_format: str,
1010+
export_type: str,
1011+
s3_bucket_owner: str,
1012+
) -> TableExport:
1013+
"""Only ExportFormat=DYNAMODB_JSON is supported so far.
1014+
Only exports one file following DYNAMODB_JSON format to the s3 location. Other files aren't created.
1015+
Incremental export is also not supported.
1016+
"""
1017+
table_export = TableExport(
1018+
s3_bucket=s3_bucket,
1019+
s3_prefix=s3_prefix,
1020+
region_name=self.region_name,
1021+
account_id=s3_bucket_owner if s3_bucket_owner else self.account_id,
1022+
table_arn=table_arn,
1023+
export_format=export_format,
1024+
export_type=export_type,
1025+
)
1026+
self.table_exports[table_export.arn] = table_export
1027+
table_export.start()
1028+
return table_export
1029+
1030+
def describe_export(self, export_arn: str) -> TableExport:
1031+
return self.table_exports[export_arn]
1032+
1033+
def list_exports(self, table_arn: str) -> List[TableExport]:
1034+
exports = []
1035+
for export_arn in self.table_exports:
1036+
if self.table_exports[export_arn].table_arn == table_arn:
1037+
exports.append(self.table_exports[export_arn])
1038+
return exports
1039+
10021040

10031041
dynamodb_backends = BackendDict(DynamoDBBackend, "dynamodb")

moto/dynamodb/models/table_export.py

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import json
2+
from threading import Thread
3+
from typing import TYPE_CHECKING, Any, Dict, Optional
4+
from uuid import uuid4
5+
6+
from moto.core.utils import gzip_compress
7+
from moto.utilities.utils import get_partition
8+
9+
if TYPE_CHECKING:
10+
from moto.dynamodb.models import Table
11+
from moto.s3.models import S3Backend
12+
13+
14+
class TableExport(Thread):
15+
def __init__(
16+
self,
17+
s3_bucket: str,
18+
s3_prefix: str,
19+
region_name: str,
20+
account_id: str,
21+
table_arn: str,
22+
export_format: str,
23+
export_type: str,
24+
):
25+
super().__init__()
26+
self.partition = get_partition(region_name)
27+
self.table_arn = table_arn
28+
self.arn = f"arn:{self.partition}:dynamodb:{region_name}:{account_id}:table/{table_arn}/import/{str(uuid4()).replace('-', '')}"
29+
self.s3_bucket = s3_bucket
30+
self.s3_prefix = s3_prefix
31+
self.status = "IN_PROGRESS"
32+
self.export_format = export_format
33+
self.export_type = export_type
34+
self.account_id = account_id
35+
self.region_name = region_name
36+
37+
self.failure_code: Optional[str] = None
38+
self.failure_message: Optional[str] = None
39+
self.table_name: str = ""
40+
self.item_count = 0
41+
self.processed_bytes = 0
42+
self.error_count = 0
43+
44+
def run(self) -> None:
45+
from moto.dynamodb.models import dynamodb_backends
46+
47+
dynamodb_backend = dynamodb_backends[self.account_id][self.region_name]
48+
try:
49+
from moto.s3.models import s3_backends
50+
51+
s3_backend = s3_backends[self.account_id][self.partition]
52+
s3_backend.buckets[self.s3_bucket]
53+
54+
except KeyError:
55+
self.status = "FAILED"
56+
self.failure_code = "S3NoSuchBucket"
57+
self.failure_message = "The specified bucket does not exist"
58+
return
59+
60+
for key in dynamodb_backend.tables:
61+
if dynamodb_backend.tables[key].table_arn == self.table_arn:
62+
self.table_name = key
63+
if not self.table_name:
64+
self.status = "FAILED"
65+
self.failure_code = "DynamoDBTableNotFound"
66+
self.failure_message = "The specified table does not exist"
67+
return
68+
table = dynamodb_backend.tables[self.table_name]
69+
if (
70+
table.continuous_backups["PointInTimeRecoveryDescription"][
71+
"PointInTimeRecoveryStatus"
72+
]
73+
!= "ENABLED"
74+
):
75+
self.status = "FAILED"
76+
self.failure_code = "PointInTimeRecoveryUnavailable"
77+
self.failure_message = "Point in time recovery not enabled for table"
78+
return
79+
try:
80+
self._backup_to_s3_file(s3_backend, table)
81+
except Exception as e:
82+
self.status = "FAILED"
83+
self.failure_code = "UNKNOWN"
84+
self.failure_message = str(e)
85+
86+
def _backup_to_s3_file(self, s3_backend: "S3Backend", table: "Table") -> None:
87+
backup = []
88+
for item in table.all_items():
89+
json_item = item.to_json()
90+
backup.append(json_item)
91+
self.processed_bytes += len(json_item)
92+
self.item_count = len(backup)
93+
content = gzip_compress(json.dumps(backup).encode("utf-8"))
94+
s3_backend.put_object(
95+
bucket_name=self.s3_bucket,
96+
key_name=f"{self.s3_prefix}/AWSDynamoDB/{str(uuid4())}/data/{str(uuid4())}.gz",
97+
value=content,
98+
)
99+
100+
self.status = "COMPLETED" if self.error_count == 0 else "FAILED"
101+
102+
def response(self) -> Dict[str, Any]:
103+
return {
104+
"ExportArn": self.arn,
105+
"ExportStatus": self.status,
106+
"FailureCode": self.failure_code,
107+
"FailureMessage": self.failure_message,
108+
"ExportFormat": self.export_format,
109+
"ExportType": self.export_type,
110+
"ItemCount": self.item_count,
111+
"BilledSizeBytes": self.processed_bytes,
112+
}

moto/dynamodb/responses.py

+38
Original file line numberDiff line numberDiff line change
@@ -1227,3 +1227,41 @@ def describe_import(self) -> str:
12271227
import_arn = self.body["ImportArn"]
12281228
import_table = self.dynamodb_backend.describe_import(import_arn)
12291229
return json.dumps({"ImportTableDescription": import_table.response()})
1230+
1231+
def export_table_to_point_in_time(self) -> str:
1232+
table_arn = self.body["TableArn"]
1233+
s3_bucket = self.body["S3Bucket"]
1234+
s3_prefix = self.body["S3Prefix"]
1235+
export_type = self.body.get("ExportType", "FULL_EXPORT")
1236+
export_format = self.body.get("ExportFormat", "DYNAMO_JSON")
1237+
s3_bucket_owner = self.body.get("S3BucketOwner")
1238+
1239+
export_table = self.dynamodb_backend.export_table(
1240+
s3_bucket=s3_bucket,
1241+
s3_prefix=s3_prefix,
1242+
table_arn=table_arn,
1243+
export_type=export_type,
1244+
export_format=export_format,
1245+
s3_bucket_owner=s3_bucket_owner,
1246+
)
1247+
1248+
return dynamo_json_dump({"ExportDescription": export_table.response()})
1249+
1250+
def describe_export(self) -> str:
1251+
export_arn = self.body["ExportArn"]
1252+
export_table = self.dynamodb_backend.describe_export(export_arn)
1253+
return json.dumps({"ExportDescription": export_table.response()})
1254+
1255+
def list_exports(self) -> str:
1256+
table_arn = self.body["TableArn"]
1257+
exports = self.dynamodb_backend.list_exports(table_arn)
1258+
response = []
1259+
for export_table in exports:
1260+
response.append(
1261+
{
1262+
"ExportArn": export_table.arn,
1263+
"ExportStatus": export_table.status,
1264+
"ExportType": export_table.export_type,
1265+
}
1266+
)
1267+
return json.dumps({"ExportSummaries": response})

0 commit comments

Comments
 (0)