代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
"""
@Time : 2023/6/9 14:25
@Auth : daiminggao
@File :ADPC_KNN.py
@IDE :PyCharm
@Motto:咕咕咕
"""
import math
from copy import copy
import numpy as np
class MicroCluster:
def __init__(self, center):
self.data = []
self.data_index = []
self.center = center
self.center_distance = 0
self.border_dict = {}
self.border_density = 0
self.visited = False
self.is_connect = False
self.label = -1
def get_cluster_ADPC_KNN(datas, k):
dists = get_distance_matrix(datas)
deltas, Kth_nb_distance = get_Kth_nearest_neighbor_distance_u(dists, k)
dc = get_dc(deltas, k)
rho = get_density(Kth_nb_distance, dc)
relative_distances, nearest_neiber = get_relative_distances(dists, rho)
micro_clusters_dict = get_micro_clusters(relative_distances, dc, dists, datas, rho)
labs, n = connect_micro_clusters(micro_clusters_dict, rho, len(datas))
return labs, n
def get_distance_matrix(datas):
N, D = np.shape(datas)
dists = np.zeros([N, N])
for i in range(N):
for j in range(N):
vi = datas[i, :]
vj = datas[j, :]
dists[i, j] = np.sqrt(np.dot((vi - vj), (vi - vj)))
return dists
def get_Kth_nearest_neighbor_distance_u(dists, k):
'''
:param dists: 距离矩阵
:param k: 近邻值
:return: 每个点的k个邻居中,离该点最远点距离,组成的距离矩阵deltas及每个点的k近邻矩阵
'''
deltas = []
Kth_nb_distance = []
for dist in dists:
temp = np.sort(dist)
Kth_nb_distance.append(temp[1:k + 1])
deltas.append(temp[k])
return deltas, Kth_nb_distance
def get_dc(deltas, k):
# math.pow/sqrt效率高
uk = np.mean((np.array(deltas)) ** k)
temp = 0
for delta in deltas:
temp += math.pow(math.pow(delta, k) - uk, 2)
dc = uk + math.sqrt(1 / (len(deltas) - 1) * temp)
return dc
# 计算每个点的局部密度
def get_density(Kth_nb_distance, dc):
'''
高斯核
'''
N = np.shape(Kth_nb_distance)[0]
Kth_nb_distance = np.array(Kth_nb_distance)
rho = np.zeros(N)
for i in range(N):
rho[i] = np.sum(np.exp(-1 * ((Kth_nb_distance[i, :] / dc) ** 2)))
return rho
def get_relative_distances(dists, rho):
"""
:param dists: 距离矩阵
:param rho: 密度矩阵
:return: 相对距离矩阵
计算每个数据点的密度距离
即对每个点,找到密度比它大的所有点
再在这些点中找到距离其最近的点的距离
"""
N = len(dists)
relative_distances = np.zeros(N)
nearest_neiber = np.zeros(N)
# 将密度从大到小排序
index_rho = np.argsort(-rho)
for i, index in enumerate(index_rho):
# 对于密度最大的点
if i == 0:
continue
# 对于其他的点
# 找到密度比其大的点的序号
index_higher_rho = index_rho[:i]
# 获取这些点距离当前点的距离,并找最小值
relative_distances[index] = np.min(dists[index, index_higher_rho])
# 保存最近邻点的编号
index_nn = np.argmin(dists[index, index_higher_rho])
nearest_neiber[index] = index_higher_rho[index_nn].astype(int)
relative_distances[index_rho[0]] = np.max(relative_distances)
return relative_distances, nearest_neiber
def get_micro_clusters(relative_distances, dc, dists, datas, rho):
centers = []
centers_dict = {}
for index, distance in enumerate(relative_distances):
if distance > dc:
centers.append(index)
centers_dict[index] = MicroCluster(datas[index])
# centers_dict = {}
# for index in centers:
# centers_dict[index] = MicroCluster(datas[index])
for index, distance in enumerate(dists):
if index not in centers:
c = np.min(distance[centers])
center_index = np.where(distance == c)
centers_dict[center_index[0][0]].data.append(datas[index])
centers_dict[center_index[0][0]].data_index.append(index)
centers_dict[center_index[0][0]].center_distance += dists[index][center_index[0][0]]
temp = copy(centers)
for index in centers:
if len(centers_dict[index].data) == 0:
temp.remove(index)
centers_dict.pop(index)
c = np.min(dists[index][temp])
center_index = np.where(dists[index] == c)
centers_dict[center_index[0][0]].data.append(datas[index])
centers_dict[center_index[0][0]].data_index.append(index)
centers_dict[center_index[0][0]].center_distance += dists[index][center_index[0][0]]
flag = set()
for v in centers_dict.keys():
for u in centers_dict.keys():
if v != u and str(v) + str(u) not in flag and str(u) + str(v) not in flag:
flag.add(str(v) + str(u))
flag.add(str(u) + str(v))
dis = min(centers_dict[v].center_distance / len(centers_dict[v].data),
centers_dict[u].center_distance / len(centers_dict[u].data))
for i in centers_dict[v].data_index:
for j in centers_dict[u].data_index:
if np.sqrt(np.sum((datas[i] - datas[j]) ** 2)) < dis:
if str(u) + "-" + str(v) not in centers_dict[v].border_dict.keys():
centers_dict[v].border_dict[str(u) + "-" + str(v)] = [[j, i]]
centers_dict[u].border_dict[str(v) + "-" + str(u)] = [[i, j]]
else:
centers_dict[v].border_dict[str(u) + "-" + str(v)].append([j, i])
centers_dict[u].border_dict[str(v) + "-" + str(u)].append([i, j])
for center in centers_dict.values():
max_p = 0
for points_list in center.border_dict.values():
for point in points_list:
temp = (rho[point[0]] + rho[point[1]]) / 2
if temp > max_p:
max_p = temp
center.border_density = max_p
return centers_dict
def connect_micro_clusters(micro_clusters_dict, rho, n):
connect_dict = {}
for key in micro_clusters_dict.keys():
connect_dict[key] = set()
while True:
index = pick_unvisited_center(micro_clusters_dict)
if index == -1:
break
micro_clusters_dict[index].visited = True
for s in micro_clusters_dict[index].border_dict.keys():
temp = s.split("-")
u = int(temp[0])
v = int(temp[1])
for u_v_set in micro_clusters_dict[index].border_dict[s]:
j = u_v_set[0]
i = u_v_set[1]
if rho[j] < micro_clusters_dict[u].border_density and rho[i] < micro_clusters_dict[v].border_density:
connect_dict[v].add(u)
connect_dict[u].add(v)
label = 1
for key in micro_clusters_dict.keys():
if not micro_clusters_dict[key].is_connect:
connect(micro_clusters_dict, connect_dict, key, label)
label += 1
labs = np.zeros(n)
for key in micro_clusters_dict.keys():
labs[key] = micro_clusters_dict[key].label
labs[micro_clusters_dict[key].data_index] = micro_clusters_dict[key].label
return labs.tolist(), np.max(labs)
def connect(micro_clusters_dict, connect_dict, key, label):
micro_clusters_dict[key].is_connect = True
micro_clusters_dict[key].label = label
if len(connect_dict[key]) == 0:
return
else:
for next_center in connect_dict[key]:
if not micro_clusters_dict[next_center].is_connect:
connect(micro_clusters_dict, connect_dict, next_center, label)
def pick_unvisited_center(micro_clusters_dict):
for index in micro_clusters_dict.keys():
if not micro_clusters_dict[index].visited:
micro_clusters_dict[index].visited = True
return index
return -1
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。