Skip to content

Commit

Permalink
Lots of fixups to make events and attributes work on master
Browse files Browse the repository at this point in the history
  • Loading branch information
mrjerryjohns committed Dec 9, 2021
1 parent 1f652b7 commit 0ac13eb
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 179 deletions.
8 changes: 4 additions & 4 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,11 @@ async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEventDescriptor]],
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent]],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEventDescriptor]]
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent]]
]], reportInterval: typing.Tuple[int, int] = None):
'''
Read a list of events from a target node
Expand Down Expand Up @@ -561,7 +561,7 @@ async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
endpoint = v
elif issubclass(v, ClusterObjects.Cluster):
cluster = v
elif issubclass(v, ClusterObjects.ClusterEventDescriptor):
elif issubclass(v, ClusterObjects.ClusterEvent):
event = v
else:
raise ValueError("Unsupported Event Path")
Expand All @@ -570,7 +570,7 @@ async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
endpoint = v[0]
if issubclass(v[1], ClusterObjects.Cluster):
cluster = v[1]
elif issubclass(v[1], ClusterAttribute.ClusterEventDescriptor):
elif issubclass(v[1], ClusterAttribute.ClusterEvent):
event = v[1]
else:
raise ValueError("Unsupported Attribute Path")
Expand Down
70 changes: 44 additions & 26 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint64, c_uint32, c_uint16, c_uint8, py_object, c_uint64
from rich.pretty import pprint

from .ClusterObjects import Cluster, ClusterAttributeDescriptor, ClusterEventDescriptor
from .ClusterObjects import Cluster, ClusterAttributeDescriptor, ClusterEvent
import chip.exceptions
import chip.interaction_model
import chip.tlv
Expand Down Expand Up @@ -176,7 +176,7 @@ class AttributePathWithListIndex(AttributePath):
@dataclass
class EventHeader:
EndpointId: int = None
Event: ClusterEventDescriptor = None
Event: ClusterEvent = None
EventNumber: int = None
Priority: EventPriority = None
Timestamp: int = None
Expand Down Expand Up @@ -218,7 +218,7 @@ class AttributeDescriptorWithEndpoint:
@dataclass
class EventDescriptorWithEndpoint:
EndpointId: int
Event: ClusterEventDescriptor
Event: ClusterEvent


@dataclass
Expand Down Expand Up @@ -413,6 +413,7 @@ def UpdateCachedData(self):
class SubscriptionTransaction:
def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl):
self._onAttributeChangeCb = DefaultAttributeChangeCallback
self._onEventChangeCb = DefaultEventChangeCallback
self._readTransaction = transaction
self._subscriptionId = subscriptionId
self._devCtrl = devCtrl
Expand All @@ -433,7 +434,7 @@ def GetAttribute(self, path: TypedAttributePath) -> Any:
return data[path.Path.EndpointId][path.ClusterType][path.AttributeType]

def GetEvents(self):
return self._read_transaction.GetAllEventValues()
return self._readTransaction.GetAllEventValues()

def SetAttributeUpdateCallback(self, callback: Callable[[TypedAttributePath, SubscriptionTransaction], None]):
'''
Expand All @@ -442,10 +443,18 @@ def SetAttributeUpdateCallback(self, callback: Callable[[TypedAttributePath, Sub
if callback is not None:
self._onAttributeChangeCb = callback

def SetEventUpdateCallback(self, callback: Callable[[EventReadResult, SubscriptionTransaction], None]):
if callback is not None:
self._onEventChangeCb = callback

@property
def OnAttributeChangeCb(self) -> Callable[[TypedAttributePath, SubscriptionTransaction], None]:
return self._onAttributeChangeCb

@property
def OnEventChangeCb(self) -> Callable[[EventReadResult, SubscriptionTransaction], None]:
return self._onEventChangeCb

def Shutdown(self):
self._devCtrl.ZCLShutdownSubscription(self._subscriptionId)

Expand All @@ -465,6 +474,11 @@ def DefaultAttributeChangeCallback(path: TypedAttributePath, transaction: Subscr
pprint(value, expand_all=True)


def DefaultEventChangeCallback(data: EventReadResult, transaction: SubscriptionTransaction):
print("Received Event:")
pprint(data, expand_all=True)


def _BuildEventIndex():
''' Build internal event index for locating the corresponding cluster object by path in the future.
We do this because this operation will take a long time when there are lots of events, it takes about 300ms for a single query.
Expand All @@ -480,34 +494,30 @@ def _BuildEventIndex():

# Only match on classes that extend the ClusterEventescriptor class
matched = [
value for value in base_classes if 'ClusterEventDescriptor' in str(value)]
value for value in base_classes if 'ClusterEvent' in str(value)]
if (matched == []):
continue

_EventIndex[str(EventPath(ClusterId=event.cluster_id, EventId=event.event_id))] = eval(
'chip.clusters.Objects.' + clusterName + '.Events.' + eventName)


class TransactionType(Enum):
READ_EVENTS = 1
READ_ATTRIBUTES = 2


class AsyncReadTransaction:
def __init__(self, future: Future, eventLoop, devCtrl, returnClusterObject: bool):
def __init__(self, future: Future, eventLoop, devCtrl, transactionType: TransactionType, returnClusterObject: bool):
self._event_loop = eventLoop
self._future = future
self._subscription_handler = None
self._events = []
self._devCtrl = devCtrl
self._transactionType = transactionType
self._cache = AttributeCache(returnClusterObject=returnClusterObject)
self._changedPathSet = set()

#
# We cannot post back async work onto the asyncio's event loop for subscriptions,
# since at that point, we've returned from the await ... and got a subscription object
# back, and the REPL's event loop is not running and capable of posting background work to.
#
# So instead, we'll directly call the callback on the originating thread and use locks to protect
# relevant critical sections.
#
self._resLock = threading.Lock()

def GetAllEventValues(self):
return self._events

Expand All @@ -526,9 +536,8 @@ def _handleAttributeData(self, path: AttributePathWithListIndex, status: int, da
tlvData = chip.tlv.TLVReader(data).get().get("Any", {})
attributeValue = tlvData

with self._resLock:
self._cache.UpdateTLV(path, attributeValue)
self._changedPathSet.add(path)
self._cache.UpdateTLV(path, attributeValue)
self._changedPathSet.add(path)

except Exception as ex:
logging.exception(ex)
Expand All @@ -546,7 +555,7 @@ def _handleEventData(self, header: EventHeader, path: EventPath, data: bytes):
tlvData, LookupError("event schema not found"))
else:
try:
eventValue = eventType(eventType.FromTLV(data))
eventValue = eventType.FromTLV(data)
except Exception as ex:
logging.error(
f"Error convering TLV to Cluster Object for path: Endpoint = {path.EndpointId}/Cluster = {path.ClusterId}/Event = {path.EventId}")
Expand All @@ -560,9 +569,14 @@ def _handleEventData(self, header: EventHeader, path: EventPath, data: bytes):
if (builtins.enableDebugMode):
raise

with self._resLock:
self._events.append[EventReadResult(
Header=header, Data=eventValue)]
eventResult = EventReadResult(
Header=header, Data=eventValue, Status=chip.interaction_model.Status.Success)
self._events.append(eventResult)

if (self._subscription_handler is not None):
self._subscription_handler.OnEventChangeCb(
eventResult, self._subscription_handler)

except Exception as ex:
logging.exception(ex)

Expand Down Expand Up @@ -604,7 +618,10 @@ def _handleReportEnd(self):

def _handleDone(self):
if not self._future.done():
self._future.set_result(self._cache.attributeCache)
if (self._transactionType == TransactionType.READ_EVENTS):
self._future.set_result(self._events)
else:
self._future.set_result(self._cache.attributeCache)

def handleDone(self):
self._event_loop.call_soon_threadsafe(self._handleDone)
Expand Down Expand Up @@ -760,7 +777,7 @@ def WriteAttributes(future: Future, eventLoop, device, attributes: List[Attribut
def ReadAttributes(future: Future, eventLoop, device, devCtrl, attributes: List[AttributePath], returnClusterObject: bool = True, subscriptionParameters: SubscriptionParameters = None) -> int:
handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)
future, eventLoop, devCtrl, TransactionType.READ_ATTRIBUTES, returnClusterObject)

readargs = []
for attr in attributes:
Expand Down Expand Up @@ -793,7 +810,8 @@ def ReadAttributes(future: Future, eventLoop, device, devCtrl, attributes: List[

def ReadEvents(future: Future, eventLoop, device, devCtrl, events: List[EventPath], subscriptionParameters: SubscriptionParameters = None) -> int:
handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(future, eventLoop, devCtrl, False)
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, TransactionType.READ_EVENTS, False)

readargs = []
for attr in events:
Expand Down
48 changes: 1 addition & 47 deletions src/controller/python/chip/clusters/ClusterObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,57 +277,11 @@ def _cluster_object(cls) -> ClusterObject:
bases=(ClusterObject,))


class ClusterEventDescriptor:
'''
The ClusterEventDescriptor is used for holding an event's metadata like its cluster id, event id and its type.
Users should not initialize an object based on this class. Instead, users should pass the subclass objects to tell some methods what they want.
The implementation of this functions is quite tricky, it will create a cluster object on-the-fly, and use it for actual encode / decode routine to save lines of code.
'''
@classmethod
def ToTLV(cls, tag: Union[int, None], value):
writer = tlv.TLVWriter()
wrapped_value = cls._cluster_object(Value=value)
cls.event_type.PutFieldToTLV(tag,
asdict(wrapped_value)['Value'], writer, '')
return writer.encoding

@classmethod
def FromTLV(cls, tlvBuffer: bytes):
obj_class = cls._cluster_object
return obj_class.FromDict(obj_class.descriptor.TagDictToLabelDict('', {0: tlv.TLVReader(tlvBuffer).get().get('Any', {})})).Value

@classmethod
def FromTagDictOrRawValue(cls, val: Any):
obj_class = cls._cluster_object
return obj_class.FromDict(obj_class.descriptor.TagDictToLabelDict('', {0: val})).Value

class ClusterEvent(ClusterObject):
@ChipUtility.classproperty
def cluster_id(self) -> int:
raise NotImplementedError()

@ChipUtility.classproperty
def event_id(self) -> int:
raise NotImplementedError()

@ChipUtility.classproperty
def event_type(cls) -> ClusterObjectFieldDescriptor:
raise NotImplementedError()

@ChipUtility.classproperty
def _cluster_object(cls) -> ClusterObject:
return make_dataclass('InternalClass',
[
('Value', List[cls.event_type.Type]
if cls.event_type.IsArray else cls.event_type.Type, field(default=None)),
('descriptor', ClassVar[ClusterObjectDescriptor],
field(
default=ClusterObjectDescriptor(
Fields=[ClusterObjectFieldDescriptor(
Label='Value', Tag=0, Type=cls.event_type.Type, IsArray=cls.event_type.IsArray)]
)
)
)
],
bases=(ClusterObject,))
Loading

0 comments on commit 0ac13eb

Please sign in to comment.