Source code for toio.cube.multi_cubes

# -*- coding: utf-8 -*-
# ************************************************************
#
#     multi_cubes.py
#
#     Copyright 2024 Sony Interactive Entertainment Inc.
#
# ************************************************************

from __future__ import annotations

import asyncio

from typing_extensions import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Dict,
    List,
    Optional,
    Sequence,
    Type,
    Union,
)

from ..device_interface import CubeInfo, ScannerInterface
from ..logger import get_toio_logger
from ..scanner.ble import UniversalBleScanner

if TYPE_CHECKING:
    from ..cube import ToioCoreCube

logger = get_toio_logger(__name__)


[docs]class MultipleToioCoreCubes: """ Multiple cube control class This class is a wrapper to control multiple cubes easier. When MultipleToioCoreCubes is initialized with an integer, the scan() function can search for a specified number of cubes. scan() can be followed by a call to the connect() function to connect to multiple cubes. If you initialize MultipleToioCoreCubes with a list of CubeInfo, you can connect to multiple cubes by calling the connect() function. In this case, the scan() function does not work. MultipleToioCoreCubes is an asynchronous context manager. When 'async with' is used, '__aenter__' handles the process up to connection, and '__aexit__' handles the disconnection. Access to each cubes Each cube can be accessed by the number. >>> async with MultipleToioCoreCubes(2) as cubes: >>> cubes[0].api....() >>> cubes[1].api....() Each cube can be accessd by 'for' also. >>> async with MultipleToioCoreCubes(2) as cubes: >>> for c in cubes: >>> c.api....() When 'names' is specified, cubes has each name and can be accessed with specified name. >>> # accessing by cube name property >>> async with MultipleToioCoreCubes(2, "alpha", "beta") as cubes: >>> cubes.alpha.api....() >>> cubes.beta.api....() >>> # accessing cubes using the cube interface obtained by name >>> async with MultipleToioCoreCubes(2, "alpha", "beta") as cubes: >>> alpha = cubes.named("alpha") >>> beta = cubes.named("beta") >>> alpha.api....() >>> beta.api....() """ OPERATION_INTERVAL: float = 0.5 _LOCK: Optional[asyncio.Lock] = None
[docs] def __init__( self, cubes: Union[int, List[CubeInfo]], names: Optional[Sequence[str]] = None, scanner: Type[ScannerInterface] = UniversalBleScanner, scanner_args: Sequence[Any] = (), ): """ Initialize MultipleCubes Args: cubes (Union[int, List[CubeInfo]]): number of cubes to be scanned, or list of cubes to be handled names (Optional[Sequence[str]]): sequence of names of cubes scanner (Type[ScannerInterface]): scanner interface (default is UniversalBleScanner) scanner_args (Sequence[Any]): arguments given to the scanner.scan() function """ if MultipleToioCoreCubes._LOCK is None: MultipleToioCoreCubes._LOCK = asyncio.Lock() from ..cube import ToioCoreCube self._cube_num: Optional[int] = None self._cubes: List[ToioCoreCube] = [] if isinstance(cubes, int): self._cube_num = cubes self._scanning_required = True else: self._scanning_required = False self._cubes = ToioCoreCube.create_cubes(cubes) self._names = names self._scanner = scanner self._scanner_args = scanner_args self._cube_dict: Dict[str, ToioCoreCube] = {}
def __getattr__(self, name: str) -> ToioCoreCube: cube = self._cube_dict.get(name) if cube: return cube else: raise AttributeError("'%s' is not found" % name) async def __aenter__(self): assert MultipleToioCoreCubes._LOCK is not None async with MultipleToioCoreCubes._LOCK: await self.scan() await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.disconnect() def __len__(self) -> int: return len(self._cubes) def __getitem__(self, n) -> ToioCoreCube: return self._cubes[n] def _assign_cubes(self): assert self._cubes is not None assert not isinstance(self._cubes, int) if self._names is not None: for name, cube in zip(self._names, self._cubes): self._cube_dict[name] = cube
[docs] async def scan(self): """ scan cubes If MultipleCubes is initialized with integer number, this function performs to scan the number of cubes. """ from ..cube import ToioCoreCube if self._scanning_required and isinstance(self._cube_num, int): device_list = await self._scanner().scan( self._cube_num, *self._scanner_args ) self._cubes = ToioCoreCube.create_cubes(device_list) self._scanning_required = False
async def _wait_and_exec(self, wait: float, func: Awaitable): await asyncio.sleep(wait) return await func
[docs] async def connect(self): """ connect to multiple cubes cubes are connected at MultipleToioCoreCubes.OPERATION_INTERVAL second interval. """ self._assign_cubes() connect_list = [] wait = 0 for cube in self._cubes: connect_list.append(self._wait_and_exec(wait, cube.connect())) wait += self.OPERATION_INTERVAL result_list = await asyncio.gather(*connect_list) while False in result_list: logger.info("try to connect again") for i, result in enumerate(result_list): if result: del connect_list[i] result_list = await asyncio.gather(*connect_list)
[docs] async def disconnect(self): """ disconnect to multiple cubes cubes are disconnected at MultipleToioCoreCubes.OPERATION_INTERVAL second interval. """ disconnect_list = [] wait = 0.0 for cube in self._cubes: disconnect_list.append(self._wait_and_exec(wait, cube.disconnect())) wait += self.OPERATION_INTERVAL result_list = await asyncio.gather(*disconnect_list) while False in result_list: logger.info("try to disconnect again") for i, result in enumerate(result_list): if result: del disconnect_list[i] result_list = await asyncio.gather(*disconnect_list)
[docs] def named(self, name: str) -> ToioCoreCube: """ get the cube specified by the name Args: name (str): name Returns: ToioCoreCube: Exceptions: ValueError: the cube specified is not found """ cube = self._cube_dict.get(name) if cube is not None: return cube else: raise ValueError("'%s' is not found" % name)