代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# encoding: utf-8
# ===============================================================================
#
# FILE:
#
# USAGE:
#
# DESCRIPTION:
#
# OPTIONS: ---
# REQUIREMENTS: ---
# BUGS: ---
# NOTES: ---
# AUTHOR: YOUR NAME (),
# COMPANY:
# VERSION: 1.0
# CREATED:
# REVISION: ---
# ===============================================================================
from flask import Flask, request, Response, abort
import lib.MyLog as Log
from lib.Dingding import DRobot
from flask_cors import CORS
from cfg import token_list, dingding_db, dingding_secret, dingding_token
from lib.tools import is_internal_ip
import json
listen = '0.0.0.0'
port = 10080
app = Flask(__name__)
CORS(app)
Log.set_logger(filename="/tmp/PrometheusAlertToDingDing.log", level='INFO', console=True)
dingding_robot = DRobot(
robot_url='https://oapi.dingtalk.com/robot/send',
is_sign=True,
token=dingding_token,
secret=dingding_secret,
db_file=dingding_db
)
@app.route("/")
def root():
return abort(404, 'Nothing')
# for Prometheus metrics
@app.route("/metrics")
def metrics():
if not is_internal_ip(request.remote_addr):
return abort(403, 'Not allow')
return Response('dingding_alert_total %s\ndingding_alert_success_total %s\ndingding_alert_fail_total %s' % (3, 4, 5))
# for alert manager
@app.route("/alert", methods=['POST'])
def alert():
token = request.args.get('token')
if not is_internal_ip(request.remote_addr) or token not in token_list:
return abort(403, 'Not allow')
data = request.json
try:
# summary = data['commonAnnotations']['summary']
# description = data['commonAnnotations']['description']
msg = ''
for _alert in data['alerts']:
_status = _alert['status']
_name = _alert['labels']['alertname']
_severity = _alert['labels']['severity']
_summary = _alert['annotations']['summary']
_desc = _alert['annotations']['description']
_start = _alert['startsAt']
if _status == 'resolved':
_end = _alert['endsAt']
else:
_end = 'NA'
_fp = _alert['fingerprint']
msg += "[{status}]-[{severity}]-[{fp}]: {name}\nstart:{start}, end:{end}\n{summary}, {desc}\n\n".format(
status=_status,
severity=_severity,
fp=_fp,
name=_name,
start=_start,
end=_end,
summary=_summary,
desc=_desc
)
except KeyError:
Log.error('key error, raw: %s' % data)
msg = "prometheus input data error, raw: %s" % data
ret = dingding_robot.send_text(msg, msg_from='prometheus')
return Response(ret, content_type='application/json')
if __name__ == '__main__':
from os import environ
debug = True if environ.get('APP_DEBUG') in ['True', 'true'] else False
app.run(host=listen, port=port, debug=debug, threaded=True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。