Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import io
import logging
import uuid
from typing import Union, Any, Dict
from azure.core.credentials import AzureKeyCredential, TokenCredential
from azure.ai.evaluation._common.onedp import ProjectsClient as RestEvaluationServiceClient
Expand All @@ -17,6 +19,7 @@
RedTeamUpload,
)
from azure.storage.blob import ContainerClient
from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory, ErrorTarget
from .utils import upload

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -167,3 +170,63 @@ def update_red_team_run(self, *, name: str, red_team: RedTeamUpload, **kwargs):
update_run_response = self.rest_client.red_teams.upload_update_run(name=name, redteam=red_team, **kwargs)

return update_run_response

def test_storage_upload(self, **kwargs) -> bool:
"""Test storage account connectivity by performing a minimal upload operation.

This method validates that the storage account is accessible and the credentials
have the necessary permissions before proceeding with the full evaluation workflow.

:param kwargs: Additional keyword arguments to pass to the underlying API calls
:return: True if the test upload succeeds
:rtype: bool
:raises EvaluationException: If the storage account is inaccessible or lacks permissions
"""
LOGGER.debug("Testing storage account connectivity...")

try:
# Create a unique test identifier to avoid conflicts
test_id = str(uuid.uuid4())[:8]
test_name = f"connectivity-test-{test_id}"
test_version = "1"

# Start a pending upload to get storage credentials
start_pending_upload_response = self.rest_client.evaluation_results.start_pending_upload(
name=test_name,
version=test_version,
body=PendingUploadRequest(pending_upload_type=PendingUploadType.TEMPORARY_BLOB_REFERENCE),
**kwargs,
)

# Attempt to upload a minimal test blob directly (no temp files needed)
with ContainerClient.from_container_url(
start_pending_upload_response.blob_reference_for_consumption.credential.sas_uri
) as container_client:
# Use BytesIO for efficient in-memory test data
test_data = io.BytesIO(b"connectivity test")
container_client.upload_blob(data=test_data, name="test.txt", overwrite=True)
container_client.delete_blob(name="test.txt")

LOGGER.debug("Storage account connectivity test successful")
return True

except Exception as e:
LOGGER.error(f"Storage account connectivity test failed: {str(e)}")

# Re-raise with helpful context
error_msg = (
f"Failed to connect to Azure Blob Storage. Error: {str(e)}. "
"Please verify that:\n"
" 1. The storage account exists and is accessible\n"
" 2. Your credentials have the necessary permissions (Storage Blob Data Contributor role)\n"
" 3. Network access to the storage account is not blocked by firewall rules\n"
" 4. The Azure AI project is properly configured"
)

raise EvaluationException(
message=error_msg,
internal_message=f"Storage connectivity test failed: {e}",
target=ErrorTarget.RAI_CLIENT,
category=ErrorCategory.UPLOAD_ERROR,
blame=ErrorBlame.SYSTEM_ERROR,
)
Original file line number Diff line number Diff line change
Expand Up @@ -914,9 +914,26 @@ def upload(path: str, container_client: ContainerClient, logger=None):
logger.debug(f"File '{local}' uploaded successfully")

except Exception as e:
# Extract storage account information if available
storage_info = ""
try:
account_name = container_client.account_name if hasattr(container_client, "account_name") else "unknown"
storage_info = f" Storage account: {account_name}."
except Exception:
pass

error_msg = (
f"Failed to upload evaluation results to Azure Blob Storage.{storage_info} "
f"Error: {str(e)}. "
"Please verify that:\n"
" 1. The storage account exists and is accessible\n"
" 2. Your credentials have the necessary permissions (Storage Blob Data Contributor role)\n"
" 3. Network access to the storage account is not blocked by firewall rules"
)

raise EvaluationException(
message=f"Error uploading file: {e}",
internal_message=f"Error uploading file: {e}",
message=error_msg,
internal_message=f"Error uploading file to blob storage: {e}",
target=ErrorTarget.RAI_CLIENT,
category=ErrorCategory.UPLOAD_ERROR,
blame=ErrorBlame.SYSTEM_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,26 @@ def start_redteam_mlflow_run(

return eval_run

def test_storage_upload(self) -> bool:
"""Test storage account connectivity before starting the scan.

This method validates that storage upload will work by testing with the
appropriate client (OneDP or MLFlow) depending on project configuration.

:return: True if the test upload succeeds
:rtype: bool
:raises EvaluationException: If the storage account is inaccessible or lacks permissions
"""
if self._one_dp_project:
# For OneDP projects, test using the evaluation client
return self.generated_rai_client._evaluation_onedp_client.test_storage_upload()
else:
# For non-OneDP projects (MLFlow), we don't have a direct upload test
# Storage is tested when we create artifacts during log_artifact
# So we just return True here and let the actual upload fail if there are issues
self.logger.debug("Storage upload test skipped for non-OneDP project (will be tested during actual upload)")
return True

async def log_redteam_results_to_mlflow(
self,
redteam_result: RedTeamResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,17 @@ async def scan(
# Update result processor with the AI studio URL now that it's available
self.result_processor.ai_studio_url = self.mlflow_integration.ai_studio_url

# Test storage connectivity early to catch issues before running attacks
try:
tqdm.write("🔍 Validating storage account connectivity...")
self.mlflow_integration.test_storage_upload()
tqdm.write("✅ Storage account validation successful")
except Exception as e:
# Log the error and re-raise to stop the scan early
self.logger.error(f"Storage account validation failed: {str(e)}")
tqdm.write(f"❌ Storage account validation failed: {str(e)}")
raise

# Process strategies and execute scan
flattened_attack_strategies = get_flattened_attack_strategies(attack_strategies)
self._validate_strategies(flattened_attack_strategies)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
"""Unit tests for storage account validation functionality."""

import os
import tempfile
import unittest
from unittest.mock import Mock, patch
from azure.ai.evaluation._common.evaluation_onedp_client import EvaluationServiceOneDPClient
from azure.ai.evaluation._common.utils import upload
from azure.ai.evaluation._exceptions import EvaluationException
from azure.ai.evaluation.red_team._mlflow_integration import MLflowIntegration


class TestStorageValidation(unittest.TestCase):
"""Test cases for storage account validation."""

def setUp(self):
"""Set up test fixtures."""
self.mock_credential = Mock()
self.test_endpoint = "https://test-endpoint.azure.com"

@patch("azure.ai.evaluation._common.evaluation_onedp_client.RestEvaluationServiceClient")
def test_test_storage_upload_success(self, mock_rest_client_class):
"""Test successful storage upload validation."""
# Arrange
mock_rest_client = Mock()
mock_rest_client_class.return_value = mock_rest_client

mock_pending_upload_response = Mock()
mock_pending_upload_response.blob_reference_for_consumption = Mock()
mock_pending_upload_response.blob_reference_for_consumption.credential = Mock()
mock_pending_upload_response.blob_reference_for_consumption.credential.sas_uri = (
"https://test.blob.core.windows.net/container?sas_token"
)

mock_rest_client.evaluation_results.start_pending_upload.return_value = mock_pending_upload_response

# Mock ContainerClient
with patch(
"azure.ai.evaluation._common.evaluation_onedp_client.ContainerClient"
) as mock_container_client_class:
mock_container_client = Mock()
mock_container_client.__enter__ = Mock(return_value=mock_container_client)
mock_container_client.__exit__ = Mock(return_value=None)
mock_container_client_class.from_container_url.return_value = mock_container_client

# Mock the upload_blob method to succeed
mock_container_client.upload_blob = Mock()
# Act
client = EvaluationServiceOneDPClient(self.test_endpoint, self.mock_credential)
result = client.test_storage_upload()

# Assert
self.assertTrue(result)
mock_container_client.upload_blob.assert_called_once()

@patch("azure.ai.evaluation._common.evaluation_onedp_client.RestEvaluationServiceClient")
def test_test_storage_upload_failure(self, mock_rest_client_class):
"""Test storage upload validation failure."""
# Arrange
mock_rest_client = Mock()
mock_rest_client_class.return_value = mock_rest_client

# Simulate upload failure
mock_rest_client.evaluation_results.start_pending_upload.side_effect = Exception(
"Storage account not accessible"
)

# Act & Assert
client = EvaluationServiceOneDPClient(self.test_endpoint, self.mock_credential)
with self.assertRaises(EvaluationException) as context:
client.test_storage_upload()

# Verify error message contains helpful guidance
self.assertIn("storage", str(context.exception).lower())
self.assertIn("permissions", str(context.exception).lower())

def test_mlflow_integration_test_storage_upload_onedp(self):
"""Test MLflowIntegration storage validation for OneDP projects."""
# Arrange
mock_logger = Mock()
mock_generated_rai_client = Mock()
mock_evaluation_client = Mock()
mock_evaluation_client.test_storage_upload.return_value = True
mock_generated_rai_client._evaluation_onedp_client = mock_evaluation_client

mlflow_integration = MLflowIntegration(
logger=mock_logger, generated_rai_client=mock_generated_rai_client, one_dp_project=True
)

# Act
result = mlflow_integration.test_storage_upload()

# Assert
self.assertTrue(result)
mock_evaluation_client.test_storage_upload.assert_called_once()

def test_mlflow_integration_test_storage_upload_non_onedp(self):
"""Test MLflowIntegration storage validation for non-OneDP projects."""
# Arrange
mock_logger = Mock()
mock_generated_rai_client = Mock()

mlflow_integration = MLflowIntegration(
logger=mock_logger, generated_rai_client=mock_generated_rai_client, one_dp_project=False
)

# Act
result = mlflow_integration.test_storage_upload()

# Assert
# For non-OneDP projects, we skip the test and return True
self.assertTrue(result)


class TestUploadErrorMessages(unittest.TestCase):
"""Test cases for improved upload error messages."""

@patch("azure.ai.evaluation._common.utils.ContainerClient")
def test_upload_error_message_includes_guidance(self, mock_container_client_class):
"""Test that upload errors include helpful troubleshooting guidance."""
# Arrange
mock_container_client = Mock()
mock_container_client.account_name = "testaccount"
mock_container_client.upload_blob.side_effect = Exception("Connection refused")

# Create a temporary test file
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as tmp_file:
tmp_file.write("test content")
test_file_path = tmp_file.name

# Act & Assert
try:
with self.assertRaises(EvaluationException) as context:
upload(path=test_file_path, container_client=mock_container_client)

# Verify error message contains helpful guidance
error_message = str(context.exception)
self.assertIn("storage account", error_message.lower())
self.assertIn("permissions", error_message.lower())
self.assertIn("verify", error_message.lower())
finally:
# Clean up the temporary file
try:
os.unlink(test_file_path)
except Exception:
pass


if __name__ == "__main__":
unittest.main()
Loading