1 Star 0 Fork 1

Giftlee/bees

forked from Spook/bees 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bees.py 10.27 KB
一键复制 编辑 原始数据 按行查看 历史
zhangming 提交于 2021-08-13 17:39 . 更新配置文件对云原生的支持
#!/usr/bin/env python3
# coding: utf-8
import os
import subprocess
import threading
import time
import argparse
import sys
import signal
import psutil
from apps.config import ConfigManager
CONFIG = ConfigManager.load_user_config()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
# Must set this environment, otherwise not no ansible result return
os.environ.setdefault('PYTHONOPTIMIZE', '1')
# This options forces color mode even when running without a TTY or the “nocolor” setting is True.
os.environ.setdefault('ANSIBLE_FORCE_COLOR', '1')
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
os.environ.setdefault('PYTHONUNBUFFERED', '1')
os.environ.setdefault('PYTHONIOENCODING', "UTF-8")
APPS_DIR = os.path.join(BASE_DIR, 'apps')
LOG_DIR = os.path.join(BASE_DIR, 'logs')
TMP_DIR = os.path.join(BASE_DIR, 'temp')
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
if not os.path.exists(TMP_DIR):
os.makedirs(TMP_DIR)
HTTP_HOST = CONFIG.RUN_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.RUN_PORT or 8080
START_TIMEOUT = 40
WORKERS = 1
DAEMON = False
LOG_LEVEL = CONFIG.LOG_LEVEL or 'INFO'
DEBUG = CONFIG.DEBUG or False
# export LC_ALL="en_US.UTF-8"
os.environ["LC_ALL"] = "en_US.UTF-8"
# export FLASK_APP=apps/manage.py
os.environ["FLASK_APP"] = "apps/manage.py"
# export FLASK_ENV=development
os.environ["FLASK_ENV"] = CONFIG.ENV or 'development'
# export FLASK_DEBUG=0
os.environ["FLASK_DEBUG"] = '1' if CONFIG.DEBUG else '0'
EXIT_EVENT = threading.Event()
all_services = ['gunicorn', 'celery', 'beat']
def check_pid(pid):
""" Check For the existence of a unix pid. """
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def get_pid_file_path(service):
return os.path.join(TMP_DIR, '{}.pid'.format(service))
def get_log_file_path(service):
return os.path.join(LOG_DIR, '{}.log'.format(service))
def get_pid(service):
pid_file = get_pid_file_path(service)
if os.path.isfile(pid_file):
with open(pid_file) as f:
try:
return int(f.read().strip())
except ValueError:
return 0
return 0
def is_running(s, unlink=True):
pid_file = get_pid_file_path(s)
if os.path.isfile(pid_file):
pid = get_pid(s)
if check_pid(pid):
return True
if unlink:
os.unlink(pid_file)
return False
def parse_service(s):
if s == 'all':
return all_services
elif "," in s:
return [i.strip() for i in s.split(',')]
else:
return [s]
def start_gunicorn():
print("\n- Start Gunicorn WSGI HTTP Server")
service = 'gunicorn'
bind = '{}:{}'.format(HTTP_HOST, HTTP_PORT)
log_format = '%(h)s %(t)s "%(r)s" %(s)s %(b)s '
pid_file = get_pid_file_path(service)
log_file = get_log_file_path(service)
# https://flask-socketio.readthedocs.io/en/latest/#gunicorn-web-server
###### gthread
# 1.websocket启动必须将-w设置为1,因为websocket的请求无法分发到同一个进程
# 2.gunicorn结合websocket使用时:
# gunicorn --worker-class必须设置为gthread, 不然这样设置会导致gunicorn运行ansible时显示超时. [CRITICAL] WORKER TIMEOUT (pid:31761)
# websocket的async_mode必须设置为threading, 此时websocket链接一直使用transport=polling方式
# cmd = [
# 'gunicorn', 'apps.manage:flask_app',
# '--bind', bind,
# '--workers', str(WORKERS),
# '--worker-class', 'gthread',
# '--threads', '10',
# '--max-requests', '4096',
# '--keep-alive', '3',
# '--pid', pid_file,
# '--log-level', LOG_LEVEL,
# '--access-logformat', log_format,
# ]
###### eventlet
# 注意gunicorn<gunicorn==19.7.1>和event<eventlet==0.23.0>的版本
# 1. websocket启动必须将 - w设置为1,因为websocket的请求无法分发到同一个进程
# 2. 以下gunicorn配置 --worker-class=eventlet
# websocket的async_mode必须设置为eventlet
# cmd = [
# 'gunicorn', 'apps.manage:flask_app',
# '--bind', bind,
# '--workers', str(WORKERS),
# '--worker-class', 'eventlet',
# '--threads', '10',
# '--max-requests', '4096',
# '--keep-alive', '3',
# '--pid', pid_file,
# '--log-level', LOG_LEVEL,
# '--access-logformat', log_format,
# ]
###### gevent
# 1. websocket启动必须将 - w设置为1,因为websocket的请求无法分发到同一个进程
# 2. 以下gunicorn配置 --worker-class=geventwebsocket.gunicorn.workers.GeventWebSocketWorker
# websocket的async_mode必须设置为gevent
cmd = [
'gunicorn', 'apps.manage:flask_app',
'--bind', bind,
'--workers', str(WORKERS),
'--worker-class', 'geventwebsocket.gunicorn.workers.GeventWebSocketWorker',
'--threads', '10',
'--max-requests', '4096',
'--keep-alive', '30',
'--pid', pid_file,
'--log-level', LOG_LEVEL,
'--access-logformat', log_format,
]
# 守护进程
if DAEMON:
cmd.extend([
'--enable-stdio-inheritance',
'--daemon',
'--error-logfile', log_file,
])
# 调试模式
if DEBUG:
cmd.extend([
'--reload',
'--access-logfile', '-',
])
# else:
# cmd.extend([
# '--access-logfile', log_file,
# ])
p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr, cwd=BASE_DIR)
return p
def start_celery():
print("\n- Start Celery as Distributed Task Queue")
service = 'celery'
pid_file = get_pid_file_path(service)
log_file = get_log_file_path(service)
cmd = [
'celery', 'worker',
'--app', 'apps.manage.celery_app',
'--pidfile', pid_file,
# '--loglevel', LOG_LEVEL,
'--loglevel', 'info', # 确保ansible信息可以正常输出
'--autoscale', '20,4',
]
if DAEMON:
cmd.extend([
'--detach',
'--logfile', log_file,
])
p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr, cwd=BASE_DIR)
return p
def start_beat():
print("\n- Start Beat as Periodic Task Scheduler")
service = 'beat'
pid_file = get_pid_file_path(service)
log_file = get_log_file_path(service)
cmd = [
'celery', 'beat',
'--app', 'apps.manage.celery_app',
'--pidfile', pid_file,
'--loglevel', LOG_LEVEL,
# '--scheduler', scheduler,
'--scheduler', 'redbeat.RedBeatScheduler',
'--max-interval', '60'
]
if DAEMON:
cmd.extend([
'--logfile', log_file,
'--detach',
])
p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr, cwd=BASE_DIR)
return p
def start_service(s):
print(time.ctime())
services_handler = {
"gunicorn": start_gunicorn,
"celery": start_celery,
"beat": start_beat
}
services_set = parse_service(s)
processes = []
for i in services_set:
if is_running(i):
show_service_status(i)
continue
func = services_handler.get(i)
p = func()
processes.append(p)
now = int(time.time())
for i in services_set:
while not is_running(i):
if int(time.time()) - now < START_TIMEOUT:
time.sleep(1)
continue
else:
print("Error: {} start error".format(i))
stop_multi_services(services_set)
return
stop_event = threading.Event()
if not DAEMON:
signal.signal(signal.SIGTERM, lambda x, y: stop_event.set())
while not stop_event.is_set():
try:
time.sleep(10)
except KeyboardInterrupt:
stop_event.set()
break
print("Stop services")
for p in processes:
p.terminate()
for i in services_set:
stop_service(i)
else:
print()
show_service_status(s)
def stop_service(s, sig=9):
services_set = parse_service(s)
for s in services_set:
try:
if not is_running(s):
show_service_status(s)
continue
pid = get_pid(s)
parent = psutil.Process(pid)
children = parent.children()
# 先杀子进程
for child in children:
os.kill(child.pid, sig)
# 在杀主进程
os.kill(pid, sig)
while True:
if not is_running(s):
break
else:
time.sleep(1)
print("Stop {} success".format(s))
except Exception:
print("Stop {} failure".format(s))
def stop_multi_services(services):
for s in services:
stop_service(s)
def show_service_status(s):
services_set = parse_service(s)
for ns in services_set:
if is_running(ns):
pid = get_pid(ns)
print("{} is running: {}".format(ns, pid))
else:
print("{} is stopped".format(ns))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="""
Bees service control tools;
Example: \r\n
%(prog)s start all -d;
"""
)
parser.add_argument(
'action', type=str,
choices=("start", "stop", "restart", "status"),
help="Action to run"
)
parser.add_argument(
"service", type=str, default="all", nargs="?",
choices=("all", "gunicorn", "celery", "beat"),
help="The service to start",
)
parser.add_argument('-d', '--daemon', nargs="?", const=1)
parser.add_argument('-w', '--worker', type=int, nargs="?", const=4)
args = parser.parse_args()
if args.daemon:
DAEMON = True
if args.worker:
WORKERS = args.worker
action = args.action
srv = args.service
if action == "start":
start_service(srv)
elif action == "stop":
stop_service(srv)
elif action == "restart":
DAEMON = True
stop_service(srv)
time.sleep(5)
start_service(srv)
else:
show_service_status(srv)
# -O: PYTHONOPTIMIZE
# -u: PYTHONUNBUFFERED
# python3 -u -O bees.py start all
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/NovemberRain/bees.git
git@gitee.com:NovemberRain/bees.git
NovemberRain
bees
bees
master

搜索帮助