Skip to content

Commit

Permalink
First implementation of type annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
sveinse committed Mar 26, 2023
1 parent 64772dc commit 4e8925e
Show file tree
Hide file tree
Showing 21 changed files with 308 additions and 147 deletions.
17 changes: 17 additions & 0 deletions canopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,20 @@
Node = RemoteNode

__pypi_url__ = "https://pypi.org/project/canopen/"

__all__ = [
"__version__",
"__pypi_url__",
"Network",
"NodeScanner",
"RemoteNode",
"LocalNode",
"Node",
"SdoCommunicationError",
"SdoAbortedError",
"import_od",
"export_od",
"ObjectDictionary",
"ObjectDictionaryError",
"BaseNode402",
]
21 changes: 14 additions & 7 deletions canopen/emcy.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from __future__ import annotations
from typing import Callable, List, Optional, TYPE_CHECKING, Tuple
import struct
import logging
import threading
import time
from typing import Callable, List, Optional

if TYPE_CHECKING:
from .network import Network

# Error code, error register, vendor specific data
EMCY_STRUCT = struct.Struct("<HB5s")

logger = logging.getLogger(__name__)

TCallback = Callable[["EmcyError"], None]

class EmcyConsumer(object):

Expand All @@ -17,10 +22,10 @@ def __init__(self):
self.log: List["EmcyError"] = []
#: Only active EMCYs. Will be cleared on Error Reset
self.active: List["EmcyError"] = []
self.callbacks = []
self.callbacks: List[TCallback] = []
self.emcy_received = threading.Condition()

def on_emcy(self, can_id, data, timestamp):
def on_emcy(self, can_id: int, data: bytearray, timestamp: float):
code, register, data = EMCY_STRUCT.unpack(data)
entry = EmcyError(code, register, data, timestamp)

Expand All @@ -36,7 +41,7 @@ def on_emcy(self, can_id, data, timestamp):
for callback in self.callbacks:
callback(entry)

def add_callback(self, callback: Callable[["EmcyError"], None]):
def add_callback(self, callback: TCallback):
"""Get notified on EMCY messages from this node.
:param callback:
Expand All @@ -52,7 +57,7 @@ def reset(self):

def wait(
self, emcy_code: Optional[int] = None, timeout: float = 10
) -> "EmcyError":
) -> Optional["EmcyError"]:
"""Wait for a new EMCY to arrive.
:param emcy_code: EMCY code to wait for
Expand Down Expand Up @@ -82,22 +87,24 @@ def wait(
class EmcyProducer(object):

def __init__(self, cob_id: int):
self.network = None
self.network: Optional[Network] = None
self.cob_id = cob_id

def send(self, code: int, register: int = 0, data: bytes = b""):
assert self.network # For typing
payload = EMCY_STRUCT.pack(code, register, data)
self.network.send_message(self.cob_id, payload)

def reset(self, register: int = 0, data: bytes = b""):
assert self.network # For typing
payload = EMCY_STRUCT.pack(0, register, data)
self.network.send_message(self.cob_id, payload)


class EmcyError(Exception):
"""EMCY exception."""

DESCRIPTIONS = [
DESCRIPTIONS: List[Tuple[int, int, str]] = [
# Code Mask Description
(0x0000, 0xFF00, "Error Reset / No Error"),
(0x1000, 0xFF00, "Generic Error"),
Expand Down
62 changes: 38 additions & 24 deletions canopen/lss.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations
from typing import List, Optional, TYPE_CHECKING, Tuple
import logging
import time
import struct
try:
import queue
except ImportError:
import Queue as queue
import Queue as queue # type: ignore

if TYPE_CHECKING:
from .network import Network

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,12 +87,12 @@ class LssMaster(object):
RESPONSE_TIMEOUT = 0.5

def __init__(self):
self.network = None
self.network: Optional[Network] = None
self._node_id = 0
self._data = None
self.responses = queue.Queue()
self.responses: queue.Queue[bytes] = queue.Queue()

def send_switch_state_global(self, mode):
def send_switch_state_global(self, mode: int):
"""switch mode to CONFIGURATION_STATE or WAITING_STATE
in the all slaves on CAN bus.
There is no reply for this request
Expand All @@ -103,12 +108,13 @@ def send_switch_state_global(self, mode):
message[1] = mode
self.__send_command(message)

def send_switch_mode_global(self, mode):
def send_switch_mode_global(self, mode: int):
"""obsolete"""
self.send_switch_state_global(mode)

def send_switch_state_selective(self,
vendorId, productCode, revisionNumber, serialNumber):
vendorId: int, productCode: int,
revisionNumber: int, serialNumber: int):
"""switch mode from WAITING_STATE to CONFIGURATION_STATE
only if 128bits LSS address matches with the arguments.
It sends 4 messages for each argument.
Expand All @@ -134,14 +140,15 @@ def send_switch_state_selective(self,
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE, productCode)
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, revisionNumber)
response = self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, serialNumber)
assert response # For typing

cs = struct.unpack_from("<B", response)[0]
if cs == CS_SWITCH_STATE_SELECTIVE_RESPONSE:
return True

return False

def inquire_node_id(self):
def inquire_node_id(self) -> int:
"""Read the node id.
CANopen node id must be within the range from 1 to 127.
Expand All @@ -151,7 +158,7 @@ def inquire_node_id(self):
"""
return self.__send_inquire_node_id()

def inquire_lss_address(self, req_cs):
def inquire_lss_address(self, req_cs: int) -> int:
"""Read the part of LSS address.
VENDOR_ID, PRODUCT_CODE, REVISION_NUMBER, or SERIAL_NUMBER
Expand All @@ -164,15 +171,15 @@ def inquire_lss_address(self, req_cs):
"""
return self.__send_inquire_lss_address(req_cs)

def configure_node_id(self, new_node_id):
def configure_node_id(self, new_node_id: int):
"""Set the node id
:param int new_node_id:
new node id to set
"""
self.__send_configure(CS_CONFIGURE_NODE_ID, new_node_id)

def configure_bit_timing(self, new_bit_timing):
def configure_bit_timing(self, new_bit_timing: int):
"""Set the bit timing.
:param int new_bit_timing:
Expand All @@ -185,7 +192,7 @@ def configure_bit_timing(self, new_bit_timing):
"""
self.__send_configure(CS_CONFIGURE_BIT_TIMING, 0, new_bit_timing)

def activate_bit_timing(self, switch_delay_ms):
def activate_bit_timing(self, switch_delay_ms: int):
"""Activate the bit timing.
:param uint16_t switch_delay_ms:
Expand All @@ -206,9 +213,9 @@ def store_configuration(self):
self.__send_configure(CS_STORE_CONFIGURATION)

def send_identify_remote_slave(self,
vendorId, productCode,
revisionNumberLow, revisionNumberHigh,
serialNumberLow, serialNumberHigh):
vendorId: int, productCode: int,
revisionNumberLow: int, revisionNumberHigh: int,
serialNumberLow: int, serialNumberHigh: int):

"""This command sends the range of LSS address to find the slave nodes
in the specified range
Expand Down Expand Up @@ -241,13 +248,13 @@ def send_identify_non_configured_remote_slave(self):
message[0] = CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE
self.__send_command(message)

def fast_scan(self):
"""This command sends a series of fastscan message
def fast_scan(self) -> Tuple[bool, Optional[List[int]]]:
"""This command sends a series of fastscan message
to find unconfigured slave with lowest number of LSS idenities
:return:
True if a slave is found.
False if there is no candidate.
False if there is no candidate.
list is the LSS identities [vendor_id, product_code, revision_number, serial_number]
:rtype: bool, list
"""
Expand Down Expand Up @@ -282,21 +289,22 @@ def fast_scan(self):

return False, None

def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next):
def __send_fast_scan_message(self, id_number: int, bit_checker: int, lss_sub: int, lss_next: int):
message = bytearray(8)
message[0:8] = struct.pack('<BIBBB', CS_FAST_SCAN, id_number, bit_checker, lss_sub, lss_next)
try:
recv_msg = self.__send_command(message)
except LssError:
return False
assert recv_msg # For typing

cs = struct.unpack_from("<B", recv_msg)[0]
if cs == CS_IDENTIFY_SLAVE:
return True

return False

def __send_lss_address(self, req_cs, number):
def __send_lss_address(self, req_cs: int, number: int):
message = bytearray(8)

message[0] = req_cs
Expand All @@ -308,7 +316,7 @@ def __send_lss_address(self, req_cs, number):

return response

def __send_inquire_node_id(self):
def __send_inquire_node_id(self) -> int:
"""
:return:
Current node id
Expand All @@ -317,15 +325,17 @@ def __send_inquire_node_id(self):
message = bytearray(8)
message[0] = CS_INQUIRE_NODE_ID
response = self.__send_command(message)
assert response # For typing

current_node_id: int
cs, current_node_id = struct.unpack_from("<BB", response)

if cs != CS_INQUIRE_NODE_ID:
raise LssError("Response message is not for the request")

return current_node_id

def __send_inquire_lss_address(self, req_cs):
def __send_inquire_lss_address(self, req_cs: int) -> int:
"""
:return:
part of address. e.g., vendor ID or product code, ..
Expand All @@ -334,21 +344,24 @@ def __send_inquire_lss_address(self, req_cs):
message = bytearray(8)
message[0] = req_cs
response = self.__send_command(message)
assert response # For typing

part_of_address: int
res_cs, part_of_address = struct.unpack_from("<BI", response)

if res_cs != req_cs:
raise LssError("Response message is not for the request")

return part_of_address

def __send_configure(self, req_cs, value1=0, value2=0):
def __send_configure(self, req_cs: int, value1: int=0, value2: int=0):
"""Send a message to set a key with values"""
message = bytearray(8)
message[0] = req_cs
message[1] = value1
message[2] = value2
response = self.__send_command(message)
assert response # For typing

res_cs, error_code = struct.unpack_from("<BB", response)

Expand All @@ -359,7 +372,7 @@ def __send_configure(self, req_cs, value1=0, value2=0):
error_msg = "LSS Error: %d" % error_code
raise LssError(error_msg)

def __send_command(self, message):
def __send_command(self, message: bytearray) -> Optional[bytes]:
"""Send a LSS operation code to the network
:param bytearray message:
Expand All @@ -380,6 +393,7 @@ def __send_command(self, message):
logger.info("There were unexpected messages in the queue")
self.responses = queue.Queue()

assert self.network # For typing
self.network.send_message(self.LSS_TX_COBID, message)

if not bool(message[0] in ListMessageNeedResponse):
Expand All @@ -395,7 +409,7 @@ def __send_command(self, message):

return response

def on_message_received(self, can_id, data, timestamp):
def on_message_received(self, can_id: int, data: bytearray, timestamp: float):
self.responses.put(bytes(data))


Expand Down
Loading

0 comments on commit 4e8925e

Please sign in to comment.