# -*- coding: utf-8 -*-
# ************************************************************
#
# toio/cube/__init__.py
#
# Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
from __future__ import annotations
import asyncio
from uuid import UUID
from typing_extensions import (
Any,
Iterable,
List,
Optional,
Sequence,
Tuple,
Type,
TypeAlias,
Union,
)
from ..device_interface import (
CubeInfo,
CubeInterface,
GattNotificationHandler,
GattReadData,
GattWriteData,
ScannerInterface,
)
from ..scanner import UniversalBleScanner
from .api import ToioCoreCubeLowLevelAPI
from .api.battery import Battery, BatteryInformation, BatteryResponseType
from .api.button import Button, ButtonInformation, ButtonResponseType, ButtonState
from .api.configuration import (
Configuration,
ConfigurationResponseType,
MagneticSensorCondition,
MagneticSensorFunction,
MotorSpeedInformationAcquisitionState,
NotificationCondition,
PostureAngleDetectionCondition,
PostureAngleDetectionType,
ProtocolVersion,
ResponseIdMissedNotificationSettings,
ResponseIdNotificationSettings,
ResponseMagneticSensorSettings,
ResponseMotorSpeedInformationAcquisitionSettings,
ResponsePostureAngleDetectionSettings,
)
from .api.id_information import (
IdInformation,
IdInformationResponseType,
PositionId,
PositionIdMissed,
StandardId,
StandardIdMissed,
)
from .api.indicator import Color, Indicator, IndicatorParam
from .api.motor import (
AccelerationDirection,
AccelerationPriority,
AccelerationRotation,
Motor,
MotorResponseCode,
MotorResponseType,
MovementType,
ResponseMotorControlMultipleTargets,
ResponseMotorControlTarget,
ResponseMotorSpeed,
RotationOption,
Speed,
SpeedChangeType,
TargetPosition,
WriteMode,
)
from .api.sensor import (
MagneticSensorData,
MotionDetectionData,
Posture,
PostureAngleEulerData,
PostureAngleQuaternionsData,
PostureDataType,
Sensor,
SensorResponseType,
)
from .api.sound import MidiNote, Note, Sound, SoundId
from .multi_cubes import MultipleToioCoreCubes
from .notification_handler_info import NotificationHandlerInfo, NotificationHandlerTypes
CubeInitializer: TypeAlias = Union[CubeInterface, CubeInfo]
[docs]class ToioCoreCube(CubeInterface):
"""
Access to toio Core Cube
Note:
- self.protocol_version is set after connecting to the cube.
- self.protocol_version and self.max_retry_to_get_protocol_version is supported since v1.1.0.
- basic scan function is supported since v1.1.0.
When Toio is initialized with no arguments, the scan() function can search for a cubes.
scan() can be followed by a call to the connect() function to connect
to multiple cubes.
If you initialize ToioCoreCube with a CubeInfo or a CubeInterface,
you can connect to specified cube by calling the connect() function.
In this case, the scan() function does not work.
ToioCoreCube is an asynchronous context manager.
When 'async with' is used, '__aenter__' handles the process up to connection,
and '__aexit__' handles the disconnection.
Attributes:
interface (CubeInterface): control interface (e.g. BleCube)
name (str): cube name (optional)
api (ToioCoreCubeLowLevelAPI): API class
protocol_version (Optional[ProtocolVersion]): protocol version of the cube
max_retry_to_get_protocol_version (int): number of retries to get protocol version
"""
SUPPORTED_MAJOR_VERSION: int = 2
SUPPORTED_MINOR_VERSION: int = 4
_LOCK: Optional[asyncio.Lock] = None
[docs] @staticmethod
def create(initializer: Union[CubeInitializer, Sequence]) -> ToioCoreCube:
"""
Supported toio.py versions: v1.1.0 or later
Create a ToioCoreCube instance from a CubeInterface or CubeInfo
Args:
initializer (CubeInitializer): initializer
Returns:
Optional[ToioCoreCube]:
"""
if (
isinstance(initializer, Sequence)
and not isinstance(initializer, CubeInterface)
and not isinstance(initializer, CubeInfo)
):
if len(initializer) < 1:
raise ValueError("no initializer")
first_initializer = initializer[0]
else:
first_initializer = initializer
if isinstance(first_initializer, CubeInterface):
return ToioCoreCube(interface=first_initializer)
elif isinstance(first_initializer, CubeInfo):
return ToioCoreCube(
interface=first_initializer.interface, name=first_initializer.name
)
else:
raise ValueError("wrong initializer: " + str(type(first_initializer)))
[docs] @staticmethod
def create_cubes(initializers: Iterable[CubeInitializer]) -> List[ToioCoreCube]:
"""
Supported toio.py versions: v1.1.0 or later
Create a ToioCoreCube instance list from a CubeInterface or CubeInfo list
Args:
initializers (Iterable[CubeInitializer]): initializers
Returns:
List[ToioCoreCube]:
"""
cubes = []
for initializer in initializers:
cube = ToioCoreCube.create(initializer)
cubes.append(cube)
return cubes
[docs] def __init__(
self,
interface: Optional[CubeInterface] = None,
name: Optional[str] = None,
scanner: Type[ScannerInterface] = UniversalBleScanner,
scanner_args: Sequence[Any] = (),
):
if ToioCoreCube._LOCK is None:
ToioCoreCube._LOCK = asyncio.Lock()
if interface is None:
self._scanning_required = True
self.interface = None
else:
self._scanning_required = False
self.interface = interface
self.name = name
self._scanner = scanner
self._scanner_args = scanner_args
self.protocol_version: Optional[ProtocolVersion] = None
self.max_retry_to_get_protocol_version: int = 10
async def __aenter__(self):
assert ToioCoreCube._LOCK is not None
async with ToioCoreCube._LOCK:
await self.scan()
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.disconnect()
[docs] async def scan(self):
if self._scanning_required and self.interface is None:
device_list = await self._scanner().scan(1, *self._scanner_args)
if len(device_list):
self.interface = device_list[0].interface
if self.name is None:
self.name = device_list[0].name
[docs] async def connect(self) -> bool:
assert self.interface is not None
self.api = ToioCoreCubeLowLevelAPI(interface=self.interface, root_device=self)
connect_result = await self.interface.connect()
while not self.interface.is_connect():
await asyncio.sleep(0.1)
if connect_result is True:
self.protocol_version = None
await self.api.configuration.request_protocol_version()
retry_count = 0
while (
self.protocol_version is None
and retry_count < self.max_retry_to_get_protocol_version
):
retry_count += 1
received_data = await self.api.configuration._read()
if ProtocolVersion.is_myself(received_data):
self.protocol_version = ProtocolVersion(received_data)
await asyncio.sleep(0.1)
if self.protocol_version is not None:
if (
self.protocol_version._major != self.SUPPORTED_MAJOR_VERSION
or self.protocol_version._minor < self.SUPPORTED_MINOR_VERSION
):
import warnings
warnings.warn(
"protocol version %s is not supported by toio.py\n"
% self.protocol_version.version
+ "update cube firmware to latest version",
UserWarning,
)
return connect_result
[docs] async def disconnect(self) -> bool:
assert self.interface is not None
return await self.interface.disconnect()
[docs] async def read(self, char_uuid: UUID) -> GattReadData:
assert self.interface is not None
return await self.interface.read(char_uuid)
[docs] async def write(self, char_uuid: UUID, data: GattWriteData, response: bool = False):
assert self.interface is not None
return await self.interface.write(char_uuid, data, response)
[docs] async def register_notification_handler(
self, char_uuid: UUID, notification_handler: GattNotificationHandler
) -> bool:
assert self.interface is not None
return await self.interface.register_notification_handler(
char_uuid, notification_handler
)
[docs] async def unregister_notification_handler(self, char_uuid: UUID) -> bool:
assert self.interface is not None
return await self.interface.unregister_notification_handler(char_uuid)
[docs] def is_connect(self) -> bool:
assert self.interface is not None
return self.interface.is_connect()
__all__: Tuple[str, ...] = (
"CubeInitializer",
"ToioCoreCube",
"NotificationHandlerInfo",
"NotificationHandlerTypes",
"MultipleToioCoreCubes",
# .api
"ToioCoreCubeLowLevelAPI",
# .api.battery
"BatteryResponseType",
"BatteryInformation",
"Battery",
# .api.button
"ButtonResponseType",
"ButtonState",
"ButtonInformation",
"Button",
# .api.configuration
"ConfigurationResponseType",
"NotificationCondition",
"MagneticSensorFunction",
"MagneticSensorCondition",
"MotorSpeedInformationAcquisitionState",
"PostureAngleDetectionType",
"PostureAngleDetectionCondition",
"ProtocolVersion",
"ResponseIdNotificationSettings",
"ResponseIdMissedNotificationSettings",
"ResponseMagneticSensorSettings",
"ResponseMotorSpeedInformationAcquisitionSettings",
"ResponsePostureAngleDetectionSettings",
"Configuration",
# .api.id_information
"IdInformationResponseType",
"PositionId",
"StandardId",
"PositionIdMissed",
"StandardIdMissed",
"IdInformation",
# .api.indicator
"Color",
"IndicatorParam",
"Indicator",
# .api.motor
"MotorResponseType",
"MovementType",
"RotationOption",
"TargetPosition",
"SpeedChangeType",
"Speed",
"WriteMode",
"AccelerationRotation",
"AccelerationDirection",
"AccelerationPriority",
"MotorResponseCode",
"ResponseMotorControlTarget",
"ResponseMotorControlMultipleTargets",
"ResponseMotorSpeed",
"Motor",
# .api.sensor
"SensorResponseType",
"PostureDataType",
"Posture",
"MotionDetectionData",
"PostureAngleEulerData",
"PostureAngleQuaternionsData",
"MagneticSensorData",
"Sensor",
# .api.sound
"SoundId",
"Note",
"MidiNote",
"Sound",
)