1 Star 2 Fork 0

CPNPlatform/CN

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
e.py 8.54 KB
一键复制 编辑 原始数据 按行查看 历史
"""CNE模块 pub端 server"""
import logging
import zmq
import argparse
import socket
import httplib2
import subprocess
from pb import comnet_node_pb2
import json
import time
import threading
parser = argparse.ArgumentParser(description=__doc__) #
parser.add_argument("mc_addr", default="192.168.1.110:9090", type=str, help="主节点的IP") # todo
parser.add_argument("--port", default="5500", type=str, help="输入本地端口")
parser.add_argument("--sleep", default=1, type=int, help="发送的延时s")
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s -[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.DEBUG)
def getIPlist():
IPlist = ""
hObj = httplib2.Http()
err = True
while (True):
if err == False:
break
err = False
try:
response, content = hObj.request("http://" + args.mc_addr + "/ip_list") #
user_dic = json.loads(content)
for valueIP in user_dic.values():
IPlist = IPlist + valueIP + ','
IPlist = IPlist.rstrip(',')
except:
logging.error("get ip list error,try again")
time.sleep(5)
err = True
ALLIPlist = IPlist.split(',')
return ALLIPlist
# 获取本机IP
def get_host_ip():
hObj = httplib2.Http()
err = True
while (True):
if err == False:
break
err = False
try:
response, content = hObj.request("http://" + args.mc_addr + "/my_ip") #
ip = str(content, 'utf-8')
logging.info("get ip ok")
except:
time.sleep(10)
logging.error("get ip error")
err = True
return ip
def get_node_inf():
global cn
global get_nodeiserror
logging.info("开启线程2 node")
while True:
try:
command = "cat /proc/cpuinfo| grep \"processor\"| wc -l"
back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
strtemp = back[0].decode() # 解码后str,默认输出的是字节 back[1]是报错信息
CPU_numcores_ = list(filter(str.isdigit, strtemp))
CPU_numcores = float(CPU_numcores_[0])
# print(CPU_numcores)
# 查看CPU频率 多核应该多个输出
command = "cat /proc/cpuinfo |grep MHz|uniq"
back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
strtemp = back[0].decode()
CPU_Fre_ = strtemp.replace('\n', ' ').split(" ")
CPU_Fre = float(CPU_Fre_[2])
# print(CPU_Fre)
# 查看CPU利用率 剩余
command = "sar -u 1 2"
back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
str_temp = back[0].decode()
t = str_temp.split(" ")
CPU_lyl = float((t[len(t) - 1])) * 0.01
# print(CPU_lyl)
# 磁盘容量(GB),利用率 剩余
command = "df --output=size,pcent"
back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
str_temp = back[0].decode()
t = str_temp.replace('%\n', ' ').split(" ")
xx = list(filter(str.isdigit, t))
floa_lis = [float(x) for x in xx]
total = 0.0
use = 0.0
for i in range(int(len(floa_lis) / 2)):
total = total + floa_lis[2 * i]
use = use + floa_lis[2 * i] * floa_lis[2 * i + 1] * 0.01
St = total / 1048576
St_lyl = 1 - (use / total)
# print(St,St_lyl)
""""""
""""""
cn.node.restComputing[0] = CPU_numcores #
cn.node.restComputing[1] = CPU_Fre
cn.node.restComputing[2] = CPU_lyl
cn.node.restStorage[0] = St
cn.node.restStorage[1] = St_lyl
except:
get_nodeiserror = True
def get_net_inf():
logging.info("开启线程1 net")
#global get_netiserror
global cn
global serverIP
global flag_cn_change
# 初始化proto link
hObj = httplib2.Http()
iplsit = getIPlist()
old_num = len(iplsit)
for i in range(len(iplsit) - 1): # 初始化link 有num个节点,每个节点有num-1个link
link = comnet_node_pb2.ComNetNode.Link() # 实例化一个node
link.nodeFrom = ""
link.nodeTo = ""
link.rate = 1
link.esDelay = 1
cn.link.append(link)
# 执行
while True:
while True:
time.sleep(10)
try:
response, content = hObj.request("http://" + serverIP + ":5000/metrics") #
user_dic = json.loads(content)
info_list = user_dic["info"]
empt = False
for ii in range(len(info_list)):
temp_info = info_list[ii] # 判断是否有空数据
if temp_info["nodeTo"] == '':
logging.error("empty nodeTO,try again after 10s...")
empt = True
if not empt:
break
except:
logging.error("http 5000 network error,check network exporter,try again after 10s... ")
# get_netiserror = True
# 判断是否动态加入退出
iplsit = getIPlist()
if len(iplsit) != old_num:
logging.warning("SU:number of node changed old:" +str(old_num)+"--->new:" +str(len(iplsit)))
flag_cn_change = True # 暂停发送数据
# time.sleep(15) todo 去掉延时,因为收到的数据中加入了空数据检测
det = len(iplsit) - old_num # 差值
if det > 0: # 加入设备
for i in range(abs(det)):
link = comnet_node_pb2.ComNetNode.Link()
link.nodeFrom = ""
link.nodeTo = ""
link.rate = 1
link.esDelay = 1
cn.link.append(link)
elif det < 0: # 减少设备
for i in range(abs(det)):
del cn.link[-1]
old_num = len(iplsit)
# 赋值
j = 0
for i in range(len(info_list)):
temp_info = info_list[i] # 字典
if temp_info["nodeTo"] in iplsit:
cn.link[j].nodeFrom = cn.node.ip
cn.link[j].nodeTo = temp_info["nodeTo"]
cn.link[j].rate = temp_info["rate"]
cn.link[j].esDelay = temp_info["esDelay"]
logging.info('nodeTo:{}'.format(cn.link[j].nodeTo))
logging.info('rate:{}'.format(cn.link[j].rate))
logging.info('esDelay:{}'.format(cn.link[j].esDelay))
j = j + 1
logging.info("-----------------------")
logging.info("-----------NEXT ROUND-----------")
flag_cn_change = False
get_nodeiserror = False
# get_netiserror = False
# 初始化
cn = comnet_node_pb2.ComNetNode()
cn.node.ip = get_host_ip()
serverIP = cn.node.ip # 本节点的IP
cn.node.restComputing.append(0.1)
cn.node.restComputing.append(0.1)
cn.node.restComputing.append(0.1)
cn.node.restStorage.append(0.1)
cn.node.restStorage.append(0.1)
# 绑定端口n
context = zmq.Context()
context.setsockopt(zmq.SNDHWM, 1) # 设置为1
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:" + args.port)
"""拉取本节点5000端口网络信息 HTTP 线程1"""
t1 = threading.Thread(target=get_net_inf, args=()) #
t1.start() # 开启新线程
"""拉取本节点算力信息 HTTP 线程2"""
t2 = threading.Thread(target=get_node_inf, args=()) #
t2.start() # 开启新线程
flag_cn_change = True
flag = 1
"""发送数据"""
while True: #
time.sleep(args.sleep)
# if get_netiserror:
# logging.error("http 5000 network error,check network exporter ")
# break
if get_nodeiserror:
logging.error("node error,check node exporter ")
break
if not flag_cn_change:
cn.id = str(int(round(time.time() * 1000)))
socket.send(cn.SerializeToString())
else:
logging.warning("node changged stop send,wait 5s ")
time.sleep(5)
if flag == 1:
logging.info("CNE server running... ")
flag = 0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/cpn-platform/cn.git
git@gitee.com:cpn-platform/cn.git
cpn-platform
cn
CN
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385