Source code for toio.device_interface
# -*- coding: utf-8 -*-
# ************************************************************
#
# toio/device_interface/__init__.py
#
# Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""device_interface
Abstraction layer to communicate toio Core Cubes.
The abstract base classes defined are as follows:
* ScannerInterface
* CubeInterface
"""
from abc import ABCMeta, abstractmethod
from typing import Awaitable, Callable, Literal, NamedTuple, Optional, TypeAlias, Union
from uuid import UUID
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
DEFAULT_SCAN_TIMEOUT = 5.0
GattReadData: TypeAlias = bytearray
GattWriteData: TypeAlias = Union[bytes, bytearray, memoryview]
GattCharacteristic: TypeAlias = BleakGATTCharacteristic
GattNotificationHandler: TypeAlias = Callable[
[GattCharacteristic, bytearray], Union[None, Awaitable[None]]
]
[docs]class CubeInterface(metaclass=ABCMeta):
"""CubeInterface
Interface to communicate to a toio Core Cube.
"""
@abstractmethod
async def __aenter__(self):
raise NotImplementedError()
@abstractmethod
async def __aexit__(self, exc_type, exc, tb):
raise NotImplementedError()
[docs] @abstractmethod
async def connect(self) -> bool:
raise NotImplementedError()
[docs] @abstractmethod
async def disconnect(self) -> bool:
raise NotImplementedError()
[docs] @abstractmethod
async def read(self, uuid: UUID) -> GattReadData:
raise NotImplementedError()
[docs] @abstractmethod
async def write(
self, uuid: UUID, data: GattWriteData, response: bool = False
) -> None:
raise NotImplementedError()
[docs] @abstractmethod
async def register_notification_handler(
self, uuid: UUID, handler: GattNotificationHandler
) -> bool:
raise NotImplementedError()
[docs] @abstractmethod
async def unregister_notification_handler(self, uuid: UUID) -> bool:
raise NotImplementedError()
CubeDevice = BLEDevice
CubeAdvertisement = AdvertisementData
[docs]class CubeInfo(NamedTuple):
name: Optional[str]
device: CubeDevice
interface: CubeInterface
advertisement: CubeAdvertisement
SortKey = Optional[Literal["rssi", "local_name"]]
[docs]class ScannerInterface(metaclass=ABCMeta):
"""ScannerInterface
Interface to scan toio Core Cubes.
This interface is used from toio.scanner.* module.
"""
[docs] @abstractmethod
async def scan(
self,
num: Optional[int] = None,
cube_id: Optional[set[str]] = None,
address: Optional[set[str]] = None,
sort: SortKey = None,
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> list[CubeInfo]:
raise NotImplementedError()