@ -92,8 +92,8 @@ def create_translate_provider(cfg):
)
elif type_ == ' GeminiTranslate ' :
return GeminiTranslate (
cfg [ ' query_interval ' ] , cfg [ ' app_id' ] , cfg [ ' prompt ' ] , cfg [ ' temperature ' ] ,
cfg [ ' model_name' ] , cfg [ ' is_use_proxy ' ] , cfg [ ' proxy ' ]
cfg [ ' query_interval ' ] , cfg [ ' proxy' ] , cfg [ ' api_key ' ] , cfg [ ' model_code ' ] ,
cfg [ ' prompt' ] , cfg [ ' temperature ' ]
)
return None
@ -681,27 +681,20 @@ class BaiduTranslate(TranslateProvider):
self . _cool_down_timer_handle = None
self . _on_availability_change ( )
class GeminiTranslate ( TranslateProvider ) :
def __init__ ( self , query_interval , app_id, prompt , temperature = 0.9 , model_name = " gemini-1.0-pro " , is_use_proxy = False , proxy = Non e) :
def __init__ ( self , query_interval , proxy, api_key , model_code , prompt , temperatur e) :
super ( ) . __init__ ( query_interval )
self . _app_id = app_id
self . _prompt = prompt
self . _temperature = temperature
self . _model_name = model_name
self . _is_use_proxy = is_use_proxy
self . _proxy = proxy
async def _do_translate ( self , text ) - > Optional [ str ] :
api_endpoint = f ' https://generativelanguage.googleapis.com/v1beta/models/ { self . _model_name } :generateContent '
api_key = self . _app_id
final_prompt = self . _prompt . replace ( ' {{ original_text}} ' , text )
payload = {
self . _proxy = proxy or None
self . _api_key = api_key
self . _url = f ' https://generativelanguage.googleapis.com/v1beta/ { model_code } :generateContent '
self . _prompt = prompt . replace ( ' \n ' , ' ' ) . replace ( ' \\ n ' , ' \n ' )
self . _body = {
' contents ' : [
{
' role ' : ' user ' ,
' parts ' : [ { ' text ' : final_prompt } ]
} ,
# 'role': 'user',
' parts ' : [ { ' text ' : ' ' } ]
}
] ,
' safetySettings ' : [
{
@ -722,64 +715,63 @@ class GeminiTranslate(TranslateProvider):
}
] ,
' generationConfig ' : {
' temperature ' : str ( self . _ temperature) ,
' topP ' : ' 1 ' ,
' topK ' : ' 32 ' ,
' candidateCount ' : ' 1 ' ,
' maxOutputTokens ' : ' 8192 '
' temperature ' : temperature,
' topP ' : 1 ,
' topK ' : 32 ,
' candidateCount ' : 1 ,
' maxOutputTokens ' : 8192
}
}
headers = {
' Content-Type ' : ' application/json ' ,
}
if self . _is_use_proxy :
proxy = self . _proxy
else :
proxy = None
max_retries = 3
retry_delay = 1
attempts = 0 # 初始化尝试计数
while attempts < max_retries :
try :
# 设置超时时间为10秒
timeout = aiohttp . ClientTimeout ( total = 10 )
async with utils . request . http_session . post (
f ' { api_endpoint } ?key= { api_key } ' ,
headers = headers ,
json = payload ,
proxy = proxy ,
timeout = timeout
) as r :
if r . status == 200 :
data = await r . json ( )
try :
translated_text = data [ ' candidates ' ] [ 0 ] [ ' content ' ] [ ' parts ' ] [ 0 ] [ ' text ' ]
return translated_text # 成功获取翻译文本,返回结果
except ( KeyError , IndexError ) :
logger . warning ( ' Failed to process the response correctly: %s ' , data )
# 如果响应结构有误,则退出循环
return None
else :
logger . warning ( ' Request failed with status %d : %s ' , r . status , r . reason )
except asyncio . TimeoutError as toe :
logger . warning ( ' Request timeout occurred: %s ' , str ( toe ) )
except Exception as e :
log_str = f ' { str ( e ) } \n data: \n { data } '
logger . error ( ' An error occurred during the request: %s ' , log_str )
try :
logger . error ( ' FinishReason: ' + { data [ ' candidates ' ] [ 0 ] [ ' finishReason ' ] } )
except :
pass
# 未成功获取翻译文本,增加尝试次数并等待重新尝试
attempts + = 1
if attempts < max_retries :
logger . info ( ' Retrying request ( %d / %d )... ' , attempts , max_retries )
await asyncio . sleep ( retry_delay )
# 如果所有重试尝试后还未成功,则记录日志并返回 None
logger . error ( ' All retry attempts failed. ' )
return None
self . _cool_down_timer_handle = None
@property
def is_available ( self ) :
return self . _cool_down_timer_handle is None and super ( ) . is_available
async def _do_translate ( self , text ) - > Optional [ str ] :
input_text = self . _prompt . replace ( ' {{ original_text}} ' , text )
self . _body [ ' contents ' ] [ 0 ] [ ' parts ' ] [ 0 ] [ ' text ' ] = input_text
try :
async with utils . request . http_session . post (
self . _url ,
params = { ' key ' : self . _api_key } ,
json = self . _body ,
proxy = self . _proxy ,
) as r :
if r . status != 200 :
logger . warning ( ' GeminiTranslate request failed: status= %d %s ' , r . status , r . reason )
self . _on_fail ( r . status )
return None
data = await r . json ( )
except ( aiohttp . ClientConnectionError , asyncio . TimeoutError ) as e :
logger . warning ( ' GeminiTranslate request failed, %s : %s ' , type ( e ) . __name__ , e )
return None
try :
candidates = data [ ' candidates ' ]
if not candidates :
block_reason = data [ ' promptFeedback ' ] . get ( ' blockReason ' , ' ' )
logger . warning ( ' GeminiTranslate no candidate, block_reason= %s , text= %s ' , block_reason , text )
return None
first_content_parts = candidates [ 0 ] [ ' content ' ] [ ' parts ' ]
return ' ' . join ( part . get ( ' text ' , ' ' ) for part in first_content_parts )
except ( KeyError , IndexError ) :
logger . warning ( ' GeminiTranslate failed to parse response: %s ' , data )
return None
def _on_fail ( self , code ) :
if self . _cool_down_timer_handle is not None :
return
if code in ( 401 , 403 ) :
# API密钥无效, 没有权限, 或者不在有效地区。需要手动处理, 等5分钟
self . _cool_down_timer_handle = asyncio . get_running_loop ( ) . call_later (
5 * 60 , self . _on_cool_down_timeout
)
self . _on_availability_change ( )
def _on_cool_down_timeout ( self ) :
self . _cool_down_timer_handle = None
self . _on_availability_change ( )