2 Star 2 Fork 1

wzw/Training-Platform

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
share_resource.py 9.57 KB
一键复制 编辑 原始数据 按行查看 历史
wzw 提交于 2023-05-08 07:27 . modify return message of 501
import json
import time
import docker
import os
import utils.tool.request as req
import asyncio
from threading import Lock
from utils.tool.message import send_message
mission_table_lock = Lock()
# | Mission Table:
# |--------------------------------GPU0-----------------------------------------|
# | PC(dict) gpu_id total_mem used free percentage |
# | pc_id uuid container(list) useful |
# +--------------------------------PC-------------------------------------------+
# | id ip port docker_port |
# |--------------------------------container------------------------------------|
# | Cid short_id status task_type task_id username |
# | abnormal |
# |-----------------------------------------------------------------------------|
class MissionManager:
def __init__(self):
with open("./cfg.json") as mq_cfg:
json_data = json.load(mq_cfg)
self.server_node = json_data['gpu_query']
self.gpu_table = []
mission_table_lock.acquire()
self.__init_mission_manager_table()
mission_table_lock.release()
def __init_mission_manager_table(self):
"""
init manager table by gpu_request
:return:
"""
para = "/gpuStatus?task_id=-1&task_type=20&server_id="
tasks = []
for ind, val in enumerate(self.server_node):
ul = "http://" + val["ip"] + ":" + val["port"] + para + str(val["id"])
task = asyncio.ensure_future(req.get_gpu_request(ul.format(ind)))
tasks.append(task)
result = asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks))
for rt in result:
try:
rq = json.loads(rt.decode())
pc = self.__choose_node(rq['msg']['server_id'])
for gpu in rq['data']['gpu_list']:
self.__add_new_gpu(pc, gpu)
# ack: {'msg': {'task_id': 2, 'task_type': 20, 'server_id': 1}, 'data': {'task_id': 2,
# 'task_type': 'query_gpu', 'gpu_list': [{'gpu_id': 0, 'total_mem': 24576.0, 'used': 5027.0,
# 'free': 19239.0, 'percentage': 0.20454915364583334, 'pc_id': 1,
# 'uuid': 'GPU-3345b559-f032-55ac-bece-afcee85b19f3'}]}}
except Exception as e:
print("-------- load request error: ", e)
print("-------- init query finish, total available gpu: ", len(self.gpu_table))
def __choose_node(self, pc_id):
for node in self.server_node:
if node['id'] == pc_id:
return node
def __set_useless(self):
for i in range(len(self.gpu_table)):
self.gpu_table[i]['useful'] = False
def __gpu_search(self, uuid):
"""
search index of the corresponding gpu
:param uuid: uuid of the gpu
:return:
"""
for i in range(len(self.gpu_table)):
if self.gpu_table[i]['uuid'] == uuid:
return i
return None
def __add_new_gpu(self, server_node, gpu_msg):
"""
add new gpu to the table
:param server_node:
:param gpu_msg:
:return:
"""
gpu_msg['PC'] = server_node
gpu_msg['container'] = []
gpu_msg['useful'] = True
self.gpu_table.append(gpu_msg)
def __flush_existed_gpu(self, gpu_ind, gpu_msg):
self.gpu_table[gpu_ind]['used'] = gpu_msg['used']
self.gpu_table[gpu_ind]['free'] = gpu_msg['free']
self.gpu_table[gpu_ind]['percentage'] = gpu_msg['percentage']
self.gpu_table[gpu_ind]['useful'] = True
def resource_flush(self, resource):
"""
refresh the status of gpus
:param resource:
:return:
"""
mission_table_lock.acquire()
self.__set_useless()
num = 0
for node in resource:
pc = self.__choose_node(node['msg']['server_id'])
for gpu in node['data']['gpu_list']:
ind = self.__gpu_search(gpu['uuid'])
if ind is None:
self.__add_new_gpu(pc, gpu)
else:
self.__flush_existed_gpu(ind, gpu)
num = num + 1
print("-------- resource refresh, total gpu: ", num)
mission_table_lock.release()
def mission_flush(self, task_type, task_id, uuid, Cid, status, short_id, username=None):
"""
refresh the missions, add new container or delete finished container
:param task_type:
:param task_id:
:param uuid:
:param Cid:
:param status:
:param short_id:
:return:
"""
if len(Cid) > 0 and Cid[-1:] == '\n':
Cid = Cid[:-1]
# print(Cid)
if uuid != "":
mission_table_lock.acquire()
ind = self.__gpu_search(uuid)
Cnd = None
for i in range(len(self.gpu_table[ind]['container'])):
if self.gpu_table[ind]['container'][i]['Cid'] == Cid:
Cnd = i
break
if Cnd is None:
new_container = {}
new_container['status'] = status
new_container['Cid'] = Cid
new_container['short_id'] = short_id
new_container['task_type'] = task_type
new_container['task_id'] = task_id
new_container['abnormal'] = 0
new_container['username'] = username
self.gpu_table[ind]['container'].append(new_container)
else:
self.gpu_table[ind]['container'][Cnd]['status'] = self.gpu_table[ind]['container'][Cnd][
'status'] + status
if self.gpu_table[ind]['container'][Cnd]['status'] == 0:
del self.gpu_table[ind]['container'][Cnd]
# print("=============this is test method to show gpu table", json.dumps(self.gpu_table))
mission_table_lock.release()
def schedule_gpu(self):
"""
get the best choice of the gpu now
:return:
"""
mission_table_lock.acquire()
self.gpu_table.sort(
key=lambda x: ((x['free'] / (len(x['container']) + 1)) if 'container' in x else x['free']) if x[
'useful'] else 0, reverse=True)
result = self.gpu_table[0]
mission_table_lock.release()
return result
def update_container_status_abnormal(self, container_dict):
mission_table_lock.acquire()
abnormal_container_list = []
number = 0
for gpu_index in range(0, len(self.gpu_table)):
IP = self.gpu_table[gpu_index]["PC"]["ip"]
for container_index in range(len(self.gpu_table[gpu_index]["container"]) - 1, -1, -1):
if not ((IP + "_" + self.gpu_table[gpu_index]["container"][container_index]["Cid"]) in container_dict):
self.gpu_table[gpu_index]["container"][container_index]["abnormal"] = \
self.gpu_table[gpu_index]["container"][container_index]["abnormal"] + 1
if self.gpu_table[gpu_index]["container"][container_index]["abnormal"] == 2:
abnormal_container_list.append(self.gpu_table[gpu_index]["container"][container_index])
del self.gpu_table[gpu_index]["container"][container_index]
else:
self.gpu_table[gpu_index]["container"][container_index]["abnormal"] = 0
number = number + 1
print("in abnormal, number of total container: ", number - len(abnormal_container_list))
mission_table_lock.release()
return abnormal_container_list
file_name = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time()))
platform_log_dir = "/home/training_center/platform_log/schedule"
logger_file = os.path.join(platform_log_dir, file_name + ".log")
logger_err = os.path.join(platform_log_dir, file_name + "_error.log")
missionManager = MissionManager()
def regularly_update_container_status(MQ_send, MQ_connection, sleep_time=180):
with open("./cfg.json") as mq_cfg:
json_data = json.load(mq_cfg)
server_node = json_data['gpu_query']
while True:
time.sleep(sleep_time)
container_map = {}
for node in server_node:
try:
docker_client = docker.DockerClient(base_url=("tcp://" + node["ip"] + ":" + node["docker_port"]))
container_list = docker_client.containers.list(all=True)
for container in container_list:
container_map[(node["ip"] + "_" + container.id)] = 1
except Exception as e:
print("DOCKER_ERROR: cannot access docker remote, url: tcp://", node["ip"], " ***", e)
abnormal_container_list = missionManager.update_container_status_abnormal(container_map)
print("in abnormal, number of abnormal_container: ", len(abnormal_container_list))
for abnormal_container in abnormal_container_list:
print(abnormal_container)
msg = {
"task_id": abnormal_container["task_id"],
"task_type": 501,
"msg": "unknown error, container disappeared",
"username": abnormal_container["username"],
# "abnormal_container": abnormal_container
}
send_message(MQ_send, MQ_connection, json.dumps(msg))
# print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())), "test_heart")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/WZhewei/training-platform-scheduling.git
git@gitee.com:WZhewei/training-platform-scheduling.git
WZhewei
training-platform-scheduling
Training-Platform
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385