Source code for toio.scanner.ble
# -*- coding: utf-8 -*-
# ************************************************************
#
# ble.py
#
# Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""
BLE scanner (including specific functions for each supported OS)
Scan toio Core Cubes with internal BLE interface.
"""
import functools
import platform
from typing_extensions import Any, List, NamedTuple, Optional, Set
from ..device_interface import DEFAULT_SCAN_TIMEOUT, CubeInfo, ScannerInterface, SortKey
from ..device_interface.ble import BaseBleScanner
from ..logger import get_toio_logger
logger = get_toio_logger(__name__)
[docs]class UniversalBleScanner(ScannerInterface):
async def _scan(
self,
num: Optional[int] = None,
cube_id: Optional[Set[str]] = None,
address: Optional[Set[str]] = None,
sort: SortKey = None,
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> List[CubeInfo]:
scanner = BaseBleScanner()
return await scanner._scan(
num=num, cube_id=cube_id, address=address, sort=sort, timeout=timeout
)
[docs] async def scan( # type: ignore
self, num: int, sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT
) -> List[CubeInfo]:
"""Scan the specified number of toio Core Cubes.
The scan is terminated by a timeout.
In the case of a timeout, the number of elements in the returned list
is the number of cubes found at the time of the timeout.
Args:
num (int): Number of cubes to be found.
sort (SortKey, optional): Key to sort results. Defaults to "rssi".
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
List[Tuple[BLEDevice, AdvertisementData]]: List of found cubes.
"""
return await self._scan(num=num, sort=sort, timeout=timeout)
[docs] async def scan_with_id(
self,
cube_id: Set[str],
sort: SortKey = "rssi",
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> List[CubeInfo]:
"""Scan toio Core Cubes with specified id.
The scan is terminated by a timeout.
In the case of a timeout, the number of elements in the returned list
is the number of cubes found at the time of the timeout.
Args:
cube_id (set[str]): Set of cube id to be found.
sort (SortKey, optional): Key to sort results. Defaults to "rssi".
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
List[Tuple[BLEDevice, AdvertisementData]]: List of found cubes.
"""
return await self._scan(cube_id=cube_id, sort=sort, timeout=timeout)
[docs] async def scan_with_address(
self,
address: Set[str],
sort: SortKey = "rssi",
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> List[CubeInfo]:
"""Scan toio Core Cubes with specified BLE address.
The scan is terminated by a timeout.
In the case of a timeout, the number of elements in the returned list
is the number of cubes found at the time of the timeout.
Args:
address (set[str]): Set of BLE address to be found.
sort (SortKey, optional): Key to sort results. Defaults to "rssi".
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
List[Tuple[BLEDevice, AdvertisementData]]: List of found cubes.
"""
return await self._scan(address=address, sort=sort, timeout=timeout)
[docs] @async_platform_specified(PlatformParam("Windows", []))
async def scan_registered_cubes(
self, num: int, sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT
) -> List[CubeInfo]:
"""Scan toio Core Cubes registered with Windows
This function only works on Windows platform.
On the other platform, this function always returns empty list.
Even if `num` is greater than the number of registered cubes,
the maximum size of the list returned by this function is the number of registered cubes.
Args:
num (int): Number of cubes to be found.
sort (SortKey, optional): Key to sort results. Defaults to "rssi".
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
List[Tuple[BLEDevice, AdvertisementData]]: List of found cubes.
"""
from ..scanner.platform.windows_ble import get_registered_cubes
registered_cubes = get_registered_cubes()
addresses = {x.address.upper() for x in registered_cubes}
found = await self.scan_with_address(addresses, sort, timeout)
if len(found) > num:
return found[:num]
else:
return found
[docs] @async_platform_specified(PlatformParam("Windows", []))
async def scan_registered_cubes_with_id(
self,
cube_id: Set[str],
sort: SortKey = "rssi",
timeout: float = DEFAULT_SCAN_TIMEOUT,
) -> List[CubeInfo]:
"""Scan toio Core Cube specified by the cube_id registered with Windows
This function only works on Windows platform.
On the other platform, this function always returns empty list.
Args:
cube_id (set[str]): Set of cube id to be found.
sort (SortKey, optional): Key to sort results. Defaults to "rssi".
timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT.
Returns:
List[Tuple[BLEDevice, AdvertisementData]]: List of found cubes.
"""
from ..scanner.platform.windows_ble import get_registered_cubes
registered_cubes = get_registered_cubes()
address_list = set([])
for cube in registered_cubes:
for id_str in cube_id:
if id_str in cube.name:
address_list.add(cube.address.upper())
if len(address_list) == 0:
return []
else:
found = await self.scan_with_address(address_list, sort, timeout)
return found