2 Star 4 Fork 0

熊崽子./机器学习-采用线性回归模型对波士顿房价进行预测-numpy实现

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
BGD_4.py 8.88 KB
一键复制 编辑 原始数据 按行查看 历史
import argparse
import warnings
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
def get_arguments():
parser = argparse.ArgumentParser(description='LinearRegression')
parser.add_argument('--test_size', type=float, default=0.5, help='the proportion of test data')
parser.add_argument('--random_seed', type=int, default=100, help='the seed of random')
parser.add_argument('--normalization', type=int, default=3, choices=(0, 1, 2, 3),
help='select the type of data normalization,'
'0: no normalization,'
'1: rescale the data to [0, 1],'
'2: rescale the data to [-1, 1],'
'3: z-score normalization')
parser.add_argument('--iteration', type=int, default=1000, help='the iteration of SGD')
parser.add_argument('--initialization', type=int, default=1, choices=(0, 1),
help='select the type of parameter initialization'
'0: all parameters initialize 0,'
'1: random initialization in [0, 1] uniform distribution')
parser.add_argument('--learning_rate', type=float, default=1e-3,
help='the learning rate of gradient descent,'
'if normalization == 0, suggest learning_rate = 1e-5 to avoid loss booming,'
'else, suggest learning_rate = 1e-2 to avoid loss descending slowly')
parser.add_argument('--mini_batch_size', type=int, default=32, help='the number of data for each update')
parser.add_argument('--gradient_descent_method', type=int, default=1, choices=(1, 2),
help='the type of gradient descent,'
'1: BGD,'
'2: MBGD')
args = parser.parse_args()
return args
def load_dataset():
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
dataset = load_boston()
print("The boston datasets is loaded successfully!")
datas = dataset.data.astype(float)
target = dataset.target.astype(float)
return datas, target
class MyPreprocessing:
def __init__(self, parser):
self.test_size = parser.test_size
self.random_seed = parser.random_seed
self.normalization = parser.normalization
def split_dataset(self, datas, target):
assert 0 < self.test_size < 1, "Please choose right test size between 0 and 1"
test_num = int(self.test_size * len(target))
target.resize((len(target), 1)) # 将标签升维
data_target = np.concatenate([datas, target], axis=1) # 拼接
np.random.seed(self.random_seed)
np.random.shuffle(data_target)
X_test = data_target[:test_num, :-1]
y_test = data_target[:test_num, -1]
X_train = data_target[test_num:, :-1]
y_train = data_target[test_num:, -1]
return X_train, X_test, y_train, y_test
def normalize_dataset(self, X_train, X_test):
if self.normalization == 0:
# 不进行任何操作
X_train_normalization = X_train
X_test_normalization = X_test
elif self.normalization == 1:
# 将数值放缩到[0, 1]
X_train_normalization, X_test_normalization = self.min_max_scaler(X_train, X_test)
elif self.normalization == 2:
X_train_normalization, X_test_normalization = self.max_abs_scaler(X_train, X_test)
elif self.normalization == 3:
X_train_normalization, X_test_normalization = self.standard_scaler(X_train, X_test)
else:
raise ValueError('Please choose right normalization type', self.normalization)
return X_train_normalization, X_test_normalization
def min_max_scaler(self, X_train, X_test):
for i in np.arange(X_train.shape[1]):
X_train[:, i] = (X_train[:, i] - np.min(X_train[:, i])) / (np.max(X_train[:, i]) - np.min(X_train[:, i]))
X_test[:, i] = (X_test[:, i] - np.min(X_test[:, i])) / (np.max(X_test[:, i]) - np.min(X_test[:, i]))
return X_train, X_test
def max_abs_scaler(self, X_train, X_test):
for i in np.arange(X_train.shape[1]):
X_train[:, i] = X_train[:, i] / (np.max(np.abs(X_train[:, i])))
X_test[:, i] = X_test[:, i] / (np.max(np.abs(X_test[:, i])))
return X_train, X_test
def standard_scaler(self, X_train, X_test):
for i in np.arange(X_train.shape[1]):
X_train[:, i] = (X_train[:, i] - np.mean(X_train[:, i])) / np.std(X_train[:, i])
X_test[:, i] = (X_test[:, i] - np.mean(X_test[:, i])) / np.std(X_test[:, i])
return X_train, X_test
class MyLinearRegression:
def __init__(self, parser):
self.initialization = parser.initialization
self.learning_rate = parser.learning_rate
self.random_seed = parser.random_seed
self.mini_batch_size = parser.mini_batch_size
self.gradient_descent_method = parser.gradient_descent_method
def initialize(self, X_train):
if self.initialization == 0:
w = np.zeros(X_train.shape[1] + 1)
elif self.initialization == 1:
w = np.random.rand(X_train.shape[1] + 1)
else:
raise ValueError('Please choose right initialization type', self.initialization)
return w
def loss(self, X, y, w):
m = X.shape[0]
ones = np.ones(X.shape[0], dtype=float).reshape((X.shape[0], 1))
X = np.concatenate([X, ones], axis=1)
loss = np.sum((np.dot(X, w) - y)**2) / (2*m)
return loss
def show(self, X, y, w, type):
ones = np.ones(X.shape[0], dtype=float).reshape((X.shape[0], 1))
X = np.concatenate([X, ones], axis=1)
x = np.arange(np.shape(X)[0])
plt.plot(x, np.dot(X, w), label='predict')
plt.plot(x, y, label='label')
plt.xlabel('samples')
plt.ylabel('price')
plt.title("{} data fitting".format(type))
plt.legend()
plt.show()
def parameter_update(self, X_train, y_train, w):
if self.gradient_descent_method == 1:
w = self.BGD_update(X_train, y_train, w)
elif self.gradient_descent_method == 2:
w = self.MBGD_update(X_train, y_train, w)
else:
raise ValueError('Please choose right gradient descent method', self.gradient_descent_method)
return w
def BGD_update(self, X_train, y_train, w):
ones = np.ones(X_train.shape[0], dtype=float).reshape((X_train.shape[0], 1))
X_train = np.concatenate([X_train, ones], axis=1)
diff = np.dot(X_train, w) - y_train
for j, theta in enumerate(w):
w[j] = theta - self.learning_rate / X_train.shape[0] * np.sum(diff * X_train[:, j])
return w
def MBGD_update(self, X_train, y_train, w):
assert 0 < self.mini_batch_size < X_train.shape[0], "Please input suitable mini batch size"
ones = np.ones(X_train.shape[0], dtype=float).reshape((X_train.shape[0], 1))
X_train = np.concatenate([X_train, ones], axis=1)
# 小批量更新
for i in range(0, X_train.shape[0]-self.mini_batch_size, self.mini_batch_size):
X = X_train[i: i+self.mini_batch_size]
y = y_train[i: i+self.mini_batch_size]
diff = np.dot(X, w) - y
for j, theta in enumerate(w):
w[j] = theta - self.learning_rate / X.shape[0] * np.sum(diff * X[:, j])
return w
def draw_loss(train_loss, test_loss, type):
x = np.arange(len(train_loss))
plt.plot(x, train_loss, label='train loss')
plt.plot(x, test_loss, label='test loss')
plt.legend(loc=0)
plt.ylabel('{} loss'.format(type))
plt.xlabel('iteration')
plt.title('The {} loss with iteration'.format(type))
plt.show()
if __name__ == '__main__':
datas, target = load_dataset()
parser = get_arguments()
MyPreprocessing = MyPreprocessing(parser)
# 划分数据
X_train, X_test, y_train, y_test = MyPreprocessing.split_dataset(datas, target)
X_train, X_test = MyPreprocessing.normalize_dataset(X_train, X_test)
MyLinearRegression = MyLinearRegression(parser)
w = MyLinearRegression.initialize(X_train)
train_loss = []
test_loss = []
for epoch in range(parser.iteration):
loss1 = MyLinearRegression.loss(X_train, y_train, w)
loss2 = MyLinearRegression.loss(X_test, y_test, w)
train_loss.append(loss1)
test_loss.append(loss2)
w = MyLinearRegression.parameter_update(X_train, y_train, w)
print(w)
# 展示拟合效果
MyLinearRegression.show(X_train, y_train, w, "Train")
MyLinearRegression.show(X_test, y_test, w, "Test")
draw_loss(train_loss, test_loss, "MSE")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/ABigWildCat/numpy-linear-regression.git
git@gitee.com:ABigWildCat/numpy-linear-regression.git
ABigWildCat
numpy-linear-regression
机器学习-采用线性回归模型对波士顿房价进行预测-numpy实现
master

搜索帮助