2 Star 0 Fork 0

沃沃/机器人调度

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Dijkstra.py 6.36 KB
一键复制 编辑 原始数据 按行查看 历史
沃沃 提交于 2023-04-04 16:08 . first commit
import os
import sys
import math
import heapq
import datetime
from tools import save
import numpy as np
from map2array import map2array, map2xy, mapFileOpen
from env import Env
# import cv2
# from skimage import io
class Dijkstra:
def __init__(self, s_start, heuristic_type, envr):
self.s_start = s_start
self.heuristic_type = heuristic_type
# self.Env = envr # class Env
self.u_set = envr.motions # feasible input set
self.obs = envr.obs # position of obstacles
self.OPEN = [] # priority queue / OPEN set
self.CLOSED = [] # CLOSED set / VISITED order
self.PARENT = dict() # recorded parent
self.g = dict() # cost to come
def cost(self, s_start, s_goal):
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def get_neighbor(self, s):
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def searching(self):
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
heapq.heappush(self.OPEN,
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]:
self.g[s_n] = new_cost
self.PARENT[s_n] = s
heapq.heappush(self.OPEN, (new_cost, s_n))
return self.PARENT, self.g
# def extract_path(self, PARENT, s_goal):
# path = [s_goal]
# s = s_goal
# if s == self.s_start:
# return list(path)
# ang_x_tmp = PARENT[s][0] - s[0]
# ang_y_tmp = PARENT[s][1] - s[1]
# while True:
# ang_x = PARENT[s][0] - s[0]
# ang_y = PARENT[s][1] - s[1]
# s = PARENT[s]
# if ang_x != ang_x_tmp or ang_y != ang_y_tmp:
# path.append(s)
# ang_x_tmp = ang_x
# ang_y_tmp = ang_y
# if s == self.s_start:
# break
# if path[-1] != self.s_start:
# path.append(self.s_start)
# return list(path)
def extract_path(self, PARENT, s_goal):
path = [s_goal]
s = s_goal
if s == self.s_start:
return list(path)
ang_x_tmp = PARENT[s][0] - s[0]
ang_y_tmp = PARENT[s][1] - s[1]
while True:
ang_x = PARENT[s][0] - s[0]
ang_y = PARENT[s][1] - s[1]
if ang_x != ang_x_tmp or ang_y != ang_y_tmp:
path.insert(0,s)
# path.insert(0,PARENT[s])
s = PARENT[s]
ang_x_tmp = ang_x
ang_y_tmp = ang_y
if s == self.s_start:
break
if path[-1] != self.s_start:
path.insert(0,self.s_start)
return list(path)
def pathfinder(map_ = [],map_num = 4):
if len(map_) == 0:
map_ = mapFileOpen("maps/" + str(map_num) + ".txt")
map_, pos = map2array(map_)
end = []
end_key = {}
for index_type_end, type_end in enumerate(pos):
for index_id_end, id_end in enumerate(type_end):
end.append(id_end)
end_key[id_end] = (index_type_end, index_id_end)
envr = {}
mode = ['big', 'small']
envr[mode[0]] = Env(map_,1)
envr[mode[1]] = Env(map_,0)
# start_t = datetime.datetime.now()
road_list = {}
for m in mode:
for index_type_start, type_start in enumerate(pos):
for index_id_start, id_start in enumerate(type_start):
if (index_type_start in [0,8,9] and m == 'big') or\
(index_type_start in [1,2,3] and m == 'small'):
continue
dijkstra = Dijkstra(pos[index_type_start][index_id_start], 'None', envr[m])
parent, dist_list = dijkstra.searching()
for s_goal in end:
if s_goal in parent:
path = dijkstra.extract_path(parent, s_goal)
dist = dist_list[s_goal]
name = str(index_type_start) + '_' + str(index_id_start) + 'to' + str(end_key[s_goal][0]) + '_' + str(end_key[s_goal][1])
if name not in road_list:
road_list[name] = {}
road_list[name]['points_list_' + m] = map2xy(path)
road_list[name]['dist_'+ m] = dist/2
# end_t = datetime.datetime.now()
# elapsed_sec = (end_t - start_t).total_seconds()
# sys.stderr.write(str(elapsed_sec)+'\n')
# print("共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")
# print(len(road_list))
return road_list
# plot.animation(path, visited, "Dijkstra's") # animation generate
# def plotting(map_,path):
# map_file = "map" #地图
# informap = map_.copy()
# informap = np.expand_dims(informap, axis=2)
# informap = np.concatenate((informap, informap, informap), axis=-1)
# img_file = "maps"
# if not os.path.exists(img_file):
# os.mkdir(img_file)
# cv2.imwrite(img_file + "/" + map_file + ".png", informap)
# informap = cv2.imread(img_file + "/" + map_file + ".png")#用于画路径
# for i in path:#画出规划路径、
# cv2.circle(informap,(math.ceil(i[1]), math.ceil(i[0])),0,(0,255,0),-1)
# io.imshow(informap)
# cv2.imwrite("informap.png",informap)
if __name__ == '__main__':
map_ = mapFileOpen("maps/2.txt")
pathfinder(map_,1)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pengyi_lin07/robot-scheduling.git
git@gitee.com:pengyi_lin07/robot-scheduling.git
pengyi_lin07
robot-scheduling
机器人调度
master

搜索帮助