Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/spellcheck-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
accessor
accessors
Args
Avro
Nestedfield
ASF
BD
bool
Expand All @@ -29,6 +31,7 @@ func
IcebergType
io
NativeFile
NaN
NestedField
nullability
pragma
Expand Down
15 changes: 13 additions & 2 deletions python/src/iceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class Schema:
>>> from iceberg import types
"""

def __init__(self, *columns: Iterable[NestedField], schema_id: int, identifier_field_ids: Optional[List[int]] = None):
self._struct = StructType(*columns) # type: ignore
def __init__(self, *columns: NestedField, schema_id: int, identifier_field_ids: Optional[List[int]] = None):
self._struct = StructType(*columns)
self._schema_id = schema_id
self._identifier_field_ids = identifier_field_ids or []
self._name_to_id: Dict[str, int] = index_by_name(self)
Expand All @@ -62,6 +62,17 @@ def __repr__(self):
f"Schema(fields={repr(self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"
)

def __eq__(self, other) -> bool:
Comment thread
Fokko marked this conversation as resolved.
if not other:
return False

columns = list(self.columns)

if len(columns) != len(other.columns):
return False

return all([lhs == rhs for lhs, rhs in zip(columns, other.columns)])
Comment thread
Fokko marked this conversation as resolved.
Outdated

@property
def columns(self) -> Iterable[NestedField]:
"""A list of the top-level fields in the underlying struct"""
Expand Down
2 changes: 1 addition & 1 deletion python/src/iceberg/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class NestedField(IcebergType):
name: str = field()
field_type: IcebergType = field()
is_optional: bool = field(default=True)
doc: Optional[str] = field(default=None, repr=False)
doc: Optional[str] = field(default=None, repr=False, compare=False, hash=False)
Comment thread
Fokko marked this conversation as resolved.
Outdated

_instances: ClassVar[Dict[Tuple[bool, int, str, IcebergType, Optional[str]], "NestedField"]] = {}

Expand Down
338 changes: 338 additions & 0 deletions python/src/iceberg/utils/schema_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Utility class for converting between Avro and Iceberg schemas

"""
import logging
from typing import Any, Dict, List, Optional, Tuple, Union

from iceberg.schema import Schema
from iceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimeType,
UUIDType,
)

logger = logging.getLogger(__name__)


class AvroSchemaConversion:
PRIMITIVE_FIELD_TYPE_MAP: Dict[str, PrimitiveType] = {
"boolean": BooleanType(),
"bytes": BinaryType(),
"date": DateType(),
"double": DoubleType(),
"float": FloatType(),
"int": IntegerType(),
"long": LongType(),
"string": StringType(),
"time-millis": TimeType(),
"timestamp-millis": TimestampType(),
Comment thread
Fokko marked this conversation as resolved.
Outdated
"uuid": UUIDType(),
}

def avro_to_iceberg(self, avro_schema: Dict[str, Any]) -> Schema:
"""Converts an Apache Avro into an Apache Iceberg schema equivalent

This expects to have field id's to be encoded in the Avro schema::

{
"type": "record",
"name": "manifest_file",
"fields": [
{"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
{"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
]
}

Example:
This converts an Avro schema into a Iceberg schema:

>>> avro_schema = AvroSchemaConversion().avro_to_iceberg({
... "type": "record",
... "name": "manifest_file",
... "fields": [
... {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
... {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
... ]
... })
>>> iceberg_schema = Schema(
... NestedField(
... field_id=500, name="manifest_path", field_type=StringType(), is_optional=False, doc="Location URI with FS scheme"
... ),
... NestedField(
... field_id=501, name="manifest_length", field_type=LongType(), is_optional=False, doc="Total file size in bytes"
... ),
... schema_id=1
... )
>>> avro_schema == iceberg_schema
True

Args:
avro_schema (Dict[str, Any]): The JSON decoded Avro schema

Returns:
Equivalent Iceberg schema
"""
fields = self._parse_record(avro_schema)
return Schema(*fields.fields, schema_id=1)

def _parse_record(self, avro_field: Dict[str, Any]) -> StructType:
Comment thread
Fokko marked this conversation as resolved.
Outdated
fields = [self._parse_field(field) for field in avro_field["fields"]]
Comment thread
Fokko marked this conversation as resolved.
Outdated
return StructType(*fields) # type: ignore

def _resolve_union(self, type_union: Union[Dict, List, str]) -> Tuple[Union[str, Dict[str, Any]], bool]:
Comment thread
Fokko marked this conversation as resolved.
Outdated
"""
Converts Unions into their type and resolves if the field is optional

Examples:
>>> AvroSchemaConversion()._resolve_union('str')
('str', False)
>>> AvroSchemaConversion()._resolve_union(['null', 'str'])
('str', True)
>>> AvroSchemaConversion()._resolve_union([{'type': 'str'}])
({'type': 'str'}, False)
>>> AvroSchemaConversion()._resolve_union(['null', {'type': 'str'}])
({'type': 'str'}, True)

Args:
type_union: The field, can be a string 'str', list ['null', 'str'], or dict {"type": 'str'}

Returns:
A tuple containing the type and nullability

Raises:
TypeError: In the case non-optional union types are encountered
"""
avro_types: Union[Dict, List]
if isinstance(type_union, str):
# It is a primitive and required
return type_union, False
elif isinstance(type_union, dict):
# It is a context and required
return type_union, False
else:
avro_types = type_union

is_optional = "null" in avro_types
Comment thread
Fokko marked this conversation as resolved.

# Filter the null value, so we know the actual type
avro_types = list(filter(lambda t: t != "null", avro_types))

if len(avro_types) != 1:
raise TypeError("Non-optional types aren't part of the Iceberg specification")

avro_type = avro_types[0]

return avro_type, is_optional

def _resolve_inner_type(
self, raw_avro_type: Dict[str, Any], inner_field_name: str, id_field: str
) -> Tuple[IcebergType, bool]:
plain_type, element_is_optional = self._resolve_union(raw_avro_type[inner_field_name])
inner_field = self._parse_field(plain_type, field_id=raw_avro_type[id_field])
if isinstance(inner_field, NestedField):
return inner_field.field_type, element_is_optional
return inner_field, element_is_optional

def _parse_field(self, field: Union[str, Dict[str, Any]], field_id: Optional[int] = None) -> IcebergType:
"""
Recursively walks through the Schema, constructing the Iceberg schema

Examples:
>>> avro_schema = AvroSchemaConversion().avro_to_iceberg({
... "type": "record",
... "name": "manifest_file",
... "fields": [
... {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
... {
... "name": "partitions",
... "type": [
... "null",
... {
... "type": "array",
... "items": {
... "type": "record",
... "name": "r508",
... "fields": [
... {
... "name": "contains_null",
... "type": "boolean",
... "doc": "True if any file has a null partition value",
... "field-id": 509,
... },
... {
... "name": "contains_nan",
... "type": ["null", "boolean"],
... "doc": "True if any file has a NaN partition value",
... "default": None,
... "field-id": 518,
... },
... ],
... },
... "element-id": 508,
... },
... ],
... "doc": "Summary for each partition",
... "default": None,
... "field-id": 507,
... },
... ]
... })
>>> iceberg_schema = Schema(
... NestedField(
... field_id=500, name="manifest_path", field_type=StringType(), is_optional=False, doc="Location URI with FS scheme"
... ),
... NestedField(
... field_id=507,
... name="partitions",
... field_type=ListType(
... element_id=508,
... element_type=StructType(
... NestedField(
... field_id=509,
... name="contains_null",
... field_type=BooleanType(),
... is_optional=False,
... doc="True if any file has a null partition value",
... ),
... NestedField(
... field_id=518,
... name="contains_nan",
... field_type=BooleanType(),
... is_optional=True,
... doc="True if any file has a NaN partition value",
... )
... ),
... element_is_optional=False
... ),
... is_optional=True,
... doc="Summary for each partition",
... ),
... schema_id=1
... )
>>> avro_schema == iceberg_schema
True

Args:
field: The Avro field
field_id: Ability to override the field_id when it is provided from up in the tree (in the case of a list or map)

Returns:
The equivalent IcebergType
"""
# In the case of a primitive field
if isinstance(field, str):
return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[field]

@rdblue rdblue May 27, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the logic in these methods hard to follow and I think it's because the cases are not cleanly separated by method. This is a good example. This method is handling an Avro field, but this is checking if the field is not a field and is instead a primitive type. That should never happen in a schema so it raises questions about when this method is called.

This is one reason why we use the visitor pattern elsewhere. Keeping the logic to traverse a schema separate from the logic to actually do something with it is useful, but it also keeps you using a consistent and focused pattern to construct this logic: here's how to convert a record, here's how to convert a field, here's how to convert a map, etc.

Since this isn't handling an Avro Schema class, I wasn't originally going to suggest it, but I think this would be cleaner and easier to review/maintain if it were structured around Avro's schema model:

def _convert_schema(schema: str | Dict[str, Any]):
    if isinstance(schema, str):
        return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[schema]
    elif isinstance(schema, dict):
        type_name = schema["type"]
        if type_name == "record":
            return _convert_record(schema)
        elif type_name == "union":
            ...
        elif type_name == "map":
            ...
        elif type_name == "array":
            ...
        else:
            logical_type = schema.get("logicalType")
            if logicalType:
                ...
            else:
                return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[type_name]
    else:
        raise ValueError(f"Cannot convert invalid schema: {schema}")

def _convert_record(schema: Dict[str, Any]):
    ... # calls _convert_field

def _convert_field(field: Dict[str, Any]):
    ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's much cleaner indeed. The method grew a bit over time, but I really like the suggestion of decoupling the field and type. I've updated the code.


raw_avro_type, is_optional = self._resolve_union(field["type"])
if isinstance(raw_avro_type, dict):
avro_type = raw_avro_type["type"]
else:
avro_type = raw_avro_type

if not field_id and "field-id" not in field:
raise ValueError(f"The field-id is missing from the Avro field: {field}")

field_id = field_id or field["field-id"]
field_name = field["name"]
field_doc = field.get("doc")

# Check on logical types
if isinstance(raw_avro_type, dict) and "logicalType" in raw_avro_type:
logical_type = raw_avro_type.get("logicalType")
if logical_type == "decimal":
return NestedField(
field_id=field_id,
name=field_name,
field_type=DecimalType(precision=raw_avro_type["precision"], scale=raw_avro_type["scale"]),
is_optional=is_optional,
doc=field_doc,
)
else:
raise ValueError(f"Unknown logical type: {field}")

# Check on primitive types
elif isinstance(avro_type, str) and avro_type in AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP:
return NestedField(
field_id=field_id,
name=field_name,
field_type=AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[avro_type],
is_optional=is_optional,
doc=field_doc,
)

# Check on complex types
elif avro_type == "record":
return NestedField(
field_id=field_id, name=field_name, field_type=self._parse_record(field), is_optional=is_optional, doc=field_doc
)
elif avro_type == "array":
Comment thread
Fokko marked this conversation as resolved.
Outdated
assert isinstance(raw_avro_type, dict)
element_type, element_is_optional = self._resolve_inner_type(raw_avro_type, "items", "element-id")
return NestedField(
field_id=field_id,
name=field_name,
field_type=ListType(
element_id=raw_avro_type["element-id"], element_type=element_type, element_is_optional=element_is_optional
),
is_optional=is_optional,
doc=field_doc,
)
elif avro_type == "map":
assert isinstance(raw_avro_type, dict)
value_type, value_is_optional = self._resolve_inner_type(raw_avro_type, "values", "value-id")
return NestedField(
field_id=field_id,
name=field_name,
field_type=MapType(
key_id=raw_avro_type["key-id"],
# Avro only supports string keys
key_type=StringType(),
value_id=raw_avro_type["value-id"],
value_type=value_type,
value_is_optional=value_is_optional,
),
is_optional=is_optional,
doc=field_doc,
)
elif avro_type == "fixed":
return NestedField(
field_id=field_id,
name=field_name,
field_type=FixedType(length=field["size"]),
is_optional=is_optional,
doc=field_doc,
)
else:
raise ValueError(f"Unknown type: {field}")
Loading