代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# -*- coding : utf-8 -*-
# @Time : 2024/3/21 10:02
# @Author : 我是Rain呀 -- (。♥ᴗ♥。)
# @File : OracleMonitoring.py
# @Software : PyCharm
import argparse
import cx_Oracle
import json
from datetime import datetime
class ConnOracle(object):
# 连接Oracle And 获取数据类
def __init__(self, oracleUser: str, oraclePassword: str, oracleHost: str, oraclePort: int, oracleServiceName: str):
self.username = oracleUser
self.password = oraclePassword
self.host = oracleHost
self.port = oraclePort
self.service_name = oracleServiceName
self.connection = self.connect()
def connect(self) -> object:
try:
dsn = cx_Oracle.makedsn(self.host, self.port, service_name=self.service_name)
connection = cx_Oracle.connect(self.username, self.password, dsn)
return connection
except cx_Oracle.Error:
return None
def sqlExecute(self, query: str) -> list:
cursor = self.connection.cursor()
try:
cursor.execute(query)
rows = cursor.fetchall()
return rows
except cx_Oracle.Error:
pass
finally:
# 释放游标
cursor.close()
self.disconnect()
def disconnect(self):
# 断开连接
if self.connection:
self.connection.close()
class OracleIndex(ConnOracle):
# 自动发现规则
def __init__(self, username, password, Host, Port, service_name):
super().__init__(username, password, Host, Port, service_name)
def tableSpaceIndex(self) -> json:
# 获取所有表空间的名字
if self.connection is not None:
sql = "SELECT TABLESPACE_NAME FROM DBA_TABLESPACES"
result = self.sqlExecute(sql)
return json.dumps([{"{#INDEX}": index, "{#TABLESPACE_NAME}": row[0]}
for index, row in enumerate(result, start=1)])
return json.dumps([])
def userNameIndex(self) -> json:
# 获取所有用户的名字
if self.connection is not None:
sql = "SELECT USERNAME, ACCOUNT_STATUS FROM DBA_USERS"
result = self.sqlExecute(sql)
return json.dumps([{"{#INDEX}": index, "{#USERNAME}": row[0], "{#ACCOUNT_STATUS}": row[1]}
for index, row in enumerate(result, start=1)])
return json.dumps([])
def userSegmentIndex(self) -> json:
# 获取 Segments 下面所有的 Schema,为后续统计用户大小传递数据 Schema discovery
if self.connection is not None:
sql = "SELECT DISTINCT OWNER FROM DBA_SEGMENTS"
result = self.sqlExecute(sql)
return json.dumps([{"{#INDEX}": index, "{#USERNAME}": row[0]} for index, row in enumerate(result, start=1)])
return json.dumps([])
def asmDiskGroupIndex(self) -> json:
# 获取所ASM磁盘组的名字
if self.connection is not None:
sql = "SELECT NAME FROM V$ASM_DISKGROUP"
result = self.sqlExecute(sql)
return json.dumps([{"{#INDEX}": index, "{#ASMNAME}": row[0]} for index, row in enumerate(result, start=1)])
return json.dumps([])
class OracleNorm(ConnOracle):
# 自动发现的监控项原型
def __init__(self, username, password, Host, Port, service_name):
super().__init__(username, password, Host, Port, service_name)
def tablespaceUsed(self, *args) -> float:
# 磁盘空间已使用空间 GB 为单位
if self.connection is not None:
sql = f"SELECT ROUND(USED_SPACE * 8 / 1024 / 1024, 2) USED_GB FROM DBA_TABLESPACE_USAGE_METRICS " \
f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
return float(result[0][0])
return 0.0
def tablespaceMax(self, *args) -> float:
# 磁盘空间最大空间 GB 为单位
if self.connection is not None:
sql = f"SELECT ROUND(TABLESPACE_SIZE * 8 / 1024 / 1024, 2) MAX_GB FROM DBA_TABLESPACE_USAGE_METRICS " \
f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
return float(result[0][0])
return 0.0
def tablespaceFree(self, *args) -> float:
# 磁盘空间剩余空间 GB 为单位
if self.connection is not None:
sql = f"SELECT ROUND((TABLESPACE_SIZE - USED_SPACE) * 8 / 1024 / 1024, 2) FREE_GB " \
f"FROM DBA_TABLESPACE_USAGE_METRICS WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
return float(result[0][0])
return 0.0
def tablespaceUsedPer(self, *args) -> float:
# 磁盘空间已使用百分比 % 为单位
if self.connection is not None:
sql = f"SELECT ROUND(USED_PERCENT, 2) AS USED_PERCENT FROM DBA_TABLESPACE_USAGE_METRICS " \
f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
return float(result[0][0])
return 0.0
def userExpired(self, *args) -> int:
# 检查用户还有多少天到期
# 19999 - 表示无期限
# 29999 - 表示没有这个用户或已删除
# 39999 - 表示数据库连接失败
if self.connection is not None:
sql = f"SELECT EXPIRY_DATE FROM DBA_USERS WHERE USERNAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
if result[0][0] is not None:
given_time = result[0][0]
current_time = datetime.now()
time_difference = given_time - current_time
days_difference = time_difference.days
return int(days_difference)
return 19999
return 29999
return 39999
def asmTotal(self, *args) -> int:
# 查看ASM 指定名称的总空间大小 (MB)
# 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
# 299999999 - 表示连接失败,无法连接到数据库
if self.connection is not None:
sql = f"SELECT TOTAL_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
return int(result[0][0])
return 199999999
return 299999999
def asmFree(self, *args) -> int:
# 查看ASM 指定名称的 剩余大小 (MB)
# 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
# 299999999 - 表示连接失败,无法连接到数据库
if self.connection is not None:
sql = f"SELECT FREE_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
return int(result[0][0])
return 199999999
return 299999999
def asmUsed(self, *args) -> int:
# 查看ASM 指定名称的 已使用大小 (MB)
# 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
# 299999999 - 表示连接失败,无法连接到数据库
if self.connection is not None:
sql = f"SELECT COLD_USED_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
return int(result[0][0])
return 199999999
return 299999999
def asmUsedPre(self, *args) -> float:
# 查看ASM 指定名称的 已使用百分比 (%)
# 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
# 299999999 - 表示连接失败,无法连接到数据库
# 触发器逻辑: xxx >= 85 and xxx <= 100
if self.connection is not None:
sql = f"SELECT ROUND(COLD_USED_MB / TOTAL_MB * 100, 2) FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
return float(result[0][0])
return 199999999
return 299999999
def schemaSize(self, *args) -> int:
# 统计Schema 总大小,单位为 B
# 0 - 表示没有该用户,或该用户已删除
# 1 - 表示连接失败,无法连接到数据库
if self.connection is not None:
sql = f"SELECT SUM(BYTES) FROM DBA_SEGMENTS OWNER = UPPER('{args[0]}')"
result = self.sqlExecute(sql)
if result:
return int(result[0][0])
return 0
return 1
class OracleStatic(ConnOracle):
# 单独指标项,非自动发现
def __init__(self, username, password, Host, Port, service_name):
super().__init__(username, password, Host, Port, service_name)
def OracleInstanceStatus(self) -> str:
# Oracle Instance 状态
# 触发器逻辑: 非OPEN状态告警
if self.connection is not None:
sql = "SELECT STATUS FROM V$INSTANCE"
result = self.sqlExecute(sql)
return str(result[0][0])
return "Connect Failed"
def OracleInstanceSessionPer(self) -> float:
# Session 连接占用百分比
# 触发器逻辑: 连接百分比 >= 80
if self.connection is not None:
sql = "SELECT ROUND(B.VALUE/A.VALUE*100, 2) AS SESSION_USE FROM (SELECT VALUE FROM V$PARAMETER " \
"WHERE NAME='sessions') A,(SELECT COUNT(*) AS VALUE FROM V$SESSION) B"
result = self.sqlExecute(sql)
return float(result[0][0])
return float(0.0)
def OracleVersion(self) -> str:
# Oracle数据库版本
if self.connection is not None:
sql = "SELECT * FROM V$VERSION WHERE BANNER LIKE 'Oracle%'"
result = self.sqlExecute(sql)
return str(result[0][0])
return "Connect Failed"
def OracleActiveSession(self) -> int:
# 检查活跃Session数量
if self.connection is not None:
sql = "SELECT COUNT(*) FROM V$SESSION WHERE STATUS = 'ACTIVE'"
result = self.sqlExecute(sql)
return int(result[0][0])
return 0
def OracleInvalidObject(self) -> int:
# 检查无效对象数量
if self.connection is not None:
sql = "SELECT COUNT(*) FROM DBA_OBJECTS WHERE STATUS = 'INVALID'"
result = self.sqlExecute(sql)
return int(result[0][0])
return 0
def OracleHitCacheRatio(self) -> float:
# 高速缓存命中率
# 缓存命中率越高越好,如果返回 0 则为数据库连接失败
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Buffer Cache Hit Ratio' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def OracleCursorCacheRatio(self) -> float:
# 游标缓存命中率
# 缓存命中率越高越好,如果返回 0 则为数据库连接失败
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Cursor Cache Hit Ratio' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def OraclePgaCacheHit(self) -> float:
# PGA缓存命中率 %
# 返回 0 可能是数据库连接失败
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='PGA Cache Hit %' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def OracleSoftParseHit(self) -> float:
# SQL软解析比例 %
# 返回 0 可能是数据库连接失败
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Soft Parse Ratio' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def OracleGlobalName(self) -> str:
# 全局数据库名称
if self.connection is not None:
sql = "SELECT PROPERTY_VALUE FROM DATABASE_PROPERTIES WHERE PROPERTY_NAME='GLOBAL_DB_NAME'"
result = self.sqlExecute(sql)
return str(result[0][0])
return "Connect Failed"
def OracleSharePoolFree(self) -> float:
# 共享池可用百分比 %
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Shared Pool Free %' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def OracleHostCpuUtilization(self) -> float:
# 主机CPU使用率 %
# 返回 0 可能是数据库连接失败
if self.connection is not None:
sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Host CPU Utilization (%)' AND GROUP_ID=2"
result = self.sqlExecute(sql)
return round(float(result[0][0]), 2)
return 0
def main():
def create_instance(class_name, *args):
return class_name(*args)
# 根据监控类型选择对应的类
class_dict = {
'index': OracleIndex,
'space': OracleNorm,
'statis': OracleStatic
}
# 定义参数信息的字典
args_info = [
{'name': '--host', 'type': str, 'help': 'Oracle database host/ipaddr', 'required': True},
{'name': '--user', 'type': str, 'default': 'zabbix',
'help': 'Oracle database username, The default value is zabbix'},
{'name': '--passwd', 'type': str, 'default': 'zabbix',
'help': 'Oracle database password, The default value is zabbix'},
{'name': '--port', 'type': int, 'default': 1521, 'help': 'Oracle database port, The default value is 1521'},
{'name': '--instance_name', 'type': str, 'help': 'Oracle database instance name', 'required': True},
{'name': '--type', 'type': str, 'help': 'Monitoring type', 'choices': class_dict.keys(), 'required': True},
{'name': 'function', 'type': str, 'help': 'Function to execute'},
{'name': 'args', 'nargs': '*', 'help': 'Arguments for the function'}
]
# 创建 ArgumentParser 对象
parser = argparse.ArgumentParser()
# 循环添加参数
for arg_info in args_info:
arg_name = arg_info.pop('name')
parser.add_argument(arg_name, **arg_info)
# 解析命令行参数
arges = parser.parse_args()
# 获取参数值
host, user, passwd, port, instance_name, monitoring_type, function_to_execute, function_args = (
arges.host, arges.user, arges.passwd, arges.port, arges.instance_name, arges.type, arges.function, arges.args
)
if monitoring_type in class_dict:
# 创建相应的类实例
instance = create_instance(class_dict[monitoring_type], user, passwd, host, port, instance_name)
# 调用指定函数
if hasattr(instance, function_to_execute):
print(getattr(instance, function_to_execute)(*function_args))
else:
print("Invalid function name:", function_to_execute)
else:
print("Invalid monitoring type:", monitoring_type)
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。