1 Star 1 Fork 1

YanWey/Python-RPC

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Client.py 4.08 KB
一键复制 编辑 原始数据 按行查看 历史
YanWey 提交于 2024-07-19 18:25 . 客户端代码
import json
import ipaddress
import argparse
import time
import socket
from socket import *
class RPCClient:
def __init__(self, ip, port):
self.ip = ip
self.port = port
# 发起RPC调用,将函数名和参数发送给服务器,然后接收服务器的响应
def funcCall(self, _funcName, *args):
try:
CSoc = socket(AF_INET, SOCK_STREAM)
CSoc.connect((self.ip, self.port))
CSoc.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 端口重用
CSoc.settimeout(8) # 设置超时时间为8秒
print("成功连接到服务器 {}: {}".format(self.ip, self.port))
except ConnectionError as e:
print("无法连接到服务器 {}: {}: {}".format(self.ip, self.port, str(e))) # 连接异常
CSoc.close()
except timeout as e:
print("与服务端 {}:{} 连接超时: {}".format(self.ip, self.port, str(e))) # 超时
CSoc.close()
# 客户端查看服务端支持调用的服务
if _funcName == 'getFunc':
reqData = json.dumps({'func': _funcName}).encode()
CSoc.sendall(len(reqData).to_bytes(4, byteorder='big') + reqData)
# 接收服务器返回内容
data = CSoc.recv(1024)
# 未接收到数据
if not data:
CSoc.close()
return
# 获取数据长度
mesLen = int.from_bytes(data[:4], byteorder='big')
if len(data) < mesLen + 4:
print("消息头提示的长度大于接收到的数据长度")
CSoc.close()
return
# 通过消息头指示的长度读取消息体
mes = json.loads(data[4:4 + mesLen].decode())
print('支持的函数如下', mes)
# 客户端请求函数调用
else:
# 利用json序列化消息
reqData = json.dumps({'func': _funcName, 'args': args}).encode()
# 发送消息头(数据长度)+数据体
CSoc.sendall(len(reqData).to_bytes(4, byteorder='big') + reqData)
try:
# 接收服务端的返回内容
data = CSoc.recv(1024)
if not data: # 收到空数据或未接收到数据
CSoc.close()
return
mesLen = int.from_bytes(data[:4], byteorder='big') # 获取前4字节(消息头)
if len(data) < mesLen + 4:
print("消息头提示的长度大于接收到的数据长度")
CSoc.close()
return
# 消息反序列化
result = json.loads(data[4:4 + mesLen].decode()) # 获取第4个字节往后的消息体
print(result)
except timeout:
print("处理超时")
CSoc.close()
except OSError as e:
print("接收响应时发生错误: {}".format(str(e)))
CSoc.close()
except Exception as e:
print("其他异常: {}".format(str(e)))
CSoc.close()
# 客户端的启动参数
def clientStart():
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--ip',required=True,help='IP不得为空') # -i
parser.add_argument('-p', '--port',required=True,help='端口号不得为空') # -p
return parser.parse_args()
if __name__ == '__main__':
args = clientStart()
ip = args.ip
port = args.port
rpcClient = RPCClient(ip, int(port))
rpcClient.funcCall('getFunc')
# 支持自定义的客户端调用,通过修改以下代码控制
# 测试用例
rpcClient.funcCall('add', 66,66)
rpcClient.funcCall('min', 31, 12)
rpcClient.funcCall('isEven', 1231)
rpcClient.funcCall('add', 129,121,3124)
rpcClient.funcCall('caedaw')
rpcClient.funcCall('hhhhh', 1)
rpcClient.funcCall('6666', 1, "3")
# 测试用sleep
# sleep(15)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/yanwey/Python-RPC.git
git@gitee.com:yanwey/Python-RPC.git
yanwey
Python-RPC
Python-RPC
master

搜索帮助