Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDS: add OriginalSnapshotCreateTime to DB snapshots #8540

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion moto/rds/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ def __init__(
snapshot_id: str,
snapshot_type: str,
tags: List[Dict[str, str]],
original_created_at: Optional[str] = None,
):
super().__init__(backend)
self.database = database
Expand All @@ -950,6 +951,7 @@ def __init__(
self.tags = tags
self.status = "available"
self.created_at = iso_8601_datetime_with_milliseconds()
self.original_created_at = original_created_at or self.created_at
self.attributes: List[Dict[str, Any]] = []

@property
Expand All @@ -960,6 +962,14 @@ def name(self) -> str:
def snapshot_arn(self) -> str:
return self.arn

@property
def snapshot_create_time(self) -> str:
return self.created_at

@property
def original_snapshot_create_time(self) -> str:
return self.original_created_at


class ExportTask(BaseModel):
def __init__(
Expand Down Expand Up @@ -1314,6 +1324,7 @@ def create_db_snapshot(
db_snapshot_identifier: str,
snapshot_type: str = "manual",
tags: Optional[List[Dict[str, str]]] = None,
original_created_at: Optional[str] = None,
) -> DBSnapshot:
if isinstance(db_instance, str):
database = self.databases.get(db_instance)
Expand All @@ -1333,7 +1344,12 @@ def create_db_snapshot(
if database.copy_tags_to_snapshot and not tags:
tags = database.get_tags()
snapshot = DBSnapshot(
self, database, db_snapshot_identifier, snapshot_type, tags
self,
database,
db_snapshot_identifier,
snapshot_type,
tags,
original_created_at,
)
self.database_snapshots[db_snapshot_identifier] = snapshot
return snapshot
Expand Down Expand Up @@ -1364,6 +1380,7 @@ def copy_db_snapshot(
db_instance=source_snapshot.database,
db_snapshot_identifier=target_snapshot_identifier,
tags=tags,
original_created_at=source_snapshot.original_created_at,
bpandola marked this conversation as resolved.
Show resolved Hide resolved
)

def delete_db_snapshot(self, db_snapshot_identifier: str) -> DBSnapshot:
Expand Down
38 changes: 27 additions & 11 deletions tests/test_rds/test_rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import boto3
import pytest
from botocore.exceptions import ClientError
from dateutil.tz import tzutc
from freezegun import freeze_time
bpandola marked this conversation as resolved.
Show resolved Hide resolved

from moto import mock_aws
from moto import mock_aws, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.rds.exceptions import InvalidDBInstanceIdentifier, InvalidDBSnapshotIdentifier
from moto.rds.models import RDSBackend
Expand Down Expand Up @@ -757,13 +759,19 @@ def test_create_db_snapshots(client):

create_db_instance(DBInstanceIdentifier="db-primary-1")

snapshot = client.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1"
)["DBSnapshot"]
snapshot_create_time = datetime.datetime(2025, 1, 2, tzinfo=tzutc())
with freeze_time(snapshot_create_time):
snapshot = client.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1"
)["DBSnapshot"]

assert snapshot["Engine"] == "postgres"
assert snapshot["DBInstanceIdentifier"] == "db-primary-1"
assert snapshot["DBSnapshotIdentifier"] == "g-1"
if not settings.TEST_SERVER_MODE:
assert snapshot["SnapshotCreateTime"] == snapshot_create_time
assert snapshot["SnapshotCreateTime"] == snapshot["OriginalSnapshotCreateTime"]

result = client.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"])
assert result["TagList"] == []

Expand Down Expand Up @@ -827,22 +835,30 @@ def test_copy_db_snapshots(
):
create_db_instance(DBInstanceIdentifier="db-primary-1")

client.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
)
snapshot_create_time = datetime.datetime(2025, 1, 2, tzinfo=tzutc())
with freeze_time(snapshot_create_time):
client.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
)

if delete_db_instance:
# Delete the original instance, but the copy snapshot operation should still succeed.
client.delete_db_instance(DBInstanceIdentifier="db-primary-1")

target_snapshot = client.copy_db_snapshot(
SourceDBSnapshotIdentifier=db_snapshot_identifier,
TargetDBSnapshotIdentifier="snapshot-2",
)["DBSnapshot"]
target_snapshot_create_time = snapshot_create_time + datetime.timedelta(minutes=1)
with freeze_time(target_snapshot_create_time):
target_snapshot = client.copy_db_snapshot(
SourceDBSnapshotIdentifier=db_snapshot_identifier,
TargetDBSnapshotIdentifier="snapshot-2",
)["DBSnapshot"]

assert target_snapshot["Engine"] == "postgres"
assert target_snapshot["DBInstanceIdentifier"] == "db-primary-1"
assert target_snapshot["DBSnapshotIdentifier"] == "snapshot-2"
if not settings.TEST_SERVER_MODE:
assert target_snapshot["SnapshotCreateTime"] == target_snapshot_create_time
assert target_snapshot["OriginalSnapshotCreateTime"] == snapshot_create_time

result = client.list_tags_for_resource(
ResourceName=target_snapshot["DBSnapshotArn"]
)
Expand Down
Loading