diff --git a/integration/test_backup_v4.py b/integration/test_backup_v4.py index f2338b2bf..58646505f 100644 --- a/integration/test_backup_v4.py +++ b/integration/test_backup_v4.py @@ -1,10 +1,12 @@ import datetime +import pathlib import time -from typing import Generator, List, Union +from typing import Generator, List, Union, Optional import pytest import weaviate +import weaviate.classes as wvc from weaviate.backup.backup import ( BackupCompressionLevel, BackupConfigCreate, @@ -346,33 +348,59 @@ def test_fail_creating_backup_for_both_include_and_exclude_classes( ) -def test_backup_and_restore_with_collection(client: weaviate.WeaviateClient) -> None: +@pytest.mark.parametrize("dynamic_backup_location", [False, True]) +def test_backup_and_restore_with_collection( + client: weaviate.WeaviateClient, dynamic_backup_location: bool, tmp_path: pathlib.Path +) -> None: backup_id = _create_backup_id() + conf_create: Optional[wvc.backup.BackupConfigCreate] = None + conf_restore: Optional[wvc.backup.BackupConfigRestore] = None + backup_location: Optional[wvc.backup.BackupLocation] = None + if dynamic_backup_location: + if client._connection._weaviate_version.is_lower_than(1, 27, 2): + pytest.skip("Cancel backups is only supported from 1.27.2") + + backup_location = wvc.backup.BackupLocation.FileSystem(path=str(tmp_path)) + article = client.collections.get("Article") # create backup - create = article.backup.create(backup_id=backup_id, backend=BACKEND, wait_for_completion=True) + create = article.backup.create( + backup_id=backup_id, + backend=BACKEND, + wait_for_completion=True, + config=conf_create, + backup_location=backup_location, + ) assert create.status == BackupStatus.SUCCESS assert len(article) == len(ARTICLES_IDS) # check create status - create_status = article.backup.get_create_status(backup_id, BACKEND) + create_status = article.backup.get_create_status( + backup_id=backup_id, backend=BACKEND, backup_location=backup_location + ) assert create_status.status == BackupStatus.SUCCESS # remove existing class client.collections.delete("Article") # restore backup - restore = article.backup.restore(backup_id=backup_id, backend=BACKEND, wait_for_completion=True) + restore = article.backup.restore( + backup_id=backup_id, + backend=BACKEND, + wait_for_completion=True, + config=conf_restore, + backup_location=backup_location, + ) assert restore.status == BackupStatus.SUCCESS # # check data exists again assert len(article) == len(ARTICLES_IDS) # check restore status - restore_status = article.backup.get_restore_status(backup_id, BACKEND) + restore_status = article.backup.get_restore_status(backup_id, BACKEND, backup_location) assert restore_status.status == BackupStatus.SUCCESS @@ -468,24 +496,42 @@ def test_backup_and_restore_with_collection_and_config_1_23_x( # assert backup_id in [b.backup_id for b in backups] -def test_cancel_backup(client: weaviate.WeaviateClient) -> None: +@pytest.mark.parametrize("dynamic_backup_location", [False, True]) +def test_cancel_backup( + client: weaviate.WeaviateClient, dynamic_backup_location, tmp_path: pathlib.Path +) -> None: """Create and restore backup without waiting.""" backup_id = _create_backup_id() if client._connection._weaviate_version.is_lower_than(1, 24, 25): pytest.skip("Cancel backups is only supported from 1.24.25") - resp = client.backup.create(backup_id=backup_id, backend=BACKEND) + backup_location: Optional[wvc.backup.BackupLocation] = None + if dynamic_backup_location: + if client._connection._weaviate_version.is_lower_than(1, 27, 2): + pytest.skip("Cancel backups is only supported from 1.27.2") + + backup_location = wvc.backup.BackupLocation.FileSystem(path=str(tmp_path)) + + resp = client.backup.create( + backup_id=backup_id, backend=BACKEND, backup_location=backup_location + ) assert resp.status == BackupStatus.STARTED - assert client.backup.cancel(backup_id=backup_id, backend=BACKEND) + assert client.backup.cancel( + backup_id=backup_id, backend=BACKEND, backup_location=backup_location + ) # async process start = time.time() while time.time() - start < 5: - status_resp = client.backup.get_create_status(backup_id=backup_id, backend=BACKEND) + status_resp = client.backup.get_create_status( + backup_id=backup_id, backend=BACKEND, backup_location=backup_location + ) if status_resp.status == BackupStatus.CANCELED: break time.sleep(0.1) - status_resp = client.backup.get_create_status(backup_id=backup_id, backend=BACKEND) + status_resp = client.backup.get_create_status( + backup_id=backup_id, backend=BACKEND, backup_location=backup_location + ) # there can be a race between the cancel and the backup completion assert status_resp.status == BackupStatus.CANCELED or status_resp.status == BackupStatus.SUCCESS diff --git a/weaviate/backup/backup.py b/weaviate/backup/backup.py index 80a8d06aa..e7c293226 100644 --- a/weaviate/backup/backup.py +++ b/weaviate/backup/backup.py @@ -4,10 +4,11 @@ from enum import Enum from time import sleep -from typing import Optional, Union, List, Tuple +from typing import Optional, Union, List, Tuple, Dict, Any, cast from pydantic import BaseModel, Field +from weaviate.backup.backup_location import _BackupLocationConfig, BackupLocationType from weaviate.connect import ConnectionV4 from weaviate.connect.v4 import _ExpectedStatusCodes from weaviate.exceptions import ( @@ -62,6 +63,15 @@ class BackupStatus(str, Enum): class _BackupConfigBase(BaseModel): CPUPercentage: Optional[int] = Field(default=None, alias="cpu_percentage") + def _to_dict(self) -> Dict[str, Any]: + ret = cast(dict, self.model_dump(exclude_none=True)) + + for key, val in ret.items(): + if isinstance(val, _BackupLocationConfig): + ret[key] = val._to_dict() + + return ret + class BackupConfigCreate(_BackupConfigBase): """Options to configure the backup when creating a backup.""" @@ -105,6 +115,7 @@ async def create( exclude_collections: Optional[Union[List[str], str]] = None, wait_for_completion: bool = False, config: Optional[BackupConfigCreate] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupReturn: """Create a backup of all/per collection Weaviate objects. @@ -125,6 +136,8 @@ async def create( Whether to wait until the backup is done. By default False. config: BackupConfigCreate, optional The configuration of the backup creation. By default None. + backup_location: + The dynamic location of a backup. By default None. Returns ------- @@ -167,7 +180,17 @@ async def create( raise WeaviateInvalidInputError( f"Expected 'config' to be of type 'BackupConfigCreate', but got {type(config)}." ) - payload["config"] = config.model_dump() + payload["config"] = config._to_dict() + + if backup_location is not None: + if self._connection._weaviate_version.is_lower_than(1, 27, 2): + raise WeaviateUnsupportedFeatureError( + "BackupConfigCreate dynamic backup location", + str(self._connection._weaviate_version), + "1.27.2", + ) + + payload["config"].update(backup_location._to_dict()) path = f"/backups/{backend.value}" @@ -182,8 +205,7 @@ async def create( if wait_for_completion: while True: status = await self.__get_create_status( - backup_id=backup_id, - backend=backend, + backup_id=backup_id, backend=backend, backup_location=backup_location ) create_status["status"] = status.status if status.status == BackupStatus.SUCCESS: @@ -200,7 +222,10 @@ async def create( return BackupReturn(**create_status) async def __get_create_status( - self, backup_id: str, backend: BackupStorage + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: backup_id, backend = _get_and_validate_get_status( backup_id=backup_id, @@ -208,9 +233,21 @@ async def __get_create_status( ) path = f"/backups/{backend.value}/{backup_id}" + params: Dict[str, str] = {} + if backup_location is not None: + if self._connection._weaviate_version.is_lower_than(1, 27, 2): + raise WeaviateUnsupportedFeatureError( + "BackupConfigCreateStatus dynamic backup location", + str(self._connection._weaviate_version), + "1.27.2", + ) + + params.update(backup_location._to_dict()) response = await self._connection.get( - path=path, error_msg="Backup creation status failed due to connection error." + path=path, + params=params, + error_msg="Backup creation status failed due to connection error.", ) typed_response = _decode_json_response_dict(response, "Backup status check") @@ -219,7 +256,12 @@ async def __get_create_status( typed_response["id"] = backup_id return BackupStatusReturn(**typed_response) - async def get_create_status(self, backup_id: str, backend: BackupStorage) -> BackupStatusReturn: + async def get_create_status( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> BackupStatusReturn: """ Checks if a started backup job has completed. @@ -230,12 +272,14 @@ async def get_create_status(self, backup_id: str, backend: BackupStorage) -> Bac NOTE: Case insensitive. backend : BackupStorage eNUM The backend storage where the backup was created. + backup_location: BackupLocationType + The dynamic location of a backup. By default None. Returns ------- A `BackupStatusReturn` object that contains the backup creation status response. """ - return await self.__get_create_status(backup_id, backend) + return await self.__get_create_status(backup_id, backend, backup_location) async def restore( self, @@ -245,6 +289,7 @@ async def restore( exclude_collections: Union[List[str], str, None] = None, wait_for_completion: bool = False, config: Optional[BackupConfigRestore] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupReturn: """ Restore a backup of all/per collection Weaviate objects. @@ -267,6 +312,8 @@ async def restore( Whether to wait until the backup restore is done. config: BackupConfigRestore, optional The configuration of the backup restoration. By default None. + backup_location: + The dynamic location of a backup. By default None. Returns ------- @@ -306,7 +353,17 @@ async def restore( raise WeaviateInvalidInputError( f"Expected 'config' to be of type 'BackupConfigRestore', but got {type(config)}." ) - payload["config"] = config.model_dump() + payload["config"] = config._to_dict() + + if backup_location is not None: + if self._connection._weaviate_version.is_lower_than(1, 27, 2): + raise WeaviateUnsupportedFeatureError( + "BackupConfigRestore dynamic backup location", + str(self._connection._weaviate_version), + "1.27.2", + ) + + payload["config"].update(backup_location._to_dict()) path = f"/backups/{backend.value}/{backup_id}/restore" response = await self._connection.post( @@ -319,8 +376,7 @@ async def restore( if wait_for_completion: while True: status = await self.__get_restore_status( - backup_id=backup_id, - backend=backend, + backup_id=backup_id, backend=backend, backup_location=backup_location ) restore_status["status"] = status.status if status.status == BackupStatus.SUCCESS: @@ -338,7 +394,10 @@ async def restore( return BackupReturn(**restore_status) async def __get_restore_status( - self, backup_id: str, backend: BackupStorage + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: backup_id, backend = _get_and_validate_get_status( backup_id=backup_id, @@ -346,8 +405,20 @@ async def __get_restore_status( ) path = f"/backups/{backend.value}/{backup_id}/restore" + params: Dict[str, str] = {} + if backup_location is not None: + if self._connection._weaviate_version.is_lower_than(1, 27, 2): + raise WeaviateUnsupportedFeatureError( + "BackupConfigRestore status dynamic backup location", + str(self._connection._weaviate_version), + "1.27.2", + ) + params.update(backup_location._to_dict()) + response = await self._connection.get( - path=path, error_msg="Backup restore status failed due to connection error." + path=path, + params=params, + error_msg="Backup restore status failed due to connection error.", ) typed_response = _decode_json_response_dict(response, "Backup restore status check") if typed_response is None: @@ -356,34 +427,55 @@ async def __get_restore_status( return BackupStatusReturn(**typed_response) async def get_restore_status( - self, backup_id: str, backend: BackupStorage + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: """ Checks if a started restore job has completed. Parameters ---------- - backup_id : str + backup_id: The identifier name of the backup. NOTE: Case insensitive. - backend : BackupStorage + backend: The backend storage where to create the backup. + backup_location: + The dynamic location of a backup. By default None. Returns ------- A `BackupStatusReturn` object that contains the backup restore status response. """ - return await self.__get_restore_status(backup_id, backend) + return await self.__get_restore_status(backup_id, backend, backup_location) - async def __cancel_backup(self, backup_id: str, backend: BackupStorage) -> bool: + async def __cancel_backup( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType], + ) -> bool: backup_id, backend = _get_and_validate_get_status( backup_id=backup_id, backend=backend, ) path = f"/backups/{backend.value}/{backup_id}" + params: Dict[str, str] = {} + + if backup_location is not None: + if self._connection._weaviate_version.is_lower_than(1, 27, 2): + raise WeaviateUnsupportedFeatureError( + "BackupConfigCancel dynamic backup location", + str(self._connection._weaviate_version), + "1.27.2", + ) + params.update(backup_location._to_dict()) response = await self._connection.delete( path=path, + params=params, error_msg="Backup cancel failed due to connection error.", status_codes=_ExpectedStatusCodes(ok_in=[204, 404], error="delete object"), ) @@ -396,17 +488,24 @@ async def __cancel_backup(self, backup_id: str, backend: BackupStorage) -> bool: raise EmptyResponseException() return False # did not exist - async def cancel(self, backup_id: str, backend: BackupStorage) -> bool: + async def cancel( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> bool: """ Cancels a running backup. Parameters ---------- - backup_id : str + backup_id: The identifier name of the backup. NOTE: Case insensitive. - backend : BackupStorage + backend: The backend storage where to create the backup. + backup_location: + The dynamic location of a backup. By default None. Raises ------ @@ -417,7 +516,7 @@ async def cancel(self, backup_id: str, backend: BackupStorage) -> bool: ------- A bool indicating if the cancellation was successful. """ - return await self.__cancel_backup(backup_id, backend) + return await self.__cancel_backup(backup_id, backend, backup_location) async def __list_backups(self, backend: BackupStorage) -> List[BackupReturn]: _, backend = _get_and_validate_get_status(backend=backend, backup_id="dummy") diff --git a/weaviate/backup/backup_location.py b/weaviate/backup/backup_location.py new file mode 100644 index 000000000..5bf392687 --- /dev/null +++ b/weaviate/backup/backup_location.py @@ -0,0 +1,53 @@ +from typing import Dict, Any, cast, Union + +from pydantic import BaseModel + + +class _BackupLocationConfig(BaseModel): + """The dynamic location of a backup.""" + + def _to_dict(self) -> Dict[str, Any]: + ret = cast(dict, self.model_dump(exclude_none=True)) + + return ret + + +class _BackupLocationFilesystem(_BackupLocationConfig): + """The dynamic location of a backup for filesystem.""" + + path: str + + +class _BackupLocationS3(_BackupLocationConfig): + """The dynamic location of a backup for S3.""" + + path: str + bucket: str + + +class _BackupLocationGCP(_BackupLocationConfig): + """The dynamic location of a backup for GCP.""" + + path: str + bucket: str + + +class _BackupLocationAzure(_BackupLocationConfig): + """The dynamic location of a backup for Azure.""" + + path: str + bucket: str + + +BackupLocationType = Union[ + _BackupLocationFilesystem, _BackupLocationS3, _BackupLocationGCP, _BackupLocationAzure +] + + +class BackupLocation: + """The dynamic path of a backup.""" + + FileSystem = _BackupLocationFilesystem + S3 = _BackupLocationS3 + GCP = _BackupLocationGCP + Azure = _BackupLocationAzure diff --git a/weaviate/backup/sync.pyi b/weaviate/backup/sync.pyi index de01155ae..8858aaf0b 100644 --- a/weaviate/backup/sync.pyi +++ b/weaviate/backup/sync.pyi @@ -1,7 +1,14 @@ from typing import Optional, Union, List +from weaviate.backup.backup import ( + BackupStorage, + BackupReturn, + BackupStatusReturn, + BackupConfigCreate, + BackupConfigRestore, +) +from weaviate.backup.backup_location import BackupLocationType from weaviate.connect import ConnectionV4 -from weaviate.backup.backup import BackupStorage, BackupReturn, BackupStatusReturn class _Backup: """Backup class used to schedule and/or check the status of a backup process of Weaviate objects.""" @@ -9,7 +16,12 @@ class _Backup: def __init__(self, connection: ConnectionV4): self._connection = connection - def cancel(self, backup_id: str, backend: BackupStorage) -> bool: ... + def cancel( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> bool: ... def create( self, backup_id: str, @@ -17,8 +29,15 @@ class _Backup: include_collections: Optional[Union[List[str], str]] = None, exclude_collections: Optional[Union[List[str], str]] = None, wait_for_completion: bool = False, + config: Optional[BackupConfigCreate] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupReturn: ... - def get_create_status(self, backup_id: str, backend: BackupStorage) -> BackupStatusReturn: ... + def get_create_status( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> BackupStatusReturn: ... def restore( self, backup_id: str, @@ -26,5 +45,12 @@ class _Backup: include_collections: Union[List[str], str, None] = None, exclude_collections: Union[List[str], str, None] = None, wait_for_completion: bool = False, + config: Optional[BackupConfigRestore] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupReturn: ... - def get_restore_status(self, backup_id: str, backend: BackupStorage) -> BackupStatusReturn: ... + def get_restore_status( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> BackupStatusReturn: ... diff --git a/weaviate/classes/__init__.py b/weaviate/classes/__init__.py index 399cdfbb6..04c439912 100644 --- a/weaviate/classes/__init__.py +++ b/weaviate/classes/__init__.py @@ -1,9 +1,21 @@ # make sure to import all classes that should be available in the weaviate module -from . import aggregate, batch, config, data, generics, init, query, tenants, rbac # noqa: F401 +from . import ( + aggregate, + backup, + batch, + config, + data, + generics, + init, + query, + tenants, + rbac, +) # noqa: F401 from .config import ConsistencyLevel __all__ = [ "aggregate", + "backup", "batch", "config", "ConsistencyLevel", diff --git a/weaviate/classes/backup.py b/weaviate/classes/backup.py index a6a30b37b..e914c1afa 100644 --- a/weaviate/classes/backup.py +++ b/weaviate/classes/backup.py @@ -4,10 +4,13 @@ BackupConfigRestore, BackupStorage, ) +from weaviate.backup.backup_location import BackupLocation + __all__ = [ "BackupCompressionLevel", "BackupConfigCreate", "BackupConfigRestore", "BackupStorage", + "BackupLocation", ] diff --git a/weaviate/collections/backups/backups.py b/weaviate/collections/backups/backups.py index 9190f29c4..1c03b0ef4 100644 --- a/weaviate/collections/backups/backups.py +++ b/weaviate/collections/backups/backups.py @@ -9,6 +9,7 @@ from weaviate.backup.backup import ( _BackupAsync, ) +from weaviate.backup.backup_location import BackupLocationType from weaviate.connect import ConnectionV4 @@ -30,6 +31,7 @@ async def create( backend: BackupStorage, wait_for_completion: bool = False, config: Optional[BackupConfigCreate] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: """Create a backup of this collection. @@ -42,6 +44,8 @@ async def create( Whether to wait until the backup is done. By default False. `config` The configuration for the backup creation. By default None. + `backup_location`: + The dynamic location of a backup. By default None. Returns: A `BackupStatusReturn` object that contains the backup creation response. @@ -57,7 +61,7 @@ async def create( One of the arguments have a wrong type. """ create = await self._backup.create( - backup_id, backend, [self._name], None, wait_for_completion, config + backup_id, backend, [self._name], None, wait_for_completion, config, backup_location ) return BackupStatusReturn( error=create.error, status=create.status, path=create.path, id=backup_id @@ -69,6 +73,7 @@ async def restore( backend: BackupStorage, wait_for_completion: bool = False, config: Optional[BackupConfigRestore] = None, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: """ Restore a backup of all/per class Weaviate objects. @@ -83,7 +88,8 @@ async def restore( Whether to wait until the backup restore is done. By default False. `config` The configuration for the backup restoration. By default None. - + `backup_location`: + The dynamic location of a backup. By default None. Returns: A `BackupStatusReturn` object that contains the backup restore response. @@ -97,13 +103,18 @@ async def restore( If the backup failed. """ restore = await self._backup.restore( - backup_id, backend, [self._name], None, wait_for_completion, config + backup_id, backend, [self._name], None, wait_for_completion, config, backup_location ) return BackupStatusReturn( error=restore.error, status=restore.status, path=restore.path, id=backup_id ) - async def get_create_status(self, backup_id: str, backend: BackupStorage) -> BackupStatusReturn: + async def get_create_status( + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, + ) -> BackupStatusReturn: """Check if a started backup job has completed. Arguments: @@ -112,14 +123,19 @@ async def get_create_status(self, backup_id: str, backend: BackupStorage) -> Bac NOTE: Case insensitive. `backend` The backend storage where the backup was created. + `backup_location`: + The dynamic location of a backup. By default None. Returns: A `BackupStatusReturn` object that contains the backup creation status response. """ - return await self._backup.get_create_status(backup_id, backend) + return await self._backup.get_create_status(backup_id, backend, backup_location) async def get_restore_status( - self, backup_id: str, backend: BackupStorage + self, + backup_id: str, + backend: BackupStorage, + backup_location: Optional[BackupLocationType] = None, ) -> BackupStatusReturn: """Check if a started classification job has completed. @@ -129,8 +145,10 @@ async def get_restore_status( NOTE: Case insensitive. `backend` The backend storage where to create the backup. + `backup_location`: + The dynamic location of a backup. By default None. Returns: A `BackupStatusReturn` object that contains the backup restore status response. """ - return await self._backup.get_restore_status(backup_id, backend) + return await self._backup.get_restore_status(backup_id, backend, backup_location)