Merge branch 'batch_heartbeat' into dev

# Conflicts:
#	main.py
#	services/avatar.py
#	services/chat.py
#	services/translate.py
#	update.py
pull/157/head
John Smith 8 months ago
commit c657a368ae

@ -17,6 +17,7 @@ import config
import services.avatar
import services.chat
import services.translate
import utils.async_io
import utils.request
logger = logging.getLogger(__name__)
@ -41,6 +42,7 @@ class ContentType(enum.IntEnum):
class FatalErrorType(enum.IntEnum):
AUTH_CODE_ERROR = 1
TOO_MANY_RETRIES = 2
def make_message_body(cmd, data):
@ -215,7 +217,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler):
pass
services.chat.client_room_manager.add_client(self.room_key, self)
asyncio.create_task(self._on_joined_room())
utils.async_io.create_task_with_ref(self._on_joined_room())
self._refresh_receive_timeout_timer()

@ -5,8 +5,8 @@ import hashlib
import hmac
import json
import logging
import random
import re
import uuid
from typing import *
import aiohttp
@ -15,13 +15,17 @@ import tornado.web
import api.base
import config
import services.open_live
import utils.rate_limit
import utils.request
logger = logging.getLogger(__name__)
START_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/start'
END_GAME_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
OPEN_LIVE_BASE_URL = 'https://live-open.biliapi.com'
START_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/start'
END_GAME_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/end'
GAME_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/heartbeat'
GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL = OPEN_LIVE_BASE_URL + '/v2/app/batchHeartbeat'
COMMON_SERVER_BASE_URL = 'https://chat.bilisc.com'
START_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/start_game'
@ -29,6 +33,8 @@ END_GAME_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/e
GAME_HEARTBEAT_COMMON_SERVER_URL = COMMON_SERVER_BASE_URL + '/api/internal/open_live/game_heartbeat'
_error_auth_code_cache = cachetools.LRUCache(256)
# 用于限制请求开放平台的频率
_open_live_rate_limiter = utils.rate_limit.TokenBucket(8, 8)
class TransportError(Exception):
@ -50,7 +56,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b
"""如果配置了开放平台,则直接请求,否则转发请求到公共服务器的内部接口"""
cfg = config.get_config()
if cfg.is_open_live_configured:
return await _request_open_live(open_live_url, body)
return await request_open_live(open_live_url, body)
try:
req_ctx_mgr = utils.request.http_session.post(common_server_url, json=body)
@ -63,7 +69,7 @@ async def request_open_live_or_common_server(open_live_url, common_server_url, b
raise
async def _request_open_live(url, body: dict) -> dict:
async def request_open_live(url, body: dict, *, ignore_rate_limit=False) -> dict:
cfg = config.get_config()
assert cfg.is_open_live_configured
@ -74,12 +80,16 @@ async def _request_open_live(url, body: dict) -> dict:
else:
auth_code = ''
# 频率限制防止触发B站风控被下架
if not _open_live_rate_limiter.try_decrease_token() and not ignore_rate_limit:
raise BusinessError({'code': 4009, 'message': '接口访问限制', 'request_id': '0', 'data': None})
body_bytes = json.dumps(body).encode('utf-8')
headers = {
'x-bili-accesskeyid': cfg.open_live_access_key_id,
'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(),
'x-bili-signature-method': 'HMAC-SHA256',
'x-bili-signature-nonce': str(random.randint(0, 999999999)),
'x-bili-signature-nonce': uuid.uuid4().hex,
'x-bili-signature-version': '1.0',
'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())),
}
@ -137,6 +147,8 @@ def _validate_auth_code(auth_code):
class _OpenLiveHandlerBase(api.base.ApiHandler):
_LOG_REQUEST = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.res: Optional[dict] = None
@ -150,7 +162,8 @@ class _OpenLiveHandlerBase(api.base.ApiHandler):
cfg = config.get_config()
self.json_args['app_id'] = cfg.open_live_app_id
logger.info('client=%s requesting open live, cls=%s', self.request.remote_ip, type(self).__name__)
if self._LOG_REQUEST:
logger.info('client=%s requesting open live, cls=%s', self.request.remote_ip, type(self).__name__)
class _PublicHandlerBase(_OpenLiveHandlerBase):
@ -180,7 +193,7 @@ class _PrivateHandlerBase(_OpenLiveHandlerBase):
raise tornado.web.HTTPError(501)
try:
self.res = await _request_open_live(self._OPEN_LIVE_URL, self.json_args)
self.res = await request_open_live(self._OPEN_LIVE_URL, self.json_args)
except TransportError:
raise tornado.web.HTTPError(500)
except BusinessError as e:
@ -201,12 +214,22 @@ class _StartGameMixin(_OpenLiveHandlerBase):
room_id = self.res['data']['anchor_info']['room_id']
except (TypeError, KeyError):
room_id = None
try:
game_id = self.res['data']['game_info']['game_id']
except (TypeError, KeyError):
game_id = None
code = self.res['code']
logger.info('room_id=%s start game res: %s %s', room_id, code, self.res['message'])
logger.info(
'client=%s room_id=%s start game res: %s %s, game_id=%s', self.request.remote_ip, room_id,
code, self.res['message'], game_id
)
if code == 7007:
# 身份码错误
# 让我看看是哪个混蛋把房间ID、UID当做身份码
logger.info('Auth code error! auth_code=%s', self.json_args.get('code', None))
logger.info(
'client=%s auth code error! auth_code=%s', self.request.remote_ip,
self.json_args.get('code', None)
)
class StartGamePublicHandler(_StartGameMixin, _PublicHandlerBase):
@ -226,13 +249,68 @@ class EndGamePrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = END_GAME_OPEN_LIVE_URL
class GameHeartbeatPublicHandler(_PublicHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
_COMMON_SERVER_URL = GAME_HEARTBEAT_COMMON_SERVER_URL
class GameHeartbeatPublicHandler(_OpenLiveHandlerBase):
_LOG_REQUEST = False
async def post(self):
game_id = self.json_args.get('game_id', None)
if not isinstance(game_id, str) or game_id == '':
raise tornado.web.MissingArgumentError('game_id')
try:
self.res = await send_game_heartbeat_by_service_or_common_server(game_id)
except TransportError as e:
logger.error(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
raise tornado.web.HTTPError(500)
except BusinessError as e:
# 因为B站的BUG这里在9点和10点的高峰期会经常报重复请求的错误但是不影响功能先屏蔽掉
if e.code != 4004:
logger.info(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
self.res = e.data
self.write(self.res)
async def send_game_heartbeat_by_service_or_common_server(game_id):
cfg = config.get_config()
if cfg.is_open_live_configured:
return await services.open_live.send_game_heartbeat(game_id)
# 这里GAME_HEARTBEAT_OPEN_LIVE_URL没用因为一定是请求公共服务器
return await request_open_live_or_common_server(
GAME_HEARTBEAT_OPEN_LIVE_URL, GAME_HEARTBEAT_COMMON_SERVER_URL, {'game_id': game_id}
)
class GameHeartbeatPrivateHandler(_OpenLiveHandlerBase):
_LOG_REQUEST = False
async def post(self):
cfg = config.get_config()
if not cfg.is_open_live_configured:
raise tornado.web.HTTPError(501)
game_id = self.json_args.get('game_id', None)
if not isinstance(game_id, str) or game_id == '':
raise tornado.web.MissingArgumentError('game_id')
class GameHeartbeatPrivateHandler(_PrivateHandlerBase):
_OPEN_LIVE_URL = GAME_HEARTBEAT_OPEN_LIVE_URL
try:
self.res = await services.open_live.send_game_heartbeat(game_id)
except TransportError as e:
logger.error(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
raise tornado.web.HTTPError(500)
except BusinessError as e:
# 因为B站的BUG这里在9点和10点的高峰期会经常报重复请求的错误但是不影响功能先屏蔽掉
if e.code != 4004:
logger.info(
'client=%s game heartbeat failed, game_id=%s, error: %s', self.request.remote_ip, game_id, e
)
self.res = e.data
self.write(self.res)
ROUTES = [

@ -1 +1 @@
Subproject commit 4da27950c89d4c0bab70ba983ba25bae9f91c19c
Subproject commit fc55b75dab95ca65700f26a145fc76d7ef05eef1

@ -1,6 +1,6 @@
{
"name": "blivechat",
"version": "1.8.2",
"version": "1.9.0-dev",
"private": true,
"scripts": {
"serve": "vue-cli-service serve",

@ -24,24 +24,32 @@ export default class ChatClientDirectOpenLive extends ChatClientOfficialBase {
}
stop() {
super.stop()
if (this.gameHeartbeatTimerId) {
window.clearInterval(this.gameHeartbeatTimerId)
this.gameHeartbeatTimerId = null
}
this.endGame()
super.stop()
}
async initRoom() {
if (!await this.startGame()) {
return false
return this.startGame()
}
async wsConnect() {
await super.wsConnect()
if (this.isDestroying) {
return
}
if (this.gameId && this.gameHeartbeatTimerId === null) {
this.gameHeartbeatTimerId = window.setInterval(this.sendGameHeartbeat.bind(this), GAME_HEARTBEAT_INTERVAL)
this.gameHeartbeatTimerId = window.setTimeout(this.onSendGameHeartbeat.bind(this), GAME_HEARTBEAT_INTERVAL)
}
return true
}
onWsClose() {
if (this.gameHeartbeatTimerId) {
window.clearTimeout(this.gameHeartbeatTimerId)
this.gameHeartbeatTimerId = null
}
super.onWsClose()
}
async startGame() {
@ -102,6 +110,13 @@ export default class ChatClientDirectOpenLive extends ChatClientOfficialBase {
return true
}
onSendGameHeartbeat() {
// 加上随机延迟,减少同时请求的概率
let sleepTime = GAME_HEARTBEAT_INTERVAL - (2 * 1000) + (Math.random() * 3 * 1000)
this.gameHeartbeatTimerId = window.setTimeout(this.onSendGameHeartbeat.bind(this), sleepTime)
this.sendGameHeartbeat()
}
async sendGameHeartbeat() {
if (!this.gameId) {
return false
@ -132,10 +147,11 @@ export default class ChatClientDirectOpenLive extends ChatClientOfficialBase {
}
async onBeforeWsConnect() {
// 重连次数太多则重新init_room保险
// 重连次数太多则重新initRoom保险
let reinitPeriod = Math.max(3, (this.hostServerUrlList || []).length)
if (this.retryCount > 0 && this.retryCount % reinitPeriod === 0) {
this.needInitRoom = true
await this.endGame()
}
return super.onBeforeWsConnect()
}
@ -148,6 +164,16 @@ export default class ChatClientDirectOpenLive extends ChatClientOfficialBase {
this.websocket.send(this.makePacket(this.authBody, base.OP_AUTH))
}
delayReconnect() {
if (document.visibilityState !== 'visible') {
// 不知道什么时候才能重连先endGame吧
this.needInitRoom = true
this.endGame()
}
super.delayReconnect()
}
async dmCallback(command) {
let data = command.data

@ -51,6 +51,7 @@ export default class ChatClientOfficialBase {
this.needInitRoom = true
this.websocket = null
this.retryCount = 0
this.totalRetryCount = 0
this.isDestroying = false
this.heartbeatTimerId = null
this.receiveTimeoutTimerId = null
@ -121,6 +122,7 @@ export default class ChatClientOfficialBase {
res = false
console.error('initRoom exception:', e)
if (e instanceof chatModels.ChatClientFatalError) {
this.stop()
this.msgHandler.onFatalError(e)
}
}
@ -185,15 +187,45 @@ export default class ChatClientOfficialBase {
return
}
this.retryCount++
console.warn('掉线重连中', this.retryCount)
window.setTimeout(this.wsConnect.bind(this), this.getReconnectInterval())
this.totalRetryCount++
console.warn(`掉线重连中 retryCount=${this.retryCount}, totalRetryCount=${this.totalRetryCount}`)
// 防止无限重连的保险措施。30次重连大概会断线500秒应该够了
if (this.totalRetryCount > 30) {
this.stop()
let error = new chatModels.ChatClientFatalError(
chatModels.FATAL_ERROR_TYPE_TOO_MANY_RETRIES, 'The connection has lost too many times'
)
this.msgHandler.onFatalError(error)
return
}
this.delayReconnect()
}
delayReconnect() {
if (document.visibilityState === 'visible') {
window.setTimeout(this.wsConnect.bind(this), this.getReconnectInterval())
return
}
// 页面不可见就先不重连了,即使重连也会心跳超时
let listener = () => {
if (document.visibilityState !== 'visible') {
return
}
document.removeEventListener('visibilitychange', listener)
this.wsConnect()
}
document.addEventListener('visibilitychange', listener)
}
getReconnectInterval() {
return Math.min(
1000 + ((this.retryCount - 1) * 2000),
10 * 1000
)
// 不用retryCount了防止意外的连接成功导致retryCount重置
let interval = Math.min(1000 + ((this.totalRetryCount - 1) * 2000), 20 * 1000)
// 加上随机延迟,防止同时请求导致雪崩
interval += Math.random() * 3000
return interval
}
onWsMessage(event) {

@ -25,6 +25,7 @@ export default class ChatClientRelay {
this.websocket = null
this.retryCount = 0
this.totalRetryCount = 0
this.isDestroying = false
this.receiveTimeoutTimerId = null
}
@ -92,15 +93,30 @@ export default class ChatClientRelay {
if (this.isDestroying) {
return
}
console.warn(`掉线重连中${++this.retryCount}`)
this.retryCount++
this.totalRetryCount++
console.warn(`掉线重连中 retryCount=${this.retryCount}, totalRetryCount=${this.totalRetryCount}`)
// 防止无限重连的保险措施。30次重连大概会断线500秒应该够了
if (this.totalRetryCount > 30) {
this.stop()
let error = new chatModels.ChatClientFatalError(
chatModels.FATAL_ERROR_TYPE_TOO_MANY_RETRIES, 'The connection has lost too many times'
)
this.msgHandler.onFatalError(error)
return
}
// 这边不用判断页面是否可见,因为发心跳包不是由定时器触发的,即使是不活动页面也不会心跳超时
window.setTimeout(this.wsConnect.bind(this), this.getReconnectInterval())
}
getReconnectInterval() {
return Math.min(
1000 + ((this.retryCount - 1) * 2000),
10 * 1000
)
// 不用retryCount了防止意外的连接成功导致retryCount重置
let interval = Math.min(1000 + ((this.totalRetryCount - 1) * 2000), 20 * 1000)
// 加上随机延迟,防止同时请求导致雪崩
interval += Math.random() * 3000
return interval
}
onWsMessage(event) {
@ -172,6 +188,7 @@ export default class ChatClientRelay {
break
}
case COMMAND_FATAL_ERROR: {
this.stop()
let error = new chatModels.ChatClientFatalError(data.type, data.msg)
this.msgHandler.onFatalError(error)
break

@ -113,6 +113,7 @@ export class UpdateTranslationMsg {
}
export const FATAL_ERROR_TYPE_AUTH_CODE_ERROR = 1
export const FATAL_ERROR_TYPE_TOO_MANY_RETRIES = 2
export class ChatClientFatalError extends Error {
constructor(type, message) {

@ -154,6 +154,9 @@ export default {
p4: '4. Add browser source in OBS',
p5: '5. Enter the previously copied room URL at URL, and enter the previously copied CSS at custom CSS'
},
room: {
fatalErrorOccurred: 'A fatal error has occurred. Please manually refresh the page to reconnect'
},
chat: {
moderator: 'moderator',
guardLevel1: 'governor',

@ -154,6 +154,9 @@ export default {
p4: '4. OBSでブラウザを新規作成する',
p5: '5. プロパティでこぴーしたURLを入力し、カスタムCSSでスタイルジェネレータのCSSを入力する'
},
room: {
fatalErrorOccurred: '致命的なエラーが発生しました。ページを手動で更新して再接続してください'
},
chat: {
moderator: 'モデレーター',
guardLevel1: '総督',

@ -154,6 +154,9 @@ export default {
p4: '4. 在OBS中添加浏览器源',
p5: '5. URL处输入之前复制的房间URL自定义CSS处输入之前复制的CSS'
},
room: {
fatalErrorOccurred: '发生了一个致命错误,请手动刷新页面以重新连接'
},
chat: {
moderator: '管理员',
guardLevel1: '总督',

@ -76,7 +76,12 @@ export default {
},
mounted() {
if (document.visibilityState === 'visible') {
this.init()
if (this.roomKeyValue === null) {
this.init()
} else {
//
window.setTimeout(this.init, Math.random() * 3000)
}
} else {
// OBSOBS
document.addEventListener('visibilitychange', this.onVisibilityChange)
@ -345,7 +350,12 @@ export default {
message: error.toString(),
duration: 10 * 1000
})
this.chatClient.stop()
this.onAddText(new chatModels.AddTextMsg({
authorName: 'blivechat',
authorType: constants.AUTHOR_TYPE_ADMIN,
content: this.$t('room.fatalErrorOccurred'),
authorLevel: 60,
}))
if (error.type === chatModels.FATAL_ERROR_TYPE_AUTH_CODE_ERROR) {
// Read The Fucking Manual
@ -497,7 +507,7 @@ export default {
// 0
img.onerror = resolve
//
setTimeout(resolve, 5000)
window.setTimeout(resolve, 5000)
img.src = urlInClosure
}
))

@ -21,6 +21,7 @@ import config
import models.database
import services.avatar
import services.chat
import services.open_live
import services.plugin
import services.translate
import update
@ -65,6 +66,7 @@ def init():
services.avatar.init()
services.translate.init()
services.open_live.init()
services.chat.init()
init_server(args.host, args.port, args.debug)

@ -14,6 +14,7 @@ import sqlalchemy.exc
import config
import models.bilibili as bl_models
import models.database
import utils.async_io
import utils.request
logger = logging.getLogger(__name__)
@ -41,7 +42,7 @@ def init():
global _avatar_url_cache, _task_queue
_avatar_url_cache = cachetools.TTLCache(cfg.avatar_cache_size, 10 * 60)
_task_queue = asyncio.Queue(cfg.fetch_avatar_max_queue_size)
asyncio.create_task(_do_init())
utils.async_io.create_task_with_ref(_do_init())
async def _do_init():
@ -89,7 +90,7 @@ async def get_avatar_url_or_none(user_id) -> Optional[str]:
_update_avatar_cache_in_memory(user_id, avatar_url)
# 如果距离数据库上次更新太久,则在后台从接口获取,并更新所有缓存
if (datetime.datetime.now() - user.update_time).days >= 1:
asyncio.create_task(_refresh_avatar_cache_from_web(user_id))
utils.async_io.create_task_with_ref(_refresh_avatar_cache_from_web(user_id))
return avatar_url
# 从接口获取
@ -249,7 +250,7 @@ class AvatarFetcher:
self._cool_down_timer_handle = None
async def init(self):
asyncio.create_task(self._fetch_consumer())
utils.async_io.create_task_with_ref(self._fetch_consumer())
return True
@property

@ -2,20 +2,21 @@
import asyncio
import enum
import logging
import random
import uuid
from typing import *
import api.chat
import api.open_live as api_open_live
import blcsdk.models as sdk_models
import blivedm.blivedm as blivedm
import blivedm.blivedm.models.open_live as dm_open_models
import blivedm.blivedm.models.web as dm_web_models
import blivedm.blivedm.utils as dm_utils
import config
import services.avatar
import services.translate
import services.plugin
import blcsdk.models as sdk_models
import services.translate
import utils.async_io
import utils.request
logger = logging.getLogger(__name__)
@ -169,7 +170,20 @@ class LiveClientManager:
)
RECONNECT_POLICY = dm_utils.make_linear_retry_policy(1, 2, 10)
class TooManyRetries(Exception):
"""重试次数太多"""
def _get_reconnect_interval(_retry_count: int, total_retry_count: int):
# 防止无限重连的保险措施。30次重连大概会断线500秒应该够了
if total_retry_count > 30:
raise TooManyRetries(f'total_retry_count={total_retry_count}')
# 不用retry_count了防止意外的连接成功导致retry_count重置
interval = min(1 + (total_retry_count - 1) * 2, 20)
# 加上随机延迟,防止同时请求导致雪崩
interval += random.uniform(0, 3)
return interval
class WebLiveClient(blivedm.BLiveClient):
@ -183,7 +197,7 @@ class WebLiveClient(blivedm.BLiveClient):
session=utils.request.http_session,
heartbeat_interval=self.HEARTBEAT_INTERVAL,
)
self.set_reconnect_policy(RECONNECT_POLICY)
self.set_reconnect_policy(_get_reconnect_interval)
@property
def room_key(self):
@ -220,7 +234,7 @@ class OpenLiveClient(blivedm.OpenLiveClient):
session=utils.request.http_session,
heartbeat_interval=self.HEARTBEAT_INTERVAL,
)
self.set_reconnect_policy(RECONNECT_POLICY)
self.set_reconnect_policy(_get_reconnect_interval)
@property
def room_key(self):
@ -289,6 +303,14 @@ class OpenLiveClient(blivedm.OpenLiveClient):
return False
return True
def _on_send_game_heartbeat(self):
# 加上随机延迟,减少同时请求的概率
sleep_time = self._game_heartbeat_interval + random.uniform(-2, 1)
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
sleep_time, self._on_send_game_heartbeat
)
utils.async_io.create_task_with_ref(self._send_game_heartbeat())
async def _send_game_heartbeat(self):
if self._game_id in (None, ''):
logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id)
@ -297,11 +319,7 @@ class OpenLiveClient(blivedm.OpenLiveClient):
# 保存一下防止await之后game_id改变
game_id = self._game_id
try:
await api_open_live.request_open_live_or_common_server(
api_open_live.GAME_HEARTBEAT_OPEN_LIVE_URL,
api_open_live.GAME_HEARTBEAT_COMMON_SERVER_URL,
{'game_id': game_id}
)
await api_open_live.send_game_heartbeat_by_service_or_common_server(game_id)
except api_open_live.TransportError:
logger.error('room=%d _send_game_heartbeat() failed', self.room_id)
return False
@ -460,18 +478,22 @@ class ClientRoom:
class LiveMsgHandler(blivedm.BaseHandler):
def on_client_stopped(self, client: LiveClientType, exception: Optional[Exception]):
if isinstance(exception, TooManyRetries):
room = client_room_manager.get_room(client.room_key)
if room is not None:
room.send_cmd_data(api.chat.Command.FATAL_ERROR, {
'type': api.chat.FatalErrorType.TOO_MANY_RETRIES,
'msg': 'The connection has lost too many times'
})
_live_client_manager.del_live_client(client.room_key)
def _on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
asyncio.create_task(self.__on_danmaku(client, message))
utils.async_io.create_task_with_ref(self.__on_danmaku(client, message))
async def __on_danmaku(self, client: WebLiveClient, message: dm_web_models.DanmakuMessage):
avatar_url = message.face
if avatar_url != '':
services.avatar.update_avatar_cache_if_expired(message.uid, avatar_url)
else:
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid, message.uname)
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
avatar_url = await services.avatar.get_avatar_url(message.uid, message.uname)
room = client_room_manager.get_room(client.room_key)
if room is None:
@ -569,7 +591,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
)
def _on_buy_guard(self, client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
asyncio.create_task(self.__on_buy_guard(client, message))
utils.async_io.create_task_with_ref(self.__on_buy_guard(client, message))
@staticmethod
async def __on_buy_guard(client: WebLiveClient, message: dm_web_models.GuardBuyMessage):
@ -638,7 +660,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
)
if need_translate:
asyncio.create_task(self._translate_and_response(
utils.async_io.create_task_with_ref(self._translate_and_response(
message.message, room.room_key, msg_id, services.translate.Priority.HIGH
))
@ -747,7 +769,9 @@ class LiveMsgHandler(blivedm.BaseHandler):
)
if need_translate:
asyncio.create_task(self._translate_and_response(message.msg, room.room_key, message.msg_id))
utils.async_io.create_task_with_ref(self._translate_and_response(
message.msg, room.room_key, message.msg_id
))
def _on_open_live_gift(self, client: OpenLiveClient, message: dm_open_models.GiftMessage):
avatar_url = services.avatar.process_avatar_url(message.uface)
@ -846,7 +870,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
)
if need_translate:
asyncio.create_task(self._translate_and_response(
utils.async_io.create_task_with_ref(self._translate_and_response(
message.message, room.room_key, msg_id, services.translate.Priority.HIGH
))

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
import asyncio
import dataclasses
import datetime
import logging
from typing import *
import api.open_live
import config
import utils.async_io
logger = logging.getLogger(__name__)
# 正在等待发送的心跳任务game_id -> HeartbeatTask
_game_id_heart_task_map: Dict[str, 'HeartbeatTask'] = {}
@dataclasses.dataclass
class HeartbeatTask:
game_id: str
future: 'asyncio.Future[dict]'
def init():
cfg = config.get_config()
# 批量心跳只支持配置了开放平台的公共服务器,私有服务器用的人少,意义不大
if cfg.is_open_live_configured:
utils.async_io.create_task_with_ref(_game_heartbeat_consumer())
async def send_game_heartbeat(game_id) -> dict:
"""发送项目心跳。成功则返回符合开放平台格式的结果,失败则抛出异常"""
assert config.get_config().is_open_live_configured
if game_id in (None, ''):
raise api.open_live.BusinessError({'code': 4000, 'message': '参数错误', 'request_id': '0', 'data': None})
task = _game_id_heart_task_map.get(game_id, None)
if task is None:
task = HeartbeatTask(
game_id=game_id,
future=asyncio.get_running_loop().create_future(),
)
_game_id_heart_task_map[game_id] = task
# 限制一次发送的数量,数量太多了就立即发送
if len(_game_id_heart_task_map) >= 200:
await _flush_game_heartbeat_tasks()
return await task.future
async def _game_heartbeat_consumer():
while True:
try:
start_time = datetime.datetime.now()
await _flush_game_heartbeat_tasks()
cost_time = (datetime.datetime.now() - start_time).total_seconds()
# 如果等待时间太短,请求频率会太高;如果等待时间太长,前端请求、项目心跳会超时
await asyncio.sleep(4 - cost_time)
except Exception: # noqa
logger.exception('_heartbeat_consumer error:')
async def _flush_game_heartbeat_tasks():
global _game_id_heart_task_map
if not _game_id_heart_task_map:
return
game_id_task_map = _game_id_heart_task_map
_game_id_heart_task_map = {}
game_ids = list(game_id_task_map.keys())
logger.info('Sending game batch heartbeat for %d games', len(game_ids))
try:
res = await api.open_live.request_open_live(
api.open_live.GAME_BATCH_HEARTBEAT_OPEN_LIVE_URL,
{'game_ids': game_ids},
ignore_rate_limit=True
)
failed_game_ids = res['data']['failed_game_ids']
if failed_game_ids is None: # 哪个SB后端给数组传null的
failed_game_ids = set()
else:
failed_game_ids = set(failed_game_ids)
request_id = res['request_id']
except Exception as e:
for task in game_id_task_map.values():
task.future.set_exception(e)
return
if failed_game_ids:
logger.info(
'Game batch heartbeat res: %d succeeded, %d failed, request_id=%s',
len(game_ids) - len(failed_game_ids), len(failed_game_ids), request_id
)
for task in game_id_task_map.values():
if task.game_id in failed_game_ids:
task.future.set_exception(api.open_live.BusinessError(
{'code': 7003, 'message': '心跳过期或GameId错误', 'request_id': request_id, 'data': None}
))
else:
task.future.set_result({'code': 0, 'message': '0', 'request_id': request_id, 'data': None})

@ -19,6 +19,7 @@ import aiohttp
import cachetools
import config
import utils.async_io
import utils.request
logger = logging.getLogger(__name__)
@ -56,7 +57,7 @@ def init():
_translate_cache = cachetools.LRUCache(cfg.translation_cache_size)
# 总队列长度会超过translate_max_queue_size不用这么严格
_task_queues = [asyncio.Queue(cfg.translate_max_queue_size) for _ in range(len(Priority))]
asyncio.create_task(_do_init())
utils.async_io.create_task_with_ref(_do_init())
async def _do_init():
@ -229,7 +230,7 @@ class TranslateProvider:
self._be_available_event.set()
async def init(self):
asyncio.create_task(self._translate_consumer())
utils.async_io.create_task_with_ref(self._translate_consumer())
return True
@property

@ -3,13 +3,14 @@ import asyncio
import aiohttp
import utils.async_io
import utils.request
VERSION = 'v1.8.2'
def check_update():
asyncio.create_task(_do_check_update())
utils.async_io.create_task_with_ref(_do_check_update())
async def _do_check_update():

@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
import asyncio
# 只用于持有Task的引用
_task_refs = set()
def create_task_with_ref(*args, **kwargs):
"""创建Task并保持引用防止协程执行完之前就被GC"""
task = asyncio.create_task(*args, **kwargs)
_task_refs.add(task)
task.add_done_callback(_task_refs.discard)
return task

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
import datetime
import logging
logger = logging.getLogger(__name__)
class TokenBucket:
def __init__(self, tokens_per_sec, max_token_num):
self._tokens_per_sec = float(tokens_per_sec)
self._max_token_num = float(max_token_num)
self._stored_token_num = self._max_token_num
self._last_update_time = datetime.datetime.now()
if self._tokens_per_sec <= 0.0 and self._max_token_num >= 1.0:
logger.warning('TokenBucket token_per_sec=%f <= 0, rate has no limit', tokens_per_sec)
def try_decrease_token(self):
if self._tokens_per_sec <= 0.0:
# self._max_token_num < 1.0 时完全禁止
return self._max_token_num >= 1.0
cur_time = datetime.datetime.now()
last_update_time = min(self._last_update_time, cur_time) # 防止时钟回拨
add_token_num = (cur_time - last_update_time).total_seconds() * self._tokens_per_sec
self._stored_token_num = min(self._stored_token_num + add_token_num, self._max_token_num)
self._last_update_time = cur_time
if self._stored_token_num < 1.0:
return False
self._stored_token_num -= 1.0
return True

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import asyncio
from typing import *
import aiohttp
@ -16,9 +17,21 @@ http_session: Optional[aiohttp.ClientSession] = None
def init():
global http_session
http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
http_session = aiohttp.ClientSession(
response_class=CustomClientResponse,
timeout=aiohttp.ClientTimeout(total=10),
)
async def shut_down():
if http_session is not None:
await http_session.close()
class CustomClientResponse(aiohttp.ClientResponse):
# 因为aiohttp的BUG当底层连接断开时_wait_released可能会抛出CancelledError导致上层协程结束。这里改个错误类型
async def _wait_released(self):
try:
return await super()._wait_released()
except asyncio.CancelledError as e:
raise aiohttp.ClientConnectionError('Connection released') from e

Loading…
Cancel
Save