# -*- coding: utf-8 -*-
# ************************************************************
#
# toio/simple/__init__.py
#
# Copyright 2023 Sony Interactive Entertainment Inc.
#
# ************************************************************
from __future__ import annotations
import asyncio
import math
import time
from enum import Enum, auto
from logging import NOTSET, NullHandler, StreamHandler, getLogger
from typing import ClassVar, Optional, Type
from toio.coordinate_systems import (
LocalCoordinateSystem,
VisualProgrammingCoordinateSystem,
)
from toio.cube import ToioCoreCube
from toio.cube.api.base_class import CubeCharacteristic, CubeNotificationHandler
from toio.cube.api.button import Button, ButtonInformation
from toio.cube.api.configuration import (
MagneticSensorCondition,
MagneticSensorFunction,
PostureAngleDetectionCondition,
PostureAngleDetectionType,
)
from toio.cube.api.id_information import (
IdInformation,
PositionId,
PositionIdMissed,
StandardId,
StandardIdMissed,
)
from toio.cube.api.indicator import Color, IndicatorParam
from toio.cube.api.motor import (
Motor,
MotorResponseCode,
MovementType,
ResponseMotorControlMultipleTargets,
ResponseMotorControlTarget,
RotationOption,
Speed,
TargetPosition,
)
from toio.cube.api.sensor import (
MagneticSensorData,
MotionDetectionData,
PostureAngleEulerData,
PostureDataType,
Sensor,
)
from toio.cube.api.sound import MidiNote, Note
from toio.position import (
STAY_CURRENT,
CubeLocation,
MatRect,
Point,
RelativeCubeLocation,
ToioMat,
)
from toio.scanner import BLEScanner
from toio.standard_id import StandardIdCard
from toio.utility import clip
logger = getLogger(__name__)
logger.setLevel(NOTSET)
handler = NullHandler()
handler.setLevel(NOTSET)
logger.addHandler(handler)
[docs]class Direction(Enum):
Forward = auto()
Backward = auto()
Right = auto()
Left = auto()
[docs]class SimpleCube(object):
"""
Access to toio core cube by easier method
Functions that like blocks in visual programming
"""
DEFAULT_ROTATION_OPTION: ClassVar[RotationOption] = RotationOption.AbsoluteOptimal
DEFAULT_MOVEMENT_TYPE: ClassVar[MovementType] = MovementType.Curve
DEFAULT_TIMEOUT: ClassVar[int] = 10
DEFAULT_ONE_STEP: ClassVar[int] = 1
CELL_SIZE: ClassVar[float] = 43.43
MONITORING_CYCLE: ClassVar[float] = 0.01
[docs] @classmethod
async def search(cls, name: Optional[str] = None, timeout: int = 5) -> ToioCoreCube:
if name is not None:
logger.info(f"search {name} in my registered devices")
devices = await BLEScanner.scan_registered_cubes_with_id(cube_id={name})
if len(devices) == 0:
logger.info(f"search {name}")
devices = await BLEScanner.scan_with_id(cube_id={name})
else:
logger.info("scan registered devices")
devices = await BLEScanner.scan_registered_cubes(1, timeout=timeout)
if len(devices) == 0:
logger.info("scan all devices")
devices = await BLEScanner.scan(1, timeout=timeout)
logger.info("scan complete")
logger.debug(devices)
if len(devices) != 1:
raise ValueError
return ToioCoreCube(interface=devices[0].interface, name=devices[0].name)
[docs] def __init__(
self,
name: Optional[str] = None,
timeout: int = 5,
coordinate_system_class: Type[
LocalCoordinateSystem
] = VisualProgrammingCoordinateSystem,
log_level: int = NOTSET,
) -> None:
self._native_location: Optional[CubeLocation] = None
self._location: Optional[RelativeCubeLocation] = None
self._standard_id: Optional[StandardId] = None
self._on_position_id: bool = False
self._on_standard_id: bool = False
self._mat: Optional[MatRect] = None
self._arrived: bool = False
self._coordinate_system_class: Type[
LocalCoordinateSystem
] = coordinate_system_class
if log_level is not NOTSET:
logger.setLevel(log_level)
log_handler = StreamHandler()
log_handler.setLevel(log_level)
logger.addHandler(log_handler)
self._event_loop = asyncio.get_event_loop()
self._cube: ToioCoreCube = self._event_loop.run_until_complete(
self.search(name=name, timeout=timeout)
)
self._motion: Optional[MotionDetectionData] = None
self._cube_angle: Optional[PostureAngleEulerData] = None
self._magnet: Optional[MagneticSensorData] = None
logger.debug("connecting")
self._event_loop.run_until_complete(self._cube.connect())
logger.debug(f"connected ({self._cube.name})")
self._button: Optional[ButtonInformation] = self._event_loop.run_until_complete(
self._cube.api.button.read()
)
self._set_sensor_configurations()
self._request_initial_information()
handlers: tuple[tuple[CubeCharacteristic, CubeNotificationHandler], ...] = (
(self._cube.api.id_information, self._id_notification_handler),
(self._cube.api.motor, self._motor_notification_handler),
(self._cube.api.sensor, self._motion_sensor_notification_handler),
(self._cube.api.button, self._button_notification_handler),
)
for characteristic, notification_handler in handlers:
self._event_loop.run_until_complete(
characteristic.register_notification_handler(notification_handler)
)
self._wait_to_obtain_initial_information()
def _set_sensor_configurations(self):
self._event_loop.run_until_complete(
self._cube.api.configuration.set_magnetic_sensor(
function_type=MagneticSensorFunction.MagnetState,
# function_type=MagneticSensorFunction.MagneticForce,
interval_ms=60,
condition=MagneticSensorCondition.ChangeDetection,
)
)
self._event_loop.run_until_complete(
self._cube.api.configuration.set_posture_angle_detection(
detection_type=PostureAngleDetectionType.Euler,
interval_ms=50,
condition=PostureAngleDetectionCondition.ChangeDetection,
)
)
def _request_initial_information(self):
self._event_loop.run_until_complete(
self._cube.api.sensor.request_motion_information()
)
self._event_loop.run_until_complete(
self._cube.api.sensor.request_posture_angle_information(
PostureDataType.Euler
)
)
self._event_loop.run_until_complete(
self._cube.api.sensor.request_magnetic_sensor_information()
)
def _wait_to_obtain_initial_information(self):
while not self._motion or not self._cube_angle or not self._magnet:
self._request_initial_information()
self._event_loop.run_until_complete(asyncio.sleep(0.1))
def __del__(self):
self.disconnect()
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
self.disconnect()
[docs] def disconnect(self):
logger.debug("disconnecting")
self._event_loop.run_until_complete(self._cube.disconnect())
logger.debug("disconnected")
[docs] def sleep(self, sleep_second: float):
self._event_loop.run_until_complete(asyncio.sleep(sleep_second))
def _id_notification_handler(self, payload: bytearray) -> None:
id_info = IdInformation.is_my_data(payload)
logger.debug(id_info)
if isinstance(id_info, PositionId):
self._native_location = id_info.center
for mat in ToioMat.mats:
if self._native_location.point in mat:
if mat != self._mat:
logger.debug(str(mat))
self._mat = mat
self._location = RelativeCubeLocation.new()
coordinate_system = self._coordinate_system_class(
origin=mat.center()
)
self._location.change_coordinate_system(coordinate_system)
else:
assert self._location is not None
self._location.from_absolute_location(self._native_location)
break
assert self._location is not None
self._on_position_id = True
elif isinstance(id_info, StandardId):
coordinate_system = self._coordinate_system_class()
id_info.angle = coordinate_system.from_native_angle(id_info.angle)
self._standard_id = id_info
self._on_standard_id = True
elif isinstance(id_info, PositionIdMissed):
self._location = None
self._mat = None
self._native_location = None
self._on_position_id = False
elif isinstance(id_info, StandardIdMissed):
self._standard_id = None
self._on_standard_id = False
def _motor_notification_handler(self, payload: bytearray) -> None:
motor_response = Motor.is_my_data(payload)
logger.debug(motor_response)
if isinstance(
motor_response,
(ResponseMotorControlTarget, ResponseMotorControlMultipleTargets),
):
if (
motor_response.response_code == MotorResponseCode.SUCCESS
or motor_response.response_code
== MotorResponseCode.SUCCESS_WITH_OVERWRITE
):
self._arrived = True
def _motion_sensor_notification_handler(self, payload: bytearray) -> None:
sensor_info = Sensor.is_my_data(payload)
logger.debug(sensor_info)
if isinstance(sensor_info, MotionDetectionData):
self._motion = sensor_info
elif isinstance(sensor_info, PostureAngleEulerData):
self._cube_angle = sensor_info
elif isinstance(sensor_info, MagneticSensorData):
self._magnet = sensor_info
def _button_notification_handler(self, payload: bytearray) -> None:
button_info = Button.is_my_data(payload)
logger.debug(button_info)
if isinstance(button_info, ButtonInformation):
self._button = button_info
[docs] def move(self, speed: int, duration: float, wait_to_complete: bool = True) -> None:
duration = max(duration, 0)
duration_ms = int(duration * 1000)
self._event_loop.run_until_complete(
self._cube.api.motor.motor_control(speed, speed, duration_ms)
)
if wait_to_complete:
self.sleep(duration)
[docs] def spin(self, speed: int, duration: float, wait_to_complete: bool = True) -> None:
"""
speed: (negative value: anticlockwise)
"""
duration = max(duration, 0)
duration_ms = int(duration * 1000)
self._event_loop.run_until_complete(
self._cube.api.motor.motor_control(speed, -speed, duration_ms)
)
if wait_to_complete:
self.sleep(duration)
[docs] def run_motor(
self,
left_speed: int,
right_speed: int,
duration: float,
wait_to_complete: bool = True,
) -> None:
duration = max(duration, 0)
duration_ms = int(duration * 1000)
self._event_loop.run_until_complete(
self._cube.api.motor.motor_control(left_speed, right_speed, duration_ms)
)
if wait_to_complete:
self.sleep(duration)
[docs] def stop_motor(self) -> None:
self._event_loop.run_until_complete(self._cube.api.motor.motor_control(0, 0))
[docs] def move_steps(self, direction: Direction, speed: int, step: int) -> bool:
if not self._on_position_id:
return False
assert self._native_location is not None
if direction == Direction.Forward:
distance = self._step_to_point(step)
elif direction == Direction.Backward:
distance = -1 * self._step_to_point(step)
else:
return False
native_location = self._native_location
target_x = native_location.point.x + round(
distance * math.cos(math.radians(native_location.angle))
)
target_y = native_location.point.y + round(
distance * math.sin(math.radians(native_location.angle))
)
target_location = CubeLocation(
point=Point(x=target_x, y=target_y), angle=native_location.angle
)
boundary_location = native_location.get_boundary_point(target_location)
target_param = TargetPosition(
cube_location=boundary_location,
rotation_option=RotationOption.WithoutRotation,
)
speed_param = Speed(
max=clip(abs(speed), 0, 255),
)
return self._wait_arrival(self._move_to_target(speed_param, target_param))
def _step_to_point(self, step: int) -> int:
return step * self.DEFAULT_ONE_STEP
def _move_to_target(self, speed: Speed, target: TargetPosition) -> float:
self._arrived = False
executed_time = time.time()
self._event_loop.run_until_complete(
self._cube.api.motor.motor_control_target(
timeout=self.DEFAULT_TIMEOUT,
movement_type=self.DEFAULT_MOVEMENT_TYPE,
speed=speed,
target=target,
)
)
return executed_time
def _wait_arrival(self, executed_time: float):
while not self._arrived:
if not self._on_position_id:
logger.debug("Position ID Missed")
return False
elif time.time() - executed_time < self.DEFAULT_TIMEOUT:
self._event_loop.run_until_complete(
asyncio.sleep(self.MONITORING_CYCLE)
)
else:
break
return self._arrived
[docs] def turn(self, speed: int, degree: int) -> bool:
if not self._on_position_id:
return False
assert self._location is not None
if degree >= 0:
rotation = RotationOption.RelativePositive
else:
rotation = RotationOption.RelativeNegative
degree = -1 * degree
current_location = CubeLocation(
point=Point(x=STAY_CURRENT, y=STAY_CURRENT), angle=degree
)
target_param = TargetPosition(
cube_location=current_location,
rotation_option=rotation,
)
speed_param = Speed(
max=clip(abs(speed), 0, 255),
)
return self._wait_arrival(self._move_to_target(speed_param, target_param))
[docs] def move_to(self, speed: int, x: int, y: int) -> bool:
if not self._on_position_id:
return False
assert self._native_location is not None
assert self._location is not None
relative_location = self._location
relative_location.relative_location = CubeLocation(
point=Point(x=x, y=y), angle=0
)
boundary_location = self._native_location.get_boundary_point(
relative_location.to_absolute_location()
)
target_param = TargetPosition(
cube_location=boundary_location,
rotation_option=RotationOption.WithoutRotation,
)
speed_param = Speed(
max=clip(abs(speed), 0, 255),
)
return self._wait_arrival(self._move_to_target(speed_param, target_param))
[docs] def set_orientation(self, speed: int, degree: int) -> bool:
if not self._on_position_id:
return False
assert self._native_location is not None
if degree >= 0:
rotation = RotationOption.AbsolutePositive
else:
rotation = RotationOption.AbsoluteNegative
degree = -1 * degree
coordinate_system = self._coordinate_system_class()
degree = round(coordinate_system.to_native_angle(degree))
current_location = CubeLocation(
point=Point(x=STAY_CURRENT, y=STAY_CURRENT), angle=degree
)
target_param = TargetPosition(
cube_location=current_location,
rotation_option=rotation,
)
speed_param = Speed(
max=clip(abs(speed), 0, 255),
)
return self._wait_arrival(self._move_to_target(speed_param, target_param))
[docs] def move_to_the_grid_cell(self, speed: int, cell_x: int, cell_y: int) -> bool:
if not self._on_position_id:
return False
cell_point = self._cell_to_point(cell_x, cell_y)
return self.move_to(speed, cell_point.x, cell_point.y)
[docs] def get_current_position(self) -> Optional[tuple[int, int]]:
if self._location:
return (
self._location.relative_location.point.x,
self._location.relative_location.point.y,
)
else:
return None
[docs] def get_x(self) -> Optional[int]:
if self._location:
return self._location.relative_location.point.x
else:
return None
[docs] def get_y(self) -> Optional[int]:
if self._location:
return self._location.relative_location.point.y
else:
return None
[docs] def get_orientation(self) -> Optional[int]:
if self._location:
return self._location.relative_location.angle
else:
return None
[docs] def get_grid(self) -> Optional[tuple[int, int]]:
if not self._on_position_id:
return None
assert self._location is not None
(cell_x, cell_y) = self._point_to_cell(self._location.relative_location.point)
return cell_x, cell_y
[docs] def get_grid_x(self) -> Optional[int]:
if not self._on_position_id:
return None
assert self._location is not None
(cell_x, _) = self._point_to_cell(self._location.relative_location.point)
return cell_x
[docs] def get_grid_y(self) -> Optional[int]:
if not self._on_position_id:
return None
assert self._location is not None
(_, cell_y) = self._point_to_cell(self._location.relative_location.point)
return cell_y
[docs] def is_on_the_gird_cell(self, cell_x: int, cell_y: int) -> bool:
if not self._on_position_id:
return False
assert self._location is not None
(current_cell_x, current_cell_y) = self._point_to_cell(
self._location.relative_location.point
)
return (cell_x, cell_y) == (current_cell_x, current_cell_y)
def _cell_to_point(self, cell_x: int, cell_y: int) -> Point:
return Point(x=round(self.CELL_SIZE * cell_x), y=round(self.CELL_SIZE * cell_y))
def _point_to_cell(self, relative_point: Point) -> tuple[int, int]:
cell = relative_point / self.CELL_SIZE
return cell.x, cell.y
[docs] def is_touched(self, item: StandardIdCard) -> bool:
if not self._standard_id:
return False
try:
current_item = StandardIdCard(self._standard_id.value)
except ValueError:
logger.debug(
f"ValueError: Wrong Standard ID is detected:{self._standard_id.value}"
)
return False
return current_item == item
[docs] def get_touched_card(self) -> Optional[int]:
if not self._standard_id:
return None
try:
current_item: Enum = StandardIdCard(self._standard_id.value)
except ValueError:
logger.debug(
f"ValueError: Wrong Standard ID is detected:{self._standard_id.value}"
)
return None
logger.info(current_item.name)
return current_item.value
[docs] def get_cube_name(self) -> Optional[str]:
return self._cube.name
[docs] def get_battery_level(self) -> Optional[int]:
battery_info = self._event_loop.run_until_complete(
self._cube.api.battery.read()
)
if battery_info is not None:
return battery_info.battery_level
else:
return None
[docs] def get_3d_angle(self) -> Optional[tuple[int, int, int]]:
if self._cube_angle is None:
return None
return self._cube_angle.roll, self._cube_angle.pitch, self._cube_angle.yaw
[docs] def get_posture(self) -> Optional[int]:
if self._motion is None:
return None
else:
return self._motion.posture.value
[docs] def turn_on_cube_lamp(self, r: int, g: int, b: int, duration: float) -> None:
duration = max(duration, 0)
indicator_param = IndicatorParam(
duration_ms=0,
color=Color(r=r, g=g, b=b),
)
self._event_loop.run_until_complete(
self._cube.api.indicator.turn_on(indicator_param)
)
if duration > 0:
self.sleep(duration)
self._event_loop.run_until_complete(self._cube.api.indicator.turn_off_all())
[docs] def turn_off_cube_lamp(self) -> None:
self._event_loop.run_until_complete(self._cube.api.indicator.turn_off_all())
[docs] def play_sound(
self, note: int, duration: float, wait_to_complete: bool = True
) -> bool:
duration_ms = clip(int(duration * 100), 1, 255)
try:
note_name = Note(note)
except ValueError:
logger.debug(f"ValueError: note number {note} is unsupported")
return False
midi_notes = [
MidiNote(
duration_ms=duration_ms,
note=note_name,
volume=255,
)
]
self._event_loop.run_until_complete(
self._cube.api.sound.play_midi(
repeat=1,
midi_notes=midi_notes,
)
)
if wait_to_complete:
self.sleep(duration)
return True
[docs] def stop_sound(self) -> None:
self._event_loop.run_until_complete(self._cube.api.sound.stop())