Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/contentsafety/azure-ai-contentsafety/_meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"commit": "ef76b5cbcee099bb7855792bd73173454ac51081",
"repository_url": "https://github.com/Azure/azure-rest-api-specs",
"typespec_src": "specification/cognitiveservices/ContentSafety",
"@azure-tools/typespec-python": "0.15.14"
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# --------------------------------------------------------------------------

from ._client import ContentSafetyClient
from ._client import BlocklistClient
from ._version import VERSION

__version__ = VERSION
Expand All @@ -20,6 +21,7 @@

__all__ = [
"ContentSafetyClient",
"BlocklistClient",
]
__all__.extend([p for p in _patch_all if p not in __all__])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,38 @@
# --------------------------------------------------------------------------

from copy import deepcopy
from typing import Any
from typing import Any, TYPE_CHECKING, Union

from azure.core import PipelineClient
from azure.core.credentials import AzureKeyCredential
from azure.core.pipeline import policies
from azure.core.rest import HttpRequest, HttpResponse

from ._configuration import ContentSafetyClientConfiguration
from ._operations import ContentSafetyClientOperationsMixin
from ._configuration import BlocklistClientConfiguration, ContentSafetyClientConfiguration
from ._operations import BlocklistClientOperationsMixin, ContentSafetyClientOperationsMixin
from ._serialization import Deserializer, Serializer

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from azure.core.credentials import TokenCredential


class ContentSafetyClient(ContentSafetyClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""Analyze harmful content.
"""ContentSafetyClient.

:param endpoint: Supported Cognitive Services endpoints (protocol and hostname, for example:
https://:code:`<resource-name>`.cognitiveservices.azure.com). Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword api_version: The API version to use for this operation. Default value is
"2023-04-30-preview". Note that overriding this default value may result in unsupported
behavior.
:param credential: Credential needed for the client to connect to Azure. Is either a
AzureKeyCredential type or a TokenCredential type. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential or
~azure.core.credentials.TokenCredential
:keyword api_version: The API version to use for this operation. Default value is "2023-10-01".
Note that overriding this default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(self, endpoint: str, credential: AzureKeyCredential, **kwargs: Any) -> None:
def __init__(self, endpoint: str, credential: Union[AzureKeyCredential, "TokenCredential"], **kwargs: Any) -> None:
_endpoint = "{endpoint}/contentsafety"
self._config = ContentSafetyClientConfiguration(endpoint=endpoint, credential=credential, **kwargs)
_policies = kwargs.pop("policies", None)
Expand All @@ -59,7 +64,7 @@ def __init__(self, endpoint: str, credential: AzureKeyCredential, **kwargs: Any)
self._deserialize = Deserializer()
self._serialize.client_side_validation = False

def send_request(self, request: HttpRequest, **kwargs: Any) -> HttpResponse:
def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse:
"""Runs the network request through the client's chained policies.

>>> from azure.core.rest import HttpRequest
Expand All @@ -83,7 +88,7 @@ def send_request(self, request: HttpRequest, **kwargs: Any) -> HttpResponse:
}

request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, **kwargs) # type: ignore
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore

def close(self) -> None:
self._client.close()
Expand All @@ -94,3 +99,81 @@ def __enter__(self) -> "ContentSafetyClient":

def __exit__(self, *exc_details: Any) -> None:
self._client.__exit__(*exc_details)


class BlocklistClient(BlocklistClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""BlocklistClient.

:param endpoint: Supported Cognitive Services endpoints (protocol and hostname, for example:
https://:code:`<resource-name>`.cognitiveservices.azure.com). Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure. Is either a
AzureKeyCredential type or a TokenCredential type. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential or
~azure.core.credentials.TokenCredential
:keyword api_version: The API version to use for this operation. Default value is "2023-10-01".
Note that overriding this default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(self, endpoint: str, credential: Union[AzureKeyCredential, "TokenCredential"], **kwargs: Any) -> None:
_endpoint = "{endpoint}/contentsafety"
self._config = BlocklistClientConfiguration(endpoint=endpoint, credential=credential, **kwargs)
_policies = kwargs.pop("policies", None)
if _policies is None:
_policies = [
policies.RequestIdPolicy(**kwargs),
self._config.headers_policy,
self._config.user_agent_policy,
self._config.proxy_policy,
policies.ContentDecodePolicy(**kwargs),
self._config.redirect_policy,
self._config.retry_policy,
self._config.authentication_policy,
self._config.custom_hook_policy,
self._config.logging_policy,
policies.DistributedTracingPolicy(**kwargs),
policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,
self._config.http_logging_policy,
]
self._client: PipelineClient = PipelineClient(base_url=_endpoint, policies=_policies, **kwargs)

self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False

def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse:
"""Runs the network request through the client's chained policies.

>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>

For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request

:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.HttpResponse
"""

request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}

request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore

def close(self) -> None:
self._client.close()

def __enter__(self) -> "BlocklistClient":
self._client.__enter__()
return self

def __exit__(self, *exc_details: Any) -> None:
self._client.__exit__(*exc_details)
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from typing import Any
from typing import Any, TYPE_CHECKING, Union

from azure.core.credentials import AzureKeyCredential
from azure.core.pipeline import policies

from ._version import VERSION

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from azure.core.credentials import TokenCredential


class ContentSafetyClientConfiguration: # pylint: disable=too-many-instance-attributes,name-too-long
"""Configuration for ContentSafetyClient.
Expand All @@ -23,16 +27,17 @@ class ContentSafetyClientConfiguration: # pylint: disable=too-many-instance-att
:param endpoint: Supported Cognitive Services endpoints (protocol and hostname, for example:
https://:code:`<resource-name>`.cognitiveservices.azure.com). Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword api_version: The API version to use for this operation. Default value is
"2023-04-30-preview". Note that overriding this default value may result in unsupported
behavior.
:param credential: Credential needed for the client to connect to Azure. Is either a
AzureKeyCredential type or a TokenCredential type. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential or
~azure.core.credentials.TokenCredential
:keyword api_version: The API version to use for this operation. Default value is "2023-10-01".
Note that overriding this default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(self, endpoint: str, credential: AzureKeyCredential, **kwargs: Any) -> None:
api_version: str = kwargs.pop("api_version", "2023-04-30-preview")
def __init__(self, endpoint: str, credential: Union[AzureKeyCredential, "TokenCredential"], **kwargs: Any) -> None:
api_version: str = kwargs.pop("api_version", "2023-10-01")

if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
Expand All @@ -42,21 +47,82 @@ def __init__(self, endpoint: str, credential: AzureKeyCredential, **kwargs: Any)
self.endpoint = endpoint
self.credential = credential
self.api_version = api_version
self.credential_scopes = kwargs.pop("credential_scopes", ["https://cognitiveservices.azure.com/.default"])
kwargs.setdefault("sdk_moniker", "ai-contentsafety/{}".format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)

def _infer_policy(self, **kwargs):
if isinstance(self.credential, AzureKeyCredential):
return policies.AzureKeyCredentialPolicy(self.credential, "Ocp-Apim-Subscription-Key", **kwargs)
if hasattr(self.credential, "get_token"):
return policies.BearerTokenCredentialPolicy(self.credential, *self.credential_scopes, **kwargs)
raise TypeError(f"Unsupported credential: {self.credential}")

def _configure(self, **kwargs: Any) -> None:
self.user_agent_policy = kwargs.get("user_agent_policy") or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get("headers_policy") or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get("proxy_policy") or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get("logging_policy") or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get("http_logging_policy") or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get("custom_hook_policy") or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get("redirect_policy") or policies.RedirectPolicy(**kwargs)
self.retry_policy = kwargs.get("retry_policy") or policies.RetryPolicy(**kwargs)
self.authentication_policy = kwargs.get("authentication_policy")
if self.credential and not self.authentication_policy:
self.authentication_policy = self._infer_policy(**kwargs)


class BlocklistClientConfiguration: # pylint: disable=too-many-instance-attributes,name-too-long
"""Configuration for BlocklistClient.

Note that all parameters used to create this instance are saved as instance
attributes.

:param endpoint: Supported Cognitive Services endpoints (protocol and hostname, for example:
https://:code:`<resource-name>`.cognitiveservices.azure.com). Required.
:type endpoint: str
:param credential: Credential needed for the client to connect to Azure. Is either a
AzureKeyCredential type or a TokenCredential type. Required.
:type credential: ~azure.core.credentials.AzureKeyCredential or
~azure.core.credentials.TokenCredential
:keyword api_version: The API version to use for this operation. Default value is "2023-10-01".
Note that overriding this default value may result in unsupported behavior.
:paramtype api_version: str
"""

def __init__(self, endpoint: str, credential: Union[AzureKeyCredential, "TokenCredential"], **kwargs: Any) -> None:
api_version: str = kwargs.pop("api_version", "2023-10-01")

if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
if credential is None:
raise ValueError("Parameter 'credential' must not be None.")

self.endpoint = endpoint
self.credential = credential
self.api_version = api_version
self.credential_scopes = kwargs.pop("credential_scopes", ["https://cognitiveservices.azure.com/.default"])
kwargs.setdefault("sdk_moniker", "ai-contentsafety/{}".format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)

def _infer_policy(self, **kwargs):
if isinstance(self.credential, AzureKeyCredential):
return policies.AzureKeyCredentialPolicy(self.credential, "Ocp-Apim-Subscription-Key", **kwargs)
if hasattr(self.credential, "get_token"):
return policies.BearerTokenCredentialPolicy(self.credential, *self.credential_scopes, **kwargs)
raise TypeError(f"Unsupported credential: {self.credential}")

def _configure(self, **kwargs: Any) -> None:
self.user_agent_policy = kwargs.get("user_agent_policy") or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get("headers_policy") or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get("proxy_policy") or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get("logging_policy") or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get("http_logging_policy") or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get("custom_hook_policy") or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get("redirect_policy") or policies.RedirectPolicy(**kwargs)
self.retry_policy = kwargs.get("retry_policy") or policies.RetryPolicy(**kwargs)
self.authentication_policy = kwargs.get("authentication_policy")
if self.credential and not self.authentication_policy:
self.authentication_policy = policies.AzureKeyCredentialPolicy(
self.credential, "Ocp-Apim-Subscription-Key", **kwargs
)
self.authentication_policy = self._infer_policy(**kwargs)
Loading