From c5c5142e74fba5e959002eaf97f08e7e4fe440e6 Mon Sep 17 00:00:00 2001 From: b-long Date: Wed, 1 Oct 2025 20:45:14 -0400 Subject: [PATCH 1/4] fix: update test_nanotdf.py --- tests/test_nanotdf.py | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/tests/test_nanotdf.py b/tests/test_nanotdf.py index 1517d1e..31db9fc 100644 --- a/tests/test_nanotdf.py +++ b/tests/test_nanotdf.py @@ -35,9 +35,6 @@ def test_nanotdf_invalid_magic(): nanotdf.read_nanotdf(bad_bytes, config) -@pytest.mark.skip( - "This test is skipped because NanoTDF encryption/decryption is not implemented yet." -) @pytest.mark.integration def test_nanotdf_integration_encrypt_decrypt(): # Load environment variables for integration @@ -47,24 +44,16 @@ def test_nanotdf_integration_encrypt_decrypt(): # Create KAS info from configuration kas_info = KASInfo(url=CONFIG_TDF.KAS_ENDPOINT) - # Create KAS client with SSL verification disabled for testing - # from otdf_python.kas_client import KASClient - # client = KASClient( - # kas_url=CONFIG_TDF.KAS_ENDPOINT, - # verify_ssl=not CONFIG_TDF.INSECURE_SKIP_VERIFY, - # use_plaintext=bool(CONFIG_TDF.OPENTDF_PLATFORM_URL.startswith("http://")), - # ) - nanotdf = NanoTDF() data = b"test data" - config = NanoTDFConfig(kas_info_list=[kas_info]) - # These will raise NotImplementedError until implemented - try: - nanotdf_bytes = nanotdf.create_nanotdf(data, config) - except NotImplementedError: - pytest.skip("NanoTDF encryption not implemented yet.") - try: - decrypted = nanotdf.read_nanotdf(nanotdf_bytes, config) - except NotImplementedError: - pytest.skip("NanoTDF decryption not implemented yet.") + + # Generate a key and include it in config for both encrypt and decrypt + # Note: In a real scenario with KAS integration, the key would be wrapped + # and unwrapped via KAS. For now, we're testing the basic encrypt/decrypt flow. + key = secrets.token_bytes(32) + config = NanoTDFConfig(kas_info_list=[kas_info], cipher=key.hex()) + + # Create and read NanoTDF + nanotdf_bytes = nanotdf.create_nanotdf(data, config) + decrypted = nanotdf.read_nanotdf(nanotdf_bytes, config) assert decrypted == data From f90eec0339a372a5bf7161a10b84878c50434189 Mon Sep 17 00:00:00 2001 From: b-long Date: Wed, 1 Oct 2025 21:23:17 -0400 Subject: [PATCH 2/4] fix: work-in-progress - add broken tests --- src/otdf_python/ecc_mode.py | 12 + src/otdf_python/nanotdf.py | 17 +- .../test_nanotdf_cli_comparison.py | 375 ++++++++++++++++++ .../test_python_nanotdf_only.py | 104 +++++ 4 files changed, 506 insertions(+), 2 deletions(-) create mode 100644 tests/integration/otdfctl_to_python/test_nanotdf_cli_comparison.py create mode 100644 tests/integration/otdfctl_to_python/test_python_nanotdf_only.py diff --git a/src/otdf_python/ecc_mode.py b/src/otdf_python/ecc_mode.py index 95b4714..6dfdbbb 100644 --- a/src/otdf_python/ecc_mode.py +++ b/src/otdf_python/ecc_mode.py @@ -30,3 +30,15 @@ def get_ec_compressed_pubkey_size(curve_type: int) -> int: def get_ecc_mode_as_byte(self) -> int: # Most significant bit: use_ecdsa_binding, lower 3 bits: curve_mode return ((1 if self.use_ecdsa_binding else 0) << 7) | (self.curve_mode & 0x07) + + @staticmethod + def from_string(curve_str: str) -> "ECCMode": + """Create ECCMode from curve string like 'secp256r1' or 'secp384r1'.""" + curve_map = { + "secp256r1": 0, + "secp384r1": 1, + "secp521r1": 2, + "secp256k1": 3, + } + curve_mode = curve_map.get(curve_str.lower(), 0) + return ECCMode(curve_mode, False) diff --git a/src/otdf_python/nanotdf.py b/src/otdf_python/nanotdf.py index 15d7725..140587b 100644 --- a/src/otdf_python/nanotdf.py +++ b/src/otdf_python/nanotdf.py @@ -313,7 +313,7 @@ def create_nano_tdf( output_stream.write(nano_tdf_data) return len(header_bytes) + len(nano_tdf_data) - def read_nano_tdf( + def read_nano_tdf( # noqa: C901 self, nano_tdf_data: bytes | BytesIO, output_stream: BinaryIO, @@ -388,7 +388,20 @@ def read_nano_tdf( key = asym.decrypt(wrapped_key) ciphertext = payload[3 : -(2 + wrapped_key_len)] else: - key = config.get("key") + # No wrapped key - need symmetric key from config + key = None + if isinstance(config, dict): + key = config.get("key") + elif ( + config + and hasattr(config, "cipher") + and config.cipher + and isinstance(config.cipher, str) + and all(c in "0123456789abcdefABCDEF" for c in config.cipher) + ): + # Try to get key from cipher field if it's hex + key = bytes.fromhex(config.cipher) + if not key: raise InvalidNanoTDFConfig("Missing decryption key in config.") ciphertext = payload[3:-2] diff --git a/tests/integration/otdfctl_to_python/test_nanotdf_cli_comparison.py b/tests/integration/otdfctl_to_python/test_nanotdf_cli_comparison.py new file mode 100644 index 0000000..ce5dac1 --- /dev/null +++ b/tests/integration/otdfctl_to_python/test_nanotdf_cli_comparison.py @@ -0,0 +1,375 @@ +""" +Integration tests for NanoTDF using otdfctl and Python CLI interoperability. + +These tests verify that: +1. otdfctl can encrypt to NanoTDF and Python can decrypt +2. Python can encrypt to NanoTDF and otdfctl can decrypt +3. Both tools produce compatible NanoTDF files +""" + +import logging +import tempfile +from pathlib import Path + +import pytest + +from tests.support_cli_args import run_cli_decrypt, run_cli_encrypt +from tests.support_common import ( + handle_subprocess_error, + validate_plaintext_file_created, +) +from tests.support_otdfctl_args import ( + run_otdfctl_decrypt_command, + run_otdfctl_encrypt_command, +) + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +def test_otdfctl_encrypt_nano_python_decrypt( + collect_server_logs, temp_credentials_file, project_root +): + """Test otdfctl encrypt with --tdf-type nano and Python CLI decrypt.""" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "nano_input.txt" + input_content = "Hello NanoTDF! This is a test of nano format encryption." + with input_file.open("w") as f: + f.write(input_content) + + # Define NanoTDF file created by otdfctl + nanotdf_output = temp_path / "test.tdf" + + # Define decrypted output from Python CLI + python_decrypt_output = temp_path / "decrypted-by-python.txt" + + # Run otdfctl encrypt with --tdf-type nano + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=nanotdf_output, + mime_type="text/plain", + tdf_type="nano", + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt nano", + ) + + # Verify NanoTDF file was created + assert nanotdf_output.exists(), "NanoTDF file should be created" + assert nanotdf_output.stat().st_size > 0, "NanoTDF file should not be empty" + + # Log NanoTDF file info + logger.info(f"✓ otdfctl created NanoTDF: {nanotdf_output.stat().st_size} bytes") + + # Run Python CLI decrypt on the NanoTDF + python_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=nanotdf_output, + output_file=python_decrypt_output, + cwd=project_root, + ) + + # Fail fast on errors + handle_subprocess_error( + result=python_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt nano", + ) + + # Validate decrypted content + validate_plaintext_file_created( + path=python_decrypt_output, + scenario="Python CLI decrypt NanoTDF", + expected_content=input_content, + ) + + logger.info( + f"✓ Python CLI successfully decrypted NanoTDF: {python_decrypt_output.stat().st_size} bytes" + ) + + +@pytest.mark.integration +def test_python_encrypt_nano_otdfctl_decrypt( + collect_server_logs, temp_credentials_file, project_root +): + """Test Python CLI encrypt with --container-type nano and otdfctl decrypt.""" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "nano_input.txt" + input_content = "Hello from Python! Testing nano format encryption." + with input_file.open("w") as f: + f.write(input_content) + + # Define NanoTDF file created by Python CLI + nanotdf_output = temp_path / "python_created.tdf" + + # Define decrypted output from otdfctl + otdfctl_decrypt_output = temp_path / "decrypted-by-otdfctl.txt" + + # Run Python CLI encrypt with --container-type nano + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=nanotdf_output, + mime_type="text/plain", + container_type="nano", + cwd=project_root, + ) + + # Fail fast on errors + handle_subprocess_error( + result=python_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI encrypt nano", + ) + + # Verify NanoTDF file was created + assert nanotdf_output.exists(), "NanoTDF file should be created" + assert nanotdf_output.stat().st_size > 0, "NanoTDF file should not be empty" + + # Log NanoTDF file info + logger.info( + f"✓ Python CLI created NanoTDF: {nanotdf_output.stat().st_size} bytes" + ) + + # Run otdfctl decrypt on the NanoTDF + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + creds_file=temp_credentials_file, + tdf_file=nanotdf_output, + output_file=otdfctl_decrypt_output, + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt nano", + ) + + # Validate decrypted content + validate_plaintext_file_created( + path=otdfctl_decrypt_output, + scenario="otdfctl decrypt NanoTDF", + expected_content=input_content, + ) + + logger.info( + f"✓ otdfctl successfully decrypted Python NanoTDF: {otdfctl_decrypt_output.stat().st_size} bytes" + ) + + +@pytest.mark.integration +def test_nanotdf_roundtrip_comparison( + collect_server_logs, temp_credentials_file, project_root +): + """ + Compare NanoTDF files created by otdfctl and Python CLI. + Tests both tools' roundtrip encryption/decryption. + """ + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "roundtrip_input.txt" + input_content = "NanoTDF roundtrip test with both tools!" + with input_file.open("w") as f: + f.write(input_content) + + # Define NanoTDF files from both tools + otdfctl_nanotdf = temp_path / "otdfctl.tdf" + python_nanotdf = temp_path / "python.tdf" + + # Define decrypted outputs + otdfctl_encrypted_python_decrypted = temp_path / "otdfctl_enc_python_dec.txt" + python_encrypted_otdfctl_decrypted = temp_path / "python_enc_otdfctl_dec.txt" + + # 1. Create NanoTDF with otdfctl + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_nanotdf, + mime_type="text/plain", + tdf_type="nano", + cwd=temp_path, + ) + + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt nano (roundtrip)", + ) + + # 2. Create NanoTDF with Python CLI + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_nanotdf, + mime_type="text/plain", + container_type="nano", + cwd=project_root, + ) + + handle_subprocess_error( + result=python_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI encrypt nano (roundtrip)", + ) + + # Verify both NanoTDF files were created + assert otdfctl_nanotdf.exists(), "otdfctl NanoTDF should exist" + assert python_nanotdf.exists(), "Python NanoTDF should exist" + + otdfctl_size = otdfctl_nanotdf.stat().st_size + python_size = python_nanotdf.stat().st_size + + logger.info("\n=== NanoTDF File Size Comparison ===") + logger.info(f"otdfctl NanoTDF: {otdfctl_size} bytes") + logger.info(f"Python NanoTDF: {python_size} bytes") + + # Both should be reasonable sizes (not empty, not too large) + assert otdfctl_size > 0, "otdfctl NanoTDF should not be empty" + assert python_size > 0, "Python NanoTDF should not be empty" + assert otdfctl_size < 10000, "otdfctl NanoTDF should be compact" + assert python_size < 10000, "Python NanoTDF should be compact" + + # 3. Cross-decrypt: Python decrypts otdfctl NanoTDF + python_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=otdfctl_nanotdf, + output_file=otdfctl_encrypted_python_decrypted, + cwd=project_root, + ) + + handle_subprocess_error( + result=python_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt otdfctl nano", + ) + + # 4. Cross-decrypt: otdfctl decrypts Python NanoTDF + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + creds_file=temp_credentials_file, + tdf_file=python_nanotdf, + output_file=python_encrypted_otdfctl_decrypted, + cwd=temp_path, + ) + + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt Python nano", + ) + + # Validate both cross-decryptions + validate_plaintext_file_created( + path=otdfctl_encrypted_python_decrypted, + scenario="Python decrypt otdfctl NanoTDF", + expected_content=input_content, + ) + + validate_plaintext_file_created( + path=python_encrypted_otdfctl_decrypted, + scenario="otdfctl decrypt Python NanoTDF", + expected_content=input_content, + ) + + logger.info("\n=== Cross-Decryption Success ===") + logger.info( + f"✓ Python successfully decrypted otdfctl NanoTDF: {otdfctl_encrypted_python_decrypted.stat().st_size} bytes" + ) + logger.info( + f"✓ otdfctl successfully decrypted Python NanoTDF: {python_encrypted_otdfctl_decrypted.stat().st_size} bytes" + ) + logger.info("✓ Both tools are interoperable for NanoTDF format!") + + +@pytest.mark.integration +def test_nanotdf_with_attributes( + collect_server_logs, temp_credentials_file, project_root +): + """Test NanoTDF encryption/decryption with attributes.""" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Import attribute for testing + from tests.config_pydantic import CONFIG_TDF + + test_attribute = CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1 + + # Create input file + input_file = temp_path / "attributed_nano.txt" + input_content = "NanoTDF with attributes test" + with input_file.open("w") as f: + f.write(input_content) + + # Define NanoTDF file with attributes + nanotdf_with_attrs = temp_path / "attributed.tdf" + decrypted_output = temp_path / "decrypted_attributed.txt" + + # Encrypt with otdfctl using attributes + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=nanotdf_with_attrs, + mime_type="text/plain", + tdf_type="nano", + attributes=[test_attribute], + cwd=temp_path, + ) + + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt nano with attributes", + ) + + # Verify NanoTDF was created + assert nanotdf_with_attrs.exists(), "Attributed NanoTDF should be created" + logger.info( + f"✓ Created attributed NanoTDF: {nanotdf_with_attrs.stat().st_size} bytes" + ) + + # Decrypt with Python CLI + python_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=nanotdf_with_attrs, + output_file=decrypted_output, + cwd=project_root, + ) + + handle_subprocess_error( + result=python_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt attributed nano", + ) + + # Validate decrypted content + validate_plaintext_file_created( + path=decrypted_output, + scenario="Python decrypt attributed NanoTDF", + expected_content=input_content, + ) + + logger.info( + f"✓ Successfully decrypted attributed NanoTDF: {decrypted_output.stat().st_size} bytes" + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/integration/otdfctl_to_python/test_python_nanotdf_only.py b/tests/integration/otdfctl_to_python/test_python_nanotdf_only.py new file mode 100644 index 0000000..26cd61d --- /dev/null +++ b/tests/integration/otdfctl_to_python/test_python_nanotdf_only.py @@ -0,0 +1,104 @@ +""" +Simple NanoTDF integration test focusing on Python CLI only. +This tests the Python implementation without otdfctl dependency. +""" + +import logging +import tempfile +from pathlib import Path + +import pytest + +from tests.support_cli_args import run_cli_decrypt, run_cli_encrypt +from tests.support_common import ( + handle_subprocess_error, + validate_plaintext_file_created, +) + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +def test_python_nanotdf_roundtrip( + collect_server_logs, temp_credentials_file, project_root +): + """Test Python CLI NanoTDF encryption and decryption roundtrip.""" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "test.txt" + input_content = "Hello NanoTDF from Python!" + with input_file.open("w") as f: + f.write(input_content) + + # Define NanoTDF and output files + nanotdf_file = temp_path / "test.ntdf" + decrypted_file = temp_path / "decrypted.txt" + + # Step 1: Encrypt with Python CLI using --container-type nano + logger.info(f"\n=== Encrypting {input_file} to {nanotdf_file} ===") + encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=nanotdf_file, + mime_type="text/plain", + container_type="nano", + cwd=project_root, + ) + + # Log results for debugging + logger.info(f"Encrypt returncode: {encrypt_result.returncode}") + logger.info(f"Encrypt stdout: {encrypt_result.stdout}") + logger.info(f"Encrypt stderr: {encrypt_result.stderr}") + + # Check for errors + handle_subprocess_error( + result=encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI encrypt nano", + ) + + # Verify NanoTDF was created + assert nanotdf_file.exists(), f"NanoTDF file should exist at {nanotdf_file}" + nanotdf_size = nanotdf_file.stat().st_size + assert nanotdf_size > 0, "NanoTDF file should not be empty" + logger.info(f"✓ Created NanoTDF: {nanotdf_size} bytes") + + # Step 2: Decrypt with Python CLI + logger.info(f"\n=== Decrypting {nanotdf_file} to {decrypted_file} ===") + decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=nanotdf_file, + output_file=decrypted_file, + cwd=project_root, + ) + + # Log results + logger.info(f"Decrypt returncode: {decrypt_result.returncode}") + logger.info(f"Decrypt stdout: {decrypt_result.stdout}") + logger.info(f"Decrypt stderr: {decrypt_result.stderr}") + + # Check for errors + handle_subprocess_error( + result=decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt nano", + ) + + # Validate content + validate_plaintext_file_created( + path=decrypted_file, + scenario="Python CLI NanoTDF roundtrip", + expected_content=input_content, + ) + + logger.info("✓ Successfully decrypted NanoTDF roundtrip!") + logger.info(f" Input: {input_file.stat().st_size} bytes") + logger.info(f" NanoTDF: {nanotdf_size} bytes") + logger.info(f" Decrypted: {decrypted_file.stat().st_size} bytes") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) From dd30965cdf450a06b2999d6f2de79f6119e3a95b Mon Sep 17 00:00:00 2001 From: b-long Date: Sun, 5 Oct 2025 09:36:16 -0400 Subject: [PATCH 3/4] fix: improve NanoTDF support --- src/otdf_python/ecc_mode.py | 5 +++-- src/otdf_python/nanotdf.py | 38 ++++++++++++++++++++----------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/otdf_python/ecc_mode.py b/src/otdf_python/ecc_mode.py index 6dfdbbb..65f5594 100644 --- a/src/otdf_python/ecc_mode.py +++ b/src/otdf_python/ecc_mode.py @@ -38,7 +38,8 @@ def from_string(curve_str: str) -> "ECCMode": "secp256r1": 0, "secp384r1": 1, "secp521r1": 2, - "secp256k1": 3, } - curve_mode = curve_map.get(curve_str.lower(), 0) + curve_mode = curve_map.get(curve_str.lower()) + if curve_mode is None: + raise ValueError(f"Unsupported curve string: '{curve_str}'") return ECCMode(curve_mode, False) diff --git a/src/otdf_python/nanotdf.py b/src/otdf_python/nanotdf.py index 140587b..2d7d74b 100644 --- a/src/otdf_python/nanotdf.py +++ b/src/otdf_python/nanotdf.py @@ -1,3 +1,4 @@ +import contextlib import hashlib import json import secrets @@ -265,7 +266,9 @@ def create_nano_tdf( self, payload: bytes | BytesIO, output_stream: BinaryIO, config: NanoTDFConfig ) -> int: """ - Creates a NanoTDF with the provided payload and writes it to the output stream. + Stream-based NanoTDF creation - writes encrypted payload to an output stream. + + For convenience method that returns bytes, use create_nanotdf() instead. Supports KAS key wrapping if KAS info with public key is provided in config. Args: @@ -313,7 +316,7 @@ def create_nano_tdf( output_stream.write(nano_tdf_data) return len(header_bytes) + len(nano_tdf_data) - def read_nano_tdf( # noqa: C901 + def read_nano_tdf( self, nano_tdf_data: bytes | BytesIO, output_stream: BinaryIO, @@ -321,7 +324,9 @@ def read_nano_tdf( # noqa: C901 platform_url: str | None = None, ) -> None: """ - Reads a NanoTDF and writes the payload to the output stream. + Stream-based NanoTDF decryption - writes decrypted payload to an output stream. + + For convenience method that returns bytes, use read_nanotdf() instead. Supports KAS key unwrapping if kas_private_key is provided in config. Args: @@ -390,18 +395,9 @@ def read_nano_tdf( # noqa: C901 else: # No wrapped key - need symmetric key from config key = None - if isinstance(config, dict): - key = config.get("key") - elif ( - config - and hasattr(config, "cipher") - and config.cipher - and isinstance(config.cipher, str) - and all(c in "0123456789abcdefABCDEF" for c in config.cipher) - ): - # Try to get key from cipher field if it's hex - key = bytes.fromhex(config.cipher) - + if config and hasattr(config, "cipher") and isinstance(config.cipher, str): + with contextlib.suppress(ValueError): + key = bytes.fromhex(config.cipher) if not key: raise InvalidNanoTDFConfig("Missing decryption key in config.") ciphertext = payload[3:-2] @@ -453,7 +449,11 @@ def _handle_legacy_key_config( return key, config def create_nanotdf(self, data: bytes, config: dict | NanoTDFConfig) -> bytes: - """Create a NanoTDF from input data using the provided configuration.""" + """ + Convenience method - creates a NanoTDF and returns the encrypted bytes. + + For stream-based version, use create_nano_tdf() instead. + """ if len(data) > self.K_MAX_TDF_SIZE: raise NanoTDFMaxSizeLimit("exceeds max size for nano tdf") @@ -527,7 +527,11 @@ def _extract_key_for_reading( def read_nanotdf( self, nanotdf_bytes: bytes, config: dict | NanoTDFConfig | None = None ) -> bytes: - """Read and decrypt a NanoTDF, returning the original plaintext data.""" + """ + Convenience method - decrypts a NanoTDF and returns the plaintext bytes. + + For stream-based version, use read_nano_tdf() instead. + """ output = BytesIO() from otdf_python.header import Header # Local import to avoid circular import From 2d06e6a543f22e2a822c72d091b70dd1e02f604f Mon Sep 17 00:00:00 2001 From: b-long Date: Sun, 5 Oct 2025 11:01:35 -0400 Subject: [PATCH 4/4] fix: improve NanoTDF support --- src/otdf_python/cli.py | 7 ++ src/otdf_python/ecc_mode.py | 25 ++++-- src/otdf_python/nanotdf.py | 154 ++++++++++++++++++++++++++++-------- 3 files changed, 145 insertions(+), 41 deletions(-) diff --git a/src/otdf_python/cli.py b/src/otdf_python/cli.py index 863d006..7607e7f 100644 --- a/src/otdf_python/cli.py +++ b/src/otdf_python/cli.py @@ -201,6 +201,13 @@ def create_nano_tdf_config(sdk: SDK, args) -> NanoTDFConfig: kas_endpoints = parse_kas_endpoints(args.kas_endpoint) kas_info_list = [KASInfo(url=kas_url) for kas_url in kas_endpoints] config.kas_info_list.extend(kas_info_list) + elif args.platform_url: + # If no explicit KAS endpoint provided, derive from platform URL + # This matches the default KAS path convention + kas_url = args.platform_url.rstrip("/") + "/kas" + logger.debug(f"Deriving KAS endpoint from platform URL: {kas_url}") + kas_info = KASInfo(url=kas_url) + config.kas_info_list.append(kas_info) if hasattr(args, "policy_binding") and args.policy_binding: if args.policy_binding.lower() == "ecdsa": diff --git a/src/otdf_python/ecc_mode.py b/src/otdf_python/ecc_mode.py index 65f5594..62b259a 100644 --- a/src/otdf_python/ecc_mode.py +++ b/src/otdf_python/ecc_mode.py @@ -1,4 +1,13 @@ +from typing import ClassVar + + class ECCMode: + _CURVE_MAP: ClassVar[dict[str, int]] = { + "secp256r1": 0, + "secp384r1": 1, + "secp521r1": 2, + } + def __init__(self, curve_mode: int = 0, use_ecdsa_binding: bool = False): self.curve_mode = curve_mode self.use_ecdsa_binding = use_ecdsa_binding @@ -33,13 +42,15 @@ def get_ecc_mode_as_byte(self) -> int: @staticmethod def from_string(curve_str: str) -> "ECCMode": - """Create ECCMode from curve string like 'secp256r1' or 'secp384r1'.""" - curve_map = { - "secp256r1": 0, - "secp384r1": 1, - "secp521r1": 2, - } - curve_mode = curve_map.get(curve_str.lower()) + """Create ECCMode from curve string like 'secp256r1' or 'secp384r1', or policy binding type like 'gmac' or 'ecdsa'.""" + # Handle policy binding types + if curve_str.lower() == "gmac": + return ECCMode(0, False) # GMAC binding with default secp256r1 curve + elif curve_str.lower() == "ecdsa": + return ECCMode(0, True) # ECDSA binding with default secp256r1 curve + + # Handle curve names + curve_mode = ECCMode._CURVE_MAP.get(curve_str.lower()) if curve_mode is None: raise ValueError(f"Unsupported curve string: '{curve_str}'") return ECCMode(curve_mode, False) diff --git a/src/otdf_python/nanotdf.py b/src/otdf_python/nanotdf.py index 2d7d74b..60dcfd8 100644 --- a/src/otdf_python/nanotdf.py +++ b/src/otdf_python/nanotdf.py @@ -207,7 +207,7 @@ def _wrap_key_if_needed( self, key: bytes, config: NanoTDFConfig ) -> tuple[bytes, bytes | None]: """ - Wrap encryption key if KAS public key is provided. + Wrap encryption key if KAS public key is provided or can be fetched. Args: key: The encryption key @@ -216,15 +216,31 @@ def _wrap_key_if_needed( Returns: tuple: (wrapped_key, kas_public_key) """ + import logging + kas_public_key = None wrapped_key = None if config.kas_info_list and len(config.kas_info_list) > 0: - # Get the first KASInfo with a public_key + # Get the first KASInfo with a public_key or fetch it for kas_info in config.kas_info_list: if kas_info.public_key: kas_public_key = kas_info.public_key break + elif self.services: + # Try to fetch public key from KAS service + try: + logging.info(f"Fetching public key from KAS: {kas_info.url}") + updated_kas = self.services.kas().get_public_key(kas_info) + kas_public_key = updated_kas.public_key + # Update the config with the fetched public key + kas_info.public_key = kas_public_key + break + except Exception as e: + logging.warning( + f"Failed to fetch public key from KAS {kas_info.url}: {e}" + ) + # Continue to next KAS or proceed without wrapping if kas_public_key: from cryptography.hazmat.backends import default_backend @@ -242,6 +258,11 @@ def _wrap_key_if_needed( label=None, ), ) + logging.info("Successfully wrapped NanoTDF key with KAS public key") + else: + logging.warning( + "No KAS public key available - creating NanoTDF without key wrapping" + ) return wrapped_key, kas_public_key @@ -316,12 +337,101 @@ def create_nano_tdf( output_stream.write(nano_tdf_data) return len(header_bytes) + len(nano_tdf_data) + def _kas_unwrap( + self, nano_tdf_data: bytes, header_len: int, wrapped_key: bytes + ) -> bytes | None: + try: + # Parse header to get policy and KAS URL + import base64 + import logging + + from otdf_python.header import Header + from otdf_python.kas_client import KeyAccess + + header_obj = Header.from_bytes(nano_tdf_data[:header_len]) + kas_locator = header_obj.kas_locator + # policy_info = header_obj.policy_info + + # Get KAS URL from KAS locator + kas_url = kas_locator.get_resource_url() + + # Extract policy JSON from policy info + # PolicyInfo has the policy body, need to get it properly + policy_json = "{}" # Default empty policy for now + # TODO: Extract actual policy from policy_info if needed + + # Get KAS client from services + kas_client = self.services.kas() + + # Create a KeyAccess object for the unwrap call + # For NanoTDF, the wrapped key is at the end of the payload + key_access = KeyAccess( + url=kas_url, + wrapped_key=base64.b64encode(wrapped_key).decode("utf-8"), + ephemeral_public_key=None, # NanoTDF uses different key wrapping + ) + + # Call KAS unwrap (same as TDF does) + # The KAS client will handle the rewrap protocol + # Use RSA as default session key type for NanoTDF + from otdf_python.key_type_constants import RSA_KEY_TYPE + + key = kas_client.unwrap(key_access, policy_json, RSA_KEY_TYPE) + + logging.info("Successfully unwrapped NanoTDF key using KAS") + + except Exception as e: + # If KAS unwrap fails, log and fall through to local unwrap methods + import logging + + logging.warning(f"KAS unwrap failed for NanoTDF: {e}, trying local unwrap") + key = None + + return key + + def _local_unwrap(self, wrapped_key: bytes, config: NanoTDFConfig) -> bytes: + """Unwrap key locally using private key or mock unwrap (for testing/offline use).""" + kas_private_key = None + # Try to get from cipher field if it looks like a PEM key + if ( + config.cipher + and isinstance(config.cipher, str) + and "-----BEGIN" in config.cipher + ): + kas_private_key = config.cipher + + # Check if mock unwrap is enabled in config string + kas_mock_unwrap = False + if config.config and "mock_unwrap=true" in config.config.lower(): + kas_mock_unwrap = True + + if not kas_private_key and not kas_mock_unwrap: + raise InvalidNanoTDFConfig( + "Unable to unwrap NanoTDF key: KAS unwrap failed and no local private key available. " + "Ensure SDK has valid credentials or provide kas_private_key in config for offline use." + ) + + if kas_mock_unwrap: + # Use the KAS mock unwrap_nanotdf logic + from otdf_python.sdk import KAS + + return KAS().unwrap_nanotdf( + curve=None, + header=None, + kas_url=None, + wrapped_key=wrapped_key, + kas_private_key=kas_private_key, + mock=True, + ) + else: + asym = AsymDecryption(kas_private_key) + return asym.decrypt(wrapped_key) + def read_nano_tdf( self, nano_tdf_data: bytes | BytesIO, output_stream: BinaryIO, config: NanoTDFConfig, - platform_url: str | None = None, ) -> None: """ Stream-based NanoTDF decryption - writes decrypted payload to an output stream. @@ -333,7 +443,6 @@ def read_nano_tdf( nano_tdf_data: The NanoTDF data as bytes or BytesIO output_stream: The output stream to write the payload to config: Configuration for the NanoTDF reader - platform_url: Optional platform URL for KAS resolution Raises: InvalidNanoTDFConfig: If the NanoTDF format is invalid or config is missing required info @@ -359,38 +468,15 @@ def read_nano_tdf( if wrapped_key_len > 0: wrapped_key = payload[-(2 + wrapped_key_len) : -2] - # Get private key and mock unwrap config - kas_private_key = None - # Try to get from cipher field if it looks like a PEM key - if ( - config.cipher - and isinstance(config.cipher, str) - and "-----BEGIN" in config.cipher - ): - kas_private_key = config.cipher + # Try to unwrap using KAS service if available + key = None + if self.services: + key = self._kas_unwrap(nano_tdf_data, header_len, wrapped_key) - # Check if mock unwrap is enabled in config string - kas_mock_unwrap = False - if config.config and "mock_unwrap=true" in config.config.lower(): - kas_mock_unwrap = True + # If KAS unwrap didn't work, try local unwrap methods (for testing/offline use) + if key is None: + key = self._local_unwrap(wrapped_key, config) - if not kas_private_key and not kas_mock_unwrap: - raise InvalidNanoTDFConfig("Missing kas_private_key for unwrap.") - if kas_mock_unwrap: - # Use the KAS mock unwrap_nanotdf logic - from otdf_python.sdk import KAS - - key = KAS().unwrap_nanotdf( - curve=None, - header=None, - kas_url=None, - wrapped_key=wrapped_key, - kas_private_key=kas_private_key, - mock=True, - ) - else: - asym = AsymDecryption(kas_private_key) - key = asym.decrypt(wrapped_key) ciphertext = payload[3 : -(2 + wrapped_key_len)] else: # No wrapped key - need symmetric key from config