代码拉取完成,页面将自动刷新
from copy import deepcopy
from QKDNetwork import QKDNetwork
from lp import LP
from dual_ps_lp import getDualPSTimeSlot
from load_balance import getLoadBalanceRouteList
import random
# Relay | Provision
# SPS(1) | Greedy(3)
# Optimal(2) | Optimal(4)
# 2 + 4 = RPSP
# 2 + 3 = RPD + GD
# 1 + 4 = SP + PSP
# QKD请求的路由方案数据结构 [{"sd": [1, 2], "path": [1, 3, 2]}...]
class Compare:
def __init__(self, net: QKDNetwork):
# 对比算法
self.net = net
def getData(self):
return [self.sp_gd(), self.sp_psp()] + self.rpd_gd() + self.rpsp()
# return [self.sp_gd(), self.sp_psp()]
def get_sp(self) -> list: # ✅
# 获取最短路方案
# return [{"sd": [1, 2, 4], "path": [1, 3, 2]}...]
candidate_paths = []
for sd in self.net.sd_list:
# 计算候选路径
candidate_paths.append({
"sd": sd,
"path": self.net.getShortestPathWithTransmitter(sd[0], sd[1])
})
self.candidate_paths = candidate_paths
return candidate_paths
def get_cplex_p(self) -> list: # ✅
# 获取 Cplex 求出来的路径
# return [{"sd": [1, 2, 4], "path": [1, 3, 2]}...]
cplex_lp = LP(self.net)
ret = []
for index in range(len(self.net.sd_list)):
ret.append({
"sd": self.net.sd_list[index],
"path": self.net.candidate_paths[index]["paths"][cplex_lp.selected_sole_paths[index][0]]
})
return ret
def get_load_balance_p(self): # ✅
return getLoadBalanceRouteList(self.net)
def greedy_ps_pro(self, routeList) -> int: # ✅
# 贪心供应
# 输入:QKDNetwork、路由方案
# 输出:Time Slot
# 估计最小的完成时间隙数量,每个请求都按照最短路
self.net.clearLoad()
def addFlowOnPath(route): # 施加流
path = route["path"]
for i in range(len(path)-1):
link_start = path[i]
link_end = path[i + 1]
self.net.G[link_start][link_end]["load"] += route["sd"][2]
for tmpRoute in routeList:
addFlowOnPath(tmpRoute)
max_time_slot = 0
while(True):
flag = True
for i in self.net.G.nodes: #初始化
node = self.net.G.nodes[i]
if(node["transmitter"] == 1):
# 选择一个需要密钥最多的链路进行服务
not_zero_link_node_ids = []
for neighbor in self.net.G.neighbors(i):
if(self.net.G[i][neighbor]["load"] > 0):
not_zero_link_node_ids.append(neighbor)
flag = False
if(len(not_zero_link_node_ids) != 0):
random_neighbor = random.choice(not_zero_link_node_ids)
self.net.G[random_neighbor][i]["load"] -= node["transmitter_rate"]
if(flag):
break
else:
max_time_slot += 1
return max_time_slot
def cplex_ps_pro(self, routeList) -> int: # ✅
# 使用 Cplex 对密钥供给方案进行求解、传入路由方案,传出密钥供给的时隙数量
t = getDualPSTimeSlot(routeList, self.net)
return t
def sp_gd(self): # ✨
# 最短路 + 贪心供应
ret = self.greedy_ps_pro(self.get_sp())
return ret
def sp_psp(self): # ✨
# 最短路中继链路 + 优化供应
ret = self.cplex_ps_pro(self.get_sp())
return ret
def rpd_gd(self): # ✨
# 优化中继链路 + 贪心供应
ret_load_balance = self.greedy_ps_pro(self.get_load_balance_p())
return [ret_load_balance]
def rpsp(self): # ✨
# 优化中继链路 + 优化供应
ret_load_balance = self.cplex_ps_pro(self.get_load_balance_p())
return [ret_load_balance]
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。