17
17
18
18
from dataclasses import dataclass , asdict , field , make_dataclass
19
19
from typing import ClassVar , List , Dict , Any , Mapping , Type , Union , ClassVar
20
+ import enum
21
+ import typing
20
22
from chip import tlv , ChipUtility
23
+ from chip .clusters .Types import Nullable , NullValue
21
24
from dacite import from_dict
22
25
23
26
27
+ def GetUnionUnderlyingType (typeToCheck , matchingType = None ):
28
+ ''' This retrieves the underlying types behind a unioned type by appropriately
29
+ passing in the required matching type in the matchingType input argument.
30
+
31
+ If that is 'None' (not to be confused with NoneType), then it will retrieve
32
+ the 'real' type behind the union, i.e not Nullable && not None
33
+ '''
34
+ if (not (typing .get_origin (typeToCheck ) == typing .Union )):
35
+ return None
36
+
37
+ for t in typing .get_args (typeToCheck ):
38
+ if (matchingType is None ):
39
+ if (t != type (None ) and t != Nullable ):
40
+ return t
41
+ else :
42
+ if (t == matchingType ):
43
+ return t
44
+
45
+ return None
46
+
47
+
24
48
@dataclass
25
49
class ClusterObjectFieldDescriptor :
26
50
Label : str = ''
27
51
Tag : int = None
28
52
Type : Type = None
29
- IsArray : bool = False
30
53
31
- def _PutSingleElementToTLV (self , tag , val , writer : tlv .TLVWriter , debugPath : str = '?' ):
32
- if issubclass (self . Type , ClusterObject ):
54
+ def _PutSingleElementToTLV (self , tag , val , elementType , writer : tlv .TLVWriter , debugPath : str = '?' ):
55
+ if issubclass (elementType , ClusterObject ):
33
56
if not isinstance (val , dict ):
34
57
raise ValueError (
35
58
f"Field { debugPath } .{ self .Label } expected a struct, but got { type (val )} " )
36
- self . Type .descriptor .DictToTLVWithWriter (
59
+ elementType .descriptor .DictToTLVWithWriter (
37
60
f'{ debugPath } .{ self .Label } ' , tag , val , writer )
38
61
return
62
+
39
63
try :
40
- val = self . Type (val )
64
+ val = elementType (val )
41
65
except Exception :
42
66
raise ValueError (
43
- f"Field { debugPath } .{ self .Label } expected { self . Type } , but got { type (val )} " )
67
+ f"Field { debugPath } .{ self .Label } expected { elementType } , but got { type (val )} " )
44
68
writer .put (tag , val )
45
69
46
70
def PutFieldToTLV (self , tag , val , writer : tlv .TLVWriter , debugPath : str = '?' ):
47
- if not self .IsArray :
48
- self ._PutSingleElementToTLV (tag , val , writer , debugPath )
49
- return
50
- if not isinstance (val , List ):
51
- raise ValueError (
52
- f"Field { debugPath } .{ self .Label } expected List[{ self .Type } ], but got { type (val )} " )
53
- writer .startArray (tag )
54
- for i , v in enumerate (val ):
55
- self ._PutSingleElementToTLV (
56
- None , v , writer , debugPath + f'[{ i } ]' )
57
- writer .endContainer ()
71
+ if (val == NullValue ):
72
+ if (GetUnionUnderlyingType (self .Type , Nullable ) is None ):
73
+ raise ValueError (
74
+ f"Field { debugPath } .{ self .Label } was not nullable, but got a null" )
75
+
76
+ writer .put (tag , None )
77
+ elif (val is None ):
78
+ if (GetUnionUnderlyingType (self .Type , type (None )) is None ):
79
+ raise ValueError (
80
+ f"Field { debugPath } .{ self .Label } was not optional, but encountered None" )
81
+ else :
82
+ #
83
+ # If it is an optional or nullable type, it's going to be a union.
84
+ # So, let's get at the 'real' type within that union before proceeding,
85
+ # since at this point, we're guarenteed to not get None or Null as values.
86
+ #
87
+ elementType = GetUnionUnderlyingType (self .Type )
88
+ if (elementType is None ):
89
+ elementType = self .Type
90
+
91
+ if not isinstance (val , List ):
92
+ self ._PutSingleElementToTLV (
93
+ tag , val , elementType , writer , debugPath )
94
+ return
95
+
96
+ writer .startArray (tag )
97
+
98
+ # Get the type of the list. This is a generic, which has its sub-type information of the list element
99
+ # inside its type argument.
100
+ (elementType , ) = typing .get_args (elementType )
101
+
102
+ for i , v in enumerate (val ):
103
+ self ._PutSingleElementToTLV (
104
+ None , v , elementType , writer , debugPath + f'[{ i } ]' )
105
+ writer .endContainer ()
58
106
59
107
60
108
@dataclass
@@ -73,16 +121,19 @@ def GetFieldByLabel(self, label: str) -> ClusterObjectFieldDescriptor:
73
121
return field
74
122
return None
75
123
76
- def _ConvertNonArray (self , debugPath : str , descriptor : ClusterObjectFieldDescriptor , value : Any ) -> Any :
77
- if not issubclass (descriptor .Type , ClusterObject ):
78
- if not isinstance (value , descriptor .Type ):
124
+ def _ConvertNonArray (self , debugPath : str , elementType , value : Any ) -> Any :
125
+ if not issubclass (elementType , ClusterObject ):
126
+ if (issubclass (elementType , enum .Enum )):
127
+ value = elementType (value )
128
+
129
+ if not isinstance (value , elementType ):
79
130
raise ValueError (
80
- f"Failed to decode field { debugPath } , expected type { descriptor . Type } , got { type (value )} " )
131
+ f"Failed to decode field { debugPath } , expected type { elementType } , got { type (value )} " )
81
132
return value
82
133
if not isinstance (value , Mapping ):
83
134
raise ValueError (
84
135
f"Failed to decode field { debugPath } , struct expected." )
85
- return descriptor . Type .descriptor .TagDictToLabelDict (debugPath , value )
136
+ return elementType .descriptor .TagDictToLabelDict (debugPath , value )
86
137
87
138
def TagDictToLabelDict (self , debugPath : str , tlvData : Dict [int , Any ]) -> Dict [str , Any ]:
88
139
ret = {}
@@ -92,13 +143,30 @@ def TagDictToLabelDict(self, debugPath: str, tlvData: Dict[int, Any]) -> Dict[st
92
143
# We do not have enough infomation for this field.
93
144
ret [tag ] = value
94
145
continue
95
- if descriptor .IsArray :
146
+
147
+ if (value is None ):
148
+ ret [descriptor .Label ] = NullValue
149
+ continue
150
+
151
+ if (typing .get_origin (descriptor .Type ) == typing .Union ):
152
+ realType = GetUnionUnderlyingType (descriptor .Type )
153
+ if (realType is None ):
154
+ raise ValueError (
155
+ f"Field { debugPath } .{ self .Label } has no valid underlying data model type" )
156
+
157
+ valueType = realType
158
+ else :
159
+ valueType = descriptor .Type
160
+
161
+ if (typing .get_origin (valueType ) == list ):
162
+ listElementType = typing .get_args (valueType )[0 ]
96
163
ret [descriptor .Label ] = [
97
- self ._ConvertNonArray (f'{ debugPath } [{ i } ]' , descriptor , v )
164
+ self ._ConvertNonArray (
165
+ f'{ debugPath } [{ i } ]' , listElementType , v )
98
166
for i , v in enumerate (value )]
99
167
continue
100
168
ret [descriptor .Label ] = self ._ConvertNonArray (
101
- f'{ debugPath } .{ descriptor .Label } ' , descriptor , value )
169
+ f'{ debugPath } .{ descriptor .Label } ' , valueType , value )
102
170
return ret
103
171
104
172
def TLVToDict (self , tlvBuf : bytes ) -> Dict [str , Any ]:
@@ -109,9 +177,6 @@ def DictToTLVWithWriter(self, debugPath: str, tag, data: Mapping, writer: tlv.TL
109
177
writer .startStructure (tag )
110
178
for field in self .Fields :
111
179
val = data .get (field .Label , None )
112
- if val is None :
113
- raise ValueError (
114
- f"Field { debugPath } .{ field .Label } is missing in the given dict" )
115
180
field .PutFieldToTLV (field .Tag , val , writer ,
116
181
debugPath + f'.{ field .Label } ' )
117
182
writer .endContainer ()
@@ -198,13 +263,13 @@ def attribute_type(cls) -> ClusterObjectFieldDescriptor:
198
263
def _cluster_object (cls ) -> ClusterObject :
199
264
return make_dataclass ('InternalClass' ,
200
265
[
201
- ('Value' , List [ cls .attribute_type .Type ]
202
- if cls . attribute_type . IsArray else cls . attribute_type . Type , field (default = None )),
266
+ ('Value' , cls .attribute_type .Type ,
267
+ field (default = None )),
203
268
('descriptor' , ClassVar [ClusterObjectDescriptor ],
204
269
field (
205
270
default = ClusterObjectDescriptor (
206
271
Fields = [ClusterObjectFieldDescriptor (
207
- Label = 'Value' , Tag = 0 , Type = cls .attribute_type .Type , IsArray = cls . attribute_type . IsArray )]
272
+ Label = 'Value' , Tag = 0 , Type = cls .attribute_type .Type )]
208
273
)
209
274
)
210
275
)
0 commit comments