-
Notifications
You must be signed in to change notification settings - Fork 388
Robustify boto3 session handling (DynamoDB, RestCatalog) #2071
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
e81cd9c
51f1e0b
eb0e1f9
b7ac8ae
457bc4a
cc28012
9819074
26f0767
6584f25
81ed8c6
d89610a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| ) | ||
|
|
||
| import boto3 | ||
| from mypy_boto3_dynamodb.client import DynamoDBClient | ||
|
|
||
| from pyiceberg.catalog import ( | ||
| BOTOCORE_SESSION, | ||
|
|
@@ -94,20 +95,29 @@ | |
|
|
||
|
|
||
| class DynamoDbCatalog(MetastoreCatalog): | ||
| def __init__(self, name: str, **properties: str): | ||
| super().__init__(name, **properties) | ||
| def __init__(self, name: str, client: Optional[DynamoDBClient] = None, **properties: str): | ||
Fokko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Dynamodb catalog. | ||
| session = boto3.Session( | ||
| profile_name=properties.get(DYNAMODB_PROFILE_NAME), | ||
| region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION), | ||
| botocore_session=properties.get(BOTOCORE_SESSION), | ||
| aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), | ||
| aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), | ||
| aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN), | ||
| ) | ||
| self.dynamodb = session.client(DYNAMODB_CLIENT) | ||
| self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT) | ||
| self._ensure_catalog_table_exists_or_create() | ||
| Args: | ||
| name: Name to identify the catalog. | ||
| client: An optional boto3 dynamodb client. | ||
| properties: Properties for dynamodb client construction and configuration. | ||
| """ | ||
| super().__init__(name, **properties) | ||
| if client is not None: | ||
| self.dynamodb = client | ||
| else: | ||
| session = boto3.Session( | ||
| profile_name=properties.get(DYNAMODB_PROFILE_NAME), | ||
| region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION), | ||
| botocore_session=properties.get(BOTOCORE_SESSION), | ||
| aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), | ||
| aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), | ||
| aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN), | ||
| ) | ||
| self.dynamodb = session.client(DYNAMODB_CLIENT) | ||
| self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT) | ||
|
||
| self._ensure_catalog_table_exists_or_create() | ||
|
||
|
|
||
| def _ensure_catalog_table_exists_or_create(self) -> None: | ||
| if self._dynamodb_table_exists(): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
|
|
||
| from pyiceberg import __version__ | ||
| from pyiceberg.catalog import ( | ||
| BOTOCORE_SESSION, | ||
| TOKEN, | ||
| URI, | ||
| WAREHOUSE_LOCATION, | ||
|
|
@@ -53,6 +54,7 @@ | |
| TableAlreadyExistsError, | ||
| UnauthorizedError, | ||
| ) | ||
| from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids | ||
| from pyiceberg.schema import Schema, assign_fresh_schema_ids | ||
| from pyiceberg.table import ( | ||
|
|
@@ -72,7 +74,7 @@ | |
| from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties | ||
| from pyiceberg.types import transform_dict_value_to_str | ||
| from pyiceberg.utils.deprecated import deprecation_message | ||
| from pyiceberg.utils.properties import get_header_properties, property_as_bool | ||
| from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool | ||
|
|
||
| if TYPE_CHECKING: | ||
| import pyarrow as pa | ||
|
|
@@ -390,11 +392,17 @@ class SigV4Adapter(HTTPAdapter): | |
| def __init__(self, **properties: str): | ||
| super().__init__() | ||
| self._properties = properties | ||
| self._boto_session = boto3.Session( | ||
| region_name=get_first_property_value(self._properties, AWS_REGION), | ||
| botocore_session=self._properties.get(BOTOCORE_SESSION), | ||
| aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID), | ||
| aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY), | ||
| aws_session_toxken=get_first_property_value(self._properties, AWS_SESSION_TOKEN), | ||
jayceslesar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613 | ||
| boto_session = boto3.Session() | ||
| credentials = boto_session.get_credentials().get_frozen_credentials() | ||
| region = self._properties.get(SIGV4_REGION, boto_session.region_name) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Fokko @kevinjqliu probably worth refactoring this class out to get some proper test coverage?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @jayceslesar yes, I think that makes sense. Let's do that in a separate PR 👍 |
||
| credentials = self._boto_session.get_credentials().get_frozen_credentials() | ||
| region = self._properties.get(SIGV4_REGION, self._boto_session.region_name) | ||
| service = self._properties.get(SIGV4_SERVICE, "execute-api") | ||
|
|
||
| url = str(request.url).split("?")[0] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,6 +72,7 @@ ray = [ | |
| python-snappy = { version = ">=0.6.0,<1.0.0", optional = true } | ||
| thrift = { version = ">=0.13.0,<1.0.0", optional = true } | ||
| mypy-boto3-glue = { version = ">=1.28.18", optional = true } | ||
| mypy-boto3-dynamodb = { version = ">=1.28.18", optional = true } | ||
| boto3 = { version = ">=1.24.59", optional = true } | ||
| s3fs = { version = ">=2023.1.0", optional = true } | ||
| adlfs = { version = ">=2023.1.0", optional = true } | ||
|
|
@@ -217,6 +218,10 @@ ignore_missing_imports = true | |
| module = "mypy_boto3_glue.*" | ||
| ignore_missing_imports = true | ||
|
|
||
| [[tool.mypy.overrides]] | ||
| module = "mypy_boto3_dynamodb.*" | ||
| ignore_missing_imports = true | ||
|
|
||
| [[tool.mypy.overrides]] | ||
| module = "moto" | ||
| ignore_missing_imports = true | ||
|
|
@@ -301,7 +306,7 @@ hive-kerberos = ["thrift", "thrift_sasl", "kerberos"] | |
| s3fs = ["s3fs"] | ||
| glue = ["boto3", "mypy-boto3-glue"] | ||
| adlfs = ["adlfs"] | ||
| dynamodb = ["boto3"] | ||
| dynamodb = ["boto3", "mypy-boto3-dynamodb"] | ||
|
||
| zstandard = ["zstandard"] | ||
| sql-postgres = ["sqlalchemy", "psycopg2-binary"] | ||
| sql-sqlite = ["sqlalchemy"] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.