1 Star 0 Fork 0

xiao咲/人工智能课程

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
paddleTest.py 7.20 KB
一键复制 编辑 原始数据 按行查看 历史
import tushare as ts
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import paddle
import paddle.nn as nn
from sklearn.preprocessing import MinMaxScaler
# 获取数据
df_index = ['open', 'high', 'close', 'low', 'vol']
data_range = [1, 2] # 选择特征
batch_size = 16
window_size = 10
fea_num = df_index.__len__()
if False:
# 通过tushare获取股票数据
# 设置token
pro = ts.pro_api('ec9845292a2461f794a76a2d8d33ebdf736d2f78797e949a256ff011')
# 获取中国平安股票数据
df = pro.daily(ts_code='000001.SZ', start_date='20230901')
df.index = pd.to_datetime(df.index)
df = df[df_index]
# 获取到的数据是时间逆序的,这里将其翻转并重置索引
df = df[::-1].reindex()
df.to_csv('./df.csv')
print(df)
# 保存数据
df.to_csv('df.csv', index=False)
# 窗口划分
def split_windows(data, size, data_range = [2]):
X = []
Y = []
# X作为数据,Y作为标签
# 滑动窗口,步长为1,构造窗口化数据,每一个窗口的数据标签是窗口末端的close值(收盘价格)
for i in range(len(data) - size):
X.append(data[i:i+size, :])
Y.append(data[i+size, data_range])
return np.array(X), np.array(Y)
df = pd.read_csv('./df.csv', usecols=df_index)
all_data = df.values
# 数据长度为batch_size的整数倍
data_len = all_data.__len__() - all_data.__len__() % batch_size
# 划分训练集和测试集,都为batch_size的整数倍
train_len = batch_size * int(data_len // batch_size * 3 // 4)
train_data = all_data[:train_len + window_size, :]
test_data = all_data[train_len - window_size : data_len, :]
plt.figure(figsize=(12, 8))
for i in range(len(data_range)):
subplt = plt.subplot(len(data_range), 2, i + 1)
# 数据可视化
plt.plot(np.arange(train_data.shape[0]), train_data[:, i], label=df_index[i] + ' train data')
plt.plot(np.arange(train_data.shape[0], train_data.shape[0] + test_data.shape[0]), test_data[:, i], label=df_index[i] + ' test data')
plt.title(df_index[data_range[i]])
plt.legend()
# plt.show()
# 归一化处理
scaler = MinMaxScaler()
scaled_train_data = scaler.fit_transform(train_data)
# 使用训练集的最值对测试集归一化,保证训练集和测试集的分布一致性
scaled_test_data = scaler.transform(test_data)
# 训练集测试集划分
train_X, train_Y = split_windows(scaled_train_data, size=window_size, data_range = data_range)
test_X, test_Y = split_windows(scaled_test_data, size=window_size, data_range = data_range)
print('train shape', train_X.shape, train_Y.shape)
print('test shape', test_X.shape, test_Y.shape)
# 模型构建
class CNN_LSTM(nn.Layer):
def __init__(self, window_size, fea_num):
super().__init__()
self.window_size = window_size
self.fea_num = fea_num
self.conv1 = nn.Conv2D(in_channels=1, out_channels=64, stride=1, kernel_size=3, padding='same') # 卷积层
self.relu1 = nn.ReLU() # 激活函数
self.pool = nn.MaxPool2D(kernel_size=2, stride=1, padding='same') # 池化层
self.dropout = nn.Dropout2D(0.3) # 丢弃30%的数据
self.lstm1 = nn.LSTM(input_size=64*fea_num, hidden_size=128, num_layers=1, time_major=False) # LSTM层
self.lstm2 = nn.LSTM(input_size=128, hidden_size=64, num_layers=1, time_major=False) # LSTM层
self.fc = nn.Linear(in_features=64, out_features=32) # 全连接层
self.relu2 = nn.ReLU()
self.head = nn.Linear(in_features=32, out_features=2)
def forward(self, x):
x = x.reshape([x.shape[0], 1, self.window_size, self.fea_num]) # 为了适应卷积层的输入格式
x = self.conv1(x)
x = self.relu1(x)
x = self.pool(x)
x = self.dropout(x)
x = x.reshape([x.shape[0], self.window_size, -1])
x, (h, c) = self.lstm1(x)
x, (h,c) = self.lstm2(x)
x = x[:,-1,:] # 最后一个LSTM只要窗口中最后一个特征的输出
x = self.fc(x)
x = self.relu2(x)
x = self.head(x)
return x
model = CNN_LSTM(window_size, fea_num)
paddle.summary(model, (64, 10, 5))
# 定义超参数
base_lr = 0.005
EPOCH = 200
lr_schedual = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=EPOCH, verbose=True)
loss_fn = nn.MSELoss()
metric = paddle.metric.Accuracy()
opt = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=lr_schedual, beta1=0.9, beta2=0.999)
def process(data, bs):
l = len(data)
tmp = []
ed = []
for i in range(0, l, bs):
if i + bs > l:
tmp.append(data[i:].tolist())
else:
tmp.append(data[i:i+bs].tolist())
tmp = np.array(tmp)
return tmp
# 处理数据集
train_X = process(train_X, batch_size)
train_Y = process(train_Y, batch_size)
print(train_X.shape, train_Y.shape)
# test_X = process(test_X, batch_size)
# test_Y = process(test_Y, batch_size)
# print(test_X.shape, test_Y.shape)
# 模型训练
for epoch in range(EPOCH):
loss_train = 0
model.train()
for batch_id, data in enumerate(train_X):
label = train_Y[batch_id]
data = paddle.to_tensor(data, dtype='float32')
label = paddle.to_tensor(label, dtype='float32')
label = label.reshape([label.shape[0],2])
y = model(data)
loss = loss_fn(y, label)
opt.clear_grad()
loss.backward()
opt.step()
loss_train += loss.item()
print("[TRAIN] ========epoch : {}, loss: {:.4f}==========".format(epoch+1, loss_train))
lr_schedual.step() # 学习率调度
## 这部分代码本来是每次的测试,但是模型适配不好做,在此取消了
# loss_eval = 0
# model.eval()
# for batch_id, data in enumerate(test_X):
# label = test_Y[batch_id]
# data = paddle.to_tensor(data, dtype='float32')
# label = paddle.to_tensor(label, dtype='float32')
# label = label.reshape([label.shape[0],1])
# y = model(data)
# loss = loss_fn(y, label)
# loss_eval += loss.item()
# print("[EVAL] ========epoch : {}, loss: {:.4f}==========\n".format(epoch+1, loss_eval))
# 保存模型参数
paddle.save(model.state_dict(), './cnn_lstm_ep200_lr0.005.params')
paddle.save(lr_schedual.state_dict(), './cnn_lstm_ep200_lr0.005.pdopts')
# 模型预测
# 加载模型
model = CNN_LSTM(window_size, fea_num)
model_dict = paddle.load('./cnn_lstm_ep200_lr0.005.params')
model.load_dict(model_dict)
# 测试集预测并输出
test_X = paddle.to_tensor(test_X, dtype='float32')
prediction = model(test_X)
prediction = prediction.cpu().numpy()
prediction = prediction.reshape(prediction.shape[0], -1)
# 反归一化
scaled_prediction = prediction * (scaler.data_max_[2] - scaler.data_min_[2]) + scaler.data_min_[2]
scaled_true = test_Y * (scaler.data_max_[2] - scaler.data_min_[2]) + scaler.data_min_[2]
# 画图
for i in range(len(data_range)):
subplt = plt.subplot(len(data_range), 2, len(data_range) + i + 1)
# 数据可视化
plt.plot(range(scaled_true.shape[0]), scaled_true[:, i], label='true')
plt.plot(range(scaled_prediction.shape[0]), scaled_prediction[:, i], label='prediction', marker='*')
plt.legend()
from sklearn.metrics import mean_squared_error
print('RMSE', np.sqrt(mean_squared_error(scaled_prediction, scaled_true)))
plt.show()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/gaoro-xiao/artificial-intelligence-course.git
git@gitee.com:gaoro-xiao/artificial-intelligence-course.git
gaoro-xiao
artificial-intelligence-course
人工智能课程
main

搜索帮助

0d507c66 1850385 C8b1a773 1850385