翻译区分优先级,高优先级的可重试

pull/123/head
John Smith 1 year ago
parent 8fa63f1d75
commit bea8711353

@ -55,12 +55,13 @@ class AppConfig:
self.open_browser_at_startup = True
self.enable_upload_file = True
self.fetch_avatar_interval = 3.5
self.fetch_avatar_max_queue_size = 2
self.avatar_cache_size = 50000
self.fetch_avatar_interval = 5.5
self.fetch_avatar_max_queue_size = 1
self.avatar_cache_size = 10000
self.enable_translate = True
self.allow_translate_rooms = set()
self.translate_max_queue_size = 10
self.translation_cache_size = 50000
self.translator_configs = []
@ -92,6 +93,7 @@ class AppConfig:
self.enable_translate = app_section.getboolean('enable_translate', fallback=self.enable_translate)
self.allow_translate_rooms = _str_to_list(app_section.get('allow_translate_rooms', ''), int, set)
self.translate_max_queue_size = app_section.getint('translate_max_queue_size', self.translate_max_queue_size)
self.translation_cache_size = app_section.getint('translation_cache_size', self.translation_cache_size)
def _load_translator_configs(self, config: configparser.ConfigParser):
@ -106,7 +108,6 @@ class AppConfig:
translator_config = {
'type': type_,
'query_interval': section.getfloat('query_interval'),
'max_queue_size': section.getint('max_queue_size')
}
if type_ == 'TencentTranslateFree':
translator_config['source_language'] = section['source_language']

@ -46,6 +46,10 @@ enable_translate = true
# Example: allow_translate_rooms = 4895312,22347054,21693691
allow_translate_rooms =
# 翻译最大队列长度
# Maximum queue length for translating
translate_max_queue_size = 10
# 翻译缓存数量
# Number of translation caches
translation_cache_size = 50000
@ -66,10 +70,8 @@ translator_configs = tencent_translate_free
# 类型:腾讯翻译白嫖版。使用了网页版的接口,**将来可能失效**
type = TencentTranslateFree
# 请求间隔时间(秒),等于 1 / QPS。目前没有遇到此接口有调用频率限制10QPS应该够用了
query_interval = 0.1
# 最大队列长度,注意最长等待时间等于 最大队列长度 * 请求间隔时间
max_queue_size = 100
# 请求间隔时间(秒),等于 1 / QPS
query_interval = 1
# 自动auto中文zh日语jp英语en韩语kr
# 完整语言列表见文档https://cloud.tencent.com/document/product/551/15619
@ -93,8 +95,6 @@ type = TencentTranslate
# 请求间隔时间(秒),等于 1 / QPS。理论上最高QPS为5实际测试是3
query_interval = 0.333
# 最大队列长度,注意最长等待时间等于 最大队列长度 * 请求间隔时间
max_queue_size = 30
# 自动auto中文zh日语ja英语en韩语ko
# 完整语言列表见文档https://cloud.tencent.com/document/product/551/15619
@ -125,8 +125,6 @@ type = BaiduTranslate
# 请求间隔时间(秒),等于 1 / QPS
query_interval = 1.5
# 最大队列长度,注意最长等待时间等于 最大队列长度 * 请求间隔时间
max_queue_size = 9
# 自动auto中文zh日语jp英语en韩语kor
# 完整语言列表见文档https://fanyi-api.baidu.com/doc/21

@ -434,7 +434,9 @@ class LiveMsgHandler(blivedm.BaseHandler):
})
if need_translate:
asyncio.create_task(self._translate_and_response(message.message, room.room_id, msg_id))
asyncio.create_task(self._translate_and_response(
message.message, room.room_id, msg_id, services.translate.Priority.HIGH
))
async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage):
room = client_room_manager.get_room(client.tmp_room_id)
@ -456,8 +458,8 @@ class LiveMsgHandler(blivedm.BaseHandler):
)
@staticmethod
async def _translate_and_response(text, room_id, msg_id):
translation = await services.translate.translate(text)
async def _translate_and_response(text, room_id, msg_id, priority=services.translate.Priority.NORMAL):
translation = await services.translate.translate(text, priority)
if translation is None:
return

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
import asyncio
import base64
import dataclasses
import datetime
import enum
import functools
import hashlib
import hmac
@ -31,12 +33,29 @@ _translate_providers: List['TranslateProvider'] = []
_translate_cache: Optional[cachetools.LRUCache] = None
# 正在翻译的Futuretext -> Future
_text_future_map: Dict[str, asyncio.Future] = {}
# 正在翻译的任务队列,索引是优先级
_task_queues: List['asyncio.Queue[TranslateTask]'] = []
class Priority(enum.IntEnum):
HIGH = 0
NORMAL = 1
@dataclasses.dataclass
class TranslateTask:
priority: Priority
text: str
future: 'asyncio.Future[Optional[str]]'
remain_retry_count: int
def init():
cfg = config.get_config()
global _translate_cache
global _translate_cache, _task_queues
_translate_cache = cachetools.LRUCache(cfg.translation_cache_size)
# 总队列长度会超过translate_max_queue_size不用这么严格
_task_queues = [asyncio.Queue(cfg.translate_max_queue_size) for _ in range(len(Priority))]
asyncio.get_event_loop().create_task(_do_init())
@ -58,19 +77,17 @@ def create_translate_provider(cfg):
type_ = cfg['type']
if type_ == 'TencentTranslateFree':
return TencentTranslateFree(
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
cfg['target_language']
cfg['query_interval'], cfg['source_language'], cfg['target_language']
)
elif type_ == 'TencentTranslate':
return TencentTranslate(
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
cfg['target_language'], cfg['secret_id'], cfg['secret_key'],
cfg['region']
cfg['query_interval'], cfg['source_language'], cfg['target_language'],
cfg['secret_id'], cfg['secret_key'], cfg['region']
)
elif type_ == 'BaiduTranslate':
return BaiduTranslate(
cfg['query_interval'], cfg['max_queue_size'], cfg['source_language'],
cfg['target_language'], cfg['app_id'], cfg['secret']
cfg['query_interval'], cfg['source_language'], cfg['target_language'],
cfg['app_id'], cfg['secret']
)
return None
@ -97,7 +114,7 @@ def get_translation_from_cache(text):
return _translate_cache.get(key, None)
def translate(text) -> Awaitable[Optional[str]]:
def translate(text, priority=Priority.NORMAL) -> Awaitable[Optional[str]]:
key = text.strip().lower()
# 如果已有正在翻译的future则返回防止重复翻译
future = _text_future_map.get(key, None)
@ -112,25 +129,18 @@ def translate(text) -> Awaitable[Optional[str]]:
future.set_result(res)
return future
# 负载均衡找等待时间最少的provider
min_wait_time = None
min_wait_time_provider = None
for provider in _translate_providers:
if not provider.is_available:
continue
wait_time = provider.wait_time
if min_wait_time is None or wait_time < min_wait_time:
min_wait_time = wait_time
min_wait_time_provider = provider
# 没有可用的
if min_wait_time_provider is None:
task = TranslateTask(
priority=priority,
text=text,
future=future,
remain_retry_count=3 if priority == Priority.HIGH else 1
)
if not _push_task(task):
future.set_result(None)
return future
_text_future_map[key] = future
future.add_done_callback(functools.partial(_on_translate_done, key))
min_wait_time_provider.translate(text, future)
return future
@ -146,27 +156,77 @@ def _on_translate_done(key, future):
_translate_cache[key] = res
class TranslateProvider:
async def init(self):
return True
def _push_task(task: TranslateTask):
if not _has_available_translate_provider():
return False
@property
def is_available(self):
queue = _task_queues[task.priority]
if not queue.full():
queue.put_nowait(task)
return True
@property
def wait_time(self):
return 0
if task.priority != Priority.HIGH:
return False
def translate(self, text, future):
raise NotImplementedError
# 高优先级的尝试降级,挤掉低优先级的任务
queue = _task_queues[Priority.NORMAL]
if queue.full():
lower_task = queue.get_nowait()
lower_task.future.set_result(None)
queue.put_nowait(task)
return True
async def _pop_task() -> TranslateTask:
# 按优先级遍历,看是否已经有任务
for queue in _task_queues:
if not queue.empty():
return queue.get_nowait()
done_future_set, pending_future_set = await asyncio.wait(
[queue.get() for queue in _task_queues],
return_when=asyncio.FIRST_COMPLETED
)
for future in pending_future_set:
future.cancel()
# 如果有多个队列都取到任务了,只返回优先级最高的那一个,剩下的放回队列
assert len(done_future_set) != 0
tasks = [await future for future in done_future_set]
if len(tasks) > 1:
tasks.sort(key=lambda task_: task_.priority)
res = None
for task in tasks:
if res is None:
res = task
continue
if not _push_task(task):
task.future.set_result(None)
return res
def _cancel_all_tasks_if_no_available_translate_provider():
if _has_available_translate_provider():
return
logger.warning('No available translate provider')
for queue in _task_queues:
while not queue.empty():
task = queue.get_nowait()
task.future.set_result(None)
class FlowControlTranslateProvider(TranslateProvider):
def __init__(self, query_interval, max_queue_size):
def _has_available_translate_provider():
return any(provider.is_available for provider in _translate_providers)
class TranslateProvider:
def __init__(self, query_interval):
self._query_interval = query_interval
# (text, future)
self._text_queue = asyncio.Queue(max_queue_size)
self._be_available_event = asyncio.Event()
self._be_available_event.set()
async def init(self):
asyncio.create_task(self._translate_consumer())
@ -174,43 +234,69 @@ class FlowControlTranslateProvider(TranslateProvider):
@property
def is_available(self):
return not self._text_queue.full()
@property
def wait_time(self):
return self._text_queue.qsize() * self._query_interval
return True
def translate(self, text, future):
try:
self._text_queue.put_nowait((text, future))
except asyncio.QueueFull:
future.set_result(None)
def _on_availability_change(self):
if self.is_available:
self._be_available_event.set()
else:
self._be_available_event.clear()
_cancel_all_tasks_if_no_available_translate_provider()
async def _translate_consumer(self):
cls_name = type(self).__name__
while True:
try:
text, future = await self._text_queue.get()
asyncio.create_task(self._translate_coroutine(text, future))
if not self.is_available:
logger.info('%s waiting to become available', cls_name)
await self._be_available_event.wait()
logger.info('%s became available', cls_name)
task = await _pop_task()
# 为了简化代码约定只会在_translate_wrapper里变成不可用所以获取task之后这里还是可用的
assert self.is_available
start_time = datetime.datetime.now()
await self._translate_wrapper(task)
cost_time = (datetime.datetime.now() - start_time).total_seconds()
# 频率限制
await asyncio.sleep(self._query_interval)
await asyncio.sleep(self._query_interval - cost_time)
except Exception: # noqa
logger.exception('FlowControlTranslateProvider error:')
logger.exception('%s error:', cls_name)
async def _translate_coroutine(self, text, future):
async def _translate_wrapper(self, task: TranslateTask) -> Optional[str]:
try:
res = await self._do_translate(text)
exc = None
task.remain_retry_count -= 1
res = await self._do_translate(task.text)
except BaseException as e:
future.set_exception(e)
exc = e
res = None
if res is not None:
task.future.set_result(res)
return res
if task.remain_retry_count > 0:
# 还可以重试则放回队列
if not _push_task(task):
task.future.set_result(None)
else:
future.set_result(res)
async def _do_translate(self, text):
# 否则设置异常或None结果
if exc is not None:
task.future.set_exception(exc)
else:
task.future.set_result(None)
return None
async def _do_translate(self, text) -> Optional[str]:
raise NotImplementedError
class TencentTranslateFree(FlowControlTranslateProvider):
def __init__(self, query_interval, max_queue_size, source_language, target_language):
super().__init__(query_interval, max_queue_size)
class TencentTranslateFree(TranslateProvider):
def __init__(self, query_interval, source_language, target_language):
super().__init__(query_interval)
self._be_available_event.clear() # _do_init之后才可用
self._source_language = source_language
self._target_language = target_language
@ -225,8 +311,6 @@ class TencentTranslateFree(FlowControlTranslateProvider):
async def init(self):
if not await super().init():
return False
if not await self._do_init():
return False
self._reinit_future = asyncio.create_task(self._reinit_coroutine())
return True
@ -297,14 +381,24 @@ class TencentTranslateFree(FlowControlTranslateProvider):
self._uc_iv = uc_iv
self._qtv = qtv
self._qtk = qtk
self._on_availability_change()
return True
async def _reinit_coroutine(self):
try:
while True:
await asyncio.sleep(30)
logger.debug('TencentTranslateFree reinit')
asyncio.create_task(self._do_init())
start_time = datetime.datetime.now()
try:
await self._do_init()
except asyncio.CancelledError:
raise
except BaseException: # noqa
pass
cost_time = (datetime.datetime.now() - start_time).total_seconds()
await asyncio.sleep(30 - cost_time)
except asyncio.CancelledError:
pass
@ -312,20 +406,15 @@ class TencentTranslateFree(FlowControlTranslateProvider):
def is_available(self):
return '' not in (self._uc_key, self._uc_iv, self._qtv, self._qtk) and super().is_available
async def _translate_coroutine(self, text, future):
try:
res = await self._do_translate(text)
except BaseException as e:
future.set_exception(e)
self._on_fail()
return
future.set_result(res)
if res is None:
self._on_fail()
else:
async def _translate_wrapper(self, task: TranslateTask) -> Optional[str]:
res = await super()._translate_wrapper(task)
if res is not None:
self._fail_count = 0
else:
self._on_fail()
return res
async def _do_translate(self, text):
async def _do_translate(self, text) -> Optional[str]:
try:
async with utils.request.http_session.post(
'https://fanyi.qq.com/api/translate',
@ -354,7 +443,7 @@ class TencentTranslateFree(FlowControlTranslateProvider):
res = ''.join(record['targetText'] for record in data['translate']['records'])
if res == '' and text.strip() != '':
# qtv、qtk过期
logger.warning('TencentTranslateFree result is empty %s', data)
logger.info('TencentTranslateFree result is empty %s', data)
return None
return res
@ -414,11 +503,13 @@ class TencentTranslateFree(FlowControlTranslateProvider):
self._qtv = self._qtk = ''
self._fail_count = 0
self._on_availability_change()
class TencentTranslate(FlowControlTranslateProvider):
def __init__(self, query_interval, max_queue_size, source_language, target_language,
class TencentTranslate(TranslateProvider):
def __init__(self, query_interval, source_language, target_language,
secret_id, secret_key, region):
super().__init__(query_interval, max_queue_size)
super().__init__(query_interval)
self._source_language = source_language
self._target_language = target_language
self._secret_id = secret_id
@ -431,7 +522,7 @@ class TencentTranslate(FlowControlTranslateProvider):
def is_available(self):
return self._cool_down_timer_handle is None and super().is_available
async def _do_translate(self, text):
async def _do_translate(self, text) -> Optional[str]:
try:
async with self._request_tencent_cloud(
'TextTranslate',
@ -519,15 +610,17 @@ class TencentTranslate(FlowControlTranslateProvider):
self._cool_down_timer_handle = asyncio.get_running_loop().call_later(
sleep_time, self._on_cool_down_timeout
)
self._on_availability_change()
def _on_cool_down_timeout(self):
self._cool_down_timer_handle = None
self._on_availability_change()
class BaiduTranslate(FlowControlTranslateProvider):
def __init__(self, query_interval, max_queue_size, source_language, target_language,
class BaiduTranslate(TranslateProvider):
def __init__(self, query_interval, source_language, target_language,
app_id, secret):
super().__init__(query_interval, max_queue_size)
super().__init__(query_interval)
self._source_language = source_language
self._target_language = target_language
self._app_id = app_id
@ -539,7 +632,7 @@ class BaiduTranslate(FlowControlTranslateProvider):
def is_available(self):
return self._cool_down_timer_handle is None and super().is_available
async def _do_translate(self, text):
async def _do_translate(self, text) -> Optional[str]:
try:
async with utils.request.http_session.post(
'https://fanyi-api.baidu.com/api/trans/vip/translate',
@ -581,6 +674,8 @@ class BaiduTranslate(FlowControlTranslateProvider):
self._cool_down_timer_handle = asyncio.get_running_loop().call_later(
sleep_time, self._on_cool_down_timeout
)
self._on_availability_change()
def _on_cool_down_timeout(self):
self._cool_down_timer_handle = None
self._on_availability_change()

Loading…
Cancel
Save