1 Star 0 Fork 0

whitebear-coder/Flow forecasting

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
LSTM-pytorch-2.1.py 6.34 KB
一键复制 编辑 原始数据 按行查看 历史
whitebear-coder 提交于 2021-10-16 16:50 . 默认更改列表
'''
author : 林泽旭
program: convlstm + lstm 预测流量
2.1 加入小波变换
'''
import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
import numpy as np
import pandas as pd
import xlrd
import math
import sklearn.metrics as metrics
from sklearn.preprocessing import MinMaxScaler
from torch.autograd import Variable
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary
from convlstmpy import ConvLSTM
import pywt
# # 初始化数据
train_spt = 0.8
look_head = 3
look_back = look_head
learing_rate = 0.001
epoch = 1000
batch_size = 64
# ConvLSTM
num_layers = 1
input_dim = look_back
hidden_dim = look_back # [-4, 3]
kernel_size = (3, 3)
bias = 0
# LSTM
input_size = look_back
hidden_size = look_back
# 输出维度
output_size = look_back
preds = []
labels = []
def wavelet_denoise(input):
'''
小波变换及重构实现降噪
:param input: 输入序列
:return: 返回降噪后的序列
'''
# 小波函数取db4
db4 = pywt.Wavelet('db4')
# 分解
coeffs = pywt.wavedec(input, db4)
# 高频系数
coeffs[len(coeffs) - 1] *= 0
coeffs[len(coeffs) - 2] *= 0
# 重构
meta = pywt.waverec(coeffs, db4)
return meta
# 引入数据
def load_data(filename):
data = pd.read_excel(filename)
data = np.array(data)
'''
data = data.ravel()
data = wavelet_denoise(data)
data = data.reshape(-1, 1)
'''
data = np.log1p(data)
series = []
for i in range(len(data)):
series.append(list(map(float, data[i]))) # 将数据转化为int类型
return series
# 训练集和测试集分离
def train_test_split(datasets, train_spt):
train_size = int(len(datasets) * train_spt)
return datasets[:train_size], datasets[train_size:]
# 创建data and label
def create_datasets(datasets, look_back, look_head):
data_x = []
data_y = []
for i in range(len(datasets) - look_back - look_head + 1):
window = datasets[i:(i + look_back)]
data_x.append(window)
data_y.append(datasets[i + look_back:i + look_back + look_head])
return np.array(data_x), np.array(data_y)
# 均方差函数
def mape(y_pred, y_true):
return np.mean(np.abs((y_pred - y_true) / y_true))
# 定义torch网络
# # 继承Module类
class MyNet(nn.Module):
def __init__(self, input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, output_size=output_size,
input_dim=input_dim, hidden_dim=hidden_dim, kernel_size=kernel_size, bias=bias):
super(MyNet, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.padding_size = kernel_size[0] // 2, kernel_size[1] // 2
self.bias = bias
self.convlstm = ConvLSTM(input_dim=input_dim, hidden_dim=hidden_dim, kernel_size=kernel_size, num_layers=num_layers,
batch_first=True, bias=True, return_all_layers=True)
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input):
out_conv = self.convlstm(input)
# out_1 = np.array(out_1, dtype=float)
# out_1 = out_1.detach().numpy()
out_conv_did = out_conv[-1][-1][-1]
out_conv_did = out_conv_did.squeeze().unsqueeze(1)
out_lstm, out_4 = self.lstm(out_conv_did)
out = self.fc(out_lstm.view(-1, look_back))
return out
model = MyNet()
# 选取损失函数
# summary(model, input_size=(1, 3, x_test.shape[1]))
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learing_rate)
if __name__ == "__main__":
# # 数据预处理
example_set = load_data('D:\流量预测-LSTM\data\BrainFlow.xls')
train_set, test_set = train_test_split(example_set, train_spt)
# 对训练集进行打乱
np.random.seed(7)
np.random.shuffle(train_set)
np.random.seed(7)
np.random.shuffle(test_set)
torch.random.seed()
# 归一化
# # 定义归一化:归一化到(0,1)之间
sc = MinMaxScaler(feature_range=(0, 1))
train_set_scaled = sc.fit_transform(train_set)
test_set = sc.transform(test_set)
# x_train -> y_train
# x_test -> y_test
x_train, y_train = create_datasets(train_set_scaled, look_head, look_back)
x_test, y_test = create_datasets(test_set, look_back, look_head)
# # 维度处理部分
shape_0 = x_train.shape[0]
x_train_torch = torch.from_numpy(x_train)
x_train_torch = x_train_torch.unsqueeze(1).unsqueeze(1).reshape(shape_0, 1, look_back, 1, 1)
# 报错:数据类型转换
x_train_torch = x_train_torch.to(torch.float32)
total_loss = 0
# 训练模型
for i in range(epoch):
# 模型训练部分
# print(x_train_torch[0][0][0][0][0])
train_prediction = model(x_train_torch)
# train_prediction = model(x_train_torch).float()
y_train_1 = torch.from_numpy(y_train).squeeze().float()
# print(y_train_1, train_prediction)
loss = criterion(train_prediction, y_train_1)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# total_loss += loss.item()
print("train:{}, loss:{}".format(i, loss))
# 模型预测部分
shape_0 = x_test.shape[0]
x_test_torch = torch.from_numpy(x_test)
x_test_torch = x_test_torch.unsqueeze(1).unsqueeze(1).reshape(shape_0, 1, look_back, 1, 1)
# 报错:数据类型转换
x_test_torch = x_test_torch.to(torch.float32)
pred = model(x_test_torch).float()
preds = pred.detach().numpy().reshape(-1, 1)
labels = y_test.squeeze().reshape(-1, 1)
# 反归一化
pred_set_end = sc.inverse_transform(preds)
labels_set_end = sc.inverse_transform(labels)
# 反平滑
pred_set_end = np.expm1(pred_set_end)
labels_set_end = np.expm1(labels_set_end)
# # 三个标准
mape1 = mape(pred_set_end, labels_set_end)
rmse = math.sqrt(metrics.mean_squared_error(pred_set_end, labels_set_end))
mae = metrics.mean_absolute_error(pred_set_end, labels_set_end)
print('均方根误差: %.6f' % rmse)
print('平均绝对误差: %.6f' % mae)
print('平均百分比误差: %.6f' % mape1)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/white_mai/flow-forecasting.git
git@gitee.com:white_mai/flow-forecasting.git
white_mai
flow-forecasting
Flow forecasting
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385