提交 30971e8a authored 作者: 张孟夏's avatar 张孟夏

grpc客户端相关代码

上级 7ad86ba0
import asyncio
import logging
from typing import Callable, List, Optional
import grpc
import grpc.aio
# from native.config import settings
# from native.utils.utils import singleton
from protos.BitAgentWorker_pb2_grpc import ConnectServiceStub
logger = logging.getLogger(__name__)
class GrpcClient:
GRPC_CHANNEL_OPTIONS = [
("grpc.max_send_message_length", 1000),
("grpc.max_receive_message_length", 1000),
# ("grpc.keep_alive_time", settings.GRPC_HEART_INTERVAL_SECONDS),
# ("grpc.keep_alive_timeout", settings.GRPC_HEART_INTERVAL_SECONDS),
# ("grpc.enable_retries", 1),
]
connect_timeout_seconds: int = 30
def __init__(self):
self._address: str = ""
self._use_ssl: bool = False
self._header: List[tuple] = []
self._is_disconnected: bool = False
self._is_reconnecting: bool = False
self._reconnect_lock = asyncio.Lock()
self._reconnect_event = asyncio.Event()
self._channel: Optional[grpc.aio.Channel] = None
self._stub: Optional[ConnectServiceStub] = None
self.state_changed_handlers: List[Callable[[bool], None]] = []
@property
def state(self) -> grpc.ChannelConnectivity:
"""获取通道状态"""
if not self._channel:
return grpc.ChannelConnectivity.SHUTDOWN
return self._channel.get_state(try_to_connect=False)
def is_channel_healthy(self) -> bool:
"""检查通道状态是否健康"""
state = self.state
return state in (
grpc.ChannelConnectivity.CONNECTING,
grpc.ChannelConnectivity.READY,
)
def set_header(self, token: str, client_id: str) -> None:
"""刷新token"""
self._header = [("token", token), ("x-worker-id", client_id)]
async def connect(self, address: str, use_ssl: bool) -> bool:
"""连接服务器"""
if self._header.count == 0:
raise ValueError("token and client_id must be set")
if address is None or address == "":
raise ValueError("grpc address must be set")
logger.debug(
f"connect grpc to {address} use_ssl:{use_ssl} header:{str(self._header)}"
)
self._is_disconnected = False
await self.shutdown()
self._address = address
self._use_ssl = use_ssl
return await self.reconnect()
async def disconnect(self) -> None:
"""断开连接"""
if self._is_disconnected:
return
self._is_disconnected = True
await self.shutdown()
def create_all_stream(self) -> grpc.aio._call.StreamStreamCall:
"""创建双向流"""
stub = ConnectServiceStub(self._channel)
return stub.ReversalCall(metadata=self._header)
async def _handle_rpc_exception(self, ex: grpc.aio.AioRpcError) -> bool:
"""处理RPC异常"""
handled = False
if ex.code() in (
grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.UNKNOWN,
):
handled = await self.reconnect(force=True)
return handled
async def wait_reconnecting(self, timeout_seconds: int) -> bool:
"""等待重连"""
if self._is_reconnecting:
return await asyncio.wait_for(
self._reconnect_event.wait(), timeout=timeout_seconds
)
return True
def check_can_send(self, log: str) -> bool:
"""检查是否可以发送数据"""
if self._is_reconnecting:
logger.error(f"send error:{log} error,正在重连 {self.state}")
return False
if not self.is_channel_healthy():
logger.error(f"send error:{log} error,连接失败 {self.state}")
return False
return True
async def reconnect(self, force: bool = False) -> bool:
"""重新连接"""
async with self._reconnect_lock:
try:
self._is_reconnecting = True
self._reconnect_event.clear()
if self._is_disconnected:
return False
if force:
await self.shutdown()
elif self.is_channel_healthy():
return True
# 创建通道
if not self._channel:
logger.debug("create channel")
if self._use_ssl:
credits: grpc.ChannelCredentials = (
grpc.ssl_channel_credentials()
)
self._channel = grpc.aio.secure_channel(
self._address, credits, options=self.GRPC_CHANNEL_OPTIONS
)
else:
self._channel = grpc.aio.insecure_channel(
self._address, options=self.GRPC_CHANNEL_OPTIONS
)
# 等待连接
logger.debug(f"wait channel ready to {self._address}")
await asyncio.wait_for(
self._channel.channel_ready(), timeout=self.connect_timeout_seconds
)
logger.info("channel connected")
# self.create_all_stream() # 主要用于验证token
# logger.info("create stream success")
self.invoke_state_changed(True)
return True
except Exception as ex:
if isinstance(ex, grpc.aio.AioRpcError):
logger.error(f"channel Connect error:{ex.code()} {ex.details()}")
else:
logger.error(f"channel Connect error: {type(ex)}")
logger.exception(ex)
await self.shutdown()
await asyncio.sleep(3) # 避免死循环重连
finally:
self._is_reconnecting = False
self._reconnect_event.set()
logger.info("channel error")
return False
async def shutdown(self) -> None:
"""关闭连接"""
if self._channel:
try:
logger.info("channel Dispose")
await asyncio.wait_for(
self._channel.close(), timeout=self.connect_timeout_seconds
)
logger.info("channel shutdown 1")
except Exception as ex:
logger.info(f"channel Shutdown error: {ex}")
self._channel = None
self.invoke_state_changed(False)
# async def report_task_status(
# self, data: ReversalRequest, logger_msg: str = ""
# ) -> bool:
# """上报任务状态"""
# log = logger_msg if logger_msg else data.jsonData
# try:
# if self._is_reconnecting:
# logger.error(f"send error:{log}, 正在重连")
# return False
# if not self.is_channel_healthy() and not await self.reconnect():
# logger.error(f"send error:{log}, 连接失败")
# return False
# stub = ConnectServiceStub(self._channel)
# response = await stub.reportTaskStatus(data, metadata=self._header)
# logger.info(f"{response.messageType} {response.jsonData} {log}")
# return True
# except Exception as e:
# if isinstance(e, grpc.aio.AioRpcError):
# logger.error(
# f"send error RpcException:{log},{e.details()},{self.state}"
# )
# _ = self._handle_rpc_exception(e)
# else:
# logger.error(f"send error:{log},{e},{self.state}")
# return False
# async def report_BitAgentWorker_log(
# self, data: ReversalRequest, can_retry: bool = True
# ) -> bool:
# """上报机器人日志"""
# try:
# if self._is_reconnecting:
# logger.error("ReportRobotLog error,正在重连 " + str(self.state))
# return False
# if not self.is_channel_healthy() and not await self.reconnect():
# logger.error("ReportRobotLog error,连接失败 " + str(self.state))
# return False
# stub = ConnectServiceStub(self._channel)
# async with stub.reportRobotLog(metadata=self._header) as stream:
# await stream.write(data)
# await stream.done_writing()
# response = await stream.read()
# if not response.jsonData or "Fail" in response.jsonData:
# return False
# return True
# except Exception as e:
# if isinstance(e, grpc.aio.AioRpcError):
# if can_retry and e.code() == grpc.StatusCode.UNKNOWN:
# return await self.report_BitAgentWorker_log(data, False)
# logger.error(f"reportRobotLog error: {e.details()}")
# _ = self._handle_rpc_exception(e)
# else:
# logger.error(f"reportRobotLog error: {e}")
# return False
def invoke_state_changed(self, state: bool) -> None:
"""状态改变"""
for handler in self.state_changed_handlers:
handler(state)
差异被折叠。
import asyncio
import logging
import requests
import json
from grpc_client import GrpcClient
from grpc_listener import GrpcListener
from result import ResultModel
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 定义消息处理函数
async def handle_message(request_path: str, request_data: str) -> ResultModel:
print(f"Received request on path: {request_path}, data: {request_data}")
return ResultModel(code=200, data={"message": "Request processed successfully"})
async def send_ping(listener: GrpcListener):
"""定时发送 ping 消息"""
while True:
try:
await listener.send_to_all_stream("ping", "#ping", type=-1, retry_count=3)
except Exception as e:
logging.error(f"send ping error: {e}")
await asyncio.sleep(5) # 每 5 秒发送一次 ping
async def main():
headers = {
"Content-Type" : "application/x-www-form-urlencoded",
"Sm4-Key" : "dUVNkTJLlzMHPqBTZh085jT0N6S7930l6sgddJsYSC8=",
"Cookie" : "ENCRYPT_KEY=f48bf3-05fe-4978"
}
params = {"username":"api_tester",
"password":"$SM4$Nd/e40vIS6UD9VMaSOmYGlrBORfo7r5/1z9D5d4E4es=$4MS$",
"tenantCode":"API_test",
"scope":"all",
"client_id":"a68ad587830d41aebf418a919006353e",
"client_secret":"Ninetech@123",
"grant_type":"password"
}
res = requests.post(url="http://accounts-4j.sit.ninetechone.com/oauth/token",headers=headers,params=params)
res1 = requests.get("http://bitagent.sit.ninetechone.com/api/system/config/client")
grpc = json.loads(res1.json()["data"])
# gRPC 配置
config = {
"token": "Bearer " + res.json()["access_token"],
"client_id": "a68ad587830d41aebf418a919006353e",
"host": grpc["grpc_host"],
"is_ssl": grpc["grpc_use_ssl"]
}
print(config)
# 创建 GrpcClient 实例
client = GrpcClient()
# 设置头部信息
client.set_header(config["token"], config["client_id"])
# 连接到 gRPC 服务器
connected = await client.connect(config["host"], config["is_ssl"])
if not connected:
logging.error("Failed to connect to the gRPC server.")
return
# 创建 GrpcListener 实例
listener = GrpcListener(client)
# 设置消息处理函数
listener.message_received = handle_message
# 启动监听
listener.start()
# 启动发送 ping 任务
ping_task = asyncio.create_task(send_ping(listener))
try:
# 保持程序运行
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logging.info("Shutting down...")
# 停止监听
await listener.stop()
# 断开连接
await client.disconnect()
# 取消 ping 任务
ping_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.ninetech.cloud.grpc";
option java_outer_classname = "BitAgentWorker";
service ConnectService {
rpc ReversalCall(stream ReversalResponse) returns (stream ReversalRequest){}
}
message ReversalRequest {
string id = 1;
string body = 2;
}
message ReversalResponse {
string requestId = 1;
int32 type = 2; // 0-start, 1-data, 2-end ,-1-ping
int32 partitionIndex = 3;
string partitionData = 4;
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: BitAgentWorker.proto
# Protobuf Python Version: 5.29.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
29,
0,
'',
'BitAgentWorker.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x42itAgentWorker.proto\"+\n\x0fReversalRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\t\"b\n\x10ReversalResponse\x12\x11\n\trequestId\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\x05\x12\x16\n\x0epartitionIndex\x18\x03 \x01(\x05\x12\x15\n\rpartitionData\x18\x04 \x01(\t2K\n\x0e\x43onnectService\x12\x39\n\x0cReversalCall\x12\x11.ReversalResponse\x1a\x10.ReversalRequest\"\x00(\x01\x30\x01\x42+\n\x17\x63om.ninetech.cloud.grpcB\x0e\x42itAgentWorkerP\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'BitAgentWorker_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\n\027com.ninetech.cloud.grpcB\016BitAgentWorkerP\001'
_globals['_REVERSALREQUEST']._serialized_start=24
_globals['_REVERSALREQUEST']._serialized_end=67
_globals['_REVERSALRESPONSE']._serialized_start=69
_globals['_REVERSALRESPONSE']._serialized_end=167
_globals['_CONNECTSERVICE']._serialized_start=169
_globals['_CONNECTSERVICE']._serialized_end=244
# @@protoc_insertion_point(module_scope)
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional
DESCRIPTOR: _descriptor.FileDescriptor
class ReversalRequest(_message.Message):
__slots__ = ("id", "body")
ID_FIELD_NUMBER: _ClassVar[int]
BODY_FIELD_NUMBER: _ClassVar[int]
id: str
body: str
def __init__(self, id: _Optional[str] = ..., body: _Optional[str] = ...) -> None: ...
class ReversalResponse(_message.Message):
__slots__ = ("requestId", "type", "partitionIndex", "partitionData")
REQUESTID_FIELD_NUMBER: _ClassVar[int]
TYPE_FIELD_NUMBER: _ClassVar[int]
PARTITIONINDEX_FIELD_NUMBER: _ClassVar[int]
PARTITIONDATA_FIELD_NUMBER: _ClassVar[int]
requestId: str
type: int
partitionIndex: int
partitionData: str
def __init__(self, requestId: _Optional[str] = ..., type: _Optional[int] = ..., partitionIndex: _Optional[int] = ..., partitionData: _Optional[str] = ...) -> None: ...
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import protos.BitAgentWorker_pb2 as BitAgentWorker__pb2
GRPC_GENERATED_VERSION = "1.69.0"
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(
GRPC_VERSION, GRPC_GENERATED_VERSION
)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f"The grpc package installed is at version {GRPC_VERSION},"
+ " but the generated code in BitAgentWorker_pb2_grpc.py depends on"
+ f" grpcio>={GRPC_GENERATED_VERSION}."
+ f" Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}"
+ f" or downgrade your generated code using grpcio-tools<={GRPC_VERSION}."
)
class ConnectServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.ReversalCall = channel.stream_stream(
"/ConnectService/ReversalCall",
request_serializer=BitAgentWorker__pb2.ReversalResponse.SerializeToString,
response_deserializer=BitAgentWorker__pb2.ReversalRequest.FromString,
_registered_method=True,
)
class ConnectServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def ReversalCall(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_ConnectServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
"ReversalCall": grpc.stream_stream_rpc_method_handler(
servicer.ReversalCall,
request_deserializer=BitAgentWorker__pb2.ReversalResponse.FromString,
response_serializer=BitAgentWorker__pb2.ReversalRequest.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"ConnectService", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers("ConnectService", rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class ConnectService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def ReversalCall(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_stream(
request_iterator,
target,
"/ConnectService/ReversalCall",
BitAgentWorker__pb2.ReversalResponse.SerializeToString,
BitAgentWorker__pb2.ReversalRequest.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class ResultModel:
code: int
data: Any = None
message: str = ""
class ResultEvent:
def __init__(self):
self.event = asyncio.Event()
self.result = None
async def wait(self):
await self.event.wait()
return self.result
def set(self, result):
self.result = result
self.event.set()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论