提交 70b15acf authored 作者: 张孟夏's avatar 张孟夏

修改grpc

上级 d7b33a57
...@@ -386,3 +386,12 @@ ${xlsx_file} testfile.xlsx ...@@ -386,3 +386,12 @@ ${xlsx_file} testfile.xlsx
log ${content} log ${content}
Should Be Equal As Strings ${content["code"]} 500 Should Be Equal As Strings ${content["code"]} 500
Should Be Equal As Strings ${content["message"]} Server busy, please try later Should Be Equal As Strings ${content["message"]} Server busy, please try later
正例-正常发送文字消息,进入探索
[Tags] F
create session URI ${URL}
${content} POST请求结果_PARAMS ${path} {"sessionId":"${sessionId}","text":"使用探索工具,打开百度"} 200 ${token}
log ${content}
Should Be Equal As Strings ${content["code"]} 200
Should Be Equal As Strings ${content["message"]} 成功
Set Global Variable ${ex_streamId} ${content["data"]}
import asyncio import asyncio
import logging import logging
import requests import requests
import json import json
from grpc_client import GrpcClient from grpc_client import GrpcClient
...@@ -10,79 +9,122 @@ from result import ResultModel ...@@ -10,79 +9,122 @@ from result import ResultModel
# 配置日志 # 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 定义消息处理函数
async def handle_message(request_path: str, request_data: str) -> ResultModel: async def handle_message(request_path: str, request_data: str,request_id:str) -> ResultModel:
print(f"Received request on path: {request_path}, data: {request_data}") """增强版消息处理函数"""
return ResultModel(code=200, data={"message": "Request processed successfully"}) logging.info(f"收到请求路径: {request_path}, 数据: {request_data}")
# 特定消息处理逻辑
if request_path == "list_mcp_servers": # 特定指令1
return ResultModel(code=200, data=[])
elif request_path == "adaptable_activities": # 特定指令2
return ResultModel(code=200, data=[ {"id": "sleep_seconds", "name": "sleep_seconds", "displayName": "等待", "description": "此工具的用途是在目标无法达成时进行等待,最多等待2次", "parametersSchema": {"description": "等待", "properties": {"wait_seconds": {"default": 5, "description": "默认5秒,可修改为1-5秒时间", "title": "Wait Seconds", "type": "integer"}}, "title": "SleepSeconds", "type": "object", "required": []}, "elementIdJsonPaths": [], "icon": "sleep_seconds", "timeout": 180}])
# 默认响应
return ResultModel(code=200, data={"message": "请求处理成功"})
async def send_ping(listener: GrpcListener): async def send_ping(listener: GrpcListener):
"""定时发送 ping 消息""" """心跳维护任务"""
while True: while True:
try: try:
await listener.send_to_all_stream("ping", "#ping", type=-1, retry_count=3) await listener.send_to_all_stream("ping", "#ping", type=-1, retry_count=3)
logging.info("心跳包已发送")
except Exception as e:
logging.error(f"发送心跳异常: {e}")
await asyncio.sleep(5) # 每5秒发送一次
async def respond_to_server(listener: GrpcListener):
"""服务端消息响应器"""
while True:
try:
# 实际业务中可添加消息检测逻辑
# 示例:当收到特定通知时发送响应
# if 收到特殊通知:
# response = {"status": "OK", "timestamp": int(time.time())}
# await listener.send_to_all_stream("response", json.dumps(response))
await asyncio.sleep(1) # 防止CPU过载
except Exception as e: except Exception as e:
logging.error(f"send ping error: {e}") logging.error(f"响应器异常: {e}")
await asyncio.sleep(5) # 每 5 秒发送一次 ping await asyncio.sleep(5) # 异常时延长等待
async def main(): async def main():
headers = { # 认证配置
"Content-Type" : "application/x-www-form-urlencoded", auth_config = {
"Sm4-Key" : "dUVNkTJLlzMHPqBTZh085jT0N6S7930l6sgddJsYSC8=", "headers": {
"Cookie" : "ENCRYPT_KEY=f48bf3-05fe-4978" "Content-Type": "application/x-www-form-urlencoded",
} "Sm4-Key": "dUVNkTJLlzMHPqBTZh085jT0N6S7930l6sgddJsYSC8=",
params = {"username":"api_tester", "Cookie": "ENCRYPT_KEY=f48bf3-05fe-4978"
"password":"$SM4$Nd/e40vIS6UD9VMaSOmYGlrBORfo7r5/1z9D5d4E4es=$4MS$", },
"tenantCode":"API_test", "params": {
"scope":"all", "username": "api_tester",
"client_id":"a68ad587830d41aebf418a919006353e", "password": "$SM4$Nd/e40vIS6UD9VMaSOmYGlrBORfo7r5/1z9D5d4E4es=$4MS$",
"client_secret":"Ninetech@123", "tenantCode": "API_test",
"grant_type":"password" "scope": "all",
} "client_id": "a68ad587830d41aebf418a919006353e",
res = requests.post(url="http://accounts-4j.sit.ninetechone.com/oauth/token",headers=headers,params=params) "client_secret": "Ninetech@123",
res1 = requests.get("http://bitagent.sit.ninetechone.com/api/system/config/client") "grant_type": "password"
grpc = json.loads(res1.json()["data"]) }
# gRPC 配置
config = {
"token": "Bearer " + res.json()["access_token"],
"client_id": "a68ad587830d41aebf418a919006353e",
"host": grpc["grpc_host"],
"is_ssl": grpc["grpc_use_ssl"]
} }
print(config)
# 创建 GrpcClient 实例 try:
client = GrpcClient() # 获取访问令牌
# 设置头部信息 token_res = requests.post(
client.set_header(config["token"], config["client_id"]) url="http://accounts-4j.sit.ninetechone.com/oauth/token",
headers=auth_config["headers"],
params=auth_config["params"]
)
token_res.raise_for_status()
# 连接到 gRPC 服务器 # 获取gRPC配置
connected = await client.connect(config["host"], config["is_ssl"]) config_res = requests.get("http://bitagent.sit.ninetechone.com/api/system/config/client")
if not connected: config_res.raise_for_status()
logging.error("Failed to connect to the gRPC server.")
return
# 创建 GrpcListener 实例 grpc_config = {
listener = GrpcListener(client) "token": "Bearer " + token_res.json()["access_token"],
# 设置消息处理函数 "client_id": auth_config["params"]["client_id"],
listener.message_received = handle_message "host": json.loads(config_res.json()["data"])["grpc_host"],
# 启动监听 "is_ssl": json.loads(config_res.json()["data"])["grpc_use_ssl"]
listener.start() }
logging.info(f"gRPC配置加载完成: {grpc_config}")
# 启动发送 ping 任务 # 初始化gRPC客户端
ping_task = asyncio.create_task(send_ping(listener)) client = GrpcClient()
client.set_header(grpc_config["token"], grpc_config["client_id"])
try: # 建立连接
# 保持程序运行 if not await client.connect(grpc_config["host"], grpc_config["is_ssl"]):
while True: logging.error("gRPC服务器连接失败")
await asyncio.sleep(1) return
except KeyboardInterrupt:
logging.info("Shutting down...") # 启动消息监听器
# 停止监听 listener = GrpcListener(client)
await listener.stop() listener.message_received = handle_message
# 断开连接 listener.start()
await client.disconnect()
# 取消 ping 任务 # 启动后台任务
ping_task.cancel() tasks = [
asyncio.create_task(send_ping(listener)),
asyncio.create_task(respond_to_server(listener))
]
# 主循环
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logging.info("正在关闭服务...")
await listener.stop()
await client.disconnect()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logging.error(f"服务启动失败: {e}")
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论