Skip to content

Commit

Permalink
Merge pull request #390 from zxzxwu/csip
Browse files Browse the repository at this point in the history
CSIP: Encrypted SIRK implementation
  • Loading branch information
zxzxwu authored Jan 4, 2024
2 parents 9ebb03b + 88cb3b2 commit e2fec67
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 14 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"cmac",
"CONNECTIONLESS",
"csip",
"csis",
"csrcs",
"CVSD",
"datagram",
Expand Down
68 changes: 60 additions & 8 deletions bumble/profiles/csip.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from __future__ import annotations
import enum
import struct
from typing import Optional
from typing import Optional, Tuple

from bumble import core
from bumble import crypto
Expand All @@ -31,6 +31,9 @@
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
SET_IDENTITY_RESOLVING_KEY_LENGTH = 16


class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''

Expand Down Expand Up @@ -66,6 +69,10 @@ def k1(n: bytes, salt: bytes, p: bytes) -> bytes:
def sef(k: bytes, r: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.5 SIRK encryption function sef.
SIRK decryption function sdf shares the same algorithm. The only difference is that argument r is:
* Plaintext in encryption
* Cipher in decryption
'''
return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r)

Expand Down Expand Up @@ -105,6 +112,11 @@ def __init__(
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
if len(set_identity_resolving_key) != SET_IDENTITY_RESOLVING_KEY_LENGTH:
raise ValueError(
f'Invalid SIRK length {len(set_identity_resolving_key)}, expected {SET_IDENTITY_RESOLVING_KEY_LENGTH}'
)

characteristics = []

self.set_identity_resolving_key = set_identity_resolving_key
Expand All @@ -113,7 +125,7 @@ def __init__(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(read=self.on_sirk_read),
)
characteristics.append(self.set_identity_resolving_key_characteristic)
Expand All @@ -123,7 +135,7 @@ def __init__(
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
Expand All @@ -134,7 +146,7 @@ def __init__(
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
Expand All @@ -145,18 +157,32 @@ def __init__(
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)

super().__init__(characteristics)

def on_sirk_read(self, _connection: Optional[device.Connection]) -> bytes:
async def on_sirk_read(self, connection: Optional[device.Connection]) -> bytes:
if self.set_identity_resolving_key_type == SirkType.PLAINTEXT:
return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key
sirk_bytes = self.set_identity_resolving_key
else:
raise NotImplementedError('TODO: Pending async Characteristic read.')
assert connection

if connection.transport == core.BT_LE_TRANSPORT:
key = await connection.device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await connection.device.get_link_key(connection.peer_address)

if not key:
raise RuntimeError('LTK or LinkKey is not present')

sirk_bytes = sef(key, self.set_identity_resolving_key)

return bytes([self.set_identity_resolving_key_type]) + sirk_bytes

def get_advertising_data(self) -> bytes:
return bytes(
Expand Down Expand Up @@ -203,3 +229,29 @@ def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]

async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]:
'''Reads SIRK and decrypts if encrypted.'''
response = await self.set_identity_resolving_key.read_value()
if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1:
raise RuntimeError('Invalid SIRK value')

sirk_type = SirkType(response[0])
if sirk_type == SirkType.PLAINTEXT:
sirk = response[1:]
else:
connection = self.service_proxy.client.connection
device = connection.device
if connection.transport == core.BT_LE_TRANSPORT:
key = await device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await device.get_link_key(connection.peer_address)

if not key:
raise RuntimeError('LTK or LinkKey is not present')

sirk = sef(key, response[1:])

return (sirk_type, sirk)
21 changes: 15 additions & 6 deletions tests/csip_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pytest
import struct
import logging
from unittest import mock

from bumble import device
from bumble.profiles import csip
Expand Down Expand Up @@ -68,30 +69,38 @@ def test_sef():

# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
@pytest.mark.parametrize(
'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)]
)
async def test_csis(sirk_type):
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')

devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT,
set_identity_resolving_key_type=sirk_type,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
)
)

await devices.setup_connection()

# Mock encryption.
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK)
devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK)

peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)

assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED
Expand Down

0 comments on commit e2fec67

Please sign in to comment.