Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/otdf_python/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ def create_nano_tdf_config(sdk: SDK, args) -> NanoTDFConfig:
kas_endpoints = parse_kas_endpoints(args.kas_endpoint)
kas_info_list = [KASInfo(url=kas_url) for kas_url in kas_endpoints]
config.kas_info_list.extend(kas_info_list)
elif args.platform_url:
# If no explicit KAS endpoint provided, derive from platform URL
# This matches the default KAS path convention
kas_url = args.platform_url.rstrip("/") + "/kas"
logger.debug(f"Deriving KAS endpoint from platform URL: {kas_url}")
kas_info = KASInfo(url=kas_url)
config.kas_info_list.append(kas_info)

if hasattr(args, "policy_binding") and args.policy_binding:
if args.policy_binding.lower() == "ecdsa":
Expand Down
24 changes: 24 additions & 0 deletions src/otdf_python/ecc_mode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
from typing import ClassVar


class ECCMode:
_CURVE_MAP: ClassVar[dict[str, int]] = {
"secp256r1": 0,
"secp384r1": 1,
"secp521r1": 2,
}

def __init__(self, curve_mode: int = 0, use_ecdsa_binding: bool = False):
self.curve_mode = curve_mode
self.use_ecdsa_binding = use_ecdsa_binding
Expand Down Expand Up @@ -30,3 +39,18 @@ def get_ec_compressed_pubkey_size(curve_type: int) -> int:
def get_ecc_mode_as_byte(self) -> int:
# Most significant bit: use_ecdsa_binding, lower 3 bits: curve_mode
return ((1 if self.use_ecdsa_binding else 0) << 7) | (self.curve_mode & 0x07)

@staticmethod
def from_string(curve_str: str) -> "ECCMode":
"""Create ECCMode from curve string like 'secp256r1' or 'secp384r1', or policy binding type like 'gmac' or 'ecdsa'."""
# Handle policy binding types
if curve_str.lower() == "gmac":
return ECCMode(0, False) # GMAC binding with default secp256r1 curve
elif curve_str.lower() == "ecdsa":
return ECCMode(0, True) # ECDSA binding with default secp256r1 curve

# Handle curve names
curve_mode = ECCMode._CURVE_MAP.get(curve_str.lower())
if curve_mode is None:
raise ValueError(f"Unsupported curve string: '{curve_str}'")
return ECCMode(curve_mode, False)
181 changes: 142 additions & 39 deletions src/otdf_python/nanotdf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import hashlib
import json
import secrets
Expand Down Expand Up @@ -206,7 +207,7 @@ def _wrap_key_if_needed(
self, key: bytes, config: NanoTDFConfig
) -> tuple[bytes, bytes | None]:
"""
Wrap encryption key if KAS public key is provided.
Wrap encryption key if KAS public key is provided or can be fetched.

Args:
key: The encryption key
Expand All @@ -215,15 +216,31 @@ def _wrap_key_if_needed(
Returns:
tuple: (wrapped_key, kas_public_key)
"""
import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Imports should be at the top of the file, not inside functions. This file has several local imports (e.g., logging here, base64, Header, KeyAccess in _kas_unwrap).

Per PEP 8, imports should be at the top of the module. This practice improves readability, avoids circular import issues, and prevents the performance overhead of re-importing modules if a function is called multiple times in a loop. Please move all local imports to the top of the file.


kas_public_key = None
wrapped_key = None

if config.kas_info_list and len(config.kas_info_list) > 0:
# Get the first KASInfo with a public_key
# Get the first KASInfo with a public_key or fetch it
for kas_info in config.kas_info_list:
if kas_info.public_key:
kas_public_key = kas_info.public_key
break
elif self.services:
# Try to fetch public key from KAS service
try:
logging.info(f"Fetching public key from KAS: {kas_info.url}")
updated_kas = self.services.kas().get_public_key(kas_info)
kas_public_key = updated_kas.public_key
# Update the config with the fetched public key
kas_info.public_key = kas_public_key
break
except Exception as e:
logging.warning(
f"Failed to fetch public key from KAS {kas_info.url}: {e}"
)
# Continue to next KAS or proceed without wrapping

if kas_public_key:
from cryptography.hazmat.backends import default_backend
Expand All @@ -241,6 +258,11 @@ def _wrap_key_if_needed(
label=None,
),
)
logging.info("Successfully wrapped NanoTDF key with KAS public key")
else:
logging.warning(
"No KAS public key available - creating NanoTDF without key wrapping"
)

return wrapped_key, kas_public_key

Expand All @@ -265,7 +287,9 @@ def create_nano_tdf(
self, payload: bytes | BytesIO, output_stream: BinaryIO, config: NanoTDFConfig
) -> int:
"""
Creates a NanoTDF with the provided payload and writes it to the output stream.
Stream-based NanoTDF creation - writes encrypted payload to an output stream.

For convenience method that returns bytes, use create_nanotdf() instead.
Supports KAS key wrapping if KAS info with public key is provided in config.

Args:
Expand Down Expand Up @@ -313,22 +337,112 @@ def create_nano_tdf(
output_stream.write(nano_tdf_data)
return len(header_bytes) + len(nano_tdf_data)

def _kas_unwrap(
self, nano_tdf_data: bytes, header_len: int, wrapped_key: bytes
) -> bytes | None:
try:
# Parse header to get policy and KAS URL
import base64
import logging

from otdf_python.header import Header
from otdf_python.kas_client import KeyAccess

header_obj = Header.from_bytes(nano_tdf_data[:header_len])
kas_locator = header_obj.kas_locator
# policy_info = header_obj.policy_info

# Get KAS URL from KAS locator
kas_url = kas_locator.get_resource_url()

# Extract policy JSON from policy info
# PolicyInfo has the policy body, need to get it properly
policy_json = "{}" # Default empty policy for now
# TODO: Extract actual policy from policy_info if needed
Comment on lines +360 to +361
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The policy is currently hardcoded to an empty JSON object ({}), and a TODO comment indicates that the actual policy extraction is not implemented. This is a significant gap, as it means the unwrap request sent to KAS will not contain the correct policy, potentially leading to incorrect access decisions or failures. The policy information should be extracted from the header_obj and included in the KAS request.


# Get KAS client from services
kas_client = self.services.kas()

# Create a KeyAccess object for the unwrap call
# For NanoTDF, the wrapped key is at the end of the payload
key_access = KeyAccess(
url=kas_url,
wrapped_key=base64.b64encode(wrapped_key).decode("utf-8"),
ephemeral_public_key=None, # NanoTDF uses different key wrapping
)

# Call KAS unwrap (same as TDF does)
# The KAS client will handle the rewrap protocol
# Use RSA as default session key type for NanoTDF
from otdf_python.key_type_constants import RSA_KEY_TYPE

key = kas_client.unwrap(key_access, policy_json, RSA_KEY_TYPE)

logging.info("Successfully unwrapped NanoTDF key using KAS")

except Exception as e:
# If KAS unwrap fails, log and fall through to local unwrap methods
import logging

logging.warning(f"KAS unwrap failed for NanoTDF: {e}, trying local unwrap")
key = None
Comment on lines +383 to +388
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a broad Exception is generally discouraged as it can mask unexpected programming errors (e.g., AttributeError, TypeError) by treating them as a KAS communication failure. This makes debugging more difficult. It would be more robust to catch more specific exceptions related to network I/O or specific SDKException subclasses that the KAS client might raise.


return key

def _local_unwrap(self, wrapped_key: bytes, config: NanoTDFConfig) -> bytes:
"""Unwrap key locally using private key or mock unwrap (for testing/offline use)."""
kas_private_key = None
# Try to get from cipher field if it looks like a PEM key
if (
config.cipher
and isinstance(config.cipher, str)
and "-----BEGIN" in config.cipher
):
kas_private_key = config.cipher

# Check if mock unwrap is enabled in config string
kas_mock_unwrap = False
if config.config and "mock_unwrap=true" in config.config.lower():
kas_mock_unwrap = True

if not kas_private_key and not kas_mock_unwrap:
raise InvalidNanoTDFConfig(
"Unable to unwrap NanoTDF key: KAS unwrap failed and no local private key available. "
"Ensure SDK has valid credentials or provide kas_private_key in config for offline use."
)

if kas_mock_unwrap:
# Use the KAS mock unwrap_nanotdf logic
from otdf_python.sdk import KAS

return KAS().unwrap_nanotdf(
curve=None,
header=None,
kas_url=None,
wrapped_key=wrapped_key,
kas_private_key=kas_private_key,
mock=True,
)
else:
asym = AsymDecryption(kas_private_key)
return asym.decrypt(wrapped_key)

Comment on lines +392 to +429
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic in _local_unwrap is overly complex and contains redundant paths. The if kas_mock_unwrap: branch is convoluted, inefficient due to KAS() instantiation, and ultimately performs the same action as the else branch: decrypting the key with AsymDecryption. This complexity increases the maintenance burden and can hide bugs.

The function can be greatly simplified by removing the kas_mock_unwrap logic and relying on a single, clear path for local private key decryption.

def _local_unwrap(self, wrapped_key: bytes, config: NanoTDFConfig) -> bytes:
    """Unwrap key locally using private key from config."""
    kas_private_key = None
    # Try to get from cipher field if it looks like a PEM key
    if (
        config.cipher
        and isinstance(config.cipher, str)
        and "-----BEGIN" in config.cipher
    ):
        kas_private_key = config.cipher

    if not kas_private_key:
        raise InvalidNanoTDFConfig(
            "Unable to unwrap NanoTDF key: KAS unwrap failed and no local private key available. "
            "Ensure SDK has valid credentials or provide a PEM private key in config.cipher for offline use."
        )

    asym = AsymDecryption(kas_private_key)
    return asym.decrypt(wrapped_key)

def read_nano_tdf(
self,
nano_tdf_data: bytes | BytesIO,
output_stream: BinaryIO,
config: NanoTDFConfig,
platform_url: str | None = None,
) -> None:
"""
Reads a NanoTDF and writes the payload to the output stream.
Stream-based NanoTDF decryption - writes decrypted payload to an output stream.

For convenience method that returns bytes, use read_nanotdf() instead.
Supports KAS key unwrapping if kas_private_key is provided in config.

Args:
nano_tdf_data: The NanoTDF data as bytes or BytesIO
output_stream: The output stream to write the payload to
config: Configuration for the NanoTDF reader
platform_url: Optional platform URL for KAS resolution

Raises:
InvalidNanoTDFConfig: If the NanoTDF format is invalid or config is missing required info
Expand All @@ -354,41 +468,22 @@ def read_nano_tdf(
if wrapped_key_len > 0:
wrapped_key = payload[-(2 + wrapped_key_len) : -2]

# Get private key and mock unwrap config
kas_private_key = None
# Try to get from cipher field if it looks like a PEM key
if (
config.cipher
and isinstance(config.cipher, str)
and "-----BEGIN" in config.cipher
):
kas_private_key = config.cipher
# Try to unwrap using KAS service if available
key = None
if self.services:
key = self._kas_unwrap(nano_tdf_data, header_len, wrapped_key)

# Check if mock unwrap is enabled in config string
kas_mock_unwrap = False
if config.config and "mock_unwrap=true" in config.config.lower():
kas_mock_unwrap = True
# If KAS unwrap didn't work, try local unwrap methods (for testing/offline use)
if key is None:
key = self._local_unwrap(wrapped_key, config)

if not kas_private_key and not kas_mock_unwrap:
raise InvalidNanoTDFConfig("Missing kas_private_key for unwrap.")
if kas_mock_unwrap:
# Use the KAS mock unwrap_nanotdf logic
from otdf_python.sdk import KAS

key = KAS().unwrap_nanotdf(
curve=None,
header=None,
kas_url=None,
wrapped_key=wrapped_key,
kas_private_key=kas_private_key,
mock=True,
)
else:
asym = AsymDecryption(kas_private_key)
key = asym.decrypt(wrapped_key)
ciphertext = payload[3 : -(2 + wrapped_key_len)]
else:
key = config.get("key")
# No wrapped key - need symmetric key from config
key = None
if config and hasattr(config, "cipher") and isinstance(config.cipher, str):
with contextlib.suppress(ValueError):
key = bytes.fromhex(config.cipher)
if not key:
raise InvalidNanoTDFConfig("Missing decryption key in config.")
ciphertext = payload[3:-2]
Expand Down Expand Up @@ -440,7 +535,11 @@ def _handle_legacy_key_config(
return key, config

def create_nanotdf(self, data: bytes, config: dict | NanoTDFConfig) -> bytes:
"""Create a NanoTDF from input data using the provided configuration."""
"""
Convenience method - creates a NanoTDF and returns the encrypted bytes.

For stream-based version, use create_nano_tdf() instead.
"""
if len(data) > self.K_MAX_TDF_SIZE:
raise NanoTDFMaxSizeLimit("exceeds max size for nano tdf")

Expand Down Expand Up @@ -514,7 +613,11 @@ def _extract_key_for_reading(
def read_nanotdf(
self, nanotdf_bytes: bytes, config: dict | NanoTDFConfig | None = None
) -> bytes:
"""Read and decrypt a NanoTDF, returning the original plaintext data."""
"""
Convenience method - decrypts a NanoTDF and returns the plaintext bytes.

For stream-based version, use read_nano_tdf() instead.
"""
output = BytesIO()
from otdf_python.header import Header # Local import to avoid circular import

Expand Down
Loading
Loading