1 Star 2 Fork 0

CPNPlatform/CN

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
a.py 6.90 KB
一键复制 编辑 原始数据 按行查看 历史
lkx2020git 提交于 2022-01-06 16:58 . 上两版本bug
"""CNA 收 client SUB"""
import logging
import argparse
import zmq
import socket
from pb import comnet_node_pb2, comnet_pb2
import time
import threading
import httplib2
import json
"""命令行参数"""
parser = argparse.ArgumentParser(description=__doc__) #
parser.add_argument("mc_addr", default="192.168.138.200:9090", type=str, help="输入masterIP:port") # TODO
parser.add_argument("ds_addr", default="10.128.210.111:5588", type=str, help="输入目的BEIP:port") # TODO
parser.add_argument("--td_port", default="5588", type=str, help="输入本地TD port,不输入默认不发")
parser.add_argument("--sleep", default=1, type=int, help="发送的延时s")
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s -[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)
def get_host_ip():
ip = ""
hObj = httplib2.Http()
err = True
while (True):
if err == False:
break
err = False
try:
response, content = hObj.request("http://" + args.mc_addr + "/my_ip") #
ip = str(content, 'utf-8')
except:
logging.error(" get my ip error ")
time.sleep(10)
err = True
return ip
def sendtoBE():
contextBE = zmq.Context()
contextBE.setsockopt(zmq.SNDHWM, 5)
sender = contextBE.socket(zmq.PUSH)
sender.connect("tcp://" + args.ds_addr)
global cnet # 全局变量
global isready
while True:
if isready:
cnet.id = str(int(round(time.time() * 1000)))
sender.send(cnet.SerializeToString())
time.sleep(args.sleep)
def getnodesIP():
CNEIP = ""
hObj = httplib2.Http()
err = True
while (True):
if err == False:
break
err = False
try:
response, content = hObj.request("http://" + args.mc_addr + "/ip_list") #
user_dic = json.loads(content)
for valueIP in user_dic.values():
CNEIP = CNEIP + valueIP + ':5500,'
CNEIP = CNEIP.rstrip(',')
except:
logging.error("get ip list error")
time.sleep(10)
err = True
return CNEIP
originIP = get_host_ip() # 自己的IP
flag = 1
""" 接收初始化""" #
IPlist = getnodesIP().split(',')
num = len(IPlist) # 需要接受的服务器数量
old_num = num
context = zmq.Context()
context.setsockopt(zmq.RCVHWM, 1)
socket = context.socket(zmq.SUB)
addr = [0 for n in range(num)]
"""分别绑定地址,建立连接"""
for i in range(num):
addr[i] = "tcp://" + IPlist[i]
socket.connect(addr[i]) # 一个socket连接多个
socket.setsockopt_string(zmq.SUBSCRIBE, "")
"""发送TD初始化"""
contextTD = zmq.Context()
contextTD.setsockopt(zmq.SNDHWM, 1)
socketTD = contextTD.socket(zmq.PUB)
socketTD.bind("tcp://*:" + args.td_port)
"""数据初始化"""
cn = comnet_node_pb2.ComNetNode() # 节点信息
flagBE = 0
while True:
"判断拓扑是否变化"
IPlist = getnodesIP().split(',')
num = len(IPlist) # 需要接受的服务器数量
if old_num != num: # 如果变化 重新建立连接
logging.warning("SU: number of node changed,reconnect after 5s. old:" + str(old_num) + "---> new:" + str(num))
flag_cn_change = True # 暂停发送数据
# 1.重新建立连接
while True:
try:
socket.close()
#time.sleep(5)
socket = context.socket(zmq.SUB)
addr = [0 for n in range(num)]
for i in range(num):
addr[i] = "tcp://" + IPlist[i]
socket.connect(addr[i]) # 一个socket连接多个
socket.setsockopt_string(zmq.SUBSCRIBE, "")
break
except:
time.sleep(5)
logging.error("node changged, reconnect failed,try again after 5s..")
old_num = num
"""分别接收"""
isready = False
cnet = comnet_pb2.ComNet() #
cnet.id = ""
cnet.origin = originIP
already_recIP = []
# for i in range(num): # 收一轮后才处理一次数据,生成一次新网络图
num_ct = 0
while True:
# 接收+反序列化
string = socket.recv() #
cn.ParseFromString(string) #
if len(already_recIP) == num: # 如果收满num个,发送
logging.info("------receive over-----")
break
if num_ct > num:
num_ct = 0
break
if cn.node.ip in already_recIP: # 一旦出现重复IP,结束本次赋值
# logging.warning("LI:if nodes offline actually,wait SU change ip_list;if online,run new node cne,then wait")
num_ct = num_ct+1
continue
already_recIP.append(cn.node.ip)
logging.info("【revceive from IP】: " + cn.node.ip)
# 存储该节点发来的 算力信息
node = comnet_pb2.ComNet.Node()
node.ip = cn.node.ip
node.restComputing.append(cn.node.restComputing[0])
node.restComputing.append(cn.node.restComputing[1])
node.restComputing.append(cn.node.restComputing[2])
node.restStorage.append(cn.node.restStorage[0])
node.restStorage.append(cn.node.restStorage[1])
cnet.nodes.append(node)
logging.info('算力信息:{}'.format(cn.node.restComputing))
logging.info('存储信息:{}'.format(cn.node.restStorage))
# 存储该节点发来的链路信息
for t in range(len(cn.link)): #
link = comnet_pb2.ComNet.Link() # 实例化一
link.nodeFrom = cn.link[t].nodeFrom
link.nodeTo = cn.link[t].nodeTo
link.rate = cn.link[t].rate
link.esDelay = cn.link[t].esDelay
cnet.links.append(link)
logging.info("链路信息:")
logging.info("From:{}".format(link.nodeFrom))
logging.info("TO:{}".format(link.nodeTo))
logging.info("rate:{}".format(link.rate))
logging.info("esDelay:{}".format(link.esDelay))
logging.info("----- next link-----")
isready = True
"""发送给TD(一个发就行) """
if args.td_port != "5588":
logging.info("send to TD")
cnet.id = str(int(round(time.time() * 1000)))
socketTD.send_multipart([bytes('comnet-full', 'utf-8'), cnet.SerializeToString()])
"""发送给BE(全发)只开启一次线程"""
if flag == 1:
flag = 0
if args.ds_addr != "10.128.210.111:5588":
t1 = threading.Thread(target=sendtoBE, args=()) #
t1.start() # 开启新线程
flagBE = 1
if flagBE == 1:
logging.info("send to BE")
logging.info("********************************* next round *********************************")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/cpn-platform/cn.git
git@gitee.com:cpn-platform/cn.git
cpn-platform
cn
CN
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385