代码拉取完成,页面将自动刷新
import tushare as ts
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import paddle
import paddle.nn as nn
from sklearn.preprocessing import MinMaxScaler
# 获取数据
df_index = ['open', 'high', 'close', 'low', 'vol']
data_range = [1, 2] # 选择特征
batch_size = 16
window_size = 10
fea_num = df_index.__len__()
if False:
# 通过tushare获取股票数据
# 设置token
pro = ts.pro_api('ec9845292a2461f794a76a2d8d33ebdf736d2f78797e949a256ff011')
# 获取中国平安股票数据
df = pro.daily(ts_code='000001.SZ', start_date='20230901')
df.index = pd.to_datetime(df.index)
df = df[df_index]
# 获取到的数据是时间逆序的,这里将其翻转并重置索引
df = df[::-1].reindex()
df.to_csv('./df.csv')
print(df)
# 保存数据
df.to_csv('df.csv', index=False)
# 窗口划分
def split_windows(data, size, data_range = [2]):
X = []
Y = []
# X作为数据,Y作为标签
# 滑动窗口,步长为1,构造窗口化数据,每一个窗口的数据标签是窗口末端的close值(收盘价格)
for i in range(len(data) - size):
X.append(data[i:i+size, :])
Y.append(data[i+size, data_range])
return np.array(X), np.array(Y)
df = pd.read_csv('./df.csv', usecols=df_index)
all_data = df.values
# 数据长度为batch_size的整数倍
data_len = all_data.__len__() - all_data.__len__() % batch_size
# 划分训练集和测试集,都为batch_size的整数倍
train_len = batch_size * int(data_len // batch_size * 3 // 4)
train_data = all_data[:train_len + window_size, :]
test_data = all_data[train_len - window_size : data_len, :]
plt.figure(figsize=(12, 8))
for i in range(len(data_range)):
subplt = plt.subplot(len(data_range), 2, i + 1)
# 数据可视化
plt.plot(np.arange(train_data.shape[0]), train_data[:, i], label=df_index[i] + ' train data')
plt.plot(np.arange(train_data.shape[0], train_data.shape[0] + test_data.shape[0]), test_data[:, i], label=df_index[i] + ' test data')
plt.title(df_index[data_range[i]])
plt.legend()
# plt.show()
# 归一化处理
scaler = MinMaxScaler()
scaled_train_data = scaler.fit_transform(train_data)
# 使用训练集的最值对测试集归一化,保证训练集和测试集的分布一致性
scaled_test_data = scaler.transform(test_data)
# 训练集测试集划分
train_X, train_Y = split_windows(scaled_train_data, size=window_size, data_range = data_range)
test_X, test_Y = split_windows(scaled_test_data, size=window_size, data_range = data_range)
print('train shape', train_X.shape, train_Y.shape)
print('test shape', test_X.shape, test_Y.shape)
# 模型构建
class CNN_LSTM(nn.Layer):
def __init__(self, window_size, fea_num):
super().__init__()
self.window_size = window_size
self.fea_num = fea_num
self.conv1 = nn.Conv2D(in_channels=1, out_channels=64, stride=1, kernel_size=3, padding='same') # 卷积层
self.relu1 = nn.ReLU() # 激活函数
self.pool = nn.MaxPool2D(kernel_size=2, stride=1, padding='same') # 池化层
self.dropout = nn.Dropout2D(0.3) # 丢弃30%的数据
self.lstm1 = nn.LSTM(input_size=64*fea_num, hidden_size=128, num_layers=1, time_major=False) # LSTM层
self.lstm2 = nn.LSTM(input_size=128, hidden_size=64, num_layers=1, time_major=False) # LSTM层
self.fc = nn.Linear(in_features=64, out_features=32) # 全连接层
self.relu2 = nn.ReLU()
self.head = nn.Linear(in_features=32, out_features=2)
def forward(self, x):
x = x.reshape([x.shape[0], 1, self.window_size, self.fea_num]) # 为了适应卷积层的输入格式
x = self.conv1(x)
x = self.relu1(x)
x = self.pool(x)
x = self.dropout(x)
x = x.reshape([x.shape[0], self.window_size, -1])
x, (h, c) = self.lstm1(x)
x, (h,c) = self.lstm2(x)
x = x[:,-1,:] # 最后一个LSTM只要窗口中最后一个特征的输出
x = self.fc(x)
x = self.relu2(x)
x = self.head(x)
return x
model = CNN_LSTM(window_size, fea_num)
paddle.summary(model, (64, 10, 5))
# 定义超参数
base_lr = 0.005
EPOCH = 200
lr_schedual = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=EPOCH, verbose=True)
loss_fn = nn.MSELoss()
metric = paddle.metric.Accuracy()
opt = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=lr_schedual, beta1=0.9, beta2=0.999)
def process(data, bs):
l = len(data)
tmp = []
ed = []
for i in range(0, l, bs):
if i + bs > l:
tmp.append(data[i:].tolist())
else:
tmp.append(data[i:i+bs].tolist())
tmp = np.array(tmp)
return tmp
# 处理数据集
train_X = process(train_X, batch_size)
train_Y = process(train_Y, batch_size)
print(train_X.shape, train_Y.shape)
# test_X = process(test_X, batch_size)
# test_Y = process(test_Y, batch_size)
# print(test_X.shape, test_Y.shape)
# 模型训练
for epoch in range(EPOCH):
loss_train = 0
model.train()
for batch_id, data in enumerate(train_X):
label = train_Y[batch_id]
data = paddle.to_tensor(data, dtype='float32')
label = paddle.to_tensor(label, dtype='float32')
label = label.reshape([label.shape[0],2])
y = model(data)
loss = loss_fn(y, label)
opt.clear_grad()
loss.backward()
opt.step()
loss_train += loss.item()
print("[TRAIN] ========epoch : {}, loss: {:.4f}==========".format(epoch+1, loss_train))
lr_schedual.step() # 学习率调度
## 这部分代码本来是每次的测试,但是模型适配不好做,在此取消了
# loss_eval = 0
# model.eval()
# for batch_id, data in enumerate(test_X):
# label = test_Y[batch_id]
# data = paddle.to_tensor(data, dtype='float32')
# label = paddle.to_tensor(label, dtype='float32')
# label = label.reshape([label.shape[0],1])
# y = model(data)
# loss = loss_fn(y, label)
# loss_eval += loss.item()
# print("[EVAL] ========epoch : {}, loss: {:.4f}==========\n".format(epoch+1, loss_eval))
# 保存模型参数
paddle.save(model.state_dict(), './cnn_lstm_ep200_lr0.005.params')
paddle.save(lr_schedual.state_dict(), './cnn_lstm_ep200_lr0.005.pdopts')
# 模型预测
# 加载模型
model = CNN_LSTM(window_size, fea_num)
model_dict = paddle.load('./cnn_lstm_ep200_lr0.005.params')
model.load_dict(model_dict)
# 测试集预测并输出
test_X = paddle.to_tensor(test_X, dtype='float32')
prediction = model(test_X)
prediction = prediction.cpu().numpy()
prediction = prediction.reshape(prediction.shape[0], -1)
# 反归一化
scaled_prediction = prediction * (scaler.data_max_[2] - scaler.data_min_[2]) + scaler.data_min_[2]
scaled_true = test_Y * (scaler.data_max_[2] - scaler.data_min_[2]) + scaler.data_min_[2]
# 画图
for i in range(len(data_range)):
subplt = plt.subplot(len(data_range), 2, len(data_range) + i + 1)
# 数据可视化
plt.plot(range(scaled_true.shape[0]), scaled_true[:, i], label='true')
plt.plot(range(scaled_prediction.shape[0]), scaled_prediction[:, i], label='prediction', marker='*')
plt.legend()
from sklearn.metrics import mean_squared_error
print('RMSE', np.sqrt(mean_squared_error(scaled_prediction, scaled_true)))
plt.show()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。