diff --git a/.github/check_entitlements.sh b/.github/check_entitlements.sh index 89bddaa..ea98f1a 100755 --- a/.github/check_entitlements.sh +++ b/.github/check_entitlements.sh @@ -1,15 +1,11 @@ #!/bin/bash - # Derive additional environment variables TOKEN_URL="${OIDC_OP_TOKEN_ENDPOINT}" OTDF_HOST_AND_PORT="${OPENTDF_PLATFORM_HOST}" OTDF_CLIENT="${OPENTDF_CLIENT_ID}" OTDF_CLIENT_SECRET="${OPENTDF_CLIENT_SECRET}" -# Enable debug mode -DEBUG=1 - echo "🔧 Environment Configuration:" echo " TOKEN_URL: ${TOKEN_URL}" echo " OTDF_HOST_AND_PORT: ${OTDF_HOST_AND_PORT}" @@ -28,6 +24,8 @@ get_token() { echo "🔐 Getting access token..." BEARER=$( get_token | jq -r '.access_token' ) +# NOTE: It's always okay to print this token, because it will +# only be valid / available in dummy / dev scenarios [[ "${DEBUG:-}" == "1" ]] && echo "Got Access Token: ${BEARER}" echo "" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 997a3f0..4f2fa77 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,90 +4,14 @@ import json import logging -import os -import subprocess import tempfile from pathlib import Path import pytest -from tests.support_cli_args import get_otdfctl_flags, get_platform_url +from tests.support_otdfctl_args import generate_tdf_files_for_target_mode logger = logging.getLogger(__name__) -# from tests.config_pydantic import CONFIG_TDF - -# Set up environment and configuration -original_env = os.environ.copy() -original_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" - -platform_url = get_platform_url() -otdfctl_flags = get_otdfctl_flags() - - -def _generate_target_mode_tdf( - input_file: Path, - output_file: Path, - target_mode: str, - creds_file: Path, - attributes: list[str] | None = None, - mime_type: str | None = None, -) -> None: - """ - Generate a TDF file using otdfctl with a specific target mode. - - Args: - input_file: Path to the input file to encrypt - output_file: Path where the TDF file should be created - target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") - creds_file: Path to credentials file - attributes: Optional list of attributes to apply - mime_type: Optional MIME type for the input file - """ - # Ensure output directory exists - output_file.parent.mkdir(parents=True, exist_ok=True) - - # Build otdfctl command - cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(creds_file), - *otdfctl_flags, - "--tdf-type", - "tdf3", - "--target-mode", - target_mode, - "-o", - str(output_file), - ] - - # Add optional parameters - if attributes: - for attr in attributes: - cmd.extend(["--attr", attr]) - - if mime_type: - cmd.extend(["--mime-type", mime_type]) - - # Add input file - cmd.append(str(input_file)) - - # Run otdfctl command - result = subprocess.run( - cmd, - capture_output=True, - text=True, - env=original_env, - ) - - if result.returncode != 0: - logger.error(f"otdfctl command failed: {result.stderr}") - raise Exception( - f"Failed to generate TDF with target mode {target_mode}: " - f"stdout={result.stdout}, stderr={result.stderr}" - ) @pytest.fixture(scope="session") @@ -118,79 +42,10 @@ def sample_input_files(test_data_dir): } -def _generate_tdf_files_for_target_mode( - target_mode: str, - temp_credentials_file: Path, - test_data_dir: Path, - sample_input_files: dict[str, Path], -) -> dict[str, Path]: - """ - Factory function to generate TDF files for a specific target mode. - - Args: - target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") - temp_credentials_file: Path to credentials file - test_data_dir: Base test data directory - sample_input_files: Dictionary of sample input files - - Returns: - Dictionary mapping file types to their TDF file paths - """ - output_dir = test_data_dir / target_mode - tdf_files = {} - - # Define the file generation configurations - file_configs = [ - { - "key": "text", - "input_key": "text", - "output_name": "sample_text.txt.tdf", - "mime_type": "text/plain", - }, - # { - # "key": "empty", - # "input_key": "empty", - # "output_name": "empty_file.txt.tdf", - # "mime_type": "text/plain", - # }, - { - "key": "binary", - "input_key": "binary", - "output_name": "sample_binary.png.tdf", - "mime_type": "image/png", - }, - { - "key": "with_attributes", - "input_key": "with_attributes", - "output_name": "sample_with_attributes.txt.tdf", - "mime_type": "text/plain", - }, - ] - - try: - for config in file_configs: - tdf_path = output_dir / config["output_name"] - _generate_target_mode_tdf( - sample_input_files[config["input_key"]], - tdf_path, - target_mode, - temp_credentials_file, - # attributes=[CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1] if config["key"] == "with_attributes" else None, # Temporarily disabled due to external KAS dependency - mime_type=config["mime_type"], - ) - tdf_files[config["key"]] = tdf_path - - return tdf_files - - except Exception as e: - logger.error(f"Error generating {target_mode} TDF files: {e}") - raise Exception(f"Failed to generate {target_mode} TDF files: {e}") from e - - @pytest.fixture(scope="session") def tdf_v4_2_2_files(temp_credentials_file, test_data_dir, sample_input_files): """Generate TDF files with target mode v4.2.2.""" - tdf_files = _generate_tdf_files_for_target_mode( + tdf_files = generate_tdf_files_for_target_mode( "v4.2.2", temp_credentials_file, test_data_dir, sample_input_files ) yield tdf_files @@ -199,7 +54,7 @@ def tdf_v4_2_2_files(temp_credentials_file, test_data_dir, sample_input_files): @pytest.fixture(scope="session") def tdf_v4_3_1_files(temp_credentials_file, test_data_dir, sample_input_files): """Generate TDF files with target mode v4.3.1.""" - tdf_files = _generate_tdf_files_for_target_mode( + tdf_files = generate_tdf_files_for_target_mode( "v4.3.1", temp_credentials_file, test_data_dir, sample_input_files ) yield tdf_files diff --git a/tests/integration/otdfctl_only/test_fixture_structure.py b/tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py similarity index 63% rename from tests/integration/otdfctl_only/test_fixture_structure.py rename to tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py index 3cd393c..ff341d0 100644 --- a/tests/integration/otdfctl_only/test_fixture_structure.py +++ b/tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py @@ -1,3 +1,5 @@ +from pathlib import Path + import pytest @@ -75,3 +77,49 @@ def test_sample_file_contents(sample_input_files): with open(attr_file) as f: content = f.read() assert "Classification: SECRET" in content + + +@pytest.mark.integration +def test_target_mode_fixtures_exist(all_target_mode_tdf_files): + """Test that target mode fixtures generate TDF files correctly.""" + # Check that we have both versions + assert "v4.2.2" in all_target_mode_tdf_files + assert "v4.3.1" in all_target_mode_tdf_files + + # Check each version has the expected file types + for version in ["v4.2.2", "v4.3.1"]: + tdf_files = all_target_mode_tdf_files[version] + + # Check all expected file types exist + expected_types = [ + "text", + "binary", + "with_attributes", + ] # Consider 'empty' as well + for file_type in expected_types: + assert file_type in tdf_files, f"Missing {file_type} TDF for {version}" + + # Check the TDF file exists and is not empty + tdf_path = tdf_files[file_type] + assert isinstance(tdf_path, Path) + assert tdf_path.exists(), f"TDF file does not exist: {tdf_path}" + assert tdf_path.stat().st_size > 0, f"TDF file is empty: {tdf_path}" + + # Check it's a valid ZIP file (TDF format) + with open(tdf_path, "rb") as f: + header = f.read(4) + assert header == b"PK\x03\x04", f"TDF file is not a valid ZIP: {tdf_path}" + + +@pytest.mark.integration +def test_v4_2_2_tdf_files(tdf_v4_2_2_files): + """Test that v4.2.2 TDF fixtures work independently.""" + assert "text" in tdf_v4_2_2_files + assert tdf_v4_2_2_files["text"].exists() + + +@pytest.mark.integration +def test_v4_3_1_tdf_files(tdf_v4_3_1_files): + """Test that v4.3.1 TDF fixtures work independently.""" + assert "text" in tdf_v4_3_1_files + assert tdf_v4_3_1_files["text"].exists() diff --git a/tests/integration/otdfctl_to_python/test_cli_comparison.py b/tests/integration/otdfctl_to_python/test_cli_comparison.py index 62ade58..03f56ed 100644 --- a/tests/integration/otdfctl_to_python/test_cli_comparison.py +++ b/tests/integration/otdfctl_to_python/test_cli_comparison.py @@ -8,9 +8,11 @@ import pytest -from tests.support_cli_args import get_platform_url - -platform_url = get_platform_url() +from tests.support_cli_args import build_cli_decrypt_command +from tests.support_otdfctl_args import ( + build_otdfctl_decrypt_command, + build_otdfctl_encrypt_command, +) @pytest.mark.integration @@ -35,20 +37,12 @@ def test_otdfctl_encrypt_python_decrypt(collect_server_logs, temp_credentials_fi cli_decrypt_output = temp_path / "decrypted-by-cli.txt" # Run otdfctl encrypt first to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path @@ -63,18 +57,11 @@ def test_otdfctl_encrypt_python_decrypt(collect_server_logs, temp_credentials_fi assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" # Now run otdfctl decrypt (this is the reference implementation) - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] + otdfctl_decrypt_cmd = build_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + ) otdfctl_decrypt_result = subprocess.run( otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path @@ -101,22 +88,11 @@ def test_otdfctl_encrypt_python_decrypt(collect_server_logs, temp_credentials_fi ) # Run our Python CLI decrypt on the same TDF - cli_decrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] + cli_decrypt_cmd = build_cli_decrypt_command( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + ) cli_decrypt_result = subprocess.run( cli_decrypt_cmd, @@ -203,20 +179,12 @@ def test_otdfctl_encrypt_otdfctl_decrypt(collect_server_logs, temp_credentials_f otdfctl_decrypt_output = temp_path / "otdfctl-roundtrip-decrypted.txt" # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path @@ -252,18 +220,11 @@ def test_otdfctl_encrypt_otdfctl_decrypt(collect_server_logs, temp_credentials_f assert tdf_header == b"PK\x03\x04", "otdfctl output is not a valid ZIP file" # Run otdfctl decrypt - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] + otdfctl_decrypt_cmd = build_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + ) otdfctl_decrypt_result = subprocess.run( otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path diff --git a/tests/integration/otdfctl_to_python/test_cli_decrypt.py b/tests/integration/otdfctl_to_python/test_cli_decrypt.py index b1eedcc..b6cb8e0 100644 --- a/tests/integration/otdfctl_to_python/test_cli_decrypt.py +++ b/tests/integration/otdfctl_to_python/test_cli_decrypt.py @@ -4,15 +4,13 @@ import logging import subprocess -import sys import tempfile from pathlib import Path import pytest -from tests.config_pydantic import CONFIG_TDF from tests.support_cli_args import ( - get_cli_flags, + build_cli_decrypt_command, ) logger = logging.getLogger(__name__) @@ -144,30 +142,17 @@ def _run_cli_decrypt(tdf_path: Path, creds_file: Path) -> Path | None: Returns the Path to the decrypted output file if successful, None if failed. """ - # Determine platform flags - platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL - cli_flags = get_cli_flags() - # Create a temporary output file with tempfile.NamedTemporaryFile(delete=False, suffix=".decrypted") as temp_file: output_path = Path(temp_file.name) try: # Build CLI command - cmd = [ - sys.executable, - "-m", - "otdf_python.cli", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(creds_file), - *cli_flags, - "decrypt", - str(tdf_path), - "-o", - str(output_path), - ] + cmd = build_cli_decrypt_command( + creds_file=creds_file, + input_file=tdf_path, + output_file=output_path, + ) # Run the CLI command result = subprocess.run( diff --git a/tests/integration/otdfctl_to_python/test_cli_inspect.py b/tests/integration/otdfctl_to_python/test_cli_inspect.py index 3acd0fb..b40dd66 100644 --- a/tests/integration/otdfctl_to_python/test_cli_inspect.py +++ b/tests/integration/otdfctl_to_python/test_cli_inspect.py @@ -2,18 +2,11 @@ Tests using target mode fixtures, for CLI integration testing. """ -import json import logging -import subprocess -import sys -from pathlib import Path import pytest -from tests.config_pydantic import CONFIG_TDF -from tests.support_cli_args import ( - get_cli_flags, -) +from tests.support_cli_args import run_cli_inspect logger = logging.getLogger(__name__) @@ -33,10 +26,10 @@ def test_cli_inspect_v4_2_2_vs_v4_3_1(all_target_mode_tdf_files, temp_credential v4_3_1_tdf = v4_3_1_files[file_type] # Inspect v4.2.2 TDF - v4_2_2_result = _run_cli_inspect(v4_2_2_tdf, temp_credentials_file) + v4_2_2_result = run_cli_inspect(v4_2_2_tdf, temp_credentials_file) # Inspect v4.3.1 TDF - v4_3_1_result = _run_cli_inspect(v4_3_1_tdf, temp_credentials_file) + v4_3_1_result = run_cli_inspect(v4_3_1_tdf, temp_credentials_file) # Both should succeed assert v4_2_2_result is not None, f"Failed to inspect v4.2.2 {file_type} TDF" @@ -109,7 +102,7 @@ def test_cli_inspect_different_file_types( tdf_path = tdf_files[file_type] # Inspect the TDF - result = _run_cli_inspect(tdf_path, temp_credentials_file) + result = run_cli_inspect(tdf_path, temp_credentials_file) assert result is not None, ( f"Failed to inspect {file_type} TDF, TDF version {version}" @@ -126,46 +119,3 @@ def test_cli_inspect_different_file_types( "keyAccess" in result["manifest"] or "encryptionInformation" in result["manifest"] ) - - -def _run_cli_inspect(tdf_path: Path, creds_file: Path) -> dict | None: - """ - Helper function to run Python CLI inspect command and return parsed JSON result. - - This demonstrates how the CLI inspect functionality could be tested - with the new fixtures. - """ - # Determine platform flags - platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL - cli_flags = get_cli_flags() - - # Build CLI command - cmd = [ - sys.executable, - "-m", - "otdf_python.cli", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(creds_file), - *cli_flags, - "inspect", - str(tdf_path), - ] - - try: - # Run the CLI command - result = subprocess.run( - cmd, - capture_output=True, - text=True, - check=True, - cwd=Path(__file__).parent.parent.parent, # Project root - ) - - # Parse JSON output - return json.loads(result.stdout) - - except (subprocess.CalledProcessError, json.JSONDecodeError) as e: - logger.error(f"CLI inspect failed for {tdf_path}: {e}") - raise Exception(f"Failed to inspect TDF {tdf_path}: {e}") from e diff --git a/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py b/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py index 6c58c32..c25c1e9 100644 --- a/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py +++ b/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py @@ -14,9 +14,7 @@ TDFReader, ) from tests.config_pydantic import CONFIG_TDF -from tests.support_cli_args import get_platform_url - -platform_url = get_platform_url() +from tests.support_otdfctl_args import build_otdfctl_encrypt_command class TestTDFReaderIntegration: @@ -40,26 +38,18 @@ def test_read_otdfctl_created_tdf_structure(self, temp_credentials_file): otdfctl_output = temp_path / "test-reader.txt.tdf" # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_output), - ] + otdfctl_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_cmd, capture_output=True, text=True, cwd=temp_path ) - # If otdfctl fails, skip the test (might be server issues) + # If otdfctl fails, fail fast if otdfctl_encrypt_result.returncode != 0: raise Exception( f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}" @@ -131,29 +121,19 @@ def test_read_otdfctl_tdf_with_attributes(self, temp_credentials_file): otdfctl_output = temp_path / "input.txt.tdf" # Run otdfctl encrypt with attributes - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - "--attr", - CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1, - str(input_file), - "-o", - str(otdfctl_output), - ] + otdfctl_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + attributes=[CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1], + ) otdfctl_result = subprocess.run( otdfctl_cmd, capture_output=True, text=True, cwd=temp_path ) - # If otdfctl fails, skip the test - # assert otdfctl_result.returncode == 0, "otdfctl encrypt failed" + # If otdfctl fails, fail fast if otdfctl_result.returncode != 0: raise Exception( f"otdfctl encrypt with attributes failed: {otdfctl_result.stderr}" @@ -240,20 +220,12 @@ def test_read_multiple_otdfctl_files(self, temp_credentials_file): output_file = temp_path / f"{test_case['name']}.tdf" # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - test_case["mime_type"], - str(input_file), - "-o", - str(output_file), - ] + otdfctl_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=output_file, + mime_type=test_case["mime_type"], + ) otdfctl_result = subprocess.run( otdfctl_cmd, capture_output=True, text=True, cwd=temp_path diff --git a/tests/integration/test_cli_integration.py b/tests/integration/test_cli_integration.py index 3a3765e..62c0f8c 100644 --- a/tests/integration/test_cli_integration.py +++ b/tests/integration/test_cli_integration.py @@ -2,24 +2,25 @@ Integration Test CLI functionality """ -import os import subprocess -import sys import tempfile from pathlib import Path import pytest -from tests.support_cli_args import get_platform_url - -original_env = os.environ.copy() -original_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" - -platform_url = get_platform_url() +from tests.support_cli_args import build_cli_decrypt_command, build_cli_encrypt_command +from tests.support_common import ( + get_testing_environ, + handle_subprocess_error, +) +from tests.support_otdfctl_args import ( + build_otdfctl_decrypt_command, + build_otdfctl_encrypt_command, +) @pytest.mark.integration -def test_cli_decrypt_otdfctl_tdf(temp_credentials_file): +def test_cli_decrypt_otdfctl_tdf(collect_server_logs, temp_credentials_file): """ Test that the Python CLI can successfully decrypt TDF files created by otdfctl. """ @@ -41,64 +42,52 @@ def test_cli_decrypt_otdfctl_tdf(temp_credentials_file): cli_decrypt_output = temp_path / "decrypted-by-cli.txt" # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - # If otdfctl fails to encrypt, fail fast - if otdfctl_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Verify the TDF file was created assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" - # Run our Python CLI decrypt on the otdfctl-created TDF - cli_decrypt_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] + cli_decrypt_cmd = build_cli_decrypt_command( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + ) + # Run our Python CLI decrypt on the otdfctl-created TDF cli_decrypt_result = subprocess.run( cli_decrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - # Check that our CLI succeeded - assert cli_decrypt_result.returncode == 0, ( - f"Python CLI decrypt failed: {cli_decrypt_result.stderr}" + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", ) # Verify the decrypted file was created @@ -141,93 +130,75 @@ def test_otdfctl_decrypt_comparison(collect_server_logs, temp_credentials_file): cli_decrypt_output = temp_path / "decrypted-by-cli.txt" # Run otdfctl encrypt first to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - # If otdfctl fails to encrypt, fail fast - if otdfctl_encrypt_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Verify the TDF file was created assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" # Now run otdfctl decrypt (this is the reference implementation) - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] + otdfctl_decrypt_cmd = build_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + ) otdfctl_decrypt_result = subprocess.run( otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - # Check that otdfctl decrypt succeeded - assert otdfctl_decrypt_result.returncode == 0, ( - f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}" + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", ) - # Run our Python CLI decrypt on the same TDF - cli_decrypt_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] + cli_decrypt_cmd = build_cli_decrypt_command( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + ) + # Run our Python CLI decrypt on the same TDF cli_decrypt_result = subprocess.run( cli_decrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - # Check that our CLI succeeded - if cli_decrypt_result.returncode != 0: - logs = collect_server_logs() - print(f"Server logs when Python CLI decrypt failed:\n{logs}") - raise Exception(f"Python CLI decrypt failed: {cli_decrypt_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", + ) # Verify both decrypted files were created assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" @@ -289,32 +260,27 @@ def test_otdfctl_encrypt_decrypt_roundtrip(collect_server_logs, temp_credentials otdfctl_decrypt_output = temp_path / "otdfctl-roundtrip-decrypted.txt" # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - # If otdfctl fails to encrypt, fail fast - if otdfctl_encrypt_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Verify the TDF file was created assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" @@ -326,32 +292,26 @@ def test_otdfctl_encrypt_decrypt_roundtrip(collect_server_logs, temp_credentials assert tdf_header == b"PK\x03\x04", "otdfctl output is not a valid ZIP file" # Run otdfctl decrypt - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] + otdfctl_decrypt_cmd = build_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + ) otdfctl_decrypt_result = subprocess.run( otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - # If otdfctl fails to decrypt, fail fast - if otdfctl_decrypt_result.returncode != 0: - logs = collect_server_logs() - print(f"Server logs when otdfctl decrypt failed:\n{logs}") - raise Exception(f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", + ) # Verify the decrypted file was created assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" @@ -402,58 +362,43 @@ def test_cli_encrypt_integration(collect_server_logs, temp_credentials_file): cli_output = temp_path / "hello-world-cli.txt.tdf" # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_output), - ] + otdfctl_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + ) otdfctl_result = subprocess.run( otdfctl_cmd, capture_output=True, text=True, cwd=temp_path ) - # If otdfctl fails, skip the test (might be server issues) - if otdfctl_result.returncode != 0: - raise Exception(f"otdfctl failed: {otdfctl_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) - # Run our Python CLI encrypt - cli_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "encrypt", - "--mime-type", - "text/plain", - "--container-type", - "tdf", # to match otdfctl behavior - str(input_file), - "-o", - str(cli_output), - ] + cli_cmd = build_cli_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=cli_output, + mime_type="text/plain", + attributes=None, + ) + # Run our Python CLI encrypt cli_result = subprocess.run( cli_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent ) - # Check that our CLI succeeded - if cli_result.returncode != 0: - logs = collect_server_logs() - print(f"Server logs when Python CLI encrypt failed:\n{logs}") - raise Exception(f"Python CLI failed: {cli_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=cli_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI encrypt", + ) # Both output files should exist assert otdfctl_output.exists(), "otdfctl output file does not exist" diff --git a/tests/integration/test_cli_tdf_validation.py b/tests/integration/test_cli_tdf_validation.py index 9e9f971..9d6861f 100644 --- a/tests/integration/test_cli_tdf_validation.py +++ b/tests/integration/test_cli_tdf_validation.py @@ -3,7 +3,6 @@ """ import json -import os import subprocess import tempfile import zipfile @@ -13,18 +12,21 @@ from otdf_python.tdf_reader import TDF_MANIFEST_FILE_NAME, TDF_PAYLOAD_FILE_NAME from tests.support_cli_args import ( + build_cli_decrypt_command, + build_cli_encrypt_command, get_cli_flags, - get_otdfctl_flags, - get_platform_url, ) - -original_env = os.environ.copy() -original_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" +from tests.support_common import ( + get_testing_environ, + handle_subprocess_error, +) +from tests.support_otdfctl_args import ( + build_otdfctl_decrypt_command, + build_otdfctl_encrypt_command, +) # Determine CLI flags based on platform URL cli_flags = get_cli_flags() -platform_url = get_platform_url() -otdfctl_flags = get_otdfctl_flags() def _create_test_input_file(temp_path: Path, content: str) -> Path: @@ -243,21 +245,8 @@ def _validate_tdf_zip_structure(tdf_path: Path) -> None: print("=" * 50) -def _handle_subprocess_error( - result: subprocess.CompletedProcess, collect_server_logs, tool_name: str -) -> None: - """Handle subprocess errors with proper server log collection and error reporting.""" - if result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when {tool_name} failed:\n{logs}") - - assert result.returncode == 0, f"{tool_name} failed: {result.stderr}" - - def _run_otdfctl_decrypt( tdf_path: Path, - platform_url: str, creds_file: Path, temp_path: Path, collect_server_logs, @@ -266,28 +255,19 @@ def _run_otdfctl_decrypt( """Run otdfctl decrypt on a TDF file and verify the decrypted content matches expected.""" decrypt_output = temp_path / f"{tdf_path.stem}_decrypted.txt" - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - str(tdf_path), - "--host", - platform_url, - "--with-client-creds-file", - str(creds_file), - *otdfctl_flags, - "-o", - str(decrypt_output), - ] + otdfctl_decrypt_cmd = build_otdfctl_decrypt_command( + creds_file=creds_file, tdf_file=tdf_path, output_file=decrypt_output + ) otdfctl_decrypt_result = subprocess.run( otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( otdfctl_decrypt_result, collect_server_logs, "otdfctl decrypt" ) @@ -310,7 +290,6 @@ def _run_otdfctl_decrypt( def _run_python_cli_decrypt( tdf_path: Path, - platform_url: str, creds_file: Path, temp_path: Path, collect_server_logs, @@ -319,32 +298,21 @@ def _run_python_cli_decrypt( """Run Python CLI decrypt on a TDF file and verify the decrypted content matches expected.""" decrypt_output = temp_path / f"{tdf_path.stem}_python_decrypted.txt" - python_decrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(creds_file), - *cli_flags, - "decrypt", - str(tdf_path), - "-o", - str(decrypt_output), - ] + python_decrypt_cmd = build_cli_decrypt_command( + creds_file=creds_file, + input_file=tdf_path, + output_file=decrypt_output, + ) python_decrypt_result = subprocess.run( python_decrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( python_decrypt_result, collect_server_logs, "Python CLI decrypt" ) @@ -381,31 +349,23 @@ def test_otdfctl_encrypt_with_validation(collect_server_logs, temp_credentials_f otdfctl_tdf_output = temp_path / "otdfctl_test.txt.tdf" # Run otdfctl encrypt to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *otdfctl_flags, - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) # Handle any encryption errors - _handle_subprocess_error( + handle_subprocess_error( otdfctl_encrypt_result, collect_server_logs, "otdfctl encrypt" ) @@ -416,7 +376,6 @@ def test_otdfctl_encrypt_with_validation(collect_server_logs, temp_credentials_f # Test that the TDF can be decrypted successfully _run_otdfctl_decrypt( otdfctl_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -442,36 +401,23 @@ def test_python_encrypt(collect_server_logs, temp_credentials_file): # Define TDF file created by Python CLI python_tdf_output = temp_path / "python_cli_test.txt.tdf" - # Run Python CLI encrypt to create a TDF file - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] + python_encrypt_cmd = build_cli_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + ) + # Run Python CLI encrypt to create a TDF file python_encrypt_result = subprocess.run( python_encrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) # Handle any encryption errors - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, "Python CLI encrypt" ) @@ -482,7 +428,6 @@ def test_python_encrypt(collect_server_logs, temp_credentials_file): # Test that the TDF can be decrypted by otdfctl _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -511,30 +456,22 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): otdfctl_tdf_output = temp_path / "otdfctl_for_python_decrypt.txt.tdf" # Encrypt with otdfctl - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *otdfctl_flags, - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] + otdfctl_encrypt_cmd = build_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + ) otdfctl_encrypt_result = subprocess.run( otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( otdfctl_encrypt_result, collect_server_logs, "otdfctl encrypt (cross-tool test)", @@ -543,7 +480,6 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): # Decrypt with Python CLI _run_python_cli_decrypt( otdfctl_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -554,34 +490,21 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): python_tdf_output = temp_path / "python_for_otdfctl_decrypt.txt.tdf" # Encrypt with Python CLI - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] + python_encrypt_cmd = build_cli_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + ) python_encrypt_result = subprocess.run( python_encrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, "Python CLI encrypt (cross-tool test)", @@ -590,7 +513,6 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): # Decrypt with otdfctl _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -629,34 +551,21 @@ def test_different_content_types(collect_server_logs, temp_credentials_file): # Test with Python CLI python_tdf_output = temp_path / f"python_{filename}.tdf" - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] + python_encrypt_cmd = build_cli_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + ) python_encrypt_result = subprocess.run( python_encrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, f"Python CLI encrypt ({filename})", @@ -668,7 +577,6 @@ def test_different_content_types(collect_server_logs, temp_credentials_file): # Decrypt and validate content _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -705,34 +613,21 @@ def test_different_content_types_empty(collect_server_logs, temp_credentials_fil # Test with Python CLI python_tdf_output = temp_path / f"python_{filename}.tdf" - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] + python_encrypt_cmd = build_cli_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + ) python_encrypt_result = subprocess.run( python_encrypt_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent, - env=original_env, + env=get_testing_environ(), ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, f"Python CLI encrypt ({filename})", @@ -744,7 +639,6 @@ def test_different_content_types_empty(collect_server_logs, temp_credentials_fil # Decrypt and validate content _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, diff --git a/tests/integration/test_target_mode_fixtures.py b/tests/integration/test_target_mode_fixtures.py deleted file mode 100644 index 8772e7f..0000000 --- a/tests/integration/test_target_mode_fixtures.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -Test target mode TDF fixtures. -""" - -from pathlib import Path - -import pytest - - -@pytest.mark.integration -def test_target_mode_fixtures_exist(all_target_mode_tdf_files): - """Test that target mode fixtures generate TDF files correctly.""" - # Check that we have both versions - assert "v4.2.2" in all_target_mode_tdf_files - assert "v4.3.1" in all_target_mode_tdf_files - - # Check each version has the expected file types - for version in ["v4.2.2", "v4.3.1"]: - tdf_files = all_target_mode_tdf_files[version] - - # Check all expected file types exist - expected_types = [ - "text", - "binary", - "with_attributes", - ] # Consider 'empty' as well - for file_type in expected_types: - assert file_type in tdf_files, f"Missing {file_type} TDF for {version}" - - # Check the TDF file exists and is not empty - tdf_path = tdf_files[file_type] - assert isinstance(tdf_path, Path) - assert tdf_path.exists(), f"TDF file does not exist: {tdf_path}" - assert tdf_path.stat().st_size > 0, f"TDF file is empty: {tdf_path}" - - # Check it's a valid ZIP file (TDF format) - with open(tdf_path, "rb") as f: - header = f.read(4) - assert header == b"PK\x03\x04", f"TDF file is not a valid ZIP: {tdf_path}" - - -@pytest.mark.integration -def test_v4_2_2_tdf_files(tdf_v4_2_2_files): - """Test that v4.2.2 TDF fixtures work independently.""" - assert "text" in tdf_v4_2_2_files - assert tdf_v4_2_2_files["text"].exists() - - -@pytest.mark.integration -def test_v4_3_1_tdf_files(tdf_v4_3_1_files): - """Test that v4.3.1 TDF fixtures work independently.""" - assert "text" in tdf_v4_3_1_files - assert tdf_v4_3_1_files["text"].exists() diff --git a/tests/support_cli_args.py b/tests/support_cli_args.py index da92a0f..4eca25f 100644 --- a/tests/support_cli_args.py +++ b/tests/support_cli_args.py @@ -1,47 +1,142 @@ -from tests.config_pydantic import CONFIG_TDF +""" +Support functions for constructing CLI arguments for this project's (Python) CLI. +""" +import json +import logging +import subprocess +import sys +from pathlib import Path -def get_platform_url() -> str: - # Get platform configuration - platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL - if not platform_url: - # Fail fast if OPENTDF_PLATFORM_URL is not set - raise Exception( - "OPENTDF_PLATFORM_URL must be set in config for integration tests" - ) - return platform_url +from tests.config_pydantic import CONFIG_TDF +from tests.support_common import get_platform_url + +logger = logging.getLogger(__name__) -def get_otdfctl_flags() -> list: +def get_cli_flags() -> list[str]: """ - Determine otdfctl flags based on platform URL + Determine (Python) CLI flags based on platform URL """ platform_url = get_platform_url() - otdfctl_flags = [] + cli_flags = [] + if platform_url.startswith("http://"): - # otdfctl doesn't have a --plaintext flag, just omit --tls-no-verify for HTTP - pass + cli_flags = ["--plaintext"] else: # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True if CONFIG_TDF.INSECURE_SKIP_VERIFY: - otdfctl_flags = ["--tls-no-verify"] + cli_flags = ["--insecure"] - return otdfctl_flags + return cli_flags -def get_cli_flags() -> list: +def run_cli_inspect(tdf_path: Path, creds_file: Path) -> dict: """ - Determine Python (cli) flags based on platform URL + Helper function to run Python CLI inspect command and return parsed JSON result. + + This demonstrates how the CLI inspect functionality could be tested + with the new fixtures. """ - platform_url = get_platform_url() - cli_flags = [] - if platform_url.startswith("http://"): - cli_flags = ["--plaintext"] - # otdfctl doesn't have a --plaintext flag, just omit --tls-no-verify for HTTP - else: - # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True - if CONFIG_TDF.INSECURE_SKIP_VERIFY: - cli_flags = ["--insecure"] # equivalent to --tls-no-verify + # Build CLI command + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *get_cli_flags(), + "inspect", + str(tdf_path), + ] - return cli_flags + try: + # Run the CLI command + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + cwd=Path(__file__).parent.parent, # Project root + ) + + # Parse JSON output + return json.loads(result.stdout) + + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + logger.error(f"CLI inspect failed for {tdf_path}: {e}") + raise Exception(f"Failed to inspect TDF {tdf_path}: {e}") from e + + +def build_cli_decrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, +) -> list[str]: + """Build CLI decrypt command.""" + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *get_cli_flags(), + "decrypt", + str(input_file), + "-o", + str(output_file), + ] + return cmd + + +# def run_cli_decrypt() -> subprocess.CompletedProcess + + +def build_cli_encrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + container_type: str = "tdf", +) -> list[str]: + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *get_cli_flags(), + "encrypt", + "--mime-type", + mime_type, + "--container-type", + container_type, + ] + + # Add attributes if provided + if attributes: + for attr in attributes: + cmd.extend(["--attr", attr]) + + cmd.extend( + [ + str(input_file), + "-o", + str(output_file), + ] + ) + + return cmd + + +# def run_cli_encrypt() -> subprocess.CompletedProcess diff --git a/tests/support_common.py b/tests/support_common.py new file mode 100644 index 0000000..362bc18 --- /dev/null +++ b/tests/support_common.py @@ -0,0 +1,46 @@ +import logging +import subprocess + +import pytest + +from tests.config_pydantic import CONFIG_TDF + +logger = logging.getLogger(__name__) + + +def get_platform_url() -> str: + # Get platform configuration + platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL + if not platform_url: + # Fail fast if OPENTDF_PLATFORM_URL is not set + raise Exception( + "OPENTDF_PLATFORM_URL must be set in config for integration tests" + ) + return platform_url + + +def handle_subprocess_error( + result: subprocess.CompletedProcess, collect_server_logs, scenario_name: str +) -> None: + """Handle subprocess errors with proper server log collection and error reporting.""" + if result.returncode != 0: + # Collect server logs for debugging + logs = collect_server_logs() + print(f"Server logs when '{scenario_name}' failed:\n{logs}") + + pytest.fail( + f"Scenario failed: '{scenario_name}': " + f"stdout={result.stdout}, stderr={result.stderr}" + ) + + +def get_testing_environ() -> dict | None: + """ + Set up environment and configuration + + TODO: YAGNI: this is a hook we could use to modify all testing environments, e.g. + env = os.environ.copy() + env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" + return env + """ + return None diff --git a/tests/support_otdfctl.py b/tests/support_otdfctl.py index c872165..1ec0a2e 100644 --- a/tests/support_otdfctl.py +++ b/tests/support_otdfctl.py @@ -13,17 +13,12 @@ def check_for_otdfctl(): an exception if the otdfctl command is not found. """ - # TODO Consider setting GRPC_ENFORCE_ALPN_ENABLED to false as needed - # test_env = os.environ.copy() - # test_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" - # Check if otdfctl is available try: subprocess.run( ["otdfctl", "--version"], capture_output=True, check=True, - # env=test_env ) except (subprocess.CalledProcessError, FileNotFoundError): raise Exception( diff --git a/tests/support_otdfctl_args.py b/tests/support_otdfctl_args.py new file mode 100644 index 0000000..75982df --- /dev/null +++ b/tests/support_otdfctl_args.py @@ -0,0 +1,227 @@ +""" +Support functions for constructing CLI arguments for otdfctl CLI. +""" + +import logging +import subprocess +from pathlib import Path + +from tests.config_pydantic import CONFIG_TDF +from tests.support_common import get_platform_url, get_testing_environ + +logger = logging.getLogger(__name__) + + +def get_otdfctl_flags() -> list[str]: + """ + Determine otdfctl flags based on platform URL + """ + platform_url = get_platform_url() + otdfctl_flags = [] + if platform_url.startswith("http://"): + # otdfctl doesn't have a --plaintext flag, just omit --tls-no-verify for HTTP + pass + else: + # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True + if CONFIG_TDF.INSECURE_SKIP_VERIFY: + otdfctl_flags = ["--tls-no-verify"] + + return otdfctl_flags + + +def get_otdfctl_base_command( + creds_file: Path, platform_url: str | None = None +) -> list[str]: + """Get base otdfctl command with common flags.""" + base_cmd = [ + "otdfctl", + "--host", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + ] + + # Add platform-specific flags + base_cmd.extend(get_otdfctl_flags()) + + return base_cmd + + +def build_otdfctl_encrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + tdf_type: str | None = None, + target_mode: str | None = None, +) -> list[str]: + """Build otdfctl encrypt command. + + Args: + platform_url: Platform URL like "http://localhost:8080" + creds_file: Path to credentials file + input_file: Path to the input file to encrypt + output_file: Path where the TDF file should be created + mime_type: Optional MIME type for the input file + attributes: Optional list of attributes to apply + tdf_type: TDF type (e.g., "tdf3", "nano") + target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") + """ + + cmd = get_otdfctl_base_command(creds_file, platform_url) + cmd.append("encrypt") + cmd.extend(["--mime-type", mime_type]) + + # Add attributes if provided + if attributes: + for attr in attributes: + cmd.extend(["--attr", attr]) + + if tdf_type: + cmd.extend( + [ + "--tdf-type", + tdf_type, + ] + ) + + if target_mode: + cmd.extend(["--target-mode", target_mode]) + + cmd.extend( + [ + str(input_file), + "-o", + str(output_file), + ] + ) + return cmd + + +def build_otdfctl_decrypt_command( + creds_file: Path, tdf_file: Path, output_file: Path, platform_url: str | None = None +) -> list[str]: + """Build otdfctl decrypt command.""" + cmd = get_otdfctl_base_command(creds_file, platform_url) + cmd.extend( + [ + "decrypt", + str(tdf_file), + "-o", + str(output_file), + ] + ) + + return cmd + + +def _generate_target_mode_tdf( + input_file: Path, + output_file: Path, + target_mode: str, + creds_file: Path, + attributes: list[str] | None = None, + mime_type: str | None = None, +) -> None: + # Ensure output directory exists + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Build otdfctl command + cmd = build_otdfctl_encrypt_command( + platform_url=get_platform_url(), + creds_file=creds_file, + input_file=input_file, + output_file=output_file, + mime_type=mime_type if mime_type else "text/plain", + attributes=attributes if attributes else None, + tdf_type="tdf3", + target_mode=target_mode, + ) + + # Run otdfctl command + result = subprocess.run( + cmd, + capture_output=True, + text=True, + env=get_testing_environ(), + ) + + if result.returncode != 0: + logger.error(f"otdfctl command failed: {result.stderr}") + raise Exception( + f"Failed to generate TDF with target mode {target_mode}: " + f"stdout={result.stdout}, stderr={result.stderr}" + ) + + +def generate_tdf_files_for_target_mode( + target_mode: str, + temp_credentials_file: Path, + test_data_dir: Path, + sample_input_files: dict[str, Path], +) -> dict[str, Path]: + """ + Factory function to generate TDF files for a specific target mode. + + Args: + target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") + temp_credentials_file: Path to credentials file + test_data_dir: Base test data directory + sample_input_files: Dictionary of sample input files + + Returns: + Dictionary mapping file types to their TDF file paths + """ + output_dir = test_data_dir / target_mode + tdf_files = {} + + # Define the file generation configurations + file_configs = [ + { + "key": "text", + "input_key": "text", + "output_name": "sample_text.txt.tdf", + "mime_type": "text/plain", + }, + # { + # "key": "empty", + # "input_key": "empty", + # "output_name": "empty_file.txt.tdf", + # "mime_type": "text/plain", + # }, + { + "key": "binary", + "input_key": "binary", + "output_name": "sample_binary.png.tdf", + "mime_type": "image/png", + }, + { + "key": "with_attributes", + "input_key": "with_attributes", + "output_name": "sample_with_attributes.txt.tdf", + "mime_type": "text/plain", + }, + ] + + try: + for config in file_configs: + tdf_path = output_dir / config["output_name"] + _generate_target_mode_tdf( + sample_input_files[config["input_key"]], + tdf_path, + target_mode, + temp_credentials_file, + attributes=[CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1] + if config["key"] == "with_attributes" + else None, + mime_type=config["mime_type"], + ) + tdf_files[config["key"]] = tdf_path + + return tdf_files + + except Exception as e: + logger.error(f"Error generating {target_mode} TDF files: {e}") + raise Exception(f"Failed to generate {target_mode} TDF files: {e}") from e