提交 0e712744 authored 作者: 张孟夏's avatar 张孟夏

修改1.1不兼容的用例,调整grpc客户端代码

上级 d48f651e
...@@ -13,7 +13,7 @@ ${path} /api/system/config/client ...@@ -13,7 +13,7 @@ ${path} /api/system/config/client
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 200 Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功 Should Be Equal As Strings ${content["message"]} 成功
Should Be Equal As Strings ${content["data"]} {\"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false} Should Contain ${content["data"]} \"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false
反例-不传入token 反例-不传入token
[Tags] E [Tags] E
...@@ -22,7 +22,7 @@ ${path} /api/system/config/client ...@@ -22,7 +22,7 @@ ${path} /api/system/config/client
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 200 Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功 Should Be Equal As Strings ${content["message"]} 成功
Should Be Equal As Strings ${content["data"]} {\"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false} Should Contain ${content["data"]} \"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false
反例-传入错误的token 反例-传入错误的token
[Tags] E [Tags] E
...@@ -31,4 +31,4 @@ ${path} /api/system/config/client ...@@ -31,4 +31,4 @@ ${path} /api/system/config/client
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 200 Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功 Should Be Equal As Strings ${content["message"]} 成功
Should Be Equal As Strings ${content["data"]} {\"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false} Should Contain ${content["data"]} \"grpc_host\":\"192.168.110.211:9090\",\"grpc_use_ssl\":false
...@@ -5,20 +5,26 @@ Library get_expId.py ...@@ -5,20 +5,26 @@ Library get_expId.py
*** Variables *** *** Variables ***
${path} /api/chat/read ${path} /api/chat/read
${clientId} a68ad587830d41aebf418a919006353e
*** Test Cases *** *** Test Cases ***
正例-正常读取消息 正例-正常读取消息
[Tags] F [Tags] F
Sleep 10
${resp} Get ExpId http://bitagent.sit.ninetechone.com/${path} ${sessionId} ${ex_streamId} ${token} ${resp} Get ExpId http://bitagent.sit.ninetechone.com/${path} ${sessionId} ${ex_streamId} ${token}
log ${resp} log ${resp}
Should Contain ${resp}[0] EXP_ Should Contain ${resp["sessionId"]} P_EXP
Set Global Variable ${expId} ${resp}[0] Set Global Variable ${expId} ${resp["sessionId"]}
${content} GET请求结果 /api/chat/retry {"sessionId":"${expId}","workerId":"${clientId}"} 200 ${token}
Set Global Variable ${streamId} ${content["data"]}
${resp1} Get ExpId http://bitagent.sit.ninetechone.com/${path} ${expId} ${streamId} ${token}
log ${resp1}
正例-正常读取对话消息 正例-正常读取对话消息
[Tags] F [Tags] F
${resp} Get Message http://bitagent.sit.ninetechone.com/${path} ${sessionId} ${text_streamId} ${token} ${resp} Get Message http://bitagent.sit.ninetechone.com/${path} ${sessionId} ${text_streamId} ${token}
log ${resp} log ${resp}
Should Contain ${resp}[0] 让我想想 Should Contain ${resp}[0] 你好
Should Contain ${resp}[-1] 关闭 Should Contain ${resp}[-1] 关闭
反例-传入错误的token 反例-传入错误的token
......
...@@ -9,7 +9,7 @@ ${path} /api/chat/interrupt ...@@ -9,7 +9,7 @@ ${path} /api/chat/interrupt
正例-正常中断消息 正例-正常中断消息
[Tags] F [Tags] F
create session URI ${URL} create session URI ${URL}
${content} GET请求结果 ${path} {"sessionId":"${sessionId}"} 200 ${token} ${content} GET请求结果 ${path} {"sessionId":"${sessionId}","streamId":"${ex_streamId}"} 200 ${token}
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 200 Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功 Should Be Equal As Strings ${content["message"]} 成功
...@@ -17,7 +17,7 @@ ${path} /api/chat/interrupt ...@@ -17,7 +17,7 @@ ${path} /api/chat/interrupt
反例-传入错误的token 反例-传入错误的token
[Tags] E [Tags] E
create session URI ${URL} create session URI ${URL}
${content} GET请求结果 ${path} {"sessionId":"${sessionId}"} 401 testtoken ${content} GET请求结果 ${path} {"sessionId":"${sessionId}","streamId":"${ex_streamId}"} 401 testtoken
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 401 Should Be Equal As Strings ${content["code"]} 401
Should Be Equal As Strings ${content["message"]} 无效的access token Should Be Equal As Strings ${content["message"]} 无效的access token
...@@ -25,7 +25,7 @@ ${path} /api/chat/interrupt ...@@ -25,7 +25,7 @@ ${path} /api/chat/interrupt
反例-传入空白token 反例-传入空白token
[Tags] E [Tags] E
create session URI ${URL} create session URI ${URL}
${content} GET请求结果 ${path} {"sessionId":"${sessionId}"} 401 ${EMPTY} ${content} GET请求结果 ${path} {"sessionId":"${sessionId}","streamId":"${ex_streamId}"} 401 ${EMPTY}
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 401 Should Be Equal As Strings ${content["code"]} 401
Should Be Equal As Strings ${content["message"]} 没有找到认证信息 Should Be Equal As Strings ${content["message"]} 没有找到认证信息
...@@ -33,7 +33,15 @@ ${path} /api/chat/interrupt ...@@ -33,7 +33,15 @@ ${path} /api/chat/interrupt
反例-不传sessionId参数 反例-不传sessionId参数
[Tags] E [Tags] E
create session URI ${URL} create session URI ${URL}
${content} GET请求结果 ${path} {} 200 ${token} ${content} GET请求结果 ${path} {"streamId":"${ex_streamId}"} 200 ${token}
log ${content}
Should Be Equal As Strings ${content["code"]} 500
Should Be Equal As Strings ${content["message"]} Server busy, please try later
反例-不传streamId参数
[Tags] E
create session URI ${URL}
${content} GET请求结果 ${path} {"sessionId":"${sessionId}"} 200 ${token}
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 500 Should Be Equal As Strings ${content["code"]} 500
Should Be Equal As Strings ${content["message"]} Server busy, please try later Should Be Equal As Strings ${content["message"]} Server busy, please try later
...@@ -40,8 +40,9 @@ ${path} /api/ability/steps ...@@ -40,8 +40,9 @@ ${path} /api/ability/steps
反例-会话过期 反例-会话过期
[Tags] F [Tags] F
create session URI ${URL} Comment 当前版本会话保持持久化,不会过期,此用例废弃
${content} GET请求结果 ${path} {"exploreSessionId":1,"chatbotSessionId":10} 200 ${token} Comment create session URI ${URL}
log ${content} Comment ${content} GET请求结果 ${path} {"exploreSessionId":1,"chatbotSessionId":10} 200 ${token}
Should Be Equal As Strings ${content["code"]} BAE0001 Comment log ${content}
Should Be Equal As Strings ${content["message"]} 会话已过期,请开启新对话 Comment Should Be Equal As Strings ${content["code"]} BAE0001
Comment Should Be Equal As Strings ${content["message"]} 会话已过期,请开启新对话
...@@ -12,7 +12,7 @@ ${path} /api/ability ...@@ -12,7 +12,7 @@ ${path} /api/ability
${random_string} Generate Random String 12 [LOWER] ${random_string} Generate Random String 12 [LOWER]
log ${random_string} log ${random_string}
create session URI ${URL} create session URI ${URL}
${content} POST请求结果 ${path} {"exploreSessionId":"${expId}","filteredStepIndices":[0],"draft":{"name":"${random_string}","description":"自动化测试能力","startPageUrl":"https://www.baidu.com","parameters":[{"name":"search_query","description":"用户希望在百度上搜索的关键词。","example": "天气预报","type": "string"}],"stepIndices": [0]}} 200 ${token} ${content} POST请求结果 ${path} {"exploreSessionId":"${expId}","filteredStepIndices":[0],"draft":{"name":"${random_string}","description":"自动化测试","parameters":[],"startPageUrl":"https://www.baidu.com","stepIndices":[0]}} 200 ${token}
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 200 Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功 Should Be Equal As Strings ${content["message"]} 成功
...@@ -45,7 +45,7 @@ ${path} /api/ability ...@@ -45,7 +45,7 @@ ${path} /api/ability
反例-能力名已存在 反例-能力名已存在
[Tags] F [Tags] F
create session URI ${URL} create session URI ${URL}
${content} POST请求结果 ${path} {"exploreSessionId":"${expId}","filteredStepIndices":[0],"draft":{"name":"outhagent","description":"该能力用于访问百度主页,以查看其内容和功能。用户可以通过此能力快速打开百度主页,进行信息搜索和浏览。","startPageUrl":"https://www.baidu.com","parameters":[{"name":"search_query","description":"用户希望在百度上搜索的关键词。","example": "天气预报","type": "string"}],"stepIndices": [0]}} 200 ${token} ${content} POST请求结果 ${path} {"exploreSessionId":"${expId}","filteredStepIndices":[0],"draft":{"name":"outhagent","description":"自动化测试","parameters":[],"startPageUrl":"https://www.baidu.com","stepIndices":[0]}} 200 ${token}
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} BAE0008 Should Be Equal As Strings ${content["code"]} BAE0008
Should Be Equal As Strings ${content["message"]} 此能力名已存在 Should Be Equal As Strings ${content["message"]} 此能力名已存在
...@@ -62,6 +62,6 @@ def get_message(url,sessionId,streamId,token,timeout=180): ...@@ -62,6 +62,6 @@ def get_message(url,sessionId,streamId,token,timeout=180):
return oplist return oplist
# print(get_expId('http://bitagent.sit.ninetechone.com/api/chat/read','CB_2a7714d1e41e4d0891fc5c13a39a4063','CB_a556efbc8502461ba473d789da439c55','Bearer c1391926-e8ba-46c0-a0d1-c5e3c38503da')) # print(get_expId('http://bitagent.sit.ninetechone.com/api/chat/read','P_CB_f21b81ef55ab48f8b87530500ced227d','P_CB_976bebae52514b39a140e38f88cd2dee','Bearer d25c4717-087b-4472-aaaa-2a1939f22680'))
# print(get_expId('http://bitagent.sit.ninetechone.com/api/chat/read','CB_e48bea09fe1e4c8388883ff11384bef8','CB_0e7c9c99d6494d9f977b0c2fbb73e2e3','Bearer c1391926-e8ba-46c0-a0d1-c5e3c38503da')) # print(get_expId('http://bitagent.sit.ninetechone.com/api/chat/read','CB_e48bea09fe1e4c8388883ff11384bef8','CB_0e7c9c99d6494d9f977b0c2fbb73e2e3','Bearer c1391926-e8ba-46c0-a0d1-c5e3c38503da'))
# print(get_message('http://bitagent.sit.ninetechone.com/api/chat/read','CB_e268fb27e7d146009d18e0912c23be58','CB_bc6ed32cc3e0476d90c7b243c75cf2a9','Bearer c1391926-e8ba-46c0-a0d1-c5e3c38503da')) # print(get_message('http://bitagent.sit.ninetechone.com/api/chat/read','P_CB_f21b81ef55ab48f8b87530500ced227d','P_CB_976bebae52514b39a140e38f88cd2dee','Bearer d25c4717-087b-4472-aaaa-2a1939f22680'))
import json
import logging
from typing import Optional
from pydantic import BaseModel
from models import ResultModel
logger = logging.getLogger(__name__)
class RequestBodyModel(BaseModel):
path: str
data: str
headers: str
def parse_request_body(body: str) -> Optional[RequestBodyModel]:
if not body:
return None
parts = body.split("\n", maxsplit=2)
if len(parts) == 3:
return RequestBodyModel(
path=parts[0].strip("/"), headers=parts[1], data=parts[2]
)
else:
logger.error(f"指令格式错误:{body}")
return None
def format_response_body(consume_result: ResultModel):
responseBody = f"{consume_result.code}\n{consume_result.message}"
if consume_result.code == 200 and consume_result.data is not None:
try:
response_headers = json.dumps(
{},
default=lambda o: o.__dict__,
ensure_ascii=False,
)
response_data = json.dumps(
consume_result.data,
default=lambda o: o.__dict__,
ensure_ascii=False,
)
responseBody = f"{consume_result.code}\n{response_headers}\n{response_data}"
except Exception as e:
logger.exception(f"json encode error:{consume_result.data} {e}")
return responseBody
...@@ -5,8 +5,6 @@ from typing import Callable, List, Optional ...@@ -5,8 +5,6 @@ from typing import Callable, List, Optional
import grpc import grpc
import grpc.aio import grpc.aio
# from native.config import settings
# from native.utils.utils import singleton
from protos.BitAgentWorker_pb2_grpc import ConnectServiceStub from protos.BitAgentWorker_pb2_grpc import ConnectServiceStub
...@@ -15,13 +13,13 @@ logger = logging.getLogger(__name__) ...@@ -15,13 +13,13 @@ logger = logging.getLogger(__name__)
class GrpcClient: class GrpcClient:
GRPC_CHANNEL_OPTIONS = [ GRPC_CHANNEL_OPTIONS = [
("grpc.max_send_message_length", 1000), ("grpc.max_send_message_length", 10*1024*1024),
("grpc.max_receive_message_length", 1000), ("grpc.max_receive_message_length", 10*1024*1024),
# ("grpc.keep_alive_time", settings.GRPC_HEART_INTERVAL_SECONDS), # ("grpc.keep_alive_time", settings.GRPC_HEART_INTERVAL_SECONDS),
# ("grpc.keep_alive_timeout", settings.GRPC_HEART_INTERVAL_SECONDS), # ("grpc.keep_alive_timeout", settings.GRPC_HEART_INTERVAL_SECONDS),
# ("grpc.enable_retries", 1), # ("grpc.enable_retries", 1),
] ]
connect_timeout_seconds: int = 30 connect_timeout_seconds: int = 60
def __init__(self): def __init__(self):
self._address: str = "" self._address: str = ""
...@@ -62,7 +60,7 @@ class GrpcClient: ...@@ -62,7 +60,7 @@ class GrpcClient:
if address is None or address == "": if address is None or address == "":
raise ValueError("grpc address must be set") raise ValueError("grpc address must be set")
logger.debug( logger.info(
f"connect grpc to {address} use_ssl:{use_ssl} header:{str(self._header)}" f"connect grpc to {address} use_ssl:{use_ssl} header:{str(self._header)}"
) )
self._is_disconnected = False self._is_disconnected = False
...@@ -131,7 +129,7 @@ class GrpcClient: ...@@ -131,7 +129,7 @@ class GrpcClient:
# 创建通道 # 创建通道
if not self._channel: if not self._channel:
logger.debug("create channel") logger.info("create channel")
if self._use_ssl: if self._use_ssl:
credits: grpc.ChannelCredentials = ( credits: grpc.ChannelCredentials = (
grpc.ssl_channel_credentials() grpc.ssl_channel_credentials()
...@@ -145,7 +143,7 @@ class GrpcClient: ...@@ -145,7 +143,7 @@ class GrpcClient:
) )
# 等待连接 # 等待连接
logger.debug(f"wait channel ready to {self._address}") logger.info(f"wait channel ready to {self._address}")
await asyncio.wait_for( await asyncio.wait_for(
self._channel.channel_ready(), timeout=self.connect_timeout_seconds self._channel.channel_ready(), timeout=self.connect_timeout_seconds
) )
......
import asyncio import asyncio
import json
import logging import logging
from collections import deque from collections import deque
from typing import Callable, Coroutine, Optional from typing import Callable, Coroutine, Optional
import grpc.aio import grpc.aio
from result import ResultModel
from models import ResultModel
from body import format_response_body, parse_request_body
from grpc_client import GrpcClient from grpc_client import GrpcClient
from protos.BitAgentWorker_pb2 import ReversalRequest, ReversalResponse from protos.BitAgentWorker_pb2 import ReversalRequest, ReversalResponse
...@@ -30,6 +30,9 @@ class GrpcListener: ...@@ -30,6 +30,9 @@ class GrpcListener:
Optional[Callable[[str, str], Coroutine[None, None, ResultModel]]] Optional[Callable[[str, str], Coroutine[None, None, ResultModel]]]
) = None ) = None
client.state_changed_handlers.append(self._on_channel_state_changed) client.state_changed_handlers.append(self._on_channel_state_changed)
# 新增:发送队列和发送任务
self._send_queue = asyncio.Queue()
self._send_task: Optional[asyncio.Task] = None
def _on_channel_state_changed(self, connect: bool) -> None: def _on_channel_state_changed(self, connect: bool) -> None:
"""处理通道状态变更""" """处理通道状态变更"""
...@@ -59,6 +62,9 @@ class GrpcListener: ...@@ -59,6 +62,9 @@ class GrpcListener:
self.is_running = True self.is_running = True
self._listen_task = asyncio.create_task(self._listen_loop()) self._listen_task = asyncio.create_task(self._listen_loop())
# 启动发送任务
if not self._send_task or self._send_task.done():
self._send_task = asyncio.create_task(self._send_loop())
async def _listen_loop(self) -> None: async def _listen_loop(self) -> None:
read_pass_count = 0 read_pass_count = 0
...@@ -109,7 +115,7 @@ class GrpcListener: ...@@ -109,7 +115,7 @@ class GrpcListener:
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
except Exception as e: except Exception as e:
logger.debug(f"ListenHandle exception:{type(e)}") logger.info(f"ListenHandle exception:{type(e)}")
if not self.is_running: if not self.is_running:
break break
if isinstance(e, grpc.aio.AioRpcError): if isinstance(e, grpc.aio.AioRpcError):
...@@ -142,50 +148,30 @@ class GrpcListener: ...@@ -142,50 +148,30 @@ class GrpcListener:
return return
self._request_id_set.add(data.id) self._request_id_set.add(data.id)
self._request_id_deque.append(data.id) self._request_id_deque.append(data.id)
if len(self._request_id_set) > 100000: if len(self._request_id_set) > 3000:
self._request_id_set.remove( self._request_id_set.remove(
self._request_id_deque.popleft() self._request_id_deque.popleft()
) # 最早的指令移除 ) # 最早的指令移除
# 3. 消费指令 # 3. 消费指令
consume_result: ResultModel = ResultModel(code=500, message="client error") consume_result: ResultModel = ResultModel.error("client error")
try: try:
if self.message_received: request_data = parse_request_body(data.body)
parts = data.body.split("\n", maxsplit=2) if request_data is None:
if len(parts) == 3: consume_result = ResultModel.error("指令格式错误")
request_path = parts[0].strip("/") elif self.message_received:
request_data = parts[2]
consume_result = await self.message_received( consume_result = await self.message_received(
request_path, request_data request_data.path, request_data.data
) )
else:
consume_result = ResultModel(code=500, message="指令格式错误")
logger.error(f"指令格式错误:{data.body}")
except Exception as e: except Exception as e:
consume_result = ResultModel(code=500, message=str(e)) consume_result = ResultModel.error(str(e))
logger.exception(f"消费指令异常:{data} {e}") logger.exception(f"消费指令异常:{data} {e}")
# 3. 回复执行结果 # 3. 回复执行结果
responseBody = f"{consume_result.code}\n{consume_result.message}" log = consume_result.log_str()
if consume_result.code == 200 and consume_result.data is not None: await self.send_to_all_stream(
try: data.id, format_response_body(consume_result), retry_count=3, log=log
response_headers = json.dumps(
{},
default=lambda o: o.__dict__,
ensure_ascii=False,
)
response_data = json.dumps(
consume_result.data,
default=lambda o: o.__dict__,
ensure_ascii=False,
) )
responseBody = (
f"{consume_result.code}\n{response_headers}\n{response_data}"
)
except Exception as e:
logger.exception(f"json encode error:{consume_result.data} {e}")
await self.send_to_all_stream(data.id, responseBody, retry_count=3)
except Exception as e: except Exception as e:
logger.exception(f"on_message_received error:{e}") logger.exception(f"on_message_received error:{e}")
...@@ -203,6 +189,11 @@ class GrpcListener: ...@@ -203,6 +189,11 @@ class GrpcListener:
self._listen_task.cancel() self._listen_task.cancel()
self._listen_task = None self._listen_task = None
# 停止发送任务
if self._send_task:
self._send_task.cancel()
self._send_task = None
await self._close_call_stream() await self._close_call_stream()
async def _close_call_stream(self) -> None: async def _close_call_stream(self) -> None:
...@@ -219,18 +210,58 @@ class GrpcListener: ...@@ -219,18 +210,58 @@ class GrpcListener:
logger.exception("Stream dispose error") logger.exception("Stream dispose error")
self._call_stream = None self._call_stream = None
async def _send_loop(self):
while self.is_running:
try:
send_args = await self._send_queue.get()
*real_args, fut = send_args
try:
result = await self._do_send_to_all_stream(*real_args)
except Exception as e:
logger.exception(f"Send loop error: {e}")
result = False
finally:
if fut:
fut.set_result(result)
except asyncio.CancelledError:
break
async def send_to_all_stream( async def send_to_all_stream(
self, requestId: str, body: str, type: int = 1, retry_count: int = 0 self,
requestId: str,
body: str,
type: int = 1,
retry_count: int = 0,
log: Optional[str] = None,
) -> bool: ) -> bool:
"""发送数据到所有流""" """入队,串行发送数据"""
if type == -1: if type == -1:
if not self._send_queue.empty():
logger.debug("ignore:ping")
return True
if self._ping_count == 6: if self._ping_count == 6:
self._ping_count = 0 self._ping_count = 0
self._ping_count += 1 self._ping_count += 1
if len(body) < 2000: log = "#ping"
log = f"{requestId} content:{body}"
else: else:
log = f"{requestId} len:{len(body)} content:{body[:2000]}..." if not log:
log = f'"{requestId}":\n{body}'
else:
log = f'"{requestId}":\n{log}'
fut = asyncio.get_event_loop().create_future()
await self._send_queue.put((requestId, body, type, retry_count, log, fut))
return await fut
async def _do_send_to_all_stream(
self,
requestId: str,
body: str,
type: int = 1,
retry_count: int = 0,
log: Optional[str] = None,
) -> bool:
try:
count = retry_count count = retry_count
while count >= 0: while count >= 0:
try: try:
...@@ -239,10 +270,10 @@ class GrpcListener: ...@@ -239,10 +270,10 @@ class GrpcListener:
if count < retry_count: if count < retry_count:
logger.info("send retry ") logger.info("send retry ")
await self._client.wait_reconnecting( await self._client.wait_reconnecting(
30 60
) )
else: else:
if not self._client.check_can_send(log): if not self._client.check_can_send(log or ""):
await self._close_call_stream() await self._close_call_stream()
continue continue
...@@ -252,17 +283,20 @@ class GrpcListener: ...@@ -252,17 +283,20 @@ class GrpcListener:
if not stream: if not stream:
logger.error(f"call_stream is null:{log}") logger.error(f"call_stream is null:{log}")
continue continue
for i in range(0, len(body), 1000): for i in range(0, len(body), 1024*1024*10):
if partitionIndex > 0: if partitionIndex > 0:
logger.info(f"partition:{partitionIndex}") logger.info(f"partition:{partitionIndex}")
data = ReversalResponse( data = ReversalResponse(
requestId=requestId, requestId=requestId,
type=type, type=type,
partitionIndex=partitionIndex, partitionIndex=partitionIndex,
partitionData=body[i : i + 1000], partitionData=body[
i : i + 1024*1024*10
],
) )
await stream.write(data) await stream.write(data)
partitionIndex += 1 partitionIndex += 1
# 发送结束标志 # 发送结束标志
if type == 1: if type == 1:
await stream.write( await stream.write(
...@@ -276,11 +310,18 @@ class GrpcListener: ...@@ -276,11 +310,18 @@ class GrpcListener:
logger.error(f"send RpcException:{log},{str(e)},{e.details()}") logger.error(f"send RpcException:{log},{str(e)},{e.details()}")
await self._client._handle_rpc_exception(e) await self._client._handle_rpc_exception(e)
elif isinstance(e, asyncio.InvalidStateError): elif isinstance(e, asyncio.InvalidStateError):
logger.error(f"send StateError:{log},{str(e)},{self._client.state}") logger.error(
f"send StateError:{log},{str(e)},{self._client.state}"
)
await self._close_call_stream() await self._close_call_stream()
await self._client.reconnect(force=True) await self._client.reconnect(force=True)
else: else:
logger.error(f"send Exception:{log},{str(e)},{self._client.state}") logger.error(
f"send Exception:{log},{str(e)},{self._client.state}"
)
finally: finally:
count -= 1 count -= 1
return False return False
except Exception as e:
logger.exception(f"_do_send_to_all_stream error: {e}")
return False
差异被折叠。
import json
from copy import deepcopy
from enum import Flag, auto
from typing import Any
from pydantic import BaseModel
class ActivityType(Flag):
BROWSER = auto() # 1
APP = auto() # 2
def truncate_base64_fields(data: Any, truncate_length: int = 10) -> Any:
if isinstance(data, dict):
new_data = {}
for key, value in data.items():
# 如果是base64图片字段,截断
if key in [
"screenshots",
"screenshotsBase64",
"screenshots_base64",
] and isinstance(value, list):
truncated_list = [
(item[:truncate_length] + "...")
if isinstance(item, str) and len(item) > truncate_length
else item
for item in value
]
new_data[key] = truncated_list
else:
new_data[key] = truncate_base64_fields(value, truncate_length)
return new_data
elif isinstance(data, list):
return [truncate_base64_fields(item, truncate_length) for item in data]
else:
return data
class ResultModel(BaseModel):
code: int
data: Any = None
message: str = ""
@staticmethod
def success(data: Any = None, message: str = "") -> "ResultModel":
return ResultModel(code=200, data=data, message=message)
@staticmethod
def error(
message: str,
code: int = 500,
data: Any = None,
) -> "ResultModel":
return ResultModel(code=code, message=message, data=data)
def log_str(self) -> str:
try:
raw_dict = self.dict()
safe_dict = deepcopy(raw_dict)
log = truncate_base64_fields(safe_dict)
return json.dumps(
log,
default=lambda o: o.__dict__,
ensure_ascii=False,
)
except Exception:
return json.dumps(
self,
default=lambda o: o.__dict__,
ensure_ascii=False,
)
...@@ -3,8 +3,7 @@ ...@@ -3,8 +3,7 @@
import grpc import grpc
import grpc_manager.protos.BitAgentWorker_pb2 as BitAgentWorker__pb2
import protos.BitAgentWorker_pb2 as BitAgentWorker__pb2
GRPC_GENERATED_VERSION = "1.69.0" GRPC_GENERATED_VERSION = "1.69.0"
GRPC_VERSION = grpc.__version__ GRPC_VERSION = grpc.__version__
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论