# -*- coding: utf-8 -*-
# ************************************************************
#
# configuration.py
#
# Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************
import pprint
import struct
from enum import IntEnum
from typing_extensions import Optional, TypeAlias, Union
from ...device_interface import CubeInterface, GattReadData
from ...logger import get_toio_logger
from ...toio_uuid import ToioUuid
from ...utility import clip
from ..api.base_class import CubeCharacteristic, CubeCommand, CubeResponse
from ..notification_handler_info import NotificationReceivedDevice
logger = get_toio_logger(__name__)
[docs]class RequestProtocolVersion(CubeCommand):
"""
Protocol version request command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#requesting-the-ble-protocol-version
"""
_payload_id = 0x01
[docs] def __init__(self) -> None:
pass
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00))
[docs]class SetHorizontalDetectionThreshold(CubeCommand):
"""
Horizontal detection threshold setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#horizontal-detection-threshold-settings
"""
_payload_id = 0x05
[docs] def __init__(self, threshold: int) -> None:
self.threshold = clip(threshold, 1, 45)
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00, self.threshold))
[docs]class SetCollisionDetectionThreshold(CubeCommand):
"""
Collision detection threshold setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#collision-detection-threshold-settings
"""
_payload_id = 0x06
[docs] def __init__(self, threshold: int) -> None:
self.threshold = clip(threshold, 1, 10)
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00, self.threshold))
[docs]class SetDoubleTapDetectionTimeInterval(CubeCommand):
"""
Double-tap detection time interval setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#double-tap-detection-time-interval-settings
"""
_payload_id = 0x17
[docs] def __init__(self, threshold: int) -> None:
self.threshold = clip(threshold, 1, 7)
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00, self.threshold))
[docs]class NotificationCondition(IntEnum):
"""
Notification conditions of ID notification
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#notification-conditions
"""
Always = 0x00
ChangeDetection = 0x01
Periodic = 0xFF
[docs]class SetIdNotification(CubeCommand):
"""
ID notification setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#identification-sensor-id-notification-settings
"""
_payload_id = 0x18
[docs] def __init__(self, interval_ms: int, condition: NotificationCondition) -> None:
self.interval = clip(int(interval_ms / 10), 0, 255)
self.condition = condition
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00, self.interval, self.condition))
[docs]class SetIdMissedNotification(CubeCommand):
"""
ID missed notification setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#identification-sensor-id-missed-notification-settings
"""
_payload_id = 0x19
[docs] def __init__(self, sensitivity_ms: int) -> None:
self.sensitivity = clip(int(sensitivity_ms / 10), 0, 255)
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00, self.sensitivity))
[docs]class MagneticSensorFunction(IntEnum):
Disable = 0x00
MagnetState = 0x01
MagneticForce = 0x02
[docs]class MagneticSensorCondition(IntEnum):
Always = 0x00
ChangeDetection = 0x01
[docs]class SetMagneticSensor(CubeCommand):
"""
Magnet sensor setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#magnetic-sensor-settings-
"""
_payload_id = 0x1B
[docs] def __init__(
self,
function_type: MagneticSensorFunction,
interval_ms: int,
condition: MagneticSensorCondition,
) -> None:
self.function_type = function_type
self.interval = clip(int(interval_ms / 20), 0, 255)
self.condition = condition
[docs] def __bytes__(self) -> bytes:
return bytes(
(self._payload_id, 0x00, self.function_type, self.interval, self.condition)
)
[docs]class PostureAngleDetectionType(IntEnum):
Euler = 0x01
Quaternions = 0x02
HighPrecisionEuler = 0x03
[docs]class PostureAngleDetectionCondition(IntEnum):
"""
Notification condition of posture angle detection
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#notification-conditions-1
"""
Always = 0x00
ChangeDetection = 0x01
[docs]class SetPostureAngleDetection(CubeCommand):
"""
Posture angle detection setting command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#posture-angle-detection-settings-
"""
_payload_id = 0x1D
[docs] def __init__(
self,
detection_type: PostureAngleDetectionType,
interval_ms: int,
condition: PostureAngleDetectionCondition,
):
self.detection_type = detection_type
self.interval = clip(int(interval_ms / 10), 0, 255)
self.condition = condition
[docs] def __bytes__(self) -> bytes:
return bytes(
(self._payload_id, 0x00, self.detection_type, self.interval, self.condition)
)
[docs]class ConnectionInterval:
"""
Representation of BLE connection interval
Attributes:
value (int): connection interval value
"""
BLE_MIN_INTERVAL = 6 # 7.5[ms]
"""Minimum connection interval defined by BLE specifications"""
BLE_MAX_INTERVAL = 3200 # 4.0[s]
"""Maximum connection interval defined by BLE specifications"""
BLE_INTERVAL_UNIT = 1.25 # 1.25[ms]
"""Millisecond per unit"""
BLE_INTERVAL_NONE = 0xFFFF
"""This means 'connection interval is not specified'"""
@staticmethod
def _check_interval_value(interval_value: int) -> int:
if interval_value == ConnectionInterval.BLE_INTERVAL_NONE:
return interval_value
if (
interval_value < ConnectionInterval.BLE_MIN_INTERVAL
or ConnectionInterval.BLE_MAX_INTERVAL < interval_value
):
raise ValueError("wrong value:%d" % interval_value)
else:
return interval_value
[docs] @staticmethod
def from_ms(interval_ms: float) -> int:
"""
Convert millisecond to connection interval value.
Args:
interval_ms (float): interval (millisecond)
Returns:
int: connection interval value
"""
interval_value = ConnectionInterval._check_interval_value(
int(round(interval_ms / ConnectionInterval.BLE_INTERVAL_UNIT))
)
logger.info("interval_value:%d", interval_value)
return interval_value
[docs] @staticmethod
def to_ms(interval_value: int) -> float:
"""
Convert connection interval value to millisecond.
Args:
interval_value (int): interval_value
Returns:
float: connection interval time (millisecond)
"""
return interval_value * ConnectionInterval.BLE_INTERVAL_UNIT
[docs] def __init__(self, interval: int):
self.value = ConnectionInterval._check_interval_value(interval)
@property
def value_ms(self):
"""
Return the connection interval time of ConnectionInterval instance.
Returns:
float: connection interval time (millisecond)
"""
return ConnectionInterval.to_ms(self.value)
def __int__(self):
return self.value
[docs] def __str__(self):
return "connection interval: %d (%fms)" % (self.value, self.value_ms)
[docs]class RequestConnectionInterval(CubeCommand):
"""
Request to change bluetooth Connection interval
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#request-to-change-connection-interval-
"""
_payload_id = 0x30
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == RequestConnectionInterval._payload_id
[docs] def __init__(self, min_interval: int, max_interval: int):
if (
min_interval != 0xFFFF
and max_interval != 0xFFFF
and min_interval > max_interval
):
raise ValueError
self.min_interval = ConnectionInterval(min_interval)
self.max_interval = ConnectionInterval(max_interval)
[docs] def __bytes__(self) -> bytes:
return struct.pack(
"<BBHH",
self._payload_id,
0x01,
self.min_interval.value,
self.max_interval.value,
)
[docs]class GetRequestedConnectionIntervalValue(CubeCommand):
"""
Get requested connection interval value
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#obtaining-the-requested-connection-interval-value-
"""
_payload_id = 0x31
[docs] def __init__(self):
pass
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00))
[docs]class GetCurrentConnectionIntervalValue(CubeCommand):
"""
Get current connection interval value
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#obtaining-the-actual-connection-interval-value-
"""
_payload_id = 0x32
[docs] def __init__(self):
pass
[docs] def __bytes__(self) -> bytes:
return bytes((self._payload_id, 0x00))
[docs]class ProtocolVersion(CubeResponse):
"""
Protocol version response
Attributes:
version (str): version (UTF-8)
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#obtaining-the-ble-protocol-version
"""
_payload_id = 0x81
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ProtocolVersion._payload_id
[docs] def __init__(self, payload: GattReadData):
if ProtocolVersion.is_myself(payload):
version = payload[2:]
self._major = 0
self._minor = 0
self._patch = 0
self.version = version.decode("UTF-8")
version_numbers = self.version.split(".")
if len(version_numbers) >= 3:
self._major = int(version_numbers[0])
self._minor = int(version_numbers[1])
self._patch = int(version_numbers[2])
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseIdNotificationSettings(CubeResponse):
"""
ID notification setting response
Attributes:
result (bool): Result of the command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-identification-sensor-id-notification-settings
"""
_payload_id = 0x98
_converter = struct.Struct("<BBB")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseIdNotificationSettings._payload_id
[docs] def __init__(self, payload: GattReadData):
if ResponseIdNotificationSettings.is_myself(payload):
_, _, result = self._converter.unpack_from(payload)
self.result = result == 0x00
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseIdMissedNotificationSettings(CubeResponse):
"""
ID missed notification setting response
Attributes:
result (bool): Result of the command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-identification-sensor-id-missed-notification-settings
"""
_payload_id = 0x99
_converter = struct.Struct("<BBB")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseIdMissedNotificationSettings._payload_id
[docs] def __init__(self, payload: GattReadData):
if ResponseIdMissedNotificationSettings.is_myself(payload):
_, _, result = self._converter.unpack_from(payload)
self.result = result == 0x00
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseMagneticSensorSettings(CubeResponse):
"""
Magnetic sensor setting response
Attributes:
result (bool): Result of the command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-magnetic-sensor-settings
"""
_payload_id = 0x9B
_converter = struct.Struct("<BBB")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseMagneticSensorSettings._payload_id
[docs] def __init__(self, payload: GattReadData):
if ResponseMagneticSensorSettings.is_myself(payload):
_, _, result = self._converter.unpack_from(payload)
self.result = result == 0x00
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponsePostureAngleDetectionSettings(CubeResponse):
"""
Posture angle detection setting response
Attributes:
result (bool): Result of the command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-posture-angle-detection-settings-
"""
_payload_id = 0x9D
_converter = struct.Struct("<BBB")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponsePostureAngleDetectionSettings._payload_id
[docs] def __init__(self, payload: GattReadData):
if ResponsePostureAngleDetectionSettings.is_myself(payload):
_, _, result = self._converter.unpack_from(payload)
self.result = result == 0x00
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseConnectionIntervalRequest(CubeResponse):
"""
Protocol version response
Attributes:
result (bool): Result of the command
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-request-to-change-connection-interval-
"""
_payload_id = 0xB0
_converter = struct.Struct("<BBB")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseConnectionIntervalRequest._payload_id
[docs] def __init__(self, payload: GattReadData):
if ResponseConnectionIntervalRequest.is_myself(payload):
_, _, result = self._converter.unpack_from(payload)
self.result = result == 0x00
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseGettingRequestedConnectionInterval(CubeResponse):
"""
Response of getting requested connection interval value
Attributes:
min_interval (ConnectionInterval): minimum connection interval
max_interval (ConnectionInterval): maximum connection interval
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-obtain-the-requested-connection-interval-value-
"""
_payload_id = 0xB1
_converter = struct.Struct("<BBHH")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseGettingRequestedConnectionInterval._payload_id
[docs] def __init__(self, payload: GattReadData):
if self.is_myself(payload):
_, _, min_interval, max_interval = self._converter.unpack_from(payload)
self.min_interval = ConnectionInterval(min_interval)
self.max_interval = ConnectionInterval(max_interval)
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
[docs]class ResponseGettingCurrentConnectionInterval(CubeResponse):
"""
Response of getting current connection interval value
Attributes:
min_interval (ConnectionInterval): minimum connection interval
max_interval (ConnectionInterval): maximum connection interval
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#responses-to-obtain-the-actual-connection-interval-value-
"""
_payload_id = 0xB2
_converter = struct.Struct("<BBH")
[docs] @staticmethod
def is_myself(payload: GattReadData) -> bool:
return payload[0] == ResponseGettingCurrentConnectionInterval._payload_id
[docs] def __init__(self, payload: GattReadData):
if self.is_myself(payload):
_, _, interval = self._converter.unpack_from(payload)
self.interval = ConnectionInterval(interval)
else:
raise TypeError("wrong payload")
[docs] def __str__(self) -> str:
return pprint.pformat(vars(self))
ConfigurationResponseType: TypeAlias = Union[
ProtocolVersion,
ResponseIdNotificationSettings,
ResponseIdMissedNotificationSettings,
ResponseMagneticSensorSettings,
ResponseMotorSpeedInformationAcquisitionSettings,
ResponsePostureAngleDetectionSettings,
ResponseConnectionIntervalRequest,
ResponseGettingRequestedConnectionInterval,
ResponseGettingCurrentConnectionInterval,
]
"""
Response types of configuration characteristic
"""
[docs]class Configuration(CubeCharacteristic):
"""
Configuration characteristic
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration
"""
[docs] @staticmethod
def is_my_data(payload: GattReadData) -> Optional[ConfigurationResponseType]:
if ProtocolVersion.is_myself(payload):
return ProtocolVersion(payload)
elif ResponseIdNotificationSettings.is_myself(payload):
return ResponseIdNotificationSettings(payload)
elif ResponseIdMissedNotificationSettings.is_myself(payload):
return ResponseIdMissedNotificationSettings(payload)
elif ResponseMagneticSensorSettings.is_myself(payload):
return ResponseMagneticSensorSettings(payload)
elif ResponseMotorSpeedInformationAcquisitionSettings.is_myself(payload):
return ResponseMotorSpeedInformationAcquisitionSettings(payload)
elif ResponsePostureAngleDetectionSettings.is_myself(payload):
return ResponsePostureAngleDetectionSettings(payload)
elif ResponseConnectionIntervalRequest.is_myself(payload):
return ResponseConnectionIntervalRequest(payload)
elif ResponseGettingRequestedConnectionInterval.is_myself(payload):
return ResponseGettingRequestedConnectionInterval(payload)
elif ResponseGettingCurrentConnectionInterval.is_myself(payload):
return ResponseGettingCurrentConnectionInterval(payload)
else:
return None
[docs] def __init__(
self, interface: CubeInterface, device: NotificationReceivedDevice
) -> None:
self.interface = interface
super().__init__(interface, ToioUuid.Config.value, device)
[docs] async def request_protocol_version(self) -> None:
"""
Send protocol version request command
This function DO NOT return response payload.
Receive the result by notification.
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#requesting-the-ble-protocol-version
"""
command = RequestProtocolVersion()
await self._write(bytes(command))
[docs] async def set_horizontal_detection_threshold(self, threshold: int) -> None:
"""
Send horizontal detection threshold setting command
This function DO NOT return response payload.
Receive the result by notification.
Args:
threshold (int): Threshold
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#horizontal-detection-threshold-settings
"""
command = SetHorizontalDetectionThreshold(threshold)
await self._write(bytes(command))
[docs] async def set_collision_detection_threshold(self, threshold: int) -> None:
"""
Send collision detection threshold setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
threshold (int): Threshold
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#collision-detection-threshold-settings
"""
command = SetHorizontalDetectionThreshold(threshold)
await self._write(bytes(command))
[docs] async def set_double_tap_detection_threshold(self, threshold: int) -> None:
"""
Send double-tap detection threshold setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
threshold (int): Threshold
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#double-tap-detection-time-interval-settings
"""
command = SetDoubleTapDetectionTimeInterval(threshold)
await self._write(bytes(command))
[docs] async def set_id_notification(
self, interval_ms: int, condition: NotificationCondition
) -> None:
"""
Send id information notification setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
interval_ms (int): Notification interval [ms]
condition (NotificationCondition): Condition
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#identification-sensor-id-notification-settings
"""
command = SetIdNotification(interval_ms, condition)
await self._write(bytes(command))
[docs] async def set_id_missed_notification(self, sensitivity_ms: int) -> None:
"""
Send ID missed notification setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
sensitivity_ms (int): Sensitivity [ms]
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#identification-sensor-id-missed-notification-settings
"""
command = SetIdMissedNotification(sensitivity_ms)
await self._write(bytes(command))
[docs] async def set_magnetic_sensor(
self,
function_type: MagneticSensorFunction,
interval_ms: int,
condition: MagneticSensorCondition,
) -> None:
"""
Send magnetic sensor setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
function_type (MagneticSensorFunction): Function type
interval_ms (int): Notification interval [ms]
condition (MagneticSensorCondition): Condition
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#magnetic-sensor-settings-
"""
command = SetMagneticSensor(function_type, interval_ms, condition)
await self._write(bytes(command))
[docs] async def set_posture_angle_detection(
self,
detection_type: PostureAngleDetectionType,
interval_ms: int,
condition: PostureAngleDetectionCondition,
) -> None:
"""
Send posture angle setting request command
This function DO NOT return response payload.
Receive the result by notification.
Args:
detection_type (PostureAngleDetectionType): Detection type
interval_ms (int): Notification interval [ms]
condition (PostureAngleDetectionCondition): Condition
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#posture-angle-detection-settings-
"""
command = SetPostureAngleDetection(detection_type, interval_ms, condition)
await self._write(bytes(command))
[docs] async def request_to_change_connection_interval(
self, min_interval: int, max_interval: int
) -> None:
"""
Request the connected central device to change the connection interval.
Note:
If central device can not accept the requested connection interval value,
one of the following occurs:
- Bluetooth connection is disconnected.
- Another connection interval value is set that can be accepted by the central device.
`max_interval` must be greater than or equal to `min_interval`. (except the value is 0xFFFF)
Args:
min_interval (int): min_interval, from 6 to 3200, or 0xFFFF (0xFFFF means "to be determined by central")
max_interval (int): max_interval, from 6 to 3200, or 0xFFFF (0xFFFF means "to be determined by central")
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#request-to-change-connection-interval-
"""
command = RequestConnectionInterval(min_interval, max_interval)
await self._write(bytes(command))
[docs] async def get_requested_connection_interval(self) -> None:
"""
Get requested connection interval value.
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#obtaining-the-requested-connection-interval-value-
"""
command = GetRequestedConnectionIntervalValue()
await self._write(bytes(command))
[docs] async def get_current_connection_interval(self) -> None:
"""
Get current connection interval value.
References:
https://toio.github.io/toio-spec/en/docs/ble_configuration#obtaining-the-actual-connection-interval-value-
"""
command = GetCurrentConnectionIntervalValue()
await self._write(bytes(command))