Skip to content
This repository was archived by the owner on May 22, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ def _serialize_body_parameter(self, builder: OperationType) -> List[str]:
elif self.code_model.options["models_mode"] == "dpg":
create_body_call = (
f"_{body_kwarg_name} = json.dumps({body_param.client_name}, "
"cls=AzureJSONEncoder) # type: ignore"
"cls=AzureJSONEncoder, exclude_readonly=True, exclude_none=False) # type: ignore"
)
else:
create_body_call = f"_{body_kwarg_name} = {body_param.client_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import re
import copy
import typing
import email
import json
from datetime import datetime, date, time, timedelta, timezone
from json import JSONEncoder
import isodate
Expand Down Expand Up @@ -128,10 +129,24 @@ def _is_readonly(p):
class AzureJSONEncoder(JSONEncoder):
"""A JSON encoder that's capable of serializing datetime objects and bytes."""

def __init__(self, *args, exclude_readonly: bool = False, exclude_none: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.exclude_readonly = exclude_readonly
self.exclude_none = exclude_none

def default(self, o): # pylint: disable=too-many-return-statements
if _is_model(o):
readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)]
return {k: v for k, v in o.items() if k not in readonly_props}
result = {k: v for k, v in o.items()}
if self.exclude_readonly:
readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)]
for k in readonly_props:
if k in result:
result.pop(k)
if self.exclude_none:
for k in list(result.keys()):
if result[k] is None or isinstance(result[k], _Null):
Comment thread
tadelesh marked this conversation as resolved.
Outdated
result.pop(k)
return result
if isinstance(o, (bytes, bytearray)):
return base64.b64encode(o).decode()
if isinstance(o, _Null):
Expand Down Expand Up @@ -296,10 +311,17 @@ def get_deserializer(annotation: typing.Any, rf: typing.Optional["_RestField"] =


def _get_model(module_name: str, model_name: str):
models = {k: v for k, v in sys.modules[module_name].__dict__.items() if isinstance(v, type)}
models = {
k: v
for k, v in sys.modules[module_name].__dict__.items()
if isinstance(v, type) or isinstance(v, typing._GenericAlias)
}
module_end = module_name.rsplit(".", 1)[0]
module = sys.modules[module_end]
models.update({k: v for k, v in module.__dict__.items() if isinstance(v, type)})
models.update({
k: v
for k, v in sys.modules[module_end].__dict__.items()
if isinstance(v, type) or isinstance(v, typing._GenericAlias)
})
if isinstance(model_name, str):
model_name = model_name.split(".")[-1]
if model_name not in models:
Expand Down Expand Up @@ -515,6 +537,36 @@ class Model(_MyMutableMapping):
return cls(data)
return mapped_cls._deserialize(data) # pylint: disable=protected-access

def as_dict(self, *, exclude_readonly: bool = False, exclude_none: bool = False) -> typing.Dict[str, typing.Any]:
Comment thread
tadelesh marked this conversation as resolved.
Outdated
result = {}
if exclude_readonly:
readonly_props = [p._rest_name for p in self._attr_to_rest_field.values() if _is_readonly(p)]
for k, v in self.items():
if exclude_readonly and k in readonly_props:
continue
if exclude_none and (v is None or isinstance(v, _Null)):
Comment thread
tadelesh marked this conversation as resolved.
Outdated
continue
result[k] = Model._as_dict_value(v, exclude_readonly=exclude_readonly, exclude_none=exclude_none)
return result

@staticmethod
def _as_dict_value(v: typing.Any, exclude_readonly: bool = False, exclude_none: bool = False) -> typing.Any:
Comment thread
tadelesh marked this conversation as resolved.
Outdated
if v is None or isinstance(v, _Null):
return None
elif isinstance(v, (list, tuple, set)):
return [
Model._as_dict_value(x, exclude_readonly=exclude_readonly, exclude_none=exclude_none)
for x in v
]
elif isinstance(v, dict):
return {
dk: Model._as_dict_value(dv, exclude_readonly=exclude_readonly, exclude_none=exclude_none)
for dk, dv in v.items()
}
else:
return v.as_dict(exclude_readonly=exclude_readonly, exclude_none=exclude_none) \
if hasattr(v, "as_dict") else v


def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-return-statements, too-many-statements
annotation: typing.Any,
Expand All @@ -524,8 +576,17 @@ def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-retur
if not annotation or annotation in [int, float]:
return None

# is it a forward ref / in quotes?
if isinstance(annotation, (str, typing.ForwardRef)):
try:
model_name = annotation.__forward_arg__ # type: ignore
except AttributeError:
model_name = annotation
if module is not None:
annotation = _get_model(module, model_name)

try:
if module and _is_model(_get_model(module, annotation)):
if module and _is_model(annotation):
if rf:
rf._is_model = True

Expand All @@ -552,22 +613,8 @@ def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-retur
except AttributeError:
pass

if getattr(annotation, "__origin__", None) is typing.Union:

def _deserialize_with_union(union_annotation, obj):
for t in union_annotation.__args__:
try:
return _deserialize(t, obj, module, rf)
except DeserializationError:
pass
raise DeserializationError()

return functools.partial(_deserialize_with_union, annotation)

# is it optional?
try:
# right now, assuming we don't have unions, since we're getting rid of the only
# union we used to have in msrest models, which was union of str and enum
if any(a for a in annotation.__args__ if a == type(None)):
if_obj_deserializer = _get_deserialize_callable_from_annotation(
next(a for a in annotation.__args__ if a != type(None)), module, rf
Expand All @@ -582,14 +629,17 @@ def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-retur
except AttributeError:
pass

# is it a forward ref / in quotes?
if isinstance(annotation, (str, typing.ForwardRef)):
try:
model_name = annotation.__forward_arg__ # type: ignore
except AttributeError:
model_name = annotation
if module is not None:
annotation = _get_model(module, model_name)
if getattr(annotation, "__origin__", None) is typing.Union:

def _deserialize_with_union(union_annotation, obj):
for t in union_annotation.__args__:
try:
return _deserialize(t, obj, module, rf)
except DeserializationError:
pass
raise DeserializationError()

return functools.partial(_deserialize_with_union, annotation)

try:
if annotation._name == "Dict":
Expand Down Expand Up @@ -742,6 +792,7 @@ class _RestField:
return
if self._is_model and not _is_model(value):
obj.__setitem__(self._rest_name, _deserialize(self._type, value))
return
obj.__setitem__(self._rest_name, _serialize(value, self._format))

def _get_deserialize_callable_from_annotation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,24 @@ def _is_readonly(p):
class AzureJSONEncoder(JSONEncoder):
"""A JSON encoder that's capable of serializing datetime objects and bytes."""

def __init__(self, *args, exclude_readonly: bool = False, exclude_none: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.exclude_readonly = exclude_readonly
self.exclude_none = exclude_none

def default(self, o): # pylint: disable=too-many-return-statements
if _is_model(o):
readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)]
return {k: v for k, v in o.items() if k not in readonly_props}
result = {k: v for k, v in o.items()}
if self.exclude_readonly:
readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)]
for k in readonly_props:
if k in result:
result.pop(k)
if self.exclude_none:
for k in list(result.keys()):
if result[k] is None or isinstance(result[k], _Null):
result.pop(k)
return result
if isinstance(o, (bytes, bytearray)):
return base64.b64encode(o).decode()
if isinstance(o, _Null):
Expand Down Expand Up @@ -296,10 +310,19 @@ def get_deserializer(annotation: typing.Any, rf: typing.Optional["_RestField"] =


def _get_model(module_name: str, model_name: str):
models = {k: v for k, v in sys.modules[module_name].__dict__.items() if isinstance(v, type)}
models = {
k: v
for k, v in sys.modules[module_name].__dict__.items()
if isinstance(v, type) or isinstance(v, typing._GenericAlias)
}
module_end = module_name.rsplit(".", 1)[0]
module = sys.modules[module_end]
models.update({k: v for k, v in module.__dict__.items() if isinstance(v, type)})
models.update(
{
k: v
for k, v in sys.modules[module_end].__dict__.items()
if isinstance(v, type) or isinstance(v, typing._GenericAlias)
}
)
if isinstance(model_name, str):
model_name = model_name.split(".")[-1]
if model_name not in models:
Expand Down Expand Up @@ -515,6 +538,34 @@ def _deserialize(cls, data):
return cls(data)
return mapped_cls._deserialize(data) # pylint: disable=protected-access

def as_dict(self, *, exclude_readonly: bool = False, exclude_none: bool = False) -> typing.Dict[str, typing.Any]:
result = {}
if exclude_readonly:
readonly_props = [p._rest_name for p in self._attr_to_rest_field.values() if _is_readonly(p)]
for k, v in self.items():
if exclude_readonly and k in readonly_props:
continue
if exclude_none and (v is None or isinstance(v, _Null)):
continue
result[k] = Model._as_dict_value(v, exclude_readonly=exclude_readonly, exclude_none=exclude_none)
return result

@staticmethod
def _as_dict_value(v: typing.Any, exclude_readonly: bool = False, exclude_none: bool = False) -> typing.Any:
if v is None or isinstance(v, _Null):
return None
elif isinstance(v, (list, tuple, set)):
return [Model._as_dict_value(x, exclude_readonly=exclude_readonly, exclude_none=exclude_none) for x in v]
elif isinstance(v, dict):
return {
dk: Model._as_dict_value(dv, exclude_readonly=exclude_readonly, exclude_none=exclude_none)
for dk, dv in v.items()
}
else:
return (
v.as_dict(exclude_readonly=exclude_readonly, exclude_none=exclude_none) if hasattr(v, "as_dict") else v
)


def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-return-statements, too-many-statements
annotation: typing.Any,
Expand All @@ -524,8 +575,17 @@ def _get_deserialize_callable_from_annotation( # pylint: disable=too-many-retur
if not annotation or annotation in [int, float]:
return None

# is it a forward ref / in quotes?
if isinstance(annotation, (str, typing.ForwardRef)):
try:
model_name = annotation.__forward_arg__ # type: ignore
except AttributeError:
model_name = annotation
if module is not None:
annotation = _get_model(module, model_name)

try:
if module and _is_model(_get_model(module, annotation)):
if module and _is_model(annotation):
if rf:
rf._is_model = True

Expand All @@ -552,22 +612,8 @@ def _deserialize_model(model_deserializer: typing.Optional[typing.Callable], obj
except AttributeError:
pass

if getattr(annotation, "__origin__", None) is typing.Union:

def _deserialize_with_union(union_annotation, obj):
for t in union_annotation.__args__:
try:
return _deserialize(t, obj, module, rf)
except DeserializationError:
pass
raise DeserializationError()

return functools.partial(_deserialize_with_union, annotation)

# is it optional?
try:
# right now, assuming we don't have unions, since we're getting rid of the only
# union we used to have in msrest models, which was union of str and enum
if any(a for a in annotation.__args__ if a == type(None)):
if_obj_deserializer = _get_deserialize_callable_from_annotation(
next(a for a in annotation.__args__ if a != type(None)), module, rf
Expand All @@ -582,14 +628,17 @@ def _deserialize_with_optional(if_obj_deserializer: typing.Optional[typing.Calla
except AttributeError:
pass

# is it a forward ref / in quotes?
if isinstance(annotation, (str, typing.ForwardRef)):
try:
model_name = annotation.__forward_arg__ # type: ignore
except AttributeError:
model_name = annotation
if module is not None:
annotation = _get_model(module, model_name)
if getattr(annotation, "__origin__", None) is typing.Union:

def _deserialize_with_union(union_annotation, obj):
for t in union_annotation.__args__:
try:
return _deserialize(t, obj, module, rf)
except DeserializationError:
pass
raise DeserializationError()

return functools.partial(_deserialize_with_union, annotation)

try:
if annotation._name == "Dict":
Expand Down Expand Up @@ -742,6 +791,7 @@ def __set__(self, obj: Model, value) -> None:
return
if self._is_model and not _is_model(value):
obj.__setitem__(self._rest_name, _deserialize(self._type, value))
return
obj.__setitem__(self._rest_name, _serialize(value, self._format))

def _get_deserialize_callable_from_annotation(
Expand Down
Loading