Source code for toio.device_interface.ble
# -*- coding: utf-8 -*-
# ************************************************************
#
# ble.py
#
# Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""
BLE device interface
"""
import asyncio
from typing import Optional, Union
from uuid import UUID
from bleak import BleakClient, BleakScanner
from toio.device_interface import (
DEFAULT_SCAN_TIMEOUT,
AdvertisementData,
BLEDevice,
CubeDevice,
CubeInfo,
CubeInterface,
GattNotificationHandler,
GattReadData,
GattWriteData,
ScannerInterface,
SortKey,
)
from toio.logger import get_toio_logger
from toio.toio_uuid import TOIO_UUID_SERVICE
RSSI_UNKNOWN = -65535
logger = get_toio_logger(__name__)
[docs]class BleCube(CubeInterface):
"""
Cube interface for internal BLE interface.
"""
[docs] def __init__(self, device: Union[CubeDevice, str]):
self.connected: bool = False
self.device = BleakClient(device)
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
[docs] async def connect(self) -> bool:
if not self.connected:
self.connected = await self.device.connect()
while not self.device.is_connected:
await asyncio.sleep(0.1)
else:
logger.warning("already connected")
return self.connected
[docs] async def disconnect(self):
if self.connected:
await self.device.disconnect()
while self.device.is_connected:
await asyncio.sleep(0.1)
else:
logger.warning("already disconnected")
[docs] async def read(self, char_uuid: UUID) -> GattReadData:
return await self.device.read_gatt_char(char_uuid)
[docs] async def write(self, char_uuid: UUID, data: GattWriteData, response: bool = False):
await self.device.write_gatt_char(char_uuid, data, response)
[docs] async def register_notification_handler(
self, char_uuid: UUID, notification_handler: GattNotificationHandler
) -> bool:
await self.device.start_notify(char_uuid, notification_handler)
return True
[docs] async def unregister_notification_handler(self, char_uuid: UUID) -> bool:
await self.device.stop_notify(char_uuid)
return True
[docs]class BleScanner(ScannerInterface):
"""BleScanner
Scanner for internal BLE interface.
"""
[docs] def __init__(self):
pass
[docs] async def scan(
self,
num: Optional[int] = None,
cube_id: Optional[set[str]] = None,
address: Optional[set[str]] = None,
sort: SortKey = None,
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> list[CubeInfo]:
"""Scan toio Core Cubes.
Argument 'num', 'cube_id', and 'address' is exclusive.
Args:
num (Optional[int], optional): Number of cubes to be found. Defaults to None.
cube_id (Optional[set[str]], optional): Set of cube id to be found. Defaults to None.
address (Optional[set[str]], optional): Set of cube BLE address to be found. Defaults to None.
sort (SortKey, optional): Key to sort results. Defaults to None (no sort).
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
list[CubeInfo]: List of found cubes
Notes:
If the cube named "31j" is found in scanning, this function warns it.
If you specify the name "31j" to find the cube, this function warns it.
"31j" is wrong name.
This name appears when the toio Core Cube fall wrong state.
To recover this, turn off the cube and back again.
Ref: https://support.toio.io/s/article/15855
"""
w31j = False
condition_met = asyncio.Event()
found_cubes: dict[Union[str, int], CubeInfo] = {}
if cube_id is not None and "31j" in cube_id:
logger.warning(
"warning: scanner: Specifying cube_id '31j' is NOT recommended"
)
# detection callback
def check_condition(device: BLEDevice, advertisement: AdvertisementData):
service_uuids = map(UUID, advertisement.service_uuids)
if TOIO_UUID_SERVICE in service_uuids:
nonlocal w31j
nonlocal condition_met
nonlocal found_cubes
if not w31j and device.name is not None and device.name.endswith("31j"):
logger.warning(
"warning: scanner: cube_id '31j' is found. Why not turn all cubes off and back again?"
)
w31j = True
if address is not None:
address_list = [x.upper() for x in address]
if device.address in address_list:
found_cubes[device.address] = CubeInfo(
name=device.name,
device=device,
interface=BleCube(device),
advertisement=advertisement,
)
if len(found_cubes) >= len(address):
condition_met.set()
elif cube_id is not None and device.name is not None:
for id_str in cube_id:
if id_str in device.name:
found_cubes[device.address] = CubeInfo(
name=device.name,
device=device,
interface=BleCube(device),
advertisement=advertisement,
)
if len(found_cubes) >= len(cube_id):
condition_met.set()
else:
found_cubes[device.address] = CubeInfo(
name=device.name,
device=device,
interface=BleCube(device),
advertisement=advertisement,
)
# scan ble devices
async with BleakScanner(detection_callback=check_condition):
try:
await asyncio.wait_for(condition_met.wait(), timeout=timeout)
except TimeoutError:
logger.debug(f"scanner: timeout {timeout} sec")
except Exception:
raise
# get the list of cubes
toio_cubes = list(found_cubes.values())
# sort
if sort is not None and len(toio_cubes) >= 2:
if sort == "rssi":
def rssi(info: CubeInfo) -> int:
if info.advertisement.rssi is not None:
return info.advertisement.rssi
else:
return RSSI_UNKNOWN
toio_cubes.sort(key=rssi, reverse=True)
elif sort == "local_name":
def local_name(info: CubeInfo) -> str:
if info.name is not None:
return info.name
else:
return ""
toio_cubes.sort(key=local_name)
if num is not None and len(toio_cubes) > num:
return toio_cubes[:num]
else:
return toio_cubes