Source code for toio.cube.api.base_class

# -*- coding: utf-8 -*-
# ************************************************************
#
#     base_class.py
#
#     Copyright 2022 Sony Interactive Entertainment Inc.
#
# ************************************************************

import binascii
import inspect
from abc import ABCMeta, abstractmethod
from typing import Awaitable, Callable, Optional, Union
from uuid import UUID

from toio.device_interface import (
    CubeInterface,
    GattCharacteristic,
    GattNotificationHandler,
    GattReadData,
    GattWriteData,
)
from toio.logger import get_toio_logger

logger = get_toio_logger(__name__)

DUMP_RAW_READ_DATA = False
DUMP_RAW_WRITE_DATA = False

CubeNotificationHandler = Callable[[bytearray], Union[None, Awaitable[None]]]


[docs]class CubeCommand(metaclass=ABCMeta):
[docs] @abstractmethod def __bytes__(self) -> bytes: """Returns the byte representation of this class to be sent to cube. Returns: bytes: byte representation of this class to be sent to cube. """ raise NotImplementedError()
[docs]class CubeResponse(metaclass=ABCMeta):
[docs] @staticmethod @abstractmethod def is_myself(data: GattReadData) -> bool: """If argument data is a byte representation of this class, this function converts the byte representation to an object and returns it. Args: data (GattReadData): received data from the cube. """ raise NotImplementedError()
[docs]class CubeCharacteristic(metaclass=ABCMeta):
[docs] @staticmethod @abstractmethod def is_my_data(payload: GattReadData) -> Optional[CubeResponse]: """If payload is my characteristic response, this function returns CubeResponse object. Otherwise, it returns None. Args: payload (GattReadData): received data from the cube. """ raise NotImplementedError()
[docs] def __init__(self, interface: CubeInterface, uuid: UUID): self.interface = interface self.uuid = uuid self.notification_handler_list: list = [] self.notification_handler_is_registered = False
async def _read(self) -> GattReadData: """Raw interface to GATT for reading.""" read_data = await self.interface.read(self.uuid) if DUMP_RAW_READ_DATA: logger.debug("READ: %s", binascii.hexlify(bytes(read_data), " ")) return read_data async def _write(self, data: GattWriteData) -> None: """Raw interface to GATT for writing.""" if DUMP_RAW_WRITE_DATA: logger.debug("WRITE: %s", binascii.hexlify(bytes(data), " ")) return await self.interface.write(self.uuid, data, response=True) async def _write_without_response(self, data: GattWriteData) -> None: """Raw interface to GATT for writing. (without response)""" if DUMP_RAW_WRITE_DATA: logger.debug( "WRITE WITHOUT RESPONSE: %s", binascii.hexlify(bytes(data), " ") ) return await self.interface.write(self.uuid, data, response=False) async def _register_notification_handler( self, handler: GattNotificationHandler ) -> bool: """Raw interface to GATT for registering handler function.""" return await self.interface.register_notification_handler(self.uuid, handler) async def _unregister_notification_handler(self) -> bool: """Raw interface to GATT for unregistering handler function.""" return await self.interface.unregister_notification_handler(self.uuid) async def _root_notification_handler( self, _: GattCharacteristic, payload: bytearray ) -> None: for handler in self.notification_handler_list: if inspect.iscoroutinefunction(handler): await handler(payload) else: handler(payload)
[docs] async def register_notification_handler( self, handler: CubeNotificationHandler ) -> bool: """User interface to GATT for registering handler function.""" if handler in self.notification_handler_list: return False self.notification_handler_list.append(handler) if not self.notification_handler_is_registered: await self._register_notification_handler(self._root_notification_handler) self.notification_handler_is_registered = True return True
[docs] async def unregister_notification_handler( self, handler: Optional[CubeNotificationHandler] ) -> bool: """User interface to GATT for unregistering handler function.""" if handler is None: self.notification_handler_list = [] return True if handler in self.notification_handler_list: self.notification_handler_list.remove(handler) if ( len(self.notification_handler_list) == 0 and self.notification_handler_is_registered ): await self._unregister_notification_handler() self.notification_handler_is_registered = False return True