Source code for toio.device_interface

# -*- coding: utf-8 -*-
# ************************************************************
#
#     toio/device_interface/__init__.py
#
#     Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""device_interface

Abstraction layer to communicate toio Core Cubes.

The abstract base classes defined are as follows:

* ScannerInterface
* CubeInterface

"""

from abc import ABCMeta, abstractmethod
from typing import List, Set
from uuid import UUID

from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from typing_extensions import (
    Awaitable,
    Callable,
    Literal,
    NamedTuple,
    Optional,
    TypeAlias,
    Union,
)

DEFAULT_SCAN_TIMEOUT = 5.0

GattReadData: TypeAlias = bytearray
GattWriteData: TypeAlias = Union[bytes, bytearray, memoryview]
GattCharacteristic: TypeAlias = BleakGATTCharacteristic
GattNotificationHandler: TypeAlias = Callable[
    [GattCharacteristic, bytearray], Union[None, Awaitable[None]]
]


[docs]class CubeInterface(metaclass=ABCMeta): """CubeInterface Interface to communicate to a toio Core Cube. """ @abstractmethod async def __aenter__(self): raise NotImplementedError() @abstractmethod async def __aexit__(self, exc_type, exc, tb): raise NotImplementedError()
[docs] @abstractmethod async def connect(self) -> bool: raise NotImplementedError()
[docs] @abstractmethod async def disconnect(self) -> bool: raise NotImplementedError()
[docs] @abstractmethod async def read(self, char_uuid: UUID) -> GattReadData: raise NotImplementedError()
[docs] @abstractmethod async def write( self, char_uuid: UUID, data: GattWriteData, response: bool = False ) -> None: raise NotImplementedError()
[docs] @abstractmethod async def register_notification_handler( self, char_uuid: UUID, notification_handler: GattNotificationHandler ) -> bool: raise NotImplementedError()
[docs] @abstractmethod async def unregister_notification_handler(self, char_uuid: UUID) -> bool: raise NotImplementedError()
[docs] @abstractmethod def is_connect(self) -> bool: """ Implemented in v1.1.0 or later get current connection state """ raise NotImplementedError()
CubeDevice = BLEDevice CubeAdvertisement = AdvertisementData
[docs]class CubeInfo(NamedTuple): name: Optional[str] device: CubeDevice interface: CubeInterface advertisement: CubeAdvertisement
SortKey = Optional[Literal["rssi", "local_name"]]
[docs]class ScannerInterface(metaclass=ABCMeta): """ScannerInterface Interface to scan toio Core Cubes. This interface is used from toio.scanner.* module. """ @abstractmethod async def _scan( self, num: Optional[int] = None, cube_id: Optional[Set[str]] = None, address: Optional[Set[str]] = None, sort: SortKey = None, timeout: float = DEFAULT_SCAN_TIMEOUT, ) -> List[CubeInfo]: raise NotImplementedError()
[docs] @abstractmethod async def scan(self, *args, **kwargs) -> List[CubeInfo]: raise NotImplementedError()