代码拉取完成,页面将自动刷新
'''
author : 林泽旭
program: convlstm + lstm 预测流量
2.1 加入小波变换
'''
import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
import numpy as np
import pandas as pd
import xlrd
import math
import sklearn.metrics as metrics
from sklearn.preprocessing import MinMaxScaler
from torch.autograd import Variable
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary
from convlstmpy import ConvLSTM
import pywt
# # 初始化数据
train_spt = 0.8
look_head = 3
look_back = look_head
learing_rate = 0.001
epoch = 1000
batch_size = 64
# ConvLSTM
num_layers = 1
input_dim = look_back
hidden_dim = look_back # [-4, 3]
kernel_size = (3, 3)
bias = 0
# LSTM
input_size = look_back
hidden_size = look_back
# 输出维度
output_size = look_back
preds = []
labels = []
def wavelet_denoise(input):
'''
小波变换及重构实现降噪
:param input: 输入序列
:return: 返回降噪后的序列
'''
# 小波函数取db4
db4 = pywt.Wavelet('db4')
# 分解
coeffs = pywt.wavedec(input, db4)
# 高频系数
coeffs[len(coeffs) - 1] *= 0
coeffs[len(coeffs) - 2] *= 0
# 重构
meta = pywt.waverec(coeffs, db4)
return meta
# 引入数据
def load_data(filename):
data = pd.read_excel(filename)
data = np.array(data)
'''
data = data.ravel()
data = wavelet_denoise(data)
data = data.reshape(-1, 1)
'''
data = np.log1p(data)
series = []
for i in range(len(data)):
series.append(list(map(float, data[i]))) # 将数据转化为int类型
return series
# 训练集和测试集分离
def train_test_split(datasets, train_spt):
train_size = int(len(datasets) * train_spt)
return datasets[:train_size], datasets[train_size:]
# 创建data and label
def create_datasets(datasets, look_back, look_head):
data_x = []
data_y = []
for i in range(len(datasets) - look_back - look_head + 1):
window = datasets[i:(i + look_back)]
data_x.append(window)
data_y.append(datasets[i + look_back:i + look_back + look_head])
return np.array(data_x), np.array(data_y)
# 均方差函数
def mape(y_pred, y_true):
return np.mean(np.abs((y_pred - y_true) / y_true))
# 定义torch网络
# # 继承Module类
class MyNet(nn.Module):
def __init__(self, input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, output_size=output_size,
input_dim=input_dim, hidden_dim=hidden_dim, kernel_size=kernel_size, bias=bias):
super(MyNet, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.padding_size = kernel_size[0] // 2, kernel_size[1] // 2
self.bias = bias
self.convlstm = ConvLSTM(input_dim=input_dim, hidden_dim=hidden_dim, kernel_size=kernel_size, num_layers=num_layers,
batch_first=True, bias=True, return_all_layers=True)
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input):
out_conv = self.convlstm(input)
# out_1 = np.array(out_1, dtype=float)
# out_1 = out_1.detach().numpy()
out_conv_did = out_conv[-1][-1][-1]
out_conv_did = out_conv_did.squeeze().unsqueeze(1)
out_lstm, out_4 = self.lstm(out_conv_did)
out = self.fc(out_lstm.view(-1, look_back))
return out
model = MyNet()
# 选取损失函数
# summary(model, input_size=(1, 3, x_test.shape[1]))
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learing_rate)
if __name__ == "__main__":
# # 数据预处理
example_set = load_data('D:\流量预测-LSTM\data\BrainFlow.xls')
train_set, test_set = train_test_split(example_set, train_spt)
# 对训练集进行打乱
np.random.seed(7)
np.random.shuffle(train_set)
np.random.seed(7)
np.random.shuffle(test_set)
torch.random.seed()
# 归一化
# # 定义归一化:归一化到(0,1)之间
sc = MinMaxScaler(feature_range=(0, 1))
train_set_scaled = sc.fit_transform(train_set)
test_set = sc.transform(test_set)
# x_train -> y_train
# x_test -> y_test
x_train, y_train = create_datasets(train_set_scaled, look_head, look_back)
x_test, y_test = create_datasets(test_set, look_back, look_head)
# # 维度处理部分
shape_0 = x_train.shape[0]
x_train_torch = torch.from_numpy(x_train)
x_train_torch = x_train_torch.unsqueeze(1).unsqueeze(1).reshape(shape_0, 1, look_back, 1, 1)
# 报错:数据类型转换
x_train_torch = x_train_torch.to(torch.float32)
total_loss = 0
# 训练模型
for i in range(epoch):
# 模型训练部分
# print(x_train_torch[0][0][0][0][0])
train_prediction = model(x_train_torch)
# train_prediction = model(x_train_torch).float()
y_train_1 = torch.from_numpy(y_train).squeeze().float()
# print(y_train_1, train_prediction)
loss = criterion(train_prediction, y_train_1)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# total_loss += loss.item()
print("train:{}, loss:{}".format(i, loss))
# 模型预测部分
shape_0 = x_test.shape[0]
x_test_torch = torch.from_numpy(x_test)
x_test_torch = x_test_torch.unsqueeze(1).unsqueeze(1).reshape(shape_0, 1, look_back, 1, 1)
# 报错:数据类型转换
x_test_torch = x_test_torch.to(torch.float32)
pred = model(x_test_torch).float()
preds = pred.detach().numpy().reshape(-1, 1)
labels = y_test.squeeze().reshape(-1, 1)
# 反归一化
pred_set_end = sc.inverse_transform(preds)
labels_set_end = sc.inverse_transform(labels)
# 反平滑
pred_set_end = np.expm1(pred_set_end)
labels_set_end = np.expm1(labels_set_end)
# # 三个标准
mape1 = mape(pred_set_end, labels_set_end)
rmse = math.sqrt(metrics.mean_squared_error(pred_set_end, labels_set_end))
mae = metrics.mean_absolute_error(pred_set_end, labels_set_end)
print('均方根误差: %.6f' % rmse)
print('平均绝对误差: %.6f' % mae)
print('平均百分比误差: %.6f' % mape1)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。