You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
blivechat/blcsdk/client.py

224 lines
7.4 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
import asyncio
import logging
from typing import *
import aiohttp
from . import handlers
from . import models
__all__ = (
'BlcPluginClient',
)
logger = logging.getLogger('blcsdk')
class BlcPluginClient:
"""
blivechat插件服务的客户端
:param ws_url: blivechat消息转发服务WebSocket地址
:param session: 连接池
:param heartbeat_interval: 发送心跳包的间隔时间(秒)
"""
def __init__(
self,
ws_url: str,
*,
session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval: float = 30,
):
self._ws_url = ws_url
if session is None:
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
self._own_session = True
else:
self._session = session
self._own_session = False
assert self._session.loop is asyncio.get_event_loop() # noqa
self._heartbeat_interval = heartbeat_interval
self._handler: Optional[handlers.HandlerInterface] = None
"""消息处理器"""
# 在运行时初始化的字段
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
"""WebSocket连接"""
self._network_future: Optional[asyncio.Future] = None
"""网络协程的future"""
self._heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
"""发心跳包定时器的handle"""
@property
def is_running(self) -> bool:
"""本客户端正在运行注意调用stop后还没完全停止也算正在运行"""
return self._network_future is not None
def set_handler(self, handler: Optional['handlers.HandlerInterface']):
"""
设置消息处理器
注意消息处理器和网络协程运行在同一个协程如果处理消息耗时太长会阻塞接收消息。如果是CPU密集型的任务建议将消息推到线程池处理
如果是IO密集型的任务应该使用async函数并且在handler里使用create_task创建新的协程
:param handler: 消息处理器
"""
self._handler = handler
def start(self):
"""启动本客户端"""
if self.is_running:
logger.warning('Plugin client is running, cannot start() again')
return
self._network_future = asyncio.create_task(self._network_coroutine_wrapper())
def stop(self):
"""停止本客户端"""
if not self.is_running:
logger.warning('Plugin client is stopped, cannot stop() again')
return
self._network_future.cancel()
async def stop_and_close(self):
"""便利函数,停止本客户端并释放本客户端的资源,调用后本客户端将不可用"""
if self.is_running:
self.stop()
await self.join()
await self.close()
async def join(self):
"""等待本客户端停止"""
if not self.is_running:
logger.warning('Plugin client is stopped, cannot join()')
return
await asyncio.shield(self._network_future)
async def close(self):
"""释放本客户端的资源,调用后本客户端将不可用"""
if self.is_running:
logger.warning('Plugin is calling close(), but client is running')
# 如果session是自己创建的则关闭session
if self._own_session:
await self._session.close()
async def send_cmd_data(self, cmd: models.Command, data: dict):
"""
发送消息给服务器
:param cmd: 消息类型见Command
:param data: 消息体JSON数据
"""
if self._websocket is None or self._websocket.closed:
raise ConnectionResetError('websocket is closed')
body = {'cmd': cmd, 'data': data}
await self._websocket.send_json(body)
async def _network_coroutine_wrapper(self):
"""负责处理网络协程的异常网络协程具体逻辑在_network_coroutine里"""
exc = None
try:
await self._network_coroutine()
except asyncio.CancelledError:
# 正常停止
pass
except Exception as e:
logger.exception('_network_coroutine() finished with exception:')
exc = e
finally:
logger.debug('_network_coroutine() finished')
self._network_future = None
if self._handler is not None:
self._handler.on_client_stopped(self, exc)
async def _network_coroutine(self):
"""网络协程,负责连接服务器、接收消息、解包"""
try:
# 连接
async with self._session.ws_connect(
self._ws_url,
receive_timeout=self._heartbeat_interval + 5,
) as websocket:
self._websocket = websocket
await self._on_ws_connect()
# 处理消息
message: aiohttp.WSMessage
async for message in websocket:
self._on_ws_message(message)
finally:
self._websocket = None
await self._on_ws_close()
# 插件消息都是本地通信的,这里不可能是因为网络问题而掉线,所以不尝试重连
async def _on_ws_connect(self):
"""WebSocket连接成功"""
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self._heartbeat_interval, self._on_send_heartbeat
)
async def _on_ws_close(self):
"""WebSocket连接断开"""
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
def _on_send_heartbeat(self):
"""定时发送心跳包的回调"""
if self._websocket is None or self._websocket.closed:
self._heartbeat_timer_handle = None
return
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self._heartbeat_interval, self._on_send_heartbeat
)
asyncio.create_task(self._send_heartbeat())
async def _send_heartbeat(self):
"""发送心跳包"""
try:
await self.send_cmd_data(models.Command.HEARTBEAT, {})
except (ConnectionResetError, aiohttp.ClientConnectionError) as e:
logger.warning('Plugin client _send_heartbeat() failed: %r', e)
except Exception: # noqa
logger.exception('Plugin client _send_heartbeat() failed:')
def _on_ws_message(self, message: aiohttp.WSMessage):
"""
收到WebSocket消息
:param message: WebSocket消息
"""
if message.type != aiohttp.WSMsgType.TEXT:
logger.warning('Unknown websocket message type=%s, data=%s', message.type, message.data)
return
try:
body = message.json()
self._handle_command(body)
except Exception:
logger.error('body=%s', message.data)
raise
def _handle_command(self, command: dict):
"""
处理业务消息
:param command: 业务消息
"""
if self._handler is not None:
try:
self._handler.handle(self, command)
except Exception as e:
logger.exception('Plugin client _handle_command() failed, command=%s', command, exc_info=e)