Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
### Features Added

- Async version of `AvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`.
- `SchemaParseError`, `SchemaSerializationError`, and `SchemaDeserializationError` have been introduced under `azure.schemaregistry.serializer.avroserializer.exceptions` and will be raised for corresponding operations.
- `SchemaParseError` and `SchemaSerializationError` may be raised for errors when calling `serialize` on `AvroSerializer`.
- `SchemaParseError` and `SchemaDeserializationError` may be raised for errors when calling `deserialize` on `AvroSerializer`.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from typing import BinaryIO, TypeVar, Union
from abc import abstractmethod

ObjectType = TypeVar("ObjectType")

class AbstractAvroObjectSerializer(object):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this comment will matter much, since we're changing the design anyway. But, just an option:

  • If, in the future, a user wants to use some other avro library implementation, can we make this a public class (like CheckpointStore) and have the user pass in their implementation? Then, if we were to do the exception catching in the object serializer, we wouldn't need our own exception types. We can just raise the regular exceptions from the underlying object serializer implementation and the customer would know what to expect.
    (
  1. Don't know if this made sense. Please lmk if it didn't :)
  2. Don't know if this would move us backwards, since we've already arrived at a solution.
  3. This might be exposing too much detail to the user that they don't care about, so we might not want to do this.
    )

Copy link
Contributor

@yunhaoling yunhaoling Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as OOB discussion:

I think this is a great idea being able to let users opt in their own avro implementation.

I personally prefer keeping try/except wrapping in the schema registry avro serializer. My 2 cents are exception experience will be strongly consistent among different avro implementation if done in the schema registry avro serializer. If we want to define error raising as part of the protocol (abstraction), then users need to try/except wrap by themselves in which case tend to make mistake, e.g. forget to handle error

I don't think this would impact our current design, we could revisit it if users request for this in the future.

"""
An Avro serializer used for serializing/deserializing an Avro RecordSchema.
"""

@abstractmethod
def get_schema_fullname(
self,
schema, # type: str
):
# type: (str) -> str
"""
Returns the namespace-qualified name of the provided schema.
Schema must be a Avro RecordSchema:
https://avro.apache.org/docs/1.10.0/gettingstartedpython.html#Defining+a+schema
:param schema: An Avro RecordSchema
:type schema: str
:rtype: str
"""


@abstractmethod
def serialize(
self,
data, # type: ObjectType
schema, # type: str
):
# type: (ObjectType, str) -> bytes
"""Convert the provided value to it's binary representation and write it to the stream.
Schema must be a Avro RecordSchema:
https://avro.apache.org/docs/1.10.0/gettingstartedpython.html#Defining+a+schema
:param data: An object to serialize
:type data: ObjectType
:param schema: An Avro RecordSchema
:type schema: str
:returns: Encoded bytes
:rtype: bytes
"""

@abstractmethod
def deserialize(
self,
data, # type: Union[bytes, BinaryIO]
schema, # type: str
):
# type: (Union[bytes, BinaryIO], str) -> ObjectType
"""Read the binary representation into a specific type.
Return type will be ignored, since the schema is deduced from the provided bytes.
:param data: A stream of bytes or bytes directly
:type data: BinaryIO or bytes
:param schema: An Avro RecordSchema
:type schema: str
:returns: An instantiated object
:rtype: ObjectType
"""
Original file line number Diff line number Diff line change
@@ -1,28 +1,8 @@
# --------------------------------------------------------------------------
#
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the ""Software""), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

try:
from functools import lru_cache
except ImportError:
Expand All @@ -32,10 +12,12 @@
import avro
from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder

from ._abstract_avro_serializer import AbstractAvroObjectSerializer

ObjectType = TypeVar("ObjectType")


class AvroObjectSerializer(object):
class ApacheAvroObjectSerializer(AbstractAvroObjectSerializer):

def __init__(self, codec=None):
"""A Avro serializer using avro lib from Apache.
Expand All @@ -44,13 +26,21 @@ def __init__(self, codec=None):
self._writer_codec = codec

@lru_cache(maxsize=128)
def _get_schema_writer(self, schema): # pylint: disable=no-self-use
schema = avro.schema.parse(schema)
def parse_schema(self, schema): # pylint: disable=no-self-use
return avro.schema.parse(schema)

def get_schema_fullname(self, schema):
parsed_schema = self.parse_schema(schema)
return parsed_schema.fullname

@lru_cache(maxsize=128)
def get_schema_writer(self, schema): # pylint: disable=no-self-use
schema = self.parse_schema(schema)
return DatumWriter(schema)

@lru_cache(maxsize=128)
def _get_schema_reader(self, schema): # pylint: disable=no-self-use
schema = avro.schema.parse(schema)
def get_schema_reader(self, schema): # pylint: disable=no-self-use
schema = self.parse_schema(schema)
return DatumReader(writers_schema=schema)

# pylint: disable=no-self-use
Expand All @@ -66,14 +56,14 @@ def serialize(
:param data: An object to serialize
:type data: ObjectType
:param schema: An Avro RecordSchema
:type schema: Union[str, bytes, avro.schema.Schema]
:type schema: str
:returns: Encoded bytes
:rtype: bytes
"""
if not schema:
raise ValueError("Schema is required in Avro serializer.")

writer = self._get_schema_writer(str(schema))
writer = self.get_schema_writer(schema)

stream = BytesIO()
with stream:
Expand All @@ -93,14 +83,14 @@ def deserialize(
:param data: A stream of bytes or bytes directly
:type data: BinaryIO or bytes
:param schema: An Avro RecordSchema
:type schema: Union[str, bytes, avro.schema.Schema]
:type schema: str
:returns: An instantiated object
:rtype: ObjectType
"""
if not hasattr(data, 'read'):
data = BytesIO(data)

reader = self._get_schema_reader(str(schema))
reader = self.get_schema_reader(schema)

with data:
bin_decoder = BinaryDecoder(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@
from io import BytesIO
from typing import Any, Dict, Mapping

from .exceptions import (
SchemaParseError,
SchemaSerializationError,
SchemaDeserializationError,
)
from ._apache_avro_serializer import ApacheAvroObjectSerializer as AvroObjectSerializer
from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
from ._avro_serializer import AvroObjectSerializer
from ._utils import parse_schema


class AvroSerializer(object):
Expand All @@ -53,19 +57,21 @@ def __init__(self, **kwargs):
# type: (Any) -> None
try:
self._schema_group = kwargs.pop("group_name")
self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient"
self._schema_registry_client = kwargs.pop(
"client"
) # type: "SchemaRegistryClient"
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
self._auto_register_schema_func = (
self._schema_registry_client.register_schema
if self._auto_register_schemas
else self._schema_registry_client.get_schema_properties
)
self._schema_registry_client.register_schema
if self._auto_register_schemas
else self._schema_registry_client.get_schema_properties
)

def __enter__(self):
# type: () -> SchemaRegistryAvroSerializer
# type: () -> AvroSerializer
self._schema_registry_client.__enter__()
return self

Expand Down Expand Up @@ -107,6 +113,7 @@ def _get_schema(self, schema_id, **kwargs):

:param str schema_id: Schema id
:return: Schema content
:rtype: str
"""
schema_str = self._schema_registry_client.get_schema(
schema_id, **kwargs
Expand All @@ -120,21 +127,45 @@ def serialize(self, value, **kwargs):
denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry
service. The remaining bytes are the real data payload.

Schema must be an Avro RecordSchema:
https://avro.apache.org/docs/1.10.0/gettingstartedpython.html#Defining+a+schema

:param value: The data to be encoded.
:type value: Mapping[str, Any]
:keyword schema: Required. The schema used to encode the data.
:paramtype schema: str
:rtype: bytes
:raises ~azure.schemaregistry.serializer.avroserializer.exceptions.SchemaParseError:
Indicates an issue with parsing schema.
:raises ~azure.schemaregistry.serializer.avroserializer.exceptions.SchemaSerializationError:
Indicates an issue with serializing data for provided schema.
"""
try:
raw_input_schema = kwargs.pop("schema")
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))

cached_schema = parse_schema(raw_input_schema)
record_format_identifier = b"\0\0\0\0"
schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
data_bytes = self._avro_serializer.serialize(value, cached_schema)

try:
schema_fullname = self._avro_serializer.get_schema_fullname(
raw_input_schema
)
except Exception as e: # pylint:disable=broad-except
SchemaParseError(
"Cannot parse schema: {}".format(raw_input_schema), error=e
).raise_with_traceback()

schema_id = self._get_schema_id(schema_fullname, raw_input_schema, **kwargs)
try:
data_bytes = self._avro_serializer.serialize(value, raw_input_schema)
except Exception as e: # pylint:disable=broad-except
SchemaSerializationError(
"Cannot serialize value '{}' for schema: {}".format(
value, raw_input_schema
),
error=e,
).raise_with_traceback()

stream = BytesIO()

Expand All @@ -152,16 +183,31 @@ def deserialize(self, value, **kwargs):
"""
Decode bytes data.

Data must follow format of associated Avro RecordSchema:
https://avro.apache.org/docs/1.10.0/gettingstartedpython.html#Defining+a+schema

:param bytes value: The bytes data needs to be decoded.
:rtype: Dict[str, Any]
:raises ~azure.schemaregistry.serializer.avroserializer.exceptions.SchemaParseError:
Indicates an issue with parsing schema.
:raises ~azure.schemaregistry.serializer.avroserializer.exceptions.SchemaDeserializationError:
Indicates an issue with deserializing value.
"""
# record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier.
schema_id = value[
SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)
].decode("utf-8")
schema_definition = self._get_schema(schema_id, **kwargs)

dict_value = self._avro_serializer.deserialize(
value[DATA_START_INDEX:], schema_definition
)
try:
dict_value = self._avro_serializer.deserialize(
value[DATA_START_INDEX:], schema_definition
)
except Exception as e: # pylint:disable=broad-except
SchemaDeserializationError(
"Cannot deserialize value '{}' for schema: {}".format(
value[DATA_START_INDEX], schema_definition
),
error=e,
).raise_with_traceback()
return dict_value

This file was deleted.

Loading