代码拉取完成,页面将自动刷新
#coding:utf-8
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.mllib.tree import GradientBoostedTreesModel
import numpy as np
import pandas as pd
import sys
def extract_label(record):
return float(record[1])
#含所有天气加大楼人数
# def extract_features_dt(record):
# return np.array(record[6:15] + record[2:4])
#含所有天气不含大楼人数
# def extract_features_dt(record):
# return np.array(record[6:14] + record[2:4])
#不含天气只含大楼人数
def extract_features_dt(record):
return np.array(record[11:] + record[2:4])
#评估回归模型的性能
#平均绝对误差
def abs_error(actual, pred):
return np.abs(pred-actual)
#MAPE计算
def cal_error(true_vs_predicted):
mape = true_vs_predicted.map(lambda t: abs_error(t[0], t[1])/t[0]).mean()
print ('MAPE: %2.4f' % mape)
#返回真实值与预测值结合成tuple的列表list
def plot_days(dt_model_gbt, records, day1, day2):
data_test_data = records.filter(lambda point: '' not in point).filter(lambda point: pd.to_datetime(point[0]) >= pd.to_datetime(day1))
data_test_data = data_test_data.filter(lambda point: pd.to_datetime(point[0]) < pd.to_datetime(day2))
data_test = data_test_data.map(lambda point: LabeledPoint(extract_label(point), extract_features_dt(point)))
preds_gbt = dt_model_gbt.predict(data_test.map(lambda p: p.features))
actual = data_test.map(lambda p:p.label)
true_vs_predicted_gbt = actual.zip(preds_gbt)
return true_vs_predicted_gbt
#返回只包含预测值的列表list
def plot_load(dt_model_gbt, records, day1, day2):
data_test_data = records.filter(lambda point: '' not in point).filter(lambda point: pd.to_datetime(point[0]) >= pd.to_datetime(day1))
data_test_data = data_test_data.filter(lambda point: pd.to_datetime(point[0]) < pd.to_datetime(day2))
preds_time = data_test_data.map(lambda p: p[0])
preds_gbt = dt_model_gbt.predict(data_test_data.map(lambda p: extract_features_dt(p)))
return preds_gbt,preds_time
time1 = sys.argv[1]
time2 = sys.argv[2]
modelname = sys.argv[3]
file_path = 'hdfs://192.168.1.5:9000/spark/building/' + modelname + '_before13_count.csv'
model_path = 'hdfs://192.168.1.5:9000/spark/building/' + modelname + '_load_model'
sc = SparkContext("spark://192.168.1.5:7077","a predict spark app")
#file_path = 'hdfs://202.114.96.180:9000/user/xzxu/spark/building/huaning_before13_count.csv'
#sc = SparkContext("spark://202.114.96.180:7077","a predict spark app")
raw_data = sc.textFile(file_path)
records = raw_data.map(lambda x: x.split(','))
records.persist()
model = GradientBoostedTreesModel.load(sc, model_path)
#model = GradientBoostedTreesModel.load(sc, 'hdfs://202.114.96.180:9000/user/xzxu/spark/building/huaning_load_model')
#true_vs_predicted = plot_days(model, records, '2017-12-25', '2017-12-29')
true_vs_predicted, preds_time = plot_load(model, records, time1, time2)
true_vs_predicted.persist()
#print (cal_error(true_vs_predicted))
predictions = true_vs_predicted.collect()
preds_time = preds_time.collect()
for i in range(len(predictions)):
predictions[i] = round(predictions[i], 2)
dic = {"predictions":predictions,"times":preds_time}
print (dic)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。