2 Star 1 Fork 0

熊崽子./smo-svm

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
SVM_numpy_template.py 16.18 KB
一键复制 编辑 原始数据 按行查看 历史
熊崽子. 提交于 2023-03-14 03:22 . numpy实现 smo算法
import argparse
import random
import warnings
import timeit
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
def get_arguments():
parser = argparse.ArgumentParser(description='SMO')
parser.add_argument('--features', type=list, default=['f1', 'f2'],
help="the features of iris datasets for regression, "
"element of parameter should be 'f0', 'f1', 'f2' or 'f3'")
parser.add_argument('--classes', type=list, default=[1, 2],
help='the classes of iris datasets for classify, element of parameter should be 0, 1, 2')
parser.add_argument('--test_size', type=float, default=0.33, help='the proportion of test data')
parser.add_argument('--normalization', type=int, default=3, choices=(0, 1, 2, 3),
help='select the type of data normalization,'
'0: no normalization,'
'1: rescale the data to [0, 1],'
'2: rescale the data to [-1, 1],'
'3: z-score normalization')
parser.add_argument('--random_seed', type=int, default=42, help='the random seed of dataset split')
parser.add_argument('--C', type=float, default=1.0, help='penalty coefficient, should be positive float')
parser.add_argument('--toler', type=int, default=1e-5, help='termination condition (iteration threshold)')
parser.add_argument('--iter_max', type=int, default=1, help='no update maximum number of iterations')
parser.add_argument('--iteration_max', type=int, default=1000,
help='termination condition (maximum number of iterations)')
parser.add_argument('--kernel', type=int, default=1,
choices=(1, 2, 3, 4),
help='the type of kernel function,'
'1: linear kernel,'
'2: polynomial kernel,'
'3: rbf kernel,'
'4: sigmoid kernel')
parser.add_argument('--gamma', type=float, default=1.0, help='the parameter of polynomial, sigmoid and rbf kernel')
parser.add_argument('--degree', type=int, default=3, help='the parameter of polynomial kernel')
parser.add_argument('--coef0', type=float, default=0.0, help='the parameter of polynomial and sigmoid kernel')
args = parser.parse_args()
return args
def load_dataset():
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
dataset = load_iris()
print("The iris datasets is loaded successfully!")
datas = dataset.data.astype(float)
target = dataset.target.astype(int)
df = pd.DataFrame({'f0': datas[:, 0],
'f1': datas[:, 1],
'f2': datas[:, 2],
'f3': datas[:, 3],
'label': target})
return df
class MyPreprocessing:
def __init__(self, parser):
self.test_size = parser.test_size
self.random_seed = parser.random_seed
self.normalization = parser.normalization
self.classes = parser.classes
self.features = parser.features
def choose_dataset(self, df):
df = df[df.label.isin(self.classes)] # 选取特定标签
datas = df[self.features].values # 选取特定特征
targets = df[['label']].values # 读取标签
return datas, targets
def draw_dataset(self, datas, targets):
# # datas, targets = self.choose_dataset(df)
# # label = np.unique(labels)
# colors = []
# for i in targets:
# if i == 0:
# colors.append('b')
# elif i == 1:
# colors.append('r')
# elif i == 2:
# colors.append('y')
plt.scatter(datas[:, 0], datas[:, 1], c=targets)
plt.xlabel(self.features[0])
plt.ylabel(self.features[1])
plt.title("Dataset scatter plot")
plt.show()
def normalize_dataset(self, X):
if self.normalization == 0:
# 不进行任何操作
X_normalization = X
elif self.normalization == 1:
# 将数值放缩到[0, 1]
X_normalization = self.min_max_scaler(X)
elif self.normalization == 2:
X_normalization = self.max_abs_scaler(X)
elif self.normalization == 3:
X_normalization = self.standard_scaler(X)
else:
raise ValueError('Please choose right normalization type', self.normalization)
return X_normalization
def target_convert(self, targets):
classes = np.unique(targets)
labels = [1 if targets[i] == np.max(classes) else -1 for i in range(len(targets))]
return classes, np.array(labels)
def split_dataset(self, datas, labels):
assert 0 < self.test_size < 1, "Please choose right test size between 0 and 1"
test_num = int(self.test_size * len(labels))
labels.resize((len(labels), 1)) # 将标签升维
data_target = np.concatenate([datas, labels], axis=1) # 拼接
np.random.seed(self.random_seed)
np.random.shuffle(data_target)
X_test = data_target[:test_num, :-1]
y_test = data_target[:test_num, -1]
X_train = data_target[test_num:, :-1]
y_train = data_target[test_num:, -1]
return X_train, y_train, X_test, y_test
def min_max_scaler(self, X):
for i in np.arange(X.shape[1]):
X[:, i] = (X[:, i] - np.min(X[:, i])) / (np.max(X[:, i]) - np.min(X[:, i]))
return X
def max_abs_scaler(self, X):
for i in np.arange(X.shape[1]):
X[:, i] = X[:, i] / (np.max(np.abs(X[:, i])))
return X
def standard_scaler(self, X):
for i in np.arange(X.shape[1]):
X[:, i] = (X[:, i] - np.mean(X[:, i])) / np.std(X[:, i])
return X
def main(self, df):
datas, targets = self.choose_dataset(df)
self.draw_dataset(datas, targets)
datas = self.normalize_dataset(datas)
classes, labels = self.target_convert(targets)
X_train, y_train, X_test, y_test = self.split_dataset(datas, labels)
return X_train, y_train, X_test, y_test, classes
class Supper_Vector_Machine:
def __init__(self, dataset, target, parser):
"""
参数:
dataset : 训练样本数据
target : 训练样本标签
C :惩罚系数
toler : 终止条件(迭代阈值)
Iter_max : 终止条件(最大迭代次数)
属性:
alpha : SVM对偶问题的拉格朗日乘子(乘子个数=样本数据量)
w(coef_) :线性模型(决策面)的系数
b :线性模型(决策面)的截距
"""
self.dataset = dataset
self.target = target
self.C = parser.C
self.toler = parser.toler
self.iter_max = parser.iter_max
self.iteration_max = parser.iteration_max
self.kernel = parser.kernel
self.gamma = parser.gamma
self.degree = parser.degree
self.r = parser.coef0
def Fx(self, i, a, b):
"""
f(x):决策函数
参数:i (一个待优化的拉格朗日乘子a[i]的下标)
"""
# 将第i行样本带入决策函数中计算。
# 返回:计算结果f(xi)
Fx = 0
for k in range(np.shape(self.dataset)[0]):
Fx += a[k] * self.target[k] * self.Kernel(i, k)
return Fx + b
def Kernel(self, i, j):
"""
参数:i,j (两个待优化的拉格朗日乘子a[i]和a[j]的下标)
"""
# Kernel(xi, xj): 核函数计算(线性核就是x_i ^ T * x_j)
# 返回:计算结果
if self.kernel == 1:
Kij = np.dot(self.dataset[i, :], self.dataset[j, :])
elif self.kernel == 2:
Kij = np.power(self.gamma * np.dot(self.dataset[i, :], self.dataset[j, :]) + self.r, self.degree)
elif self.kernel == 3:
Kij = np.exp(-self.gamma * np.power(np.linalg.norm(self.dataset[i, :] - self.dataset[j, :]), 2))
elif self.kernel == 4:
Kij = np.tanh(self.gamma * np.dot(self.dataset[i, :], self.dataset[j, :]) + self.r)
else:
raise ValueError("Please choose right kernel function~", self.kernel)
return Kij
def random_j(self, i):
"""
参数:i(一个待优化的拉格朗日乘子a[i]的下标)
"""
# 随机选择另一个待优化的拉格朗日乘子a[j]的下标j(j与i不相同)
# 返回:j
js = np.arange(0, np.shape(self.dataset)[0]).tolist()
js.remove(i)
j = random.choice(js)
return j
def get_L_H(self, i, j, a):
"""
参数:i,j (两个待优化的拉格朗日乘子a[i]和a[j]的下标)
"""
# 计算上下界
# 返回:上界和下界
if self.target[i] == self.target[j]:
L = np.max([0, a[j] + a[i] - self.C])
H = np.min([self.C, a[j] + a[i]])
else:
L = np.max([0, a[j] - a[i]])
H = np.min([self.C, self.C + a[j] - a[i]])
return L, H
def filter(self, L, H, alpha_j):
"""
参数:
i,j:两个待优化的拉格朗日乘子a[i]和a[j]的下标
L, H:上下界
"""
# 按边界对a[j]值进行修剪,使其在(L, H)范围内。
# 返回:修剪后的a[j]
if alpha_j > H:
alpha_j_clipped = H
elif alpha_j < L:
alpha_j_clipped = L
else:
alpha_j_clipped = alpha_j
return alpha_j_clipped
def SMO(self):
# 外循环:迭代次数
# change_num用于记录拉格朗日乘子更新的次数,iter用于记录遍历拉格朗日乘子的次数
iter = 0
iteration = 0
a = np.zeros(np.shape(self.dataset)[0])
b = 0
while iter < self.iter_max and iteration < self.iteration_max:
change_num = 0
# 内循环:遍历拉格朗日乘子,作为a[i]
self.js = np.arange(0, len(self.target)).tolist()
for i in range(np.shape(self.dataset)[0]):
# 1、计算Fx(i),Ei
Ei = self.Fx(i, a, b) - self.target[i]
# 2、随机选择另一个要优化的拉格朗日乘子a[j]:random_j方法
j = self.random_j(i)
# 3、计算Fx(j),Ej
Ej = self.Fx(j, a, b) - self.target[j]
# 4、计算上下界:get_L_H方法
L, H = self.get_L_H(i, j, a)
# 判断如果L == H,则a[i]和a[j]都在边界,不用再进行优化,寻找下一对
if L == H:
continue
# 5、计算eta:kernel方法
eta = self.Kernel(i, i) + self.Kernel(j, j) - 2 * self.Kernel(i, j)
# 判断如果eta <= 0,则不再进行优化,寻找下一对
if eta <= 0:
continue
# 6、更新a[j]
a_j_new = a[j] + (self.target[j] * (Ei - Ej)) / eta
# 7、修剪a[j]
a_j_old = a[j]
a_j_clipped = self.filter(L, H, a_j_new)
# 判断如果a[j]_new-a[j]_old < toler,则不更新a[j],寻找下一对
if a_j_new - a_j_old < self.toler:
continue
a[j] = a_j_clipped
# 8、更新a[i]
a_i_old = a[i]
a[i] = a[i] + self.target[i] * self.target[j] * (a_j_old - a_j_clipped)
# 9、更新bi和bj
bi = b - Ei - self.target[i] * (a[i] - a_i_old) * self.Kernel(i, i) - self.target[j] * (a[j] - a_j_old) * self.Kernel(j, i)
bj = b - Ej - self.target[i] * (a[i] - a_i_old) * self.Kernel(i, j) - self.target[j] * (a[j] - a_j_old) * self.Kernel(j, j)
# 10、更新b
if 0 < a[i] < self.C:
b = bi
elif 0 < a[j] < self.C:
b = bj
else:
b = (bi + bj) / 2
change_num += 1
# change_num为0,则表示遍历过一遍所有的拉格朗日乘子,都没有进行更新
if change_num == 0:
iter += 1
else:
iter = 0 # 一旦拉格朗日乘子有一次更新,就重新遍历,直到完成iter_max次遍历,都没有进行更新,就认为找到最优系数
iteration += 1
# 11、迭代完成,计算决策面w
if self.kernel == 1:
w = np.zeros(np.shape(self.dataset)[1])
for k in range(len(self.target)):
w = w + a[k] * self.target[k] * self.dataset[k, :]
return a, w, b
else:
return a, None, None
def predict(self, w, b):
"""
参数:
dataset : 样本数据
target : 样本标签
"""
# pre = np.zeros_like(self.target)
dist = np.dot(w, self.dataset.T) + b
pre = [1 if dist[i] > 0 else -1 for i in range(len(self.target))]
return pre
def score(self, w, b):
"""
参数:
y_true : 实际标签
y_pred : 预测标签
"""
# 评价指标任选
y_true = self.target
y_pred = self.predict(w, b)
score = np.sum(y_true == y_pred) / len(y_true)
return score
def draw_result(self, W, b):
w = -1 * W[0] / W[1]
b0 = -1 * b / W[1]
b1 = -1 * (b + 1) / W[1]
b2 = -1 * (b - 1) / W[1]
x = np.arange(np.min(self.dataset[:, 0]), np.max(self.dataset[:, 0]), 0.1)
y0 = w * x + b0 # 决策边界
y1 = w * x + b1 # 间隔边界
y2 = w * x + b2 # 间隔边界
plt.plot(x, y0, label='decision boundary')
plt.plot(x, y1, '--', c='g', label='margin boundary')
plt.plot(x, y2, '--', c='g')
# 原始数据
plt.scatter(self.dataset[:, 0], self.dataset[:, 1], c=self.target)
sv = []
dist = np.multiply(self.target, (np.dot(W, self.dataset.T) + b))
for i in range(len(dist)):
if np.abs(np.min(dist[i])) <= 1.0:
sv.append(self.dataset[i])
sv = np.array(sv)
if np.shape(sv)[0] != 0:
plt.scatter(sv[:, 0], sv[:, 1], c="r", s=300, alpha=0.5, label='support vector')
plt.title("The decision boundary, margin boundary and support vector of SVM")
plt.legend()
plt.show()
if __name__ == '__main__':
parser = get_arguments()
# 1、导入数据,划分数据集
# 注意:选择两个特征进行二分类,二分类标签为1和-1
df = load_dataset()
MyPreprocessing = MyPreprocessing(parser)
X_train, y_train, X_test, y_test, classes = MyPreprocessing.main(df)
# 2、创建SVM对象,训练模型
svm = Supper_Vector_Machine(X_train, y_train, parser)
start_time = timeit.default_timer()
a, w, b = svm.SMO()
end_time = timeit.default_timer()
# 输出决策面系数w和截距b
print("Running time of SVM: {:.24f} seconds".format(end_time - start_time))
print("The coefficient of SWM is {}, the intercept of SVM is {}.".format(w, b))
# 3、画出特征散点图和决策面,标出支持向量(通过可视化效果,判断程序编写是否正确)
if parser.kernel == 1:
svm.draw_result(w, b)
train_score = svm.score(w, b)
print("The precision of train dataset is {}".format(train_score))
# 4、使用测试集带入模型预测
svm = Supper_Vector_Machine(X_test, y_test, parser)
if parser.kernel == 1:
svm.draw_result(w, b)
# 5、模型评价
test_score = svm.score(w, b)
print("The precision of test dataset is {}".format(test_score))
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/ABigWildCat/smo-svm.git
git@gitee.com:ABigWildCat/smo-svm.git
ABigWildCat
smo-svm
smo-svm
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385