Source code for toio.simple.async_simple

# -*- coding: utf-8 -*-
# ************************************************************
#
#     toio/simple/async_simple.py
#
#     Copyright 2024 Sony Interactive Entertainment Inc.
#
# ************************************************************

from __future__ import annotations

import asyncio
import math
import time
from enum import Enum, auto
from logging import NOTSET, Formatter, NullHandler, StreamHandler, getLogger
from typing import ClassVar, Optional, Tuple, Type

from ..coordinate_systems import (
    LocalCoordinateSystem,
    VisualProgrammingCoordinateSystem,
)
from ..cube import ToioCoreCube
from ..cube.api.base_class import CubeCharacteristic, NotificationHandlerTypes
from ..cube.api.button import Button, ButtonInformation
from ..cube.api.configuration import (
    MagneticSensorCondition,
    MagneticSensorFunction,
    PostureAngleDetectionCondition,
    PostureAngleDetectionType,
)
from ..cube.api.id_information import (
    IdInformation,
    PositionId,
    PositionIdMissed,
    StandardId,
    StandardIdMissed,
)
from ..cube.api.indicator import Color, IndicatorParam
from ..cube.api.motor import (
    Motor,
    MotorResponseCode,
    MovementType,
    ResponseMotorControlMultipleTargets,
    ResponseMotorControlTarget,
    RotationOption,
    Speed,
    TargetPosition,
)
from ..cube.api.sensor import (
    MagneticSensorData,
    MotionDetectionData,
    PostureAngleEulerData,
    PostureDataType,
    Sensor,
)
from ..cube.api.sound import MidiNote, Note
from ..position import (
    STAY_CURRENT,
    CubeLocation,
    MatRect,
    Point,
    RelativeCubeLocation,
    ToioMat,
)
from ..scanner.ble import UniversalBleScanner
from ..standard_id import StandardIdCard
from ..utility import clip

module_logger = getLogger(__name__)
module_logger.setLevel(NOTSET)
module_handler = NullHandler()
module_handler.setLevel(NOTSET)
module_logger.addHandler(module_handler)


[docs]class Direction(Enum): Forward = auto() Backward = auto() Right = auto() Left = auto()
[docs]class AsyncSimpleCube: """ Access to toio core cube by easier method Functions that like blocks in visual programming """ DEFAULT_ROTATION_OPTION: ClassVar[RotationOption] = RotationOption.AbsoluteOptimal DEFAULT_MOVEMENT_TYPE: ClassVar[MovementType] = MovementType.Curve DEFAULT_TIMEOUT: ClassVar[int] = 10 DEFAULT_ONE_STEP: ClassVar[int] = 1 CELL_SIZE: ClassVar[float] = 43.43 MONITORING_CYCLE: ClassVar[float] = 0.01 _LOCK: Optional[asyncio.Lock] = None _CUBES: int = 0
[docs] @staticmethod def ensure_event_loop() -> asyncio.AbstractEventLoop: try: return asyncio.get_running_loop() except RuntimeError: event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) return event_loop
[docs] @classmethod async def search(cls, name: Optional[str] = None, timeout: int = 5) -> ToioCoreCube: BLEScanner = UniversalBleScanner() if name is not None: module_logger.info(f"search {name} in my registered devices") devices = await BLEScanner.scan_registered_cubes_with_id(cube_id={name}) if len(devices) == 0: module_logger.info(f"search {name}") devices = await BLEScanner.scan_with_id(cube_id={name}) else: module_logger.info("scan registered devices") devices = await BLEScanner.scan_registered_cubes(1, timeout=timeout) if len(devices) == 0: module_logger.info("scan all devices") devices = await BLEScanner.scan(1, timeout=timeout) module_logger.info("scan complete") module_logger.debug(devices) if len(devices) != 1: raise ValueError return ToioCoreCube(interface=devices[0].interface, name=devices[0].name)
[docs] def __init__( self, name: Optional[str] = None, timeout: int = 5, coordinate_system_class: Type[ LocalCoordinateSystem ] = VisualProgrammingCoordinateSystem, log_level: int = NOTSET, cube: Optional[ToioCoreCube] = None, ) -> None: if AsyncSimpleCube._LOCK is None: AsyncSimpleCube._LOCK = asyncio.Lock() self.logger = getLogger(__name__ + str(AsyncSimpleCube._CUBES)) AsyncSimpleCube._CUBES += 1 if log_level is not NOTSET: self.logger.setLevel(log_level) log_handler = StreamHandler() formatter = Formatter( "simple:" + str(AsyncSimpleCube._CUBES) + ":%(asctime)s %(levelname)7s %(message)s" ) log_handler.setFormatter(formatter) log_handler.setLevel(log_level) self.logger.addHandler(log_handler) self.logger.info("initialized") self._native_location: Optional[CubeLocation] = None self._location: Optional[RelativeCubeLocation] = None self._standard_id: Optional[StandardId] = None self._on_position_id: bool = False self._on_standard_id: bool = False self._mat: Optional[MatRect] = None self._arrived: bool = False self._name: Optional[str] = name self._timeout: int = timeout self._coordinate_system_class: Type[LocalCoordinateSystem] = ( coordinate_system_class ) self._cube = cube self._motion: Optional[MotionDetectionData] = None self._cube_angle: Optional[PostureAngleEulerData] = None self._magnet: Optional[MagneticSensorData] = None
async def _scan(self) -> None: if self._cube is None: self._cube = await self.search(name=self._name, timeout=self._timeout) if self._cube is None: raise Exception("no cubes are found") async def _setup(self) -> None: assert self._cube is not None self._button: Optional[ButtonInformation] = await self._cube.api.button.read() await self._set_sensor_configurations() await self._request_initial_information() handlers: Tuple[Tuple[CubeCharacteristic, NotificationHandlerTypes], ...] = ( (self._cube.api.id_information, self._id_notification_handler), (self._cube.api.motor, self._motor_notification_handler), (self._cube.api.sensor, self._motion_sensor_notification_handler), (self._cube.api.button, self._button_notification_handler), ) for characteristic, notification_handler in handlers: await characteristic.register_notification_handler(notification_handler) await self._wait_to_obtain_initial_information() async def _set_sensor_configurations(self) -> None: assert self._cube is not None await self._cube.api.configuration.set_magnetic_sensor( function_type=MagneticSensorFunction.MagnetState, # function_type=MagneticSensorFunction.MagneticForce, interval_ms=60, condition=MagneticSensorCondition.ChangeDetection, ) await self._cube.api.configuration.set_posture_angle_detection( detection_type=PostureAngleDetectionType.Euler, interval_ms=50, condition=PostureAngleDetectionCondition.ChangeDetection, ) async def _request_initial_information(self) -> None: assert self._cube is not None await self._cube.api.sensor.request_motion_information() await self._cube.api.sensor.request_posture_angle_information( PostureDataType.Euler ) await self._cube.api.sensor.request_magnetic_sensor_information() async def _wait_to_obtain_initial_information(self): while not self._motion or not self._cube_angle or not self._magnet: await self._request_initial_information() await asyncio.sleep(0.1) def __del__(self) -> None: AsyncSimpleCube._CUBES -= 1 async def __aenter__(self) -> AsyncSimpleCube: await self.connect() return self async def __aexit__(self, _exc_type, _exc_value, _traceback) -> None: await self.disconnect()
[docs] async def connect(self) -> None: self.logger.info("start to connect") assert AsyncSimpleCube._LOCK is not None self.logger.info("try to lock") async with AsyncSimpleCube._LOCK: self.logger.info("enter critical section") self.logger.info("scanning") await self._scan() assert self._cube is not None self.logger.info("connecting") await self._cube.connect() self.logger.info("connected (%s)", self._cube.name) await self._setup() self.logger.info("setup completed") self.logger.info("exit critical section") self.logger.info("release lock") await asyncio.sleep(0)
[docs] async def disconnect(self) -> None: assert self._cube is not None self.logger.debug("disconnecting") if self._cube.is_connect(): await self._cube.disconnect() self.logger.debug("disconnected")
[docs] async def sleep(self, sleep_second: float) -> None: start_time = time.time() while True: await asyncio.sleep(0) time.sleep(0) if (time.time() - start_time) >= sleep_second: break
async def _id_notification_handler(self, payload: bytearray) -> None: id_info = IdInformation.is_my_data(payload) self.logger.debug(id_info) if isinstance(id_info, PositionId): self._native_location = id_info.center for mat in ToioMat.mats: if self._native_location.point in mat: if mat != self._mat: self.logger.debug(str(mat)) self._mat = mat self._location = RelativeCubeLocation.new() coordinate_system = self._coordinate_system_class( origin=mat.center() ) self._location.change_coordinate_system(coordinate_system) else: assert self._location is not None self._location.from_absolute_location(self._native_location) break assert self._location is not None self._on_position_id = True elif isinstance(id_info, StandardId): coordinate_system = self._coordinate_system_class() id_info.angle = coordinate_system.from_native_angle(id_info.angle) self._standard_id = id_info self._on_standard_id = True elif isinstance(id_info, PositionIdMissed): self._location = None self._mat = None self._native_location = None self._on_position_id = False elif isinstance(id_info, StandardIdMissed): self._standard_id = None self._on_standard_id = False async def _motor_notification_handler(self, payload: bytearray) -> None: motor_response = Motor.is_my_data(payload) self.logger.debug(motor_response) if isinstance( motor_response, (ResponseMotorControlTarget, ResponseMotorControlMultipleTargets), ): if ( motor_response.response_code == MotorResponseCode.SUCCESS or motor_response.response_code == MotorResponseCode.SUCCESS_WITH_OVERWRITE ): self._arrived = True async def _motion_sensor_notification_handler(self, payload: bytearray) -> None: sensor_info = Sensor.is_my_data(payload) self.logger.debug(sensor_info) if isinstance(sensor_info, MotionDetectionData): self._motion = sensor_info elif isinstance(sensor_info, PostureAngleEulerData): self._cube_angle = sensor_info elif isinstance(sensor_info, MagneticSensorData): self._magnet = sensor_info async def _button_notification_handler(self, payload: bytearray) -> None: button_info = Button.is_my_data(payload) self.logger.debug(button_info) if isinstance(button_info, ButtonInformation): self._button = button_info
[docs] async def move( self, speed: int, duration: float, wait_to_complete: bool = True ) -> None: assert self._cube is not None duration = max(duration, 0) duration_ms = int(duration * 1000) await self._cube.api.motor.motor_control(speed, speed, duration_ms) if wait_to_complete: await self.sleep(duration)
[docs] async def spin( self, speed: int, duration: float, wait_to_complete: bool = True ) -> None: """ speed: (negative value: anticlockwise) """ assert self._cube is not None duration = max(duration, 0) duration_ms = int(duration * 1000) await self._cube.api.motor.motor_control(speed, -speed, duration_ms) if wait_to_complete: await self.sleep(duration)
[docs] async def run_motor( self, left_speed: int, right_speed: int, duration: float, wait_to_complete: bool = True, ) -> None: assert self._cube is not None duration = max(duration, 0) duration_ms = int(duration * 1000) await self._cube.api.motor.motor_control(left_speed, right_speed, duration_ms) if wait_to_complete: await self.sleep(duration)
[docs] async def stop_motor(self) -> None: """stop_motor. Args: Returns: None: """ assert self._cube is not None await self._cube.api.motor.motor_control(0, 0)
[docs] async def move_steps(self, direction: Direction, speed: int, step: int) -> bool: if not self._on_position_id: return False assert self._native_location is not None if direction == Direction.Forward: distance = self._step_to_point(step) elif direction == Direction.Backward: distance = -1 * self._step_to_point(step) else: return False native_location = self._native_location target_x = native_location.point.x + round( distance * math.cos(math.radians(native_location.angle)) ) target_y = native_location.point.y + round( distance * math.sin(math.radians(native_location.angle)) ) target_location = CubeLocation( point=Point(x=target_x, y=target_y), angle=native_location.angle ) boundary_location = native_location.get_boundary_point(target_location) target_param = TargetPosition( cube_location=boundary_location, rotation_option=RotationOption.WithoutRotation, ) speed_param = Speed( max=clip(abs(speed), 0, 255), ) return await self._wait_arrival( await self._move_to_target(speed_param, target_param) )
def _step_to_point(self, step: int) -> int: return step * self.DEFAULT_ONE_STEP async def _move_to_target(self, speed: Speed, target: TargetPosition) -> float: assert self._cube is not None self._arrived = False executed_time = time.time() await self._cube.api.motor.motor_control_target( timeout=self.DEFAULT_TIMEOUT, movement_type=self.DEFAULT_MOVEMENT_TYPE, speed=speed, target=target, ) return executed_time async def _wait_arrival(self, executed_time: float): while not self._arrived: if not self._on_position_id: self.logger.debug("Position ID Missed") return False elif time.time() - executed_time < self.DEFAULT_TIMEOUT: await asyncio.sleep(self.MONITORING_CYCLE) else: break return self._arrived
[docs] async def turn(self, speed: int, degree: int) -> bool: if not self._on_position_id: return False assert self._location is not None if degree >= 0: rotation = RotationOption.RelativePositive else: rotation = RotationOption.RelativeNegative degree = -1 * degree current_location = CubeLocation( point=Point(x=STAY_CURRENT, y=STAY_CURRENT), angle=degree ) target_param = TargetPosition( cube_location=current_location, rotation_option=rotation, ) speed_param = Speed( max=clip(abs(speed), 0, 255), ) return await self._wait_arrival( await self._move_to_target(speed_param, target_param) )
[docs] async def move_to(self, speed: int, x: int, y: int) -> bool: if not self._on_position_id: return False assert self._native_location is not None assert self._location is not None relative_location = self._location relative_location.relative_location = CubeLocation( point=Point(x=x, y=y), angle=0 ) boundary_location = self._native_location.get_boundary_point( relative_location.to_absolute_location() ) target_param = TargetPosition( cube_location=boundary_location, rotation_option=RotationOption.WithoutRotation, ) speed_param = Speed( max=clip(abs(speed), 0, 255), ) return await self._wait_arrival( await self._move_to_target(speed_param, target_param) )
[docs] async def set_orientation(self, speed: int, degree: int) -> bool: if not self._on_position_id: return False assert self._native_location is not None if degree >= 0: rotation = RotationOption.AbsolutePositive else: rotation = RotationOption.AbsoluteNegative degree = -1 * degree coordinate_system = self._coordinate_system_class() degree = round(coordinate_system.to_native_angle(degree)) current_location = CubeLocation( point=Point(x=STAY_CURRENT, y=STAY_CURRENT), angle=degree ) target_param = TargetPosition( cube_location=current_location, rotation_option=rotation, ) speed_param = Speed( max=clip(abs(speed), 0, 255), ) return await self._wait_arrival( await self._move_to_target(speed_param, target_param) )
[docs] async def move_to_the_grid_cell(self, speed: int, cell_x: int, cell_y: int) -> bool: if not self._on_position_id: return False cell_point = self._cell_to_point(cell_x, cell_y) return await self.move_to(speed, cell_point.x, cell_point.y)
[docs] async def get_current_position(self) -> Optional[Tuple[int, int]]: if self._location: return ( self._location.relative_location.point.x, self._location.relative_location.point.y, ) else: return None
[docs] async def get_x(self) -> Optional[int]: if self._location: return self._location.relative_location.point.x else: return None
[docs] async def get_y(self) -> Optional[int]: if self._location: return self._location.relative_location.point.y else: return None
[docs] async def get_orientation(self) -> Optional[int]: if self._location: return self._location.relative_location.angle else: return None
[docs] async def get_grid(self) -> Optional[Tuple[int, int]]: if not self._on_position_id: return None assert self._location is not None (cell_x, cell_y) = self._point_to_cell(self._location.relative_location.point) return cell_x, cell_y
[docs] async def get_grid_x(self) -> Optional[int]: if not self._on_position_id: return None assert self._location is not None (cell_x, _) = self._point_to_cell(self._location.relative_location.point) return cell_x
[docs] async def get_grid_y(self) -> Optional[int]: if not self._on_position_id: return None assert self._location is not None (_, cell_y) = self._point_to_cell(self._location.relative_location.point) return cell_y
[docs] async def is_on_the_gird_cell(self, cell_x: int, cell_y: int) -> bool: if not self._on_position_id: return False assert self._location is not None (current_cell_x, current_cell_y) = self._point_to_cell( self._location.relative_location.point ) return (cell_x, cell_y) == (current_cell_x, current_cell_y)
def _cell_to_point(self, cell_x: int, cell_y: int) -> Point: return Point(x=round(self.CELL_SIZE * cell_x), y=round(self.CELL_SIZE * cell_y)) def _point_to_cell(self, relative_point: Point) -> Tuple[int, int]: cell = relative_point / self.CELL_SIZE return cell.x, cell.y
[docs] async def is_touched(self, item: StandardIdCard) -> bool: if not self._standard_id: return False try: current_item = StandardIdCard(self._standard_id.value) except ValueError: self.logger.debug( f"ValueError: Wrong Standard ID is detected:{self._standard_id.value}" ) return False return current_item == item
[docs] async def get_touched_card(self) -> Optional[int]: if not self._standard_id: return None try: current_item: Enum = StandardIdCard(self._standard_id.value) except ValueError: self.logger.debug( f"ValueError: Wrong Standard ID is detected:{self._standard_id.value}" ) return None self.logger.info(current_item.name) return current_item.value
[docs] async def get_cube_name(self) -> Optional[str]: assert self._cube is not None return self._cube.name
[docs] async def get_battery_level(self) -> Optional[int]: assert self._cube is not None battery_info = await self._cube.api.battery.read() if battery_info is not None: return battery_info.battery_level else: return None
[docs] async def get_3d_angle(self) -> Optional[Tuple[int, int, int]]: if self._cube_angle is None: return None return self._cube_angle.roll, self._cube_angle.pitch, self._cube_angle.yaw
[docs] async def get_posture(self) -> Optional[int]: if self._motion is None: return None else: return self._motion.posture.value
[docs] async def is_button_pressed(self) -> Optional[int]: if self._button is None: return None else: return self._button.state
[docs] async def turn_on_cube_lamp(self, r: int, g: int, b: int, duration: float) -> None: assert self._cube is not None duration = max(duration, 0) indicator_param = IndicatorParam( duration_ms=0, color=Color(r=r, g=g, b=b), ) await self._cube.api.indicator.turn_on(indicator_param) if duration > 0: await self.sleep(duration) await self._cube.api.indicator.turn_off_all()
[docs] async def turn_off_cube_lamp(self) -> None: assert self._cube is not None await self._cube.api.indicator.turn_off_all()
[docs] async def play_sound( self, note: int, duration: float, wait_to_complete: bool = True ) -> bool: assert self._cube is not None duration_ms = clip(int(duration * 1000), 1, 2550) try: note_name = Note(note) except ValueError: self.logger.debug(f"ValueError: note number {note} is unsupported") return False midi_notes = [ MidiNote( duration_ms=duration_ms, note=note_name, volume=255, ) ] await self._cube.api.sound.play_midi( repeat=1, midi_notes=midi_notes, ) if wait_to_complete: await self.sleep(duration) return True
[docs] async def stop_sound(self) -> None: assert self._cube is not None await self._cube.api.sound.stop()
[docs] async def is_magnet_in_contact(self) -> Optional[int]: if self._magnet is None: return None else: return self._magnet.state