Skip to content

Commit 9b2a640

Browse files
authored
[SchemaRegistry] remove all serializer caches (Azure#21020)
1 parent 3e72868 commit 9b2a640

File tree

5 files changed

+33
-38
lines changed

5 files changed

+33
-38
lines changed

sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
# IN THE SOFTWARE.
2424
#
2525
# --------------------------------------------------------------------------
26-
from typing import BinaryIO, Union, TypeVar, Dict
26+
try:
27+
from functools import lru_cache
28+
except ImportError:
29+
from backports.functools_lru_cache import lru_cache
30+
from typing import BinaryIO, Union, TypeVar
2731
from io import BytesIO
2832
import avro
2933
from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder
@@ -38,9 +42,18 @@ def __init__(self, codec=None):
3842
:param str codec: The writer codec. If None, let the avro library decides.
3943
"""
4044
self._writer_codec = codec
41-
self._schema_writer_cache = {} # type: Dict[str, DatumWriter]
42-
self._schema_reader_cache = {} # type: Dict[str, DatumReader]
4345

46+
@lru_cache(maxsize=128)
47+
def _get_schema_writer(self, schema): # pylint: disable=no-self-use
48+
schema = avro.schema.parse(schema)
49+
return DatumWriter(schema)
50+
51+
@lru_cache(maxsize=128)
52+
def _get_schema_reader(self, schema): # pylint: disable=no-self-use
53+
schema = avro.schema.parse(schema)
54+
return DatumReader(writers_schema=schema)
55+
56+
# pylint: disable=no-self-use
4457
def serialize(
4558
self,
4659
data, # type: ObjectType
@@ -60,21 +73,15 @@ def serialize(
6073
if not schema:
6174
raise ValueError("Schema is required in Avro serializer.")
6275

63-
if not isinstance(schema, avro.schema.Schema):
64-
schema = avro.schema.parse(schema)
65-
66-
try:
67-
writer = self._schema_writer_cache[str(schema)]
68-
except KeyError:
69-
writer = DatumWriter(schema)
70-
self._schema_writer_cache[str(schema)] = writer
76+
writer = self._get_schema_writer(str(schema))
7177

7278
stream = BytesIO()
7379
with stream:
7480
writer.write(data, BinaryEncoder(stream))
7581
encoded_data = stream.getvalue()
7682
return encoded_data
7783

84+
# pylint: disable=no-self-use
7885
def deserialize(
7986
self,
8087
data, # type: Union[bytes, BinaryIO]
@@ -93,14 +100,7 @@ def deserialize(
93100
if not hasattr(data, 'read'):
94101
data = BytesIO(data)
95102

96-
if not isinstance(schema, avro.schema.Schema):
97-
schema = avro.schema.parse(schema)
98-
99-
try:
100-
reader = self._schema_reader_cache[str(schema)]
101-
except KeyError:
102-
reader = DatumReader(writers_schema=schema)
103-
self._schema_reader_cache[str(schema)] = reader
103+
reader = self._get_schema_reader(str(schema))
104104

105105
with data:
106106
bin_decoder = BinaryDecoder(data)

sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def __init__(self, **kwargs):
6363
if self._auto_register_schemas
6464
else self._schema_registry_client.get_schema_id
6565
)
66-
self._user_input_schema_cache = {}
6766

6867
def __enter__(self):
6968
# type: () -> SchemaRegistryAvroSerializer
@@ -115,6 +114,11 @@ def _get_schema(self, schema_id, **kwargs):
115114
).schema_content
116115
return schema_str
117116

117+
@classmethod
118+
@lru_cache(maxsize=128)
119+
def _parse_schema(cls, schema):
120+
return avro.schema.parse(schema)
121+
118122
def serialize(self, value, **kwargs):
119123
# type: (Mapping[str, Any], Any) -> bytes
120124
"""
@@ -132,13 +136,8 @@ def serialize(self, value, **kwargs):
132136
raw_input_schema = kwargs.pop("schema")
133137
except KeyError as e:
134138
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
135-
try:
136-
cached_schema = self._user_input_schema_cache[raw_input_schema]
137-
except KeyError:
138-
parsed_schema = avro.schema.parse(raw_input_schema)
139-
self._user_input_schema_cache[raw_input_schema] = parsed_schema
140-
cached_schema = parsed_schema
141139

140+
cached_schema = AvroSerializer._parse_schema(raw_input_schema)
142141
record_format_identifier = b"\0\0\0\0"
143142
schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
144143
data_bytes = self._avro_serializer.serialize(value, cached_schema)

sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ interactions:
2323
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
2424
response:
2525
body:
26-
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
26+
string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}'
2727
headers:
2828
content-type:
2929
- application/json
3030
date:
31-
- Thu, 30 Sep 2021 02:05:53 GMT
31+
- Fri, 01 Oct 2021 22:19:06 GMT
3232
location:
3333
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
3434
server:
@@ -38,9 +38,9 @@ interactions:
3838
transfer-encoding:
3939
- chunked
4040
x-schema-id:
41-
- f666e373299048fabaa4296f5dbfed46
41+
- 7b4eff1c25d9438a975ff7a3d985a5c6
4242
x-schema-id-location:
43-
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
43+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04
4444
x-schema-type:
4545
- Avro
4646
x-schema-version:

sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ interactions:
2323
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
2424
response:
2525
body:
26-
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
26+
string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}'
2727
headers:
2828
content-type:
2929
- application/json
3030
date:
31-
- Thu, 30 Sep 2021 02:05:54 GMT
31+
- Fri, 01 Oct 2021 22:19:07 GMT
3232
location:
3333
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
3434
server:
@@ -38,9 +38,9 @@ interactions:
3838
transfer-encoding:
3939
- chunked
4040
x-schema-id:
41-
- f666e373299048fabaa4296f5dbfed46
41+
- 7b4eff1c25d9438a975ff7a3d985a5c6
4242
x-schema-id-location:
43-
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
43+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04
4444
x-schema-type:
4545
- Avro
4646
x-schema-version:

sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
8686
dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
8787
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)
8888

89-
assert schema_str in sr_avro_serializer._user_input_schema_cache
90-
9189
assert encoded_data[0:4] == b'\0\0\0\0'
9290
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
9391
assert encoded_data[4:36] == schema_id.encode("utf-8")
@@ -111,8 +109,6 @@ def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregi
111109
dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
112110
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)
113111

114-
assert schema_str in sr_avro_serializer._user_input_schema_cache
115-
116112
assert encoded_data[0:4] == b'\0\0\0\0'
117113
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
118114
assert encoded_data[4:36] == schema_id.encode("utf-8")

0 commit comments

Comments
 (0)