Skip to content
2,768 changes: 1,556 additions & 1,212 deletions poetry.lock

Large diffs are not rendered by default.

32 changes: 22 additions & 10 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@

if TYPE_CHECKING:
import pyarrow as pa
from mypy_boto3_dynamodb.client import DynamoDBClient


DYNAMODB_CLIENT = "dynamodb"

Expand Down Expand Up @@ -94,18 +96,28 @@


class DynamoDbCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: str):
def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str):
"""Dynamodb catalog.

Args:
name: Name to identify the catalog.
client: An optional boto3 dynamodb client.
properties: Properties for dynamodb client construction and configuration.
"""
super().__init__(name, **properties)
if client is not None:
self.dynamodb = client
else:
session = boto3.Session(
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
botocore_session=properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
)
self.dynamodb = session.client(DYNAMODB_CLIENT)

session = boto3.Session(
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
botocore_session=properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
)
self.dynamodb = session.client(DYNAMODB_CLIENT)
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
self._ensure_catalog_table_exists_or_create()

Expand Down
66 changes: 33 additions & 33 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@

import boto3
from botocore.config import Config
from mypy_boto3_glue.client import GlueClient
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
DatabaseInputTypeDef,
DatabaseTypeDef,
StorageDescriptorTypeDef,
TableInputTypeDef,
TableTypeDef,
)

from pyiceberg.catalog import (
BOTOCORE_SESSION,
Expand Down Expand Up @@ -101,6 +92,15 @@

if TYPE_CHECKING:
import pyarrow as pa
from mypy_boto3_glue.client import GlueClient
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
DatabaseInputTypeDef,
DatabaseTypeDef,
StorageDescriptorTypeDef,
TableInputTypeDef,
TableTypeDef,
)


# There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue
Expand Down Expand Up @@ -140,7 +140,7 @@


def _construct_parameters(
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
metadata_location: str, glue_table: Optional["TableTypeDef"] = None, prev_metadata_location: Optional[str] = None
) -> Properties:
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
Expand Down Expand Up @@ -190,15 +190,15 @@ def primitive(self, primitive: PrimitiveType) -> str:
return GLUE_PRIMITIVE_TYPES[primitive_type]


def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
results: Dict[str, ColumnTypeDef] = {}
def _to_columns(metadata: TableMetadata) -> List["ColumnTypeDef"]:
results: Dict[str, "ColumnTypeDef"] = {}

def _append_to_results(field: NestedField, is_current: bool) -> None:
if field.name in results:
return

results[field.name] = cast(
ColumnTypeDef,
"ColumnTypeDef",
{
"Name": field.name,
"Type": visit(field.field_type, _IcebergSchemaToGlueType()),
Expand Down Expand Up @@ -230,10 +230,10 @@ def _construct_table_input(
metadata_location: str,
properties: Properties,
metadata: TableMetadata,
glue_table: Optional[TableTypeDef] = None,
glue_table: Optional["TableTypeDef"] = None,
prev_metadata_location: Optional[str] = None,
) -> TableInputTypeDef:
table_input: TableInputTypeDef = {
) -> "TableInputTypeDef":
table_input: "TableInputTypeDef" = {
"Name": table_name,
"TableType": EXTERNAL_TABLE,
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
Expand All @@ -249,8 +249,8 @@ def _construct_table_input(
return table_input


def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef:
rename_table_input: TableInputTypeDef = {"Name": to_table_name}
def _construct_rename_table_input(to_table_name: str, glue_table: "TableTypeDef") -> "TableInputTypeDef":
rename_table_input: "TableInputTypeDef" = {"Name": to_table_name}
# use the same Glue info to create the new table, pointing to the old metadata
assert glue_table["TableType"]
rename_table_input["TableType"] = glue_table["TableType"]
Expand All @@ -264,16 +264,16 @@ def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef)
# It turns out the output of StorageDescriptor is not the same as the input type
# because the Column can have a different type, but for now it seems to work, so
# silence the type error.
rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"])
rename_table_input["StorageDescriptor"] = cast("StorageDescriptorTypeDef", glue_table["StorageDescriptor"])

if "Description" in glue_table:
rename_table_input["Description"] = glue_table["Description"]

return rename_table_input


def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef:
database_input: DatabaseInputTypeDef = {"Name": database_name}
def _construct_database_input(database_name: str, properties: Properties) -> "DatabaseInputTypeDef":
database_input: "DatabaseInputTypeDef" = {"Name": database_name}
parameters = {}
for k, v in properties.items():
if k == "Description":
Expand All @@ -286,7 +286,7 @@ def _construct_database_input(database_name: str, properties: Properties) -> Dat
return database_input


def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None:
def _register_glue_catalog_id_with_glue_client(glue: "GlueClient", glue_catalog_id: str) -> None:
"""
Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.

Expand All @@ -303,9 +303,9 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:


class GlueCatalog(MetastoreCatalog):
glue: GlueClient
glue: "GlueClient"

def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any):
def __init__(self, name: str, client: Optional["GlueClient"] = None, **properties: Any):
"""Glue Catalog.

You either need to provide a boto3 glue client, or one will be constructed from the properties.
Expand All @@ -317,7 +317,7 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
"""
super().__init__(name, **properties)

if client:
if client is not None:
self.glue = client
else:
retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)
Expand All @@ -344,7 +344,7 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
if glue_catalog_id := properties.get(GLUE_ID):
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)

def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table:
properties: Properties = glue_table["Parameters"]

assert glue_table["DatabaseName"]
Expand Down Expand Up @@ -380,15 +380,15 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
catalog=self,
)

def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
def _create_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef") -> None:
try:
self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
except self.glue.exceptions.AlreadyExistsException as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
def _update_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef", version_id: str) -> None:
try:
self.glue.update_table(
DatabaseName=database_name,
Expand All @@ -403,7 +403,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef":
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
return load_table_response["Table"]
Expand Down Expand Up @@ -496,7 +496,7 @@ def commit_table(
table_identifier = table.name()
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)

current_glue_table: Optional[TableTypeDef]
current_glue_table: Optional["TableTypeDef"]
glue_table_version_id: Optional[str]
current_table: Optional[Table]
try:
Expand Down Expand Up @@ -702,7 +702,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
table_list: List[TableTypeDef] = []
table_list: List["TableTypeDef"] = []
next_token: Optional[str] = None
try:
while True:
Expand Down Expand Up @@ -730,7 +730,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
if namespace:
return []

database_list: List[DatabaseTypeDef] = []
database_list: List["DatabaseTypeDef"] = []
next_token: Optional[str] = None

while True:
Expand Down Expand Up @@ -806,5 +806,5 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

@staticmethod
def __is_iceberg_table(table: TableTypeDef) -> bool:
def __is_iceberg_table(table: "TableTypeDef") -> bool:
return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG
16 changes: 12 additions & 4 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from pyiceberg import __version__
from pyiceberg.catalog import (
BOTOCORE_SESSION,
TOKEN,
URI,
WAREHOUSE_LOCATION,
Expand All @@ -53,6 +54,7 @@
TableAlreadyExistsError,
UnauthorizedError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
Expand All @@ -72,7 +74,7 @@
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_header_properties, property_as_bool
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -390,11 +392,17 @@ class SigV4Adapter(HTTPAdapter):
def __init__(self, **properties: str):
super().__init__()
self._properties = properties
self._boto_session = boto3.Session(
region_name=get_first_property_value(self._properties, AWS_REGION),
botocore_session=self._properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN),
)

def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613
boto_session = boto3.Session()
credentials = boto_session.get_credentials().get_frozen_credentials()
region = self._properties.get(SIGV4_REGION, boto_session.region_name)
credentials = self._boto_session.get_credentials().get_frozen_credentials()
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name)
service = self._properties.get(SIGV4_SERVICE, "execute-api")

url = str(request.url).split("?")[0]
Expand Down
11 changes: 8 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ pandas = { version = ">=1.0.0,<3.0.0", optional = true }
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
ray = [
{ version = "==2.10.0", python = "<3.9", optional = true },
{ version = ">=2.10.0,<3.0.0", python = ">=3.9", optional = true },
{ version = ">=2.10.0,<=2.44.0", python = ">=3.9", optional = true },
]
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
mypy-boto3-glue = { version = ">=1.28.18", optional = true }
boto3 = { version = ">=1.24.59", optional = true }
s3fs = { version = ">=2023.1.0", optional = true }
adlfs = { version = ">=2023.1.0", optional = true }
Expand Down Expand Up @@ -102,6 +101,8 @@ cython = "3.1.2"
deptry = ">=0.14,<0.24"
datafusion = ">=44,<48"
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
mypy-boto3-glue = ">=1.28.18"
mypy-boto3-dynamodb = ">=1.28.18"

[tool.poetry.group.docs.dependencies]
# for mkdocs
Expand Down Expand Up @@ -217,6 +218,10 @@ ignore_missing_imports = true
module = "mypy_boto3_glue.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "mypy_boto3_dynamodb.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "moto"
ignore_missing_imports = true
Expand Down Expand Up @@ -299,7 +304,7 @@ snappy = ["python-snappy"]
hive = ["thrift"]
hive-kerberos = ["thrift", "thrift_sasl", "kerberos"]
s3fs = ["s3fs"]
glue = ["boto3", "mypy-boto3-glue"]
glue = ["boto3"]
adlfs = ["adlfs"]
dynamodb = ["boto3"]
zstandard = ["zstandard"]
Expand Down
8 changes: 8 additions & 0 deletions tests/catalog/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,11 @@ def test_table_exists(
assert test_catalog.table_exists(identifier) is True
# Act and Assert for an non-existing table
assert test_catalog.table_exists(("non", "exist")) is False


@mock_aws
def test_dynamodb_client_override() -> None:
catalog_name = "glue"
test_client = boto3.client("dynamodb", region_name="us-west-2")
test_catalog = DynamoDbCatalog(catalog_name, test_client)
assert test_catalog.dynamodb is test_client