diff --git a/service_client/assisted_service_api.py b/service_client/assisted_service_api.py index 5373337..3757930 100644 --- a/service_client/assisted_service_api.py +++ b/service_client/assisted_service_api.py @@ -14,9 +14,9 @@ import requests from requests.exceptions import RequestException from assisted_service_client import ApiClient, Configuration, api, models -from assisted_service_client.rest import ApiException from service_client.logger import log +from service_client.exceptions import sanitize_exceptions class InventoryClient: @@ -95,6 +95,7 @@ def _get_host(self, configs: Configuration) -> str: netloc=parsed_inventory_url.netloc, scheme=parsed_inventory_url.scheme ).geturl() + @sanitize_exceptions async def get_cluster( self, cluster_id: str, get_unregistered_clusters: bool = False ) -> models.Cluster: @@ -108,34 +109,20 @@ async def get_cluster( Returns: models.Cluster: The cluster object containing cluster information. """ - try: - log.info( - "Getting cluster %s (unregistered: %s)", - cluster_id, - get_unregistered_clusters, - ) - result = await asyncio.to_thread( - self._installer_api().v2_get_cluster, - cluster_id=cluster_id, - get_unregistered_clusters=get_unregistered_clusters, - ) - log.info("Successfully retrieved cluster %s", cluster_id) - return cast(models.Cluster, result) - except ApiException as e: - log.error( - "API error while getting cluster %s: Status: %s, Reason: %s, Body: %s", - cluster_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while getting cluster %s: %s", cluster_id, str(e) - ) - raise + log.info( + "Getting cluster %s (unregistered: %s)", + cluster_id, + get_unregistered_clusters, + ) + result = await asyncio.to_thread( + self._installer_api().v2_get_cluster, + cluster_id=cluster_id, + get_unregistered_clusters=get_unregistered_clusters, + ) + log.info("Successfully retrieved cluster %s", cluster_id) + return cast(models.Cluster, result) + @sanitize_exceptions async def list_clusters(self) -> list: """ List all clusters accessible to the authenticated user. @@ -143,23 +130,12 @@ async def list_clusters(self) -> list: Returns: list: A list of cluster objects. """ - try: - log.info("Listing all clusters") - result = await asyncio.to_thread(self._installer_api().v2_list_clusters) - log.info("Successfully listed clusters") - return cast(list, result) - except ApiException as e: - log.error( - "API error while listing clusters: Status: %s, Reason: %s, Body: %s", - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error("Unexpected error while listing clusters: %s", str(e)) - raise + log.info("Listing all clusters") + result = await asyncio.to_thread(self._installer_api().v2_list_clusters) + log.info("Successfully listed clusters") + return cast(list, result) + @sanitize_exceptions async def get_events( self, cluster_id: Optional[str] = "", @@ -184,46 +160,26 @@ async def get_events( if categories is None: categories = ["user"] - try: - log.info( - "Getting events for cluster %s, host %s, infra_env %s, categories %s", - cluster_id, - host_id, - infra_env_id, - categories, - ) - response = await asyncio.to_thread( - self._events_api().v2_list_events, - cluster_id=cluster_id, - host_id=host_id, - infra_env_id=infra_env_id, - categories=categories, - _preload_content=False, - **kwargs, - ) - log.info("Successfully retrieved events") - return cast(Any, response).data - except ApiException as e: - log.error( - "API error while getting events (cluster: %s, host: %s, infra_env: %s): Status: %s, Reason: %s, Body: %s", - cluster_id, - host_id, - infra_env_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while getting events (cluster: %s, host: %s, infra_env: %s): %s", - cluster_id, - host_id, - infra_env_id, - str(e), - ) - raise + log.info( + "Getting events for cluster %s, host %s, infra_env %s, categories %s", + cluster_id, + host_id, + infra_env_id, + categories, + ) + response = await asyncio.to_thread( + self._events_api().v2_list_events, + cluster_id=cluster_id, + host_id=host_id, + infra_env_id=infra_env_id, + categories=categories, + _preload_content=False, + **kwargs, + ) + log.info("Successfully retrieved events") + return cast(Any, response).data + @sanitize_exceptions async def get_infra_env(self, infra_env_id: str) -> models.InfraEnv: """ Get infrastructure environment information by ID. @@ -234,32 +190,14 @@ async def get_infra_env(self, infra_env_id: str) -> models.InfraEnv: Returns: models.InfraEnv: The infrastructure environment object. """ - try: - log.info("Getting infrastructure environment %s", infra_env_id) - result = await asyncio.to_thread( - self._installer_api().get_infra_env, infra_env_id=infra_env_id - ) - log.info( - "Successfully retrieved infrastructure environment %s", infra_env_id - ) - return cast(models.InfraEnv, result) - except ApiException as e: - log.error( - "API error while getting infrastructure environment %s: Status: %s, Reason: %s, Body: %s", - infra_env_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while getting infrastructure environment %s: %s", - infra_env_id, - str(e), - ) - raise + log.info("Getting infrastructure environment %s", infra_env_id) + result = await asyncio.to_thread( + self._installer_api().get_infra_env, infra_env_id=infra_env_id + ) + log.info("Successfully retrieved infrastructure environment %s", infra_env_id) + return cast(models.InfraEnv, result) + @sanitize_exceptions async def list_infra_envs(self, cluster_id: str) -> list[dict[str, Any]]: """ List infrastructure environments for a specific cluster. @@ -270,33 +208,17 @@ async def list_infra_envs(self, cluster_id: str) -> list[dict[str, Any]]: Returns: list[dict[str, Any]]: A list of infrastructure environment dictionaries for the cluster. """ - try: - log.info("Listing infrastructure environments for cluster %s", cluster_id) - result = await asyncio.to_thread( - self._installer_api().list_infra_envs, cluster_id=cluster_id - ) - log.info( - "Successfully listed infrastructure environments for cluster %s", - cluster_id, - ) - return cast(list[dict[str, Any]], result) - except ApiException as e: - log.error( - "API error while listing infrastructure environments for cluster %s: Status: %s, Reason: %s, Body: %s", - cluster_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while listing infrastructure environments for cluster %s: %s", - cluster_id, - str(e), - ) - raise + log.info("Listing infrastructure environments for cluster %s", cluster_id) + result = await asyncio.to_thread( + self._installer_api().list_infra_envs, cluster_id=cluster_id + ) + log.info( + "Successfully listed infrastructure environments for cluster %s", + cluster_id, + ) + return cast(list[dict[str, Any]], result) + @sanitize_exceptions async def create_cluster( self, name: str, version: str, single_node: bool, **cluster_params: Any ) -> models.Cluster: @@ -312,42 +234,30 @@ async def create_cluster( Returns: models.Cluster: The created cluster object. """ - try: - if single_node: - cluster_params["control_plane_count"] = 1 - cluster_params["high_availability_mode"] = "None" - cluster_params["user_managed_networking"] = True - - params = models.ClusterCreateParams( - name=name, - openshift_version=version, - pull_secret=self.pull_secret, - **cluster_params, - ) - log.info( - "Creating cluster '%s' with version %s (single_node: %s)", - name, - version, - single_node, - ) - result = await asyncio.to_thread( - self._installer_api().v2_register_cluster, new_cluster_params=params - ) - log.info("Successfully created cluster '%s'", name) - return cast(models.Cluster, result) - except ApiException as e: - log.error( - "API error while creating cluster '%s': Status: %s, Reason: %s, Body: %s", - name, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error("Unexpected error while creating cluster '%s': %s", name, str(e)) - raise + if single_node: + cluster_params["control_plane_count"] = 1 + cluster_params["high_availability_mode"] = "None" + cluster_params["user_managed_networking"] = True + + params = models.ClusterCreateParams( + name=name, + openshift_version=version, + pull_secret=self.pull_secret, + **cluster_params, + ) + log.info( + "Creating cluster '%s' with version %s (single_node: %s)", + name, + version, + single_node, + ) + result = await asyncio.to_thread( + self._installer_api().v2_register_cluster, new_cluster_params=params + ) + log.info("Successfully created cluster '%s'", name) + return cast(models.Cluster, result) + @sanitize_exceptions async def create_infra_env( self, name: str, **infra_env_params: Any ) -> models.InfraEnv: @@ -361,34 +271,18 @@ async def create_infra_env( Returns: models.InfraEnv: The created infrastructure environment object. """ - try: - infra_env = models.InfraEnvCreateParams( - name=name, pull_secret=self.pull_secret, **infra_env_params - ) - log.info("Creating infrastructure environment '%s'", name) - result = await asyncio.to_thread( - self._installer_api().register_infra_env, - infraenv_create_params=infra_env, - ) - log.info("Successfully created infrastructure environment '%s'", name) - return cast(models.InfraEnv, result) - except ApiException as e: - log.error( - "API error while creating infrastructure environment '%s': Status: %s, Reason: %s, Body: %s", - name, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while creating infrastructure environment '%s': %s", - name, - str(e), - ) - raise + infra_env = models.InfraEnvCreateParams( + name=name, pull_secret=self.pull_secret, **infra_env_params + ) + log.info("Creating infrastructure environment '%s'", name) + result = await asyncio.to_thread( + self._installer_api().register_infra_env, + infraenv_create_params=infra_env, + ) + log.info("Successfully created infrastructure environment '%s'", name) + return cast(models.InfraEnv, result) + @sanitize_exceptions async def update_cluster( self, cluster_id: str, @@ -408,38 +302,24 @@ async def update_cluster( Returns: models.Cluster: The updated cluster object. """ - try: - params = models.V2ClusterUpdateParams(**update_params) - if api_vip != "": - params.api_vips = [models.ApiVip(cluster_id=cluster_id, ip=api_vip)] - if ingress_vip != "": - params.ingress_vips = [ - models.IngressVip(cluster_id=cluster_id, ip=ingress_vip) - ] - - log.info("Updating cluster %s", cluster_id) - result = await asyncio.to_thread( - self._installer_api().v2_update_cluster, - cluster_id=cluster_id, - cluster_update_params=params, - ) - log.info("Successfully updated cluster %s", cluster_id) - return cast(models.Cluster, result) - except ApiException as e: - log.error( - "API error while updating cluster %s: Status: %s, Reason: %s, Body: %s", - cluster_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while updating cluster %s: %s", cluster_id, str(e) - ) - raise + params = models.V2ClusterUpdateParams(**update_params) + if api_vip != "": + params.api_vips = [models.ApiVip(cluster_id=cluster_id, ip=api_vip)] + if ingress_vip != "": + params.ingress_vips = [ + models.IngressVip(cluster_id=cluster_id, ip=ingress_vip) + ] + log.info("Updating cluster %s", cluster_id) + result = await asyncio.to_thread( + self._installer_api().v2_update_cluster, + cluster_id=cluster_id, + cluster_update_params=params, + ) + log.info("Successfully updated cluster %s", cluster_id) + return cast(models.Cluster, result) + + @sanitize_exceptions async def install_cluster(self, cluster_id: str) -> models.Cluster: """ Start the installation process for a cluster. @@ -450,28 +330,14 @@ async def install_cluster(self, cluster_id: str) -> models.Cluster: Returns: models.Cluster: The cluster object with updated installation status. """ - try: - log.info("Starting installation for cluster %s", cluster_id) - result = await asyncio.to_thread( - self._installer_api().v2_install_cluster, cluster_id=cluster_id - ) - log.info("Successfully started installation for cluster %s", cluster_id) - return cast(models.Cluster, result) - except ApiException as e: - log.error( - "API error while installing cluster %s: Status: %s, Reason: %s, Body: %s", - cluster_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while installing cluster %s: %s", cluster_id, str(e) - ) - raise + log.info("Starting installation for cluster %s", cluster_id) + result = await asyncio.to_thread( + self._installer_api().v2_install_cluster, cluster_id=cluster_id + ) + log.info("Successfully started installation for cluster %s", cluster_id) + return cast(models.Cluster, result) + @sanitize_exceptions async def get_openshift_versions( self, only_latest: bool ) -> models.OpenshiftVersions: @@ -484,26 +350,15 @@ async def get_openshift_versions( Returns: models.OpenshiftVersions: Object containing available OpenShift versions. """ - try: - log.info("Getting OpenShift versions (only_latest: %s)", only_latest) - result = await asyncio.to_thread( - self._versions_api().v2_list_supported_openshift_versions, - only_latest=only_latest, - ) - log.info("Successfully retrieved OpenShift versions") - return cast(models.OpenshiftVersions, result) - except ApiException as e: - log.error( - "API error while getting OpenShift versions: Status: %s, Reason: %s, Body: %s", - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error("Unexpected error while getting OpenShift versions: %s", str(e)) - raise + log.info("Getting OpenShift versions (only_latest: %s)", only_latest) + result = await asyncio.to_thread( + self._versions_api().v2_list_supported_openshift_versions, + only_latest=only_latest, + ) + log.info("Successfully retrieved OpenShift versions") + return cast(models.OpenshiftVersions, result) + @sanitize_exceptions async def get_operator_bundles(self) -> list[dict[str, Any]]: """ Get available operator bundles. @@ -511,23 +366,12 @@ async def get_operator_bundles(self) -> list[dict[str, Any]]: Returns: list: A list of operator bundle dictionaries. """ - try: - log.info("Getting operator bundles") - bundles = await asyncio.to_thread(self._operators_api().v2_list_bundles) - log.info("Successfully retrieved operator bundles") - return [bundle.to_dict() for bundle in cast(list, bundles)] - except ApiException as e: - log.error( - "API error while getting operator bundles: Status: %s, Reason: %s, Body: %s", - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error("Unexpected error while getting operator bundles: %s", str(e)) - raise + log.info("Getting operator bundles") + bundles = await asyncio.to_thread(self._operators_api().v2_list_bundles) + log.info("Successfully retrieved operator bundles") + return [bundle.to_dict() for bundle in cast(list, bundles)] + @sanitize_exceptions async def add_operator_bundle_to_cluster( self, cluster_id: str, bundle_name: str ) -> models.Cluster: @@ -541,45 +385,25 @@ async def add_operator_bundle_to_cluster( Returns: models.Cluster: The updated cluster object with the new operator. """ - try: - log.info( - "Adding operator bundle '%s' to cluster %s", bundle_name, cluster_id - ) - bundle = await asyncio.to_thread( - self._operators_api().v2_get_bundle, bundle_name - ) - olm_operators = [ - models.OperatorCreateParams(name=op_name) - for op_name in getattr(bundle, "operators", []) - ] - result = await self.update_cluster( - cluster_id=cluster_id, olm_operators=olm_operators - ) - log.info( - "Successfully added operator bundle '%s' to cluster %s", - bundle_name, - cluster_id, - ) - return result - except ApiException as e: - log.error( - "API error while adding operator bundle '%s' to cluster %s: Status: %s, Reason: %s, Body: %s", - bundle_name, - cluster_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while adding operator bundle '%s' to cluster %s: %s", - bundle_name, - cluster_id, - str(e), - ) - raise + log.info("Adding operator bundle '%s' to cluster %s", bundle_name, cluster_id) + bundle = await asyncio.to_thread( + self._operators_api().v2_get_bundle, bundle_name + ) + olm_operators = [ + models.OperatorCreateParams(name=op_name) + for op_name in getattr(bundle, "operators", []) + ] + result = await self.update_cluster( + cluster_id=cluster_id, olm_operators=olm_operators + ) + log.info( + "Successfully added operator bundle '%s' to cluster %s", + bundle_name, + cluster_id, + ) + return result + @sanitize_exceptions async def update_host( self, host_id: str, infra_env_id: str, **update_params: Any ) -> models.Host: @@ -594,41 +418,23 @@ async def update_host( Returns: models.Host: The updated host object. """ - try: - params = models.HostUpdateParams(**update_params) - log.info( - "Updating host %s in infrastructure environment %s", - host_id, - infra_env_id, - ) - result = await asyncio.to_thread( - self._installer_api().v2_update_host, infra_env_id, host_id, params - ) - log.info( - "Successfully updated host %s in infrastructure environment %s", - host_id, - infra_env_id, - ) - return cast(models.Host, result) - except ApiException as e: - log.error( - "API error while updating host %s in infrastructure environment %s: Status: %s, Reason: %s, Body: %s", - host_id, - infra_env_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while updating host %s in infrastructure environment %s: %s", - host_id, - infra_env_id, - str(e), - ) - raise + params = models.HostUpdateParams(**update_params) + log.info( + "Updating host %s in infrastructure environment %s", + host_id, + infra_env_id, + ) + result = await asyncio.to_thread( + self._installer_api().v2_update_host, infra_env_id, host_id, params + ) + log.info( + "Successfully updated host %s in infrastructure environment %s", + host_id, + infra_env_id, + ) + return cast(models.Host, result) + @sanitize_exceptions async def get_presigned_for_cluster_credentials( self, cluster_id: str, file_name: str ) -> models.PresignedUrl: @@ -643,42 +449,24 @@ async def get_presigned_for_cluster_credentials( Returns: models.PresignedUrl: The presigned URL model containing URL and optional expiration time. """ - try: - log.info( - "Getting presigned URL for cluster %s credentials file %s", - cluster_id, - file_name, - ) - result = await asyncio.to_thread( - self._installer_api().v2_get_presigned_for_cluster_credentials, - cluster_id=cluster_id, - file_name=file_name, - ) - log.info( - "Successfully retrieved presigned URL for cluster %s credentials file %s", - cluster_id, - file_name, - ) - return cast(models.PresignedUrl, result) - except ApiException as e: - log.error( - "API error while getting presigned URL for cluster %s credentials file %s: Status: %s, Reason: %s, Body: %s", - cluster_id, - file_name, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while getting presigned URL for cluster %s credentials file %s: %s", - cluster_id, - file_name, - str(e), - ) - raise + log.info( + "Getting presigned URL for cluster %s credentials file %s", + cluster_id, + file_name, + ) + result = await asyncio.to_thread( + self._installer_api().v2_get_presigned_for_cluster_credentials, + cluster_id=cluster_id, + file_name=file_name, + ) + log.info( + "Successfully retrieved presigned URL for cluster %s credentials file %s", + cluster_id, + file_name, + ) + return cast(models.PresignedUrl, result) + @sanitize_exceptions async def get_infra_env_download_url( self, infra_env_id: str ) -> models.PresignedUrl: @@ -691,33 +479,16 @@ async def get_infra_env_download_url( Returns: models.PresignedUrl: The presigned URL model containing URL and optional expiration time. """ - try: - log.info( - "Getting presigned download URL for infrastructure environment %s", - infra_env_id, - ) - result = await asyncio.to_thread( - self._installer_api().get_infra_env_download_url, - infra_env_id=infra_env_id, - ) - log.info( - "Successfully retrieved presigned download URL for infrastructure environment %s", - infra_env_id, - ) - return cast(models.PresignedUrl, result) - except ApiException as e: - log.error( - "API error while getting presigned download URL for infrastructure environment %s: Status: %s, Reason: %s, Body: %s", - infra_env_id, - e.status, - e.reason, - e.body, - ) - raise - except Exception as e: - log.error( - "Unexpected error while getting presigned download URL for infrastructure environment %s: %s", - infra_env_id, - str(e), - ) - raise + log.info( + "Getting presigned download URL for infrastructure environment %s", + infra_env_id, + ) + result = await asyncio.to_thread( + self._installer_api().get_infra_env_download_url, + infra_env_id=infra_env_id, + ) + log.info( + "Successfully retrieved presigned download URL for infrastructure environment %s", + infra_env_id, + ) + return cast(models.PresignedUrl, result) diff --git a/service_client/exceptions.py b/service_client/exceptions.py new file mode 100644 index 0000000..1875d91 --- /dev/null +++ b/service_client/exceptions.py @@ -0,0 +1,52 @@ +""" +Exception handling utilities for the Assisted Service client. + +This module provides custom exception classes and decorators for sanitizing +exceptions from the Assisted Service API. +""" + +from typing import Callable, TypeVar, ParamSpec, Awaitable +from functools import wraps + +from assisted_service_client.rest import ApiException +from service_client.logger import log + + +class AssistedServiceAPIError(Exception): + """Exception for Assisted Service API errors.""" + + +# Type variables for the decorator +P = ParamSpec("P") +T = TypeVar("T") + + +def sanitize_exceptions(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: + """ + Decorate a function to sanitize exceptions from API calls. + + The operation name for logging is automatically derived from the function name. + + Returns: + Decorated function that catches and sanitizes exceptions + """ + + @wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + operation_name = func.__name__ + try: + return await func(*args, **kwargs) + except ApiException as e: + log.error( + "API error during %s: Status: %s, Reason: %s, Body: %s", + operation_name, + e.status, + e.reason, + e.body, + ) + raise AssistedServiceAPIError(f"API error: Status {e.status}") from e + except Exception as e: + log.error("Unexpected error during %s: %s", operation_name, str(e)) + raise AssistedServiceAPIError("An internal error occurred") from e + + return wrapper diff --git a/tests/test_assisted_service_api.py b/tests/test_assisted_service_api.py index d4053e0..8b4a79c 100644 --- a/tests/test_assisted_service_api.py +++ b/tests/test_assisted_service_api.py @@ -11,6 +11,7 @@ from assisted_service_client import Configuration, models from service_client.assisted_service_api import InventoryClient +from service_client.exceptions import AssistedServiceAPIError from tests.test_utils import ( create_test_cluster, create_test_installing_cluster, @@ -202,11 +203,10 @@ async def test_get_cluster_api_exception(self, client: InventoryClient) -> None: ) mock_installer_api.return_value = mock_api - with pytest.raises(ApiException) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_cluster(cluster_id) - assert exc_info.value.status == 404 - assert exc_info.value.reason == "Not Found" + assert "API error: Status 404" in str(exc_info.value) @pytest.mark.asyncio async def test_get_cluster_unexpected_exception( @@ -220,9 +220,11 @@ async def test_get_cluster_unexpected_exception( mock_api.v2_get_cluster.side_effect = ValueError("Unexpected error") mock_installer_api.return_value = mock_api - with pytest.raises(ValueError): + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_cluster(cluster_id) + assert "An internal error occurred" in str(exc_info.value) + @pytest.mark.asyncio async def test_list_clusters_success(self, client: InventoryClient) -> None: """Test successful cluster listing.""" @@ -347,11 +349,10 @@ async def test_list_infra_envs_api_exception(self, client: InventoryClient) -> N ) mock_installer_api.return_value = mock_api - with pytest.raises(ApiException) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.list_infra_envs(cluster_id) - assert exc_info.value.status == 404 - assert exc_info.value.reason == "Not Found" + assert "API error: Status 404" in str(exc_info.value) @pytest.mark.asyncio async def test_create_cluster_success(self, client: InventoryClient) -> None: @@ -620,13 +621,12 @@ async def test_get_presigned_for_cluster_credentials_api_exception( ) mock_installer_api.return_value = mock_api - with pytest.raises(ApiException) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_presigned_for_cluster_credentials( cluster_id, file_name ) - assert exc_info.value.status == 404 - assert exc_info.value.reason == "Not Found" + assert "API error: Status 404" in str(exc_info.value) @pytest.mark.asyncio async def test_get_presigned_for_cluster_credentials_unexpected_exception( @@ -643,12 +643,12 @@ async def test_get_presigned_for_cluster_credentials_unexpected_exception( ) mock_installer_api.return_value = mock_api - with pytest.raises(ValueError) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_presigned_for_cluster_credentials( cluster_id, file_name ) - assert str(exc_info.value) == "Unexpected error" + assert "An internal error occurred" in str(exc_info.value) @pytest.mark.asyncio async def test_get_presigned_for_cluster_credentials_different_file_types( @@ -741,11 +741,10 @@ async def test_get_infra_env_download_url_api_exception( ) mock_installer_api.return_value = mock_api - with pytest.raises(ApiException) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_infra_env_download_url(infra_env_id) - assert exc_info.value.status == 404 - assert exc_info.value.reason == "Not Found" + assert "API error: Status 404" in str(exc_info.value) @pytest.mark.asyncio async def test_get_infra_env_download_url_unexpected_exception( @@ -761,10 +760,10 @@ async def test_get_infra_env_download_url_unexpected_exception( ) mock_installer_api.return_value = mock_api - with pytest.raises(ValueError) as exc_info: + with pytest.raises(AssistedServiceAPIError) as exc_info: await client.get_infra_env_download_url(infra_env_id) - assert str(exc_info.value) == "Unexpected error" + assert "An internal error occurred" in str(exc_info.value) @pytest.mark.asyncio async def test_get_infra_env_download_url_no_expiration(