代码拉取完成,页面将自动刷新
from flask import Flask, render_template, request, redirect, session, flash, jsonify
from flask import send_file, send_from_directory
from functools import wraps
from cachelib import SimpleCache
import service.download_fault_info
from dao.fault_dao import *
from dao.user_dao import *
from entity.user import User
from flask_paginate import Pagination
from utils.send_mail import send_mail
from utils.dbpools_utils import db_pool
from utils.import_utils import *
from utils.common_utils import *
from service.import_users import *
import threading
import time
import os
calc_mutex = threading.Lock()
detect_mutex = threading.Lock()
app = Flask(__name__)
app.config['SECRET_KEY'] = 'jk12la1s2df21h'
cache = SimpleCache()
cache.set("detected", None)
cache.set("rates", None)
cache.set("lastFault", -1)
# 判断是否登录
def is_login(func):
@wraps(func)
def wrapper(*args, **kwargs):
if session.get('username') is None:
flash("请先登录!")
return redirect("/")
return func(*args, **kwargs)
return wrapper
# 判断是否是管理员登录
def is_admin_login(func):
@wraps(func)
def wrapper(*args, **kwargs):
if session.get('username') is None:
flash("请先登录!")
return redirect("/")
if 'admin' != session.get('username'):
flash("请登录管理员账号访问!")
return redirect("/")
return func(*args, **kwargs)
return wrapper
@app.route('/')
def index():
user = session.get('username')
if user is not None:
if user == 'admin':
return redirect("/control")
return redirect("/context")
return render_template("index.html")
# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
session.clear()
username = request.form.getlist('username')
password = request.form.getlist('password')
remember = request.form.getlist('remember')
if len(username) == 0 or len(password) == 0:
flash("请输入完整的登录信息!")
return redirect("/")
results = search_user_info(User(username[0], password[0]))
if len(results) != 0:
session['username'] = username[0]
# 判断是否勾选记住我
if len(remember) != 0:
session.permanent = True
if username[0] == 'admin':
return redirect("/control")
return redirect("/context")
else:
flash("用户 " + username[0] + " 不存在,或密码错误!")
return redirect("/")
# 注册
@app.route('/logon', methods=['GET', 'POST'])
def logon():
username = request.form['username']
password = request.form['password']
if username == "" or password == "":
flash("请输入完整的登录信息!")
return redirect("/")
results = search_user_info_by_name(username)
if len(results) != 0:
flash("用户已经注册!请勿重复注册")
return redirect("/")
else:
db_pool.insert_user_info(User(username, password))
flash("注册成功!")
return redirect("/")
# 管理员界面
@app.route('/control', methods=["GET", "POST"])
@is_admin_login
def control():
per_page = 8
page = request.args.get('page', 1, type=int)
results = select_all_common()
total_size = len(results)
left = (page - 1) * per_page
right = left + per_page
# page:当前页 total:记录总条数 per_page每页显示的条数
paginate = Pagination(page=page, total=len(results), per_page=per_page, bs_version=3, prev_label='上一页',
next_label='下一页')
login_user = session.get('username')
results = results[left:right]
return render_template("control.html", length=total_size, login_user=login_user, paginate=paginate, results=results)
# 故障记录界面
@app.route('/faultInfo')
@is_admin_login
def fault_info():
per_page = 8
page = request.args.get('page', 1, type=int)
results = show_fault_info()
left = (page - 1) * per_page
right = left + per_page
# page:当前页 total:记录总条数
paginate = Pagination(page=page, total=len(results), per_page=per_page, bs_version=3, prev_label='上一页',
next_label='下一页')
return render_template("faultInfo.html", login_user=session.get('username'), paginate=paginate,
results=results[left: right])
# 内容界面
@app.route('/context', methods=['GET', 'POST'])
@is_login
def context(param=None):
if param is None:
param = {}
param['login_user'] = session.get('username')
return render_template("context.html", **param)
# 注销
@app.route('/logout')
@is_login
def logout():
session.clear()
return redirect("/")
# 删除用户
@app.route('/deleteUser', methods=['GET', 'POST'])
@is_admin_login
def delete_user():
username = request.form.get('username')
delete_user_info(username)
return {'result': 'success'}
# 重置用户密码为000
@app.route('/updateUser', methods=['GET', 'POST'])
@is_admin_login
def reset_password():
username = request.form.get('username')
update_user_info(username)
return {'result': 'success'}
# 计算分类率
@app.route('/calcRate', methods=['GET', 'POST'])
@is_login
def calc_rate():
param = request.form.get('kinds')
calc_rate_cache = eval(request.form.get('cache'))
size = ' 200'
cmd = "./run_calc_rates.sh /usr/local/MATLAB/MATLAB_Runtime/v95 "
if os.name == 'nt':
cmd = "calc_rates.exe "
cmd = cmd + str(param) + size
with calc_mutex:
if calc_rate_cache is False or cache.get('rates') is None:
process = os.popen(cmd)
classification_rate = process.readline()
cache.set('rates', classification_rate)
cache.set('lastFault', param)
else:
time.sleep(2)
classification_rate = cache.get('rates')
return {'classification_rate': classification_rate}
@app.route('/detect', methods=['GET', 'POST'])
@is_login
def detect():
param = request.form.get('kinds')
detect_cache = eval(request.form.get('cache'))
size = ' 260'
cmd = "./run_detect_fault.sh /usr/local/MATLAB/MATLAB_Runtime/v95 "
if os.name == 'nt':
cmd = "detect_fault.exe "
cmd = cmd + str(param) + size
with detect_mutex:
if detect_cache is False or cache.get('detected') is None or len(cache.get('detected')) == 0:
process = os.popen(cmd)
results = process.readlines()
cache.set('detected', results.copy())
else:
results = cache.get('detected')
detect_rate = results[0]
detect_data = results[1:]
return {'detect_rate': detect_rate, 'detect_data': detect_data}
# 发送邮件
@app.route('/sendEmail', methods=['GET', 'POST'])
@is_login
def send_email():
kind = request.form.get('kind')
results = send_mail(kind)
if results == -1:
return {'res': 'failed'}
return {'res': 'success'}
# 故障发生时通过传感器获取故障状况
# 待扩充
@app.route('/shot', methods=['GET', 'POST'])
@is_login
def shot():
return {'温度': 40, '压强': 770}
# 处理故障信息表单
@app.route('/formProcess', methods=['GET', 'POST'])
@is_login
def process_form():
fixer = request.form.get('fixer')
dutyer = request.form.get('dutyer')
broken = request.form.get('broken')
lost = request.form.get('lost')
if fixer is None or dutyer is None or broken is None:
return {'result': 'nothing'}
insert_fault_info(Fault(fixer, dutyer, broken, lost, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
return {'result': 'success'}
@app.route('/lastFault', methods=['GET', 'POST'])
@is_login
def last_fault():
return {'lastFault': str(cache.get('lastFault'))}
@app.route('/downloadFaultInfos', methods=['GET', 'POST'])
@is_login
def download_fault_infos():
# 下载故障记录文件
service.download_fault_info.download_fault_info()
# 需要知道2个参数, 第1个参数是本地目录的path, 第2个参数是文件名(带扩展名)
directory = os.getcwd() # 假设在当前目录
return send_from_directory(directory, 'files/outs/故障记录表.xlsx', as_attachment=True)
@app.route('/searchUser', methods=['GET', 'POST'])
@is_login
def search_user():
name = request.form.get('username')
results = []
if name != "" and name is not None:
results = search_user_info_by_name(name)
return render_template("control.html", login_user=session.get('username'), results=results,
length=len(results), paginate=None, show_btn=True)
@app.route('/upload', methods=['GET', 'POST'])
@is_admin_login
def upload():
if request.method == 'POST':
# 通过file标签获取文件
f = request.files['file']
# 当前文件所在路径
base_path = os.path.dirname(__file__)
upload_path = os.path.join(base_path, 'files/inputs', f.filename)
# 保存文件
f.save(upload_path)
result = import_users_from_excels(f.filename)
if len(result["failed"]) == 0:
return {"info": "读取到{0}组有效信息,成功创建{1}名用户".format(result["total"], result["success"]), "code": 0}
return {
"info": "读取到{0}组有效信息,成功创建{1}名用户。{2}用户已存在".format(result["total"], result["success"], str(result["failed"])),
"code": 1}
if __name__ == '__main__':
app.run(debug=True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。