代码拉取完成,页面将自动刷新
"""使用方法
1、环境配置
python3.9.13 64-bit
安装python和pip
命令行进入当前文件夹
执行pip install -r requirements.txt
2、使用步骤
一、初始化类
hc = hc_forecast.HcForecast()
二、缓存数据到本地, 只执行一次
hc.init_local_data()
三、初始化模型, 只执行一次
hc.init_local_model()
四、预测
# 对给定时间日前预测
# 例如预测2022年9月29日16:35时刻后一天的功率
# given_time = datetime.datetime(2022, 9, 29, 16, 35)
# hc.forecast_day_ahead_sometime(2, given_time)
# 对给定时间日内预测
# 例如预测2022年9月29日10:35时刻后4个小时的功率
# given_time = datetime.datetime(2022, 9, 29, 10, 35)
hc.forecast_intraday_given_time(3, given_time)
"""
import os
import numpy as np
from numpy import NaN
import datetime
import itertools
import pandas as pd
import numpy.random as rd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.models import load_model
from keras.layers import LSTM, Dense
from sqlalchemy import create_engine
from sqlalchemy import text
from urllib.parse import quote_plus as urlquote
from functions.lssvm import LSSVMRegression
from functions.data_format import WIND_DIRECTION, WEATHER, WIND_SPEED
from functions.similar_day import get_df_similar_day
# 用户可修改常量
# 最长缓存数据时间, 单位天
MAX_CACHED_DAYS = 30
# 初始模型训练时长, 单位天
INIT_TRAIN_DAYS = 14
# 数据库中天气的有效列(天气影响因素)
WEA_COLUMNS = ["wea", "tem", "win", "win_speed"]
# 数据库中天气的无效列
WEA_DELETE_COLUMNS = ["id", "city_id", "city", "update_time", "week", "hours", "remark"]
# 数据库中负荷的有效列
LOAD_COLUMNS = ["predict_type", "item_value", "date"]
# 数据库中负荷的无效列
LOAD_DELETE_COLUMNS = ["id", "equipment_id", "remarks"]
# 负荷影响因素
LOAD_FACTORS = ["wea", "tem"]
# 光伏影响因素
PV_FACTORS = ["wea"]
# 风电影响因素
WIND_FACTORS = ["win_speed"]
# 不可修改常量
LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2, INDEX_288 = 1, 2, 3, 4, 5, 6, 7, 288
# 数据库信息
MYSQL_HOST = "127.0.0.1"
MYSQL_PORT = "3306"
MYSQL_USER = "user"
MYSQL_PASSWORD = urlquote("123456")
MYSQL_DB = "jxhc"
ENGINE = create_engine(
f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}?charset=utf8",
pool_pre_ping=True,
)
class DataError(Exception):
def __init__(self, msg):
self.message = msg
def __str__(self):
return self.message
class ProgramError(Exception):
def __init__(self, msg):
self.message = msg
def __str__(self):
return self.message
class HcForecast:
def __init__(self):
# 缓存数据目录
self.local_data_dir = f"{os.getcwd()}/data_dir/"
# 模型文件目录
self.local_model_dir = f"{os.getcwd()}/model_dir/"
def init_local_data(self):
"""初始化本地数据
- 主要是将程序所需的大量数据从服务器中缓存至本地, 后期动态更新本地数据
- 只需执行一次
- 缓存的一年数据, 包含最后一天的空数据(日前预测待预测日), 倒数第二天的部分空数据(日内预测)
"""
# 清空已有数据
files = os.listdir(self.local_data_dir)
for f in files:
os.remove(self.local_data_dir + f)
time_end = datetime.datetime.now()
# 缓存一年的数据
time_start = time_end - datetime.timedelta(days=MAX_CACHED_DAYS)
time_start = datetime.datetime(
time_start.year, time_start.month, time_start.day
)
df_data, real_time_end = self.get_df_data_from_db(time_start, time_end)
df_wea = self.get_df_wea_from_db(time_start, time_end)
df_data = self.add_df_wea(df_data, df_wea)
df_data = self.data_clean(df_data, real_time_end)
for equip_type in df_data:
filename = (
self.local_data_dir
+ str(equip_type)
+ "@"
+ real_time_end.strftime("%Y-%m-%d-%H-%M-%S")
+ "@.csv"
)
df_data[equip_type].reset_index(drop=True).to_csv(filename, index=False)
return
def init_local_model(self):
"""初始化本地模型
- 用于节约预测时间, 提高效率, 主要针对日内预测的LSTM模型
并将模型保存至local_model_dir目录中
"""
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
# 缓存到本地的最新时间, 即能获取到最新数据的时间
saved_data_time = datetime.datetime.now()
strs = None
files = os.listdir(self.local_data_dir)
for f in files:
temp = f.split("@")
if temp[0] == str(equip_type):
strs = temp
if strs is None:
print(f"设备{equip_type}未找到缓存历史数据, 无法初始化模型!")
continue
else:
saved_data_time = datetime.datetime.strptime(strs[1], "%Y-%m-%d-%H-%M-%S")
df = self.get_data_intraday_latest_days(equip_type, saved_data_time, INIT_TRAIN_DAYS)
if df is None:
print(f"设备{equip_type}无法获取最新的数据!")
continue
else:
x_train, y_train, _, _, _ = self.get_train_data_intraday(df, equip_type)
print(f"开始设备{equip_type}的模型初始化...")
lstm = self.__lstm_fit(x_train, y_train)
lstm.save(self.local_model_dir + str(equip_type) + "@" + "lstm.h5")
return
def check_local_data(self):
"""本地数据检查、纠错
若程序运行错误, 可能是本地缓存数据出错, 可尝试对本地数据进行检查、纠错
程序部署后经过长时间运行, 缓存在本地的数据可能存在错误, 运行该方法进行纠错
- 1、检查本地数据是否存在重复日期的数据
- 2、检查本地数据除待预测日空数据、最后一天的空数据外, 是否还存在空数据
"""
files = os.listdir(self.local_data_dir)
for f in files:
strs = f.split("@")
end_time = datetime.datetime.strptime(strs[1], "%Y-%m-%d-%H-%M-%S")
df = pd.read_csv(self.local_data_dir + f)
year = int(df.loc[0, "year"])
month = int(df.loc[0, "month"])
day = int(df.loc[0, "day"])
hour = int(df.loc[0, "hour"])
minutes = int(df.loc[0, "minutes"])
start_time = datetime.datetime(year, month, day, hour, minutes)
df_new = pd.DataFrame(columns=df.columns)
ni = 0
delta_time = datetime.timedelta(minutes=ni)
for i in range(df.shape[0]):
year = int(df.loc[i, "year"])
month = int(df.loc[i, "month"])
day = int(df.loc[i, "day"])
hour = int(df.loc[i, "hour"])
minutes = int(df.loc[i, "minutes"])
time = datetime.datetime(year, month, day, hour, minutes)
if time == start_time + delta_time:
df_new = df_new.append(df.loc[i, :])
ni += 1
delta_time = datetime.timedelta(minutes=5) * ni
df_new.reset_index(drop=True).to_csv(self.local_data_dir + f, index=False)
print("数据检查完成!")
return
def forecast_day_ahead_sometime(self, equip_type, given_time):
"""
基于给定时间进行日前预测
- equip_type: 预测类型
- sometime: 给定的预测时间, 精度15分钟, 例如2022-01-01 16:00:00
"""
given_time = datetime.datetime(given_time.year, given_time.month, given_time.day,
given_time.hour, given_time.minute // 15 * 15)
day_ahead = datetime.datetime(
given_time.year, given_time.month, given_time.day
) + datetime.timedelta(days=1)
# 获取日前预测给定时间的数据表
df = self.get_given_time_data(equip_type, given_time)
if df is None: return
# 最后时间
minutes = given_time.hour * 60 + given_time.minute
last_time_index = minutes // 5
df = self.check_df(df, last_time_index)
if df is None:
return
elif df.shape[0] < 30 * INDEX_288:
print(f"设备{equip_type}历史数据太少, 无法进行日内预测!")
return
if equip_type in [PV1, PV2, WIND1, WIND2]:
y_pre_inverse = self.get_similar_day(df, equip_type).reshape(-1, 1)
else:
x_train, y_train, x_valid, _, mms = self.get_train_data_day_ahead(
df, equip_type
)
y_pre = self.lssvm_model_fit_prediction(x_train, y_train, x_valid)
y_pre_inverse = mms.inverse_transform(np.array(y_pre).reshape(-1, 1))
y_pre_inverse = self.check_pre_data(y_pre_inverse, equip_type)
self.forecast_result_write2db(day_ahead, equip_type, 1, y_pre_inverse)
return y_pre_inverse
@staticmethod
def lssvm_model_fit_prediction(x_train, y_train, x_valid):
"""LSSVM模型训练预测并返回预测值"""
# 最优参数使用__gwo方法优化
lssvm = LSSVMRegression(sigma=2.8)
lssvm.fit(x_train, y_train)
return lssvm.predict(x_valid)
def forecast_intraday_given_time(self, equip_type, given_time: datetime.datetime):
"""
基于给定时间进行日内预测
- equip_type: 预测类型
- given_time: 给定的预测时间, 精度5分钟, 例如2022-01-01 16:05:00
"""
# 获取日内预测给定时间的数据表
# 仅返回给定时间和本地缓存时间之间的数据表, 作为新训练数据
given_time = datetime.datetime(given_time.year, given_time.month, given_time.day,
given_time.hour, given_time.minute // 5 * 5)
df = self.get_given_time_data(equip_type, given_time)
# 日内预测最后时间
minutes = given_time.hour * 60 + given_time.minute
last_time_index = minutes // 5
# 若给定的时间大于最新数据的时间, 则表明缺少当天数据, 无法进行预测
if df is None:
return
else:
# 新数据用于训练模型
df = df[-INDEX_288 * 8: -INDEX_288 * 2 + last_time_index]
df = self.check_df(df, last_time_index)
if df is None:
return
elif df.shape[0] < 6 * INDEX_288:
print(f"设备{equip_type}缺少历史数据, 无法进行日内预测!")
return
x_train, y_train, x_valid, _, mms = self.get_train_data_intraday(
df, equip_type
)
y_pre = None
model_file = None
files = os.listdir(self.local_model_dir)
for f in files:
temp = f.split("@")
if temp[0] == str(equip_type):
model_file = f
if model_file is not None:
lstm = load_model(self.local_model_dir + model_file)
lstm = self.__lstm_fit(x_train, y_train)
lstm.save(self.local_model_dir + str(equip_type) + "@" + "lstm.h5")
y_pre = lstm.predict(x_valid)
else:
print("不存在模型文件!")
return
# mms = MinMaxScaler(feature_range=(0, 1))
# _ = mms.fit_transform(df[['p']])
y_pre_inverse = mms.inverse_transform(y_pre)
y_pre_inverse = self.check_pre_data(y_pre_inverse, equip_type)
self.forecast_result_write2db(given_time, equip_type, 2, y_pre_inverse)
return y_pre_inverse
@staticmethod
def check_pre_data(y_pre: list, equip_type: int) -> list:
if equip_type == LOAD:
for i in range(len(y_pre)):
y_pre[i] = max(y_pre[i], 0)
return y_pre
@staticmethod
def forecast_result_write2db(time, equip_type, intraday_or_dayahead, pre_result):
"""
预测结果写入数据库中
- time为预测时间
- equip_type为小类
- intraday_or_day_ahead其中1表示日前, 2表示日内
- pre_result为预测结果
"""
with ENGINE.connect() as conn:
for i, p in enumerate(pre_result):
predict_type = 1
if equip_type in [1, 6, 7]:
predict_type = 1
elif equip_type in [2, 3]:
predict_type = 2
else:
predict_type = 3
time_delta = 5
# 日前
if intraday_or_dayahead == 1:
time = datetime.datetime(time.year, time.month, time.day)
time_delta = 15
time_in = time + datetime.timedelta(minutes=i * time_delta)
time_now = datetime.datetime.now()
sql = text(
f"INSERT INTO x_prediction_curve (predict_type, photovoltaic_id, equipment_p, date, predict_curve_type, update_time) VALUES ({predict_type}, {equip_type}, {round(p[0], 2)}, date_format('{time_in}', '%Y-%m-%d %H:%i:%s'), {intraday_or_dayahead}, date_format('{time_now}', '%Y-%m-%d %H:%i:%s'));"
)
conn.execute(sql)
conn.commit()
conn.close()
return
def get_data_intraday_latest_days(self, equip_type, given_time, n):
"""
获取日内预测给定时间的数据表
返回最新的N天数据作为新训练数据
该方法为备用方法
"""
# 获取给定时间的数据表
df = self.get_given_time_data(equip_type, given_time)
if df is None:
# print("缺失历史数据, 无法进行预测!")
return
df = df.reset_index(drop=True)
# 日内预测最后时间
minutes = given_time.hour * 60 + given_time.minute
last_time_index = minutes // 5
if df.shape[0] < INDEX_288 * 14:
# print("缺失历史数据, 无法进行预测!")
return
else:
# 注意最后一天为合并气象表时加入的空数据
df = df[-INDEX_288 * n: -INDEX_288 * 2 + last_time_index]
return self.check_df(df, last_time_index)
@staticmethod
def check_df(df, last_time_index):
"""数据表检查, 若存在空值会导致无法进行训练预测"""
if not df.empty:
df = df.dropna(axis=1, how="all")
if df[: last_time_index - 288 * 2].isnull().sum().sum() > 0:
print("返回的数据表存在空值, 请检查后再执行!")
return
return df
def get_given_time_data(self, equip_type, given_time):
"""从本地缓存及数据库中获取给定时间段的数据"""
local_file = None
strs = None
files = os.listdir(self.local_data_dir)
for f in files:
temp = f.split("@")
if temp[0] == str(equip_type):
local_file = f
strs = temp
if strs is None:
print(f"设备{equip_type}未找到缓存历史数据, 无法进行预测!")
return
df = pd.read_csv(self.local_data_dir + local_file)
saved_data_time = datetime.datetime.strptime(strs[1], "%Y-%m-%d-%H-%M-%S")
year = saved_data_time.year
month = saved_data_time.month
day = saved_data_time.day
saved_data_day = datetime.datetime(year, month, day)
# 数据获取
# 若读取数据的时间早于缓存历史数据
if given_time <= saved_data_time:
year = given_time.year
month = given_time.month
day = given_time.day
given_time_day = datetime.datetime(year, month, day)
delta_days = (saved_data_day - given_time_day).days
if delta_days > 0:
df = df[: -INDEX_288 * delta_days]
else:
# 去除待预测日后一天及当天的数据
df = df[: -INDEX_288 * 2]
df_data, real_time_end = self.get_df_data_from_db(
saved_data_day, given_time
)
# 数据库中未找到对应的设备数据, 则返回空
if df_data.get(equip_type) is None or real_time_end < given_time:
print(f"无法从数据库中更新设备{equip_type}的数据!")
return
# 天气数据需获取预测时间的后一天数据
year = given_time.year
month = given_time.month
day = given_time.day
given_time_day = datetime.datetime(year, month, day)
given_time_ahead = given_time_day + datetime.timedelta(days=1)
# 合并气象数据
df_wea = self.get_df_wea_from_db(saved_data_time, given_time_ahead)
df_data = self.add_df_wea(df_data, df_wea)
# 数据清洗
df_data = self.data_clean(df_data, real_time_end)
df_add = df_data[equip_type]
# 多个dataframe合并, 且重置索引
df = pd.concat([df, df_add], ignore_index=True)
# 获取最后非空数据日期
df_temp = df[~df["p"].isin([None, NaN, ""])]
year = int(df_temp.iloc[-1]["year"])
month = int(df_temp.iloc[-1]["month"])
day = int(df_temp.iloc[-1]["day"])
hour = int(df_temp.iloc[-1]["hour"])
minutes = int(df_temp.iloc[-1]["minutes"])
# 更新本地缓存文件
time_file = datetime.datetime(year, month, day, hour, minutes)
filename = (
self.local_data_dir
+ str(equip_type)
+ "@"
+ time_file.strftime("%Y-%m-%d-%H-%M-%S")
+ "@.csv"
)
os.remove(self.local_data_dir + local_file)
df.reset_index(drop=True).to_csv(filename, index=False)
if df is None: return
df = df.reset_index(drop=True)
strs = None
files = os.listdir(self.local_data_dir)
for f in files:
temp = f.split("@")
if temp[0] == str(equip_type):
strs = temp
if strs is None:
print(f"设备{equip_type}未找到缓存历史数据!")
return
else:
# 缓存到本地的最新时间, 即能获取到最新数据的时间
saved_data_time = datetime.datetime.strptime(strs[1], "%Y-%m-%d-%H-%M-%S")
# 若给定的时间大于最新数据的时间, 则表明缺少当天数据, 无法进行预测
if saved_data_time < given_time:
print("缺少历史数据, 无法进行预测, 请修改预测时间!")
return
return df
def get_df_data_from_db(self, time_start, time_end):
"""从数据库中读取某一时段数据"""
df = None
with ENGINE.connect() as conn:
sql = text(
f"select * from x_power_history where date > str_to_date('{time_start}', '%Y-%m-%d %H:%i:%s') and date < str_to_date('{time_end}', '%Y-%m-%d %H:%i:%s')"
)
df = pd.read_sql(sql, conn)
conn.close()
# 删除列load_columns, 中含有缺失值的行
df = df.dropna(subset=LOAD_COLUMNS)
# 删除不需要的列
df = df.drop(columns=LOAD_DELETE_COLUMNS)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(by="date", ascending=True)
df = df.reset_index(drop=True)
# 设备数据
equip_datas = {equip_type: [] for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]}
day_data = {
equip_type: [
{
"year": 0,
"month": 0,
"day": 0,
"hour": 0,
"minutes": 0,
"p": NaN,
}
for _ in range(INDEX_288)
]
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]
}
# 实际获取到数据的最后时刻
real_time_end = None
for i, row in df.iterrows():
p = float(row["item_value"])
time = pd.to_datetime(row["date"])
year = time.year
month = time.month
day = time.day
minutes = time.hour * 60 + time.minute
i_288 = minutes // 5
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
for n in range(INDEX_288):
day_data[equip_type][n]["year"] = year
day_data[equip_type][n]["month"] = month
day_data[equip_type][n]["day"] = day
day_data[equip_type][n]["hour"] = n // 12
day_data[equip_type][n]["minutes"] = (n % 12) * 5
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
if row["predict_type"] == str(equip_type) and np.isnan(
day_data[equip_type][i_288]["p"]
):
day_data[equip_type][i_288]["p"] = p
# 判断下一行是否为下一天
if i < df.shape[0] - 1:
time_next_row = pd.to_datetime(df["date"][i + 1])
time_next_day = datetime.datetime(
year, month, day
) + datetime.timedelta(days=1)
if time_next_row > time_next_day:
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
equip_datas[equip_type].append(day_data[equip_type])
day_data[equip_type] = [
{
"year": 0,
"month": 0,
"day": 0,
"hour": 0,
"minutes": 0,
"p": NaN,
}
for _ in range(INDEX_288)
]
else:
real_time_end = datetime.datetime(
year, month, day, i_288 // 12, (i_288 % 12) * 5
)
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
equip_datas[equip_type].append(day_data[equip_type])
# 删除equip_datas中无效的equip_type
# 如何判断list所有元素是否为空
for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]:
if equip_datas[equip_type] == [] or len(equip_datas[equip_type]) == 0:
equip_datas.pop(equip_type)
# 统计数据缺失情况
last_i288 = (real_time_end.hour * 60 + real_time_end.minute) // 5
for equip_type, value in equip_datas.items():
nan_num = 0
for i in range(len(value)):
if i < len(equip_datas[equip_type]) - 1:
for k in range(INDEX_288):
if np.isnan(equip_datas[equip_type][i][k]["p"]):
nan_num += 1
else:
for k in range(last_i288):
if np.isnan(equip_datas[equip_type][i][k]["p"]):
nan_num += 1
# print(f"设备{equip_type}数据缺失数目: {nan_num}!")
# print(f"设备{equip_type}最后一天i_288的时刻: {last_i288}!")
args_num = len(["year", "month", "day", "hour", "minutes", "p"])
equip_data = {equip_type: [[] for _ in range(args_num)] for equip_type in equip_datas}
for equip_type, value__ in equip_datas.items():
# 天数
for day_data in value__:
# i_288
for value in day_data:
equip_data[equip_type][0].append(value["year"])
equip_data[equip_type][1].append(value["month"])
equip_data[equip_type][2].append(value["day"])
equip_data[equip_type][3].append(value["hour"])
equip_data[equip_type][4].append(value["minutes"])
equip_data[equip_type][5].append(value["p"])
df_data = {equip_type: None for equip_type in [LOAD, PV1, PV2, WIND1, WIND2, LOAD1, LOAD2]}
for equip_type in equip_data:
df_data[equip_type] = pd.DataFrame(
{
"year": equip_data[equip_type][0],
"month": equip_data[equip_type][1],
"day": equip_data[equip_type][2],
"hour": equip_data[equip_type][3],
"minutes": equip_data[equip_type][4],
"p": equip_data[equip_type][5],
}
)
df_data = self.del_df_data_empty_day_and_df(df_data)
return df_data, real_time_end
@staticmethod
def del_df_data_empty_day_and_df(df_data):
"""
- 删除空数据日
- 删除df_data中的空表
"""
for equip_type in list(df_data.keys()):
# 生成日期列
df_data[equip_type]["time"] = (
df_data[equip_type]["year"].astype(str)
+ df_data[equip_type]["month"].astype(str)
+ df_data[equip_type]["day"].astype(str)
)
times = df_data[equip_type]["time"].unique()
for t in times:
# 当某日功率之和为0则删除该日数据
if df_data[equip_type][df_data[equip_type]["time"] == t]["p"].sum() == 0:
df_data[equip_type] = df_data[equip_type][df_data[equip_type]["time"] != t]
# 删除日期列
df_data[equip_type] = df_data[equip_type].drop("time", axis=1)
# 删除空表
if df_data[equip_type].empty:
del df_data[equip_type]
return df_data
@staticmethod
def add_df_wea(df_data, df_wea):
"""将待预测日(预测时间后一天)空数据添加至表中"""
year = df_data[1]["year"].iloc[-1]
month = df_data[1]["month"].iloc[-1]
day = df_data[1]["day"].iloc[-1]
last_day = datetime.datetime(year, month, day)
next_day = last_day + datetime.timedelta(days=1)
for i in range(INDEX_288):
for equip_type in df_data:
df_data[equip_type].loc[len(df_data[equip_type].index)] = [
next_day.year,
next_day.month,
next_day.day,
i // 12,
(i % 12) * 5,
NaN,
]
# 添加气象列
for equip_type in df_data:
for col in df_wea.columns:
if col != "date":
df_data[equip_type][col] = None
for i, row in df_data[equip_type].iterrows():
year = int(row["year"])
month = int(row["month"])
day = int(row["day"])
hour = int(row["hour"])
time = datetime.datetime(year, month, day, hour)
# 在气象表中找到某列的时间值与之最近的行
time_col = df_wea["date"]
closest_row = df_wea.iloc[(time_col - time).abs().argsort()[:1]]
for col in df_wea.columns:
if col != "date":
df_data[equip_type].loc[i, col] = float(closest_row[col])
return df_data
@staticmethod
def data_clean(df_data, real_time_end):
"""数据清洗
删除数据空列, 清洗最后一行非空数据之前的数据
"""
last_i288 = (real_time_end.hour * 60 + real_time_end.minute) // 5
# 删除数据空列, 例如缺失气象数据
for equip_type in df_data:
# df_data[equip_type] = df_clean(df_data[equip_type])
s1 = set(df_data[equip_type].columns)
df_data[equip_type] = df_data[equip_type].dropna(axis=1, how="all")
s2 = set(df_data[equip_type].columns)
s = s1 - (s1 & s2)
if len(s) > 0:
print(f"设备{equip_type}缺失数据列{s}")
# 数据清洗
for equip_type in df_data:
# 重置index
df_data[equip_type] = df_data[equip_type].reset_index()
nrows = len(df_data[equip_type])
# 忽略最后一天存在的空数据及插入的待预测日空数据
# 向后填充空值, 最大数量为8, 若仍存在空值则置为0
rows = nrows - INDEX_288 * 2 + last_i288
df_data[equip_type].loc[:rows, "p"] = df_data[equip_type].loc[:rows, "p"].fillna(method="ffill", limit=8)
df_data[equip_type].loc[:rows, "p"] = df_data[equip_type].loc[:rows, "p"].fillna(value=0)
print("数据清洗已完成, 缺失数据已补齐!")
# df中所有为空数据用0代替, 并输出值, 作为数据清洗算法调整依据
for equip_type in df_data:
error_num = 0
for i, row in df_data[equip_type][: -INDEX_288 * 2 + last_i288].iterrows():
for col in df_data[equip_type].columns:
if np.isnan(row[col]):
df_data[equip_type].loc[i, col] = 0
error_num += 1
if error_num > 0:
print(f"设备{equip_type}返回数据表存在空数据{error_num}!")
return df_data
@staticmethod
def get_df_wea_from_db(time_start, time_end):
"""从数据库中读取某一段时间的气象数据"""
# 小时天气数据
# dataframe初始化多个空列
df_wea = pd.DataFrame()
with ENGINE.connect() as conn:
sql = text(
f"select * from zchc_weather_forecast_hours where date > str_to_date('{time_start}', '%Y-%m-%d %H:%i:%s') and date < str_to_date('{time_end}', '%Y-%m-%d %H:%i:%s')"
)
df_wea = pd.read_sql(sql, conn)
conn.close()
# 删除列load_columns, 中含有缺失值的行
# df_wea = df_wea.dropna(subset=WEA_COLUMNS)
times, weas, wins, win_speeds, tems = [], [], [], [], []
for i, row in df_wea.iterrows():
wins.append(NaN)
weas.append(NaN)
win_speeds.append(NaN)
tems.append(float(row["tem"]))
time = str(row["date"]) + " " + str(row["hours"]).strip("时") + ":00:00"
times.append(datetime.datetime.strptime(time, "%Y-%m-%d %H:00:00"))
for key, value in WIND_DIRECTION.items():
if row["win"] == key:
wins[i] = value
for key, value in WEATHER.items():
if row["wea"] == key:
weas[i] = value
if row["win_speed"].isdigit():
win_speeds[i] = float(row["win_speed"])
for key, value in WIND_SPEED.items():
if row["win_speed"] == key:
win_speeds[i] = value
df_wea["date"] = times
df_wea["wea"] = weas
df_wea["win"] = wins
df_wea["win_speed"] = win_speeds
df_wea["tem"] = tems
# 删除不需要的列
df_wea = df_wea.drop(columns=WEA_DELETE_COLUMNS)
return df_wea
@staticmethod
def get_similar_day(df, equip_type):
"""获取相似日数据"""
df = df[df["minutes"].isin([0, 15, 30, 45])].copy()
# 特征因素选择
x_columns = []
if equip_type in [LOAD, LOAD1, LOAD2]:
# 数据表中是否存在该因素
x_columns.extend(lf for lf in LOAD_FACTORS if lf in df.columns)
# 负荷特征因素主要考虑时序相关性, 1个小时以内的负荷作为特征因素
for i, j in itertools.product(range(2, 5), range(4)):
x_columns.append(f"D-{i}T-{j}")
df.insert(0, f"D-{i}T-{j}", df["p"].shift(96 * i - j))
df = df[96 * 5:]
elif equip_type in [PV1, PV2]:
x_columns = ["hour", "minutes"]
# 数据表中是否存在该因素
x_columns.extend(lf for lf in PV_FACTORS if lf in df.columns)
else:
x_columns = ["hour", "minutes"]
# 数据表中是否存在该因素
x_columns.extend(lf for lf in WIND_FACTORS if lf in df.columns)
if set(x_columns) >= set(df.columns):
print("缺少气象数据!")
# 当前日不完整需删除
columns = x_columns[:]
columns.append("p")
df_history = df[columns][: -2 * 96]
# 待预测日即为data中最后一天
df_predict = df[columns][-96:]
df_similar_day = get_df_similar_day(df_history, df_predict, 96, 1)
return df_similar_day[-1]["p"].to_numpy()
@staticmethod
def get_train_data_day_ahead(df, equip_type):
"""
df: 输入Dataframe
"""
mms = MinMaxScaler(feature_range=(0, 1))
# if equip_type != LOAD:
# mms = MinMaxScaler(feature_range=(-1, 0))
# 选取15分钟一个点
df = df[df["minutes"].isin([0, 15, 30, 45])].copy()
# 特征因素选择
x_columns = []
if equip_type in [LOAD, LOAD1, LOAD2]:
# 数据表中是否存在该因素
x_columns.extend(lf for lf in LOAD_FACTORS if lf in df.columns)
# 负荷特征因素主要考虑时序相关性, 1个小时以内的负荷作为特征因素
for i, j in itertools.product(range(2, 5), range(4)):
x_columns.append(f"D-{i}T-{j}")
df.insert(0, f"D-{i}T-{j}", df["p"].shift(96 * i - j))
df = df[96 * 5:]
df[x_columns] = mms.fit_transform(df[x_columns])
elif equip_type in [PV1, PV2]:
x_columns = ["hour", "minutes"]
# 数据表中是否存在该因素
x_columns.extend(lf for lf in PV_FACTORS if lf in df.columns)
else:
x_columns = ["hour", "minutes"]
# 数据表中是否存在该因素
x_columns.extend(lf for lf in WIND_FACTORS if lf in df.columns)
if set(x_columns) >= set(df.columns):
print("缺少气象数据!")
df.loc[:, "p"] = mms.fit_transform(df[["p"]])
# 有效的数据列
# 赋值原数组, 防止对原数组的修改
columns = x_columns[:]
columns.append("p")
df = df.loc[:, columns]
# 待预测日即为data中最后一天
x_valid = np.array(df[x_columns][-96:]).flatten()
y_valid = np.array(df[["p"]][-96:]).flatten()
day_size = 96
similar_num = 5
df_similar_day = get_df_similar_day(
df[: -2 * day_size], df[-day_size:], day_size, similar_num
)
df_train = pd.DataFrame()
for i in range(similar_num):
df_train = pd.concat([df_train, df_similar_day[i]], ignore_index=True)
x_train = np.array(df_train.loc[:, df_train.columns[:-1]]).flatten()
y_train = np.array(df_train.loc[:, df_train.columns[-1]]).flatten()
args_num = len(x_columns)
x_train = x_train.reshape(len(x_train) // args_num, args_num)
x_valid = x_valid.reshape(len(x_valid) // args_num, args_num)
# print(f'x_train shape{np.shape(x_train)}')
# print(f'x_valid shape{np.shape(x_valid)}')
return x_train, y_train, x_valid, y_valid, mms
@staticmethod
def get_train_data_intraday(df, equip_type):
"""
df: 输入Dataframe, 数据不少于7*24*12
"""
mms = MinMaxScaler(feature_range=(0, 1))
if equip_type in [PV1, PV2, WIND1, WIND2]:
mms = MinMaxScaler(feature_range=(-1, 0))
x_columns = ["hour", "minutes"]
if equip_type in [LOAD, LOAD1, LOAD2]:
# 数据表中是否存在该因素
x_columns.extend(lf for lf in LOAD_FACTORS if lf in df.columns)
elif equip_type in [PV1, PV2]:
# 数据表中是否存在该因素
x_columns.extend(lf for lf in PV_FACTORS if lf in df.columns)
else:
# 数据表中是否存在该因素
x_columns.extend(lf for lf in WIND_FACTORS if lf in df.columns)
df.loc[:, "p"] = mms.fit_transform(df[["p"]])
# mms = MinMaxScaler(feature_range=(0, 1))
# columns = df.columns
# df = mms.fit_transform(df)
# df = pd.DataFrame(df, columns=columns)
# 当前时刻未来4小时48点为待预测时刻
# 时间点长度
time_size = 5 * INDEX_288
x_train, y_train, x_valid, y_valid = [], [], [], []
# 训练集
# 7*24*12 = 2016
train_data = df
scaled_train_data_x = np.array(train_data[x_columns])
scaled_train_data_y = np.array(train_data["p"])
for i in range(time_size, len(train_data)):
x_train.append(scaled_train_data_x[(i - time_size): i])
y_train.append(scaled_train_data_y[i])
x_train, y_train = np.array(x_train[:-48]), np.array(y_train[:-48])
x_valid, y_valid = np.array(x_train[-48:]), np.array(y_train[-48:])
# print('x_train shape' + str(np.shape(x_train)))
# print('x_valid shape' + str(np.shape(x_valid)))
return x_train, y_train, x_valid, y_valid, mms
@staticmethod
def gra_one(df, m=0):
# 读取为df格式
df = (df - df.min()) / (df.max() - df.min())
# 标准化
std = df.iloc[:, m] # 为标准要素
ce = df.iloc[:, 0:] # 为比较要素
n = ce.shape[0]
m = ce.shape[1] # 计算行列
# print(std,ce,n,m)
# 与标准要素比较, 相减
a = np.zeros([m, n])
for i in range(m):
for j in range(n):
a[i, j] = abs(ce.iloc[j, i] - std[j])
# 取出矩阵中最大值与最小值
c = np.amax(a)
d = np.amin(a)
# 计算值
result = np.zeros([m, n])
for i in range(m):
for j in range(n):
if a[i, j] + 0.5 * c == 0:
result[i, j] = NaN
else:
result[i, j] = (d + 0.5 * c) / (a[i, j] + 0.5 * c)
# 求均值, 得到灰色关联值
result2 = np.zeros(m)
for i in range(m):
result2[i] = np.mean(result[i, :])
return pd.DataFrame(result2)
def gra(self, df):
list_columns = [df.columns[i] for i in range(len(df.columns))]
df_local = pd.DataFrame(columns=list_columns)
for i in range(len(df.columns)):
df_local[df_local.columns[i]] = self.gra_one(df, m=i)[0]
return df_local
@staticmethod
def gwo(x_train, y_train, agents_num, max_iter, dim, lb, ub):
"""
agents_num: 狼群数量
max_iter: 最大迭代次数
dim: 需要优化参数个数
lb: 参数取值下界
ub: 参数取值上界
"""
# 初始化Alpha狼的位置
alpha_pos = [0, 0]
beta_pos = [0, 0]
delta_pos = [0, 0]
# 初始化Alpha狼的目标函数值
alpha_score = float("inf")
beta_score = float("inf")
delta_score = float("inf")
# 初始化首次搜索位置
pos = np.dot(rd.rand(agents_num, dim), (ub - lb)) + lb
# 迭代次数
iterations = []
# 精度
accuracy = []
# 循环迭代优化狼群位置, iter循环次数
iter = 0
while iter < max_iter:
# t0 = time()
# 遍历每个狼
for i in range(pos.shape[0]):
# 若搜索位置超过了搜索空间, 需要重新回到搜索空间
for j in range(pos.shape[1]):
Flag4ub = pos[i, j] > ub
Flag4lb = pos[i, j] < lb
# 若狼的位置在最大值和最小值之间, 则位置不需要调整, 若超出最大值, 最回到最大值边界
if Flag4ub:
pos[i, j] = ub
if Flag4lb:
pos[i, j] = lb
# LSSVM
lssvm = LSSVMRegression(c=pos[i][0], sigma=pos[i][1])
lssvm.fit(x_train, y_train)
fitness = lssvm.score(x_train, y_train) * 100
# fitness = (1 - r2_score(y_train, y_pre)) * 100
# 如果目标函数值小于Alpha狼的目标函数值
if fitness < alpha_score:
alpha_score = fitness # 则将Alpha狼的目标函数值更新为最优目标函数值
alpha_pos = pos[i] # 同时将Alpha狼的位置更新为最优位置
# 如果目标函数值介于于Alpha狼和Beta狼的目标函数值之间
if alpha_score < fitness < beta_score:
beta_score = fitness # 则将Beta狼的目标函数值更新为最优目标函数值
beta_pos = pos[i]
# 如果目标函数值介于于Beta狼和Delta狼的目标函数值之间
if (
alpha_score < fitness < delta_score
and fitness > beta_score
):
delta_score = fitness # 则将Delta狼的目标函数值更新为最优目标函数值
delta_pos = pos[i]
a = 2 - iter * (2 / max_iter)
# 遍历每个狼
for i in range(pos.shape[0]):
# 遍历每个维度
for j in range(pos.shape[1]):
# 包围猎物, 位置更新
r1 = rd.random(1) # 生成0~1之间的随机数
r2 = rd.random(1)
A1 = 2 * a * r1 - a # 计算系数A
C1 = 2 * r2 # 计算系数C
# Alpha狼位置更新
d_alpha = abs(C1 * alpha_pos[j] - pos[i, j])
X1 = alpha_pos[j] - A1 * d_alpha
r1 = rd.random(1)
r2 = rd.random(1)
A2 = 2 * a * r1 - a
C2 = 2 * r2
# Beta狼位置更新
D_beta = abs(C2 * beta_pos[j] - pos[i, j])
X2 = beta_pos[j] - A2 * D_beta
r1 = rd.random(1)
r2 = rd.random(1)
A3 = 2 * a * r1 - a
C3 = 2 * r2
# Delta狼位置更新
D_delta = abs(C3 * delta_pos[j] - pos[i, j])
X3 = delta_pos[j] - A3 * D_delta
# 位置更新
pos[i, j] = (X1 + X2 + X3) / 3
iter += 1
iterations.append(iter)
accuracy.append((100 - alpha_score) / 100)
# print(f'迭代次数: {iter}, {(time() - t0):.0f}s')
best_c = alpha_pos[0]
best_sigma = alpha_pos[1]
return best_c, best_sigma
@staticmethod
def __lstm_fit(x_train, y_train):
# return_sequences, True: 输出为一个序列, 默认为False, 输出一个值。
# 输入单个样本特征值的维度
dim = x_train.shape[-1]
# 输入的时间点长度
length = x_train.shape[1]
model = Sequential()
model.add(
LSTM(units=384, return_sequences=True, input_dim=dim, input_length=length)
)
model.add(LSTM(units=31))
model.add(Dense(1))
model.compile(loss="mean_squared_error", optimizer="adam")
model.fit(x_train, y_train, epochs=2, batch_size=96, verbose=1)
return model
if __name__ == "__main__":
hc = HcForecast()
re = hc.forecast_day_ahead(LOAD)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。