Source code for toio.device_interface.ble

# -*- coding: utf-8 -*-
# ************************************************************
#
#     ble.py
#
#     Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""
BLE device interface
"""

import asyncio
import platform
import sys
from typing import Dict, List, Optional, Set, Type, Union
from uuid import UUID

from bleak import BleakClient, BleakScanner
from bleak.backends.client import BaseBleakClient
from bleak.backends.scanner import BaseBleakScanner

from ..device_interface import (
    DEFAULT_SCAN_TIMEOUT,
    AdvertisementData,
    BLEDevice,
    CubeDevice,
    CubeInfo,
    CubeInterface,
    GattNotificationHandler,
    GattReadData,
    GattWriteData,
    ScannerInterface,
    SortKey,
)
from ..logger import get_toio_logger
from ..toio_uuid import TOIO_UUID_SERVICE

RSSI_UNKNOWN = -65535

logger = get_toio_logger(__name__)


def _get_platform_client_backend_type() -> Optional[Type[BaseBleakClient]]:
    if sys.platform == "ios":
        from .pythonista3corebluetooth.client import BleakClientPythonista3

        return BleakClientPythonista3
    else:
        return None


def _get_platform_scanner_backend() -> Optional[Type[BaseBleakScanner]]:
    if sys.platform == "ios":
        from .pythonista3corebluetooth.scanner import BleakScannerPythonista3

        return BleakScannerPythonista3
    else:
        return None


[docs]class BleCube(CubeInterface): """ Cube interface for internal BLE interface. """
[docs] def __init__(self, device: Union[CubeDevice, str]): self.connected: bool = False if platform.system() == "Windows": from bleak.backends.winrt.scanner import _RawAdvData if isinstance(device, CubeDevice): if device.details.adv is None: device.details = _RawAdvData(device.details.scan, device.details.scan) logger.info("copy scan to adv") self.device = BleakClient(device, backend=_get_platform_client_backend_type())
async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect()
[docs] async def connect(self) -> bool: if not self.connected: self.connected = await self.device.connect() while not self.device.is_connected: await asyncio.sleep(0.1) else: logger.warning("already connected") return self.connected
[docs] async def disconnect(self) -> bool: if self.connected: await self.device.disconnect() while self.device.is_connected: await asyncio.sleep(0.1) else: logger.warning("already disconnected") return True
[docs] async def read(self, char_uuid: UUID) -> GattReadData: return await self.device.read_gatt_char(char_uuid)
[docs] async def write(self, char_uuid: UUID, data: GattWriteData, response: bool = False): await self.device.write_gatt_char(char_uuid, data, response)
[docs] async def register_notification_handler( self, char_uuid: UUID, notification_handler: GattNotificationHandler ) -> bool: await self.device.start_notify(char_uuid, notification_handler) return True
[docs] async def unregister_notification_handler(self, char_uuid: UUID) -> bool: await self.device.stop_notify(char_uuid) return True
[docs] def is_connect(self) -> bool: return self.device.is_connected
[docs]class BaseBleScanner(ScannerInterface): """BleScanner Scanner for internal BLE interface. """
[docs] def __init__(self): pass
async def _scan( self, num: Optional[int] = None, cube_id: Optional[Set[str]] = None, address: Optional[Set[str]] = None, sort: SortKey = None, timeout: float = DEFAULT_SCAN_TIMEOUT, ) -> List[CubeInfo]: """Scan toio Core Cubes. Argument 'num', 'cube_id', and 'address' is exclusive. Args: num (Optional[int], optional): Number of cubes to be found. Defaults to None. cube_id (Optional[set[str]], optional): Set of cube id to be found. Defaults to None. address (Optional[set[str]], optional): Set of cube BLE address to be found. Defaults to None. sort (SortKey, optional): Key to sort results. Defaults to None (no sort). timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[CubeInfo]: List of found cubes Notes: If the cube named "31j" is found in scanning, this function warns it. If you specify the name "31j" to find the cube, this function warns it. "31j" is wrong name. This name appears when the toio Core Cube fall wrong state. To recover this, turn off the cube and back again. Ref: https://support.toio.io/s/article/15855 """ w31j = False condition_met = asyncio.Event() found_cubes: Dict[Union[str, int], CubeInfo] = {} if cube_id is not None and "31j" in cube_id: logger.warning( "warning: scanner: Specifying cube_id '31j' is NOT recommended" ) # detection callback def check_condition(device: BLEDevice, advertisement: AdvertisementData): service_uuids = map(UUID, advertisement.service_uuids) if TOIO_UUID_SERVICE in service_uuids: nonlocal w31j nonlocal condition_met nonlocal found_cubes if not w31j and device.name is not None and "31j" in device.name: logger.warning( "warning: scanner: cube_id '31j' is found. Why not turn all cubes off and back again?" ) w31j = True if address is not None: address_list = [x.upper() for x in address] if device.address in address_list: found_cubes[device.address] = CubeInfo( name=device.name, device=device, interface=BleCube(device), advertisement=advertisement, ) if len(found_cubes) >= len(address): condition_met.set() elif cube_id is not None and device.name is not None: for id_str in cube_id: if id_str in device.name: found_cubes[device.address] = CubeInfo( name=device.name, device=device, interface=BleCube(device), advertisement=advertisement, ) if len(found_cubes) >= len(cube_id): condition_met.set() else: found_cubes[device.address] = CubeInfo( name=device.name, device=device, interface=BleCube(device), advertisement=advertisement, ) # scan ble devices async with BleakScanner( detection_callback=check_condition, backend=_get_platform_scanner_backend(), ): try: await asyncio.wait_for(condition_met.wait(), timeout=timeout) except TimeoutError: logger.debug(f"scanner: timeout {timeout} sec") except asyncio.TimeoutError: logger.debug(f"scanner: timeout {timeout} sec (asyncio)") except Exception: raise # get the list of cubes toio_cubes = list(found_cubes.values()) # sort if sort is not None and len(toio_cubes) >= 2: if sort == "rssi": def rssi(info: CubeInfo) -> int: if info.advertisement.rssi is not None: return info.advertisement.rssi else: return RSSI_UNKNOWN toio_cubes.sort(key=rssi, reverse=True) elif sort == "local_name": def local_name(info: CubeInfo) -> str: if info.name is not None: return info.name else: return "" toio_cubes.sort(key=local_name) if num is not None and len(toio_cubes) > num: return toio_cubes[:num] else: return toio_cubes
[docs] async def scan(self, *args, **kwargs) -> List[CubeInfo]: return await self._scan(*args, **kwargs)