Source code for toio.scanner.ble

# -*- coding: utf-8 -*-
# ************************************************************
#
#     ble.py
#
#     Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
"""
BLE scanner

Scan toio Core Cubes with internal BLE interface.
"""

import platform

from toio.device_interface import DEFAULT_SCAN_TIMEOUT, CubeInfo, SortKey
from toio.device_interface.ble import BleScanner
from toio.logger import get_toio_logger

if platform.system() == "Windows":
    from toio.scanner.platform.windows_ble import get_registered_cubes

logger = get_toio_logger(__name__)


[docs]async def scan( num: int, sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT ) -> list[CubeInfo]: """Scan the specified number of toio Core Cubes. The scan is terminated by a timeout. In the case of a timeout, the number of elements in the returned list is the number of cubes found at the time of the timeout. Args: num (int): Number of cubes to be found. sort (SortKey, optional): Key to sort results. Defaults to "rssi". timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[tuple[BLEDevice, AdvertisementData]]: List of found cubes. """ scanner = BleScanner() return await scanner.scan(num=num, sort=sort, timeout=timeout)
[docs]async def scan_with_id( cube_id: set[str], sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT ) -> list[CubeInfo]: """Scan toio Core Cubes with specified id. The scan is terminated by a timeout. In the case of a timeout, the number of elements in the returned list is the number of cubes found at the time of the timeout. Args: cube_id (set[str]): Set of cube id to be found. sort (SortKey, optional): Key to sort results. Defaults to "rssi". timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[tuple[BLEDevice, AdvertisementData]]: List of found cubes. """ scanner = BleScanner() return await scanner.scan(cube_id=cube_id, sort=sort, timeout=timeout)
[docs]async def scan_with_address( address: set[str], sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT ) -> list[CubeInfo]: """Scan toio Core Cubes with specified BLE address. The scan is terminated by a timeout. In the case of a timeout, the number of elements in the returned list is the number of cubes found at the time of the timeout. Args: address (set[str]): Set of BLE address to be found. sort (SortKey, optional): Key to sort results. Defaults to "rssi". timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[tuple[BLEDevice, AdvertisementData]]: List of found cubes. """ scanner = BleScanner() return await scanner.scan(address=address, sort=sort, timeout=timeout)
[docs]async def scan_registered_cubes( num: int, sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT ) -> list[CubeInfo]: """Scan toio Core Cubes registered with Windows This function only works on Windows platform. On the other platform, this function always returns empty list. Even if `num` is greater than the number of registered cubes, the maximum size of the list returned by this function is the number of registered cubes. Args: num (int): Number of cubes to be found. sort (SortKey, optional): Key to sort results. Defaults to "rssi". timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[tuple[BLEDevice, AdvertisementData]]: List of found cubes. """ if platform.system() == "Windows": registered_cubes = get_registered_cubes() addresses = {x.address.upper() for x in registered_cubes} found = await scan_with_address(addresses, sort, timeout) if len(found) > num: return found[:num] else: return found else: logger.warning("platform '%s' is not supported", platform.system()) return []
[docs]async def scan_registered_cubes_with_id( cube_id: set[str], sort: SortKey = "rssi", timeout: float = DEFAULT_SCAN_TIMEOUT ) -> list[CubeInfo]: """Scan toio Core Cube specified by the cube_id registered with Windows This function only works on Windows platform. On the other platform, this function always returns empty list. Args: cube_id (set[str]): Set of cube id to be found. sort (SortKey, optional): Key to sort results. Defaults to "rssi". timeout (float, optional): Scan timeout. Defaults to DEFAULT_SCAN_TIMEOUT. Returns: list[tuple[BLEDevice, AdvertisementData]]: List of found cubes. """ if platform.system() == "Windows": registered_cubes = get_registered_cubes() address_list = set([]) for cube in registered_cubes: for id_str in cube_id: if id_str in cube.name: address_list.add(cube.address.upper()) if len(address_list) == 0: return [] else: found = await scan_with_address(address_list, sort, timeout) return found else: logger.warning("platform '%s' is not supported", platform.system()) return []