代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/sysSentry 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From bd32dc01000126d593c188d47404cfdbe1df343e Mon Sep 17 00:00:00 2001
From: zhuofeng <zhuofeng2@huawei.com>
Date: Thu, 12 Sep 2024 11:29:01 +0800
Subject: [PATCH 1/2] add collect module to sysSentry
---
config/collector.conf | 7 +
service/sentryCollector.service | 12 +
service/sysSentry.service | 2 +-
src/python/sentryCollector/__init__.py | 0
src/python/sentryCollector/__main__.py | 17 ++
src/python/sentryCollector/collect_config.py | 118 ++++++++
src/python/sentryCollector/collect_io.py | 239 ++++++++++++++++
src/python/sentryCollector/collect_plugin.py | 276 ++++++++++++++++++
src/python/sentryCollector/collect_server.py | 285 +++++++++++++++++++
src/python/sentryCollector/collectd.py | 99 +++++++
src/python/setup.py | 4 +-
11 files changed, 1057 insertions(+), 2 deletions(-)
create mode 100644 config/collector.conf
create mode 100644 service/sentryCollector.service
create mode 100644 src/python/sentryCollector/__init__.py
create mode 100644 src/python/sentryCollector/__main__.py
create mode 100644 src/python/sentryCollector/collect_config.py
create mode 100644 src/python/sentryCollector/collect_io.py
create mode 100644 src/python/sentryCollector/collect_plugin.py
create mode 100644 src/python/sentryCollector/collect_server.py
create mode 100644 src/python/sentryCollector/collectd.py
diff --git a/config/collector.conf b/config/collector.conf
new file mode 100644
index 0000000..9baa086
--- /dev/null
+++ b/config/collector.conf
@@ -0,0 +1,7 @@
+[common]
+modules=io
+
+[io]
+period_time=1
+max_save=10
+disk=default
\ No newline at end of file
diff --git a/service/sentryCollector.service b/service/sentryCollector.service
new file mode 100644
index 0000000..4ee07d5
--- /dev/null
+++ b/service/sentryCollector.service
@@ -0,0 +1,12 @@
+[Unit]
+Description = Collection module added for sysSentry and kernel lock-free collection
+
+[Service]
+ExecStart=/usr/bin/python3 /usr/bin/sentryCollector
+ExecStop=/bin/kill $MAINPID
+KillMode=process
+Restart=on-failure
+RestartSec=10s
+
+[Install]
+WantedBy = multi-user.target
diff --git a/service/sysSentry.service b/service/sysSentry.service
index 4d85a6c..1d8338f 100644
--- a/service/sysSentry.service
+++ b/service/sysSentry.service
@@ -2,7 +2,7 @@
Description=EulerOS System Inspection Frame
[Service]
-ExecStart=/usr/bin/syssentry
+ExecStart=/usr/bin/python3 /usr/bin/syssentry
ExecStop=/bin/kill $MAINPID
KillMode=process
Restart=on-failure
diff --git a/src/python/sentryCollector/__init__.py b/src/python/sentryCollector/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/python/sentryCollector/__main__.py b/src/python/sentryCollector/__main__.py
new file mode 100644
index 0000000..9c2ae50
--- /dev/null
+++ b/src/python/sentryCollector/__main__.py
@@ -0,0 +1,17 @@
+# coding: utf-8
+# Copyright (c) 2023 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+main
+"""
+from collectd import collectd
+
+collectd.main()
diff --git a/src/python/sentryCollector/collect_config.py b/src/python/sentryCollector/collect_config.py
new file mode 100644
index 0000000..b6cc75c
--- /dev/null
+++ b/src/python/sentryCollector/collect_config.py
@@ -0,0 +1,118 @@
+# coding: utf-8
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+Read and save collector.conf value.
+"""
+import configparser
+import logging
+import os
+import re
+
+
+COLLECT_CONF_PATH = "/etc/sysSentry/collector.conf"
+
+CONF_COMMON = 'common'
+CONF_MODULES = 'modules'
+
+# io
+CONF_IO = 'io'
+CONF_IO_PERIOD_TIME = 'period_time'
+CONF_IO_MAX_SAVE = 'max_save'
+CONF_IO_DISK = 'disk'
+CONF_IO_PERIOD_TIME_DEFAULT = 1
+CONF_IO_MAX_SAVE_DEFAULT = 10
+CONF_IO_DISK_DEFAULT = "default"
+
+class CollectConfig:
+ def __init__(self, filename=COLLECT_CONF_PATH):
+
+ self.filename = filename
+ self.modules = []
+ self.module_count = 0
+ self.load_config()
+
+ def load_config(self):
+ if not os.path.exists(self.filename):
+ logging.error("%s is not exists", self.filename)
+ return
+
+ try:
+ self.config = configparser.ConfigParser()
+ self.config.read(self.filename)
+ except configparser.Error:
+ logging.error("collectd configure file read failed")
+ return
+
+ try:
+ common_config = self.config[CONF_COMMON]
+ modules_str = common_config[CONF_MODULES]
+ # remove space
+ modules_list = modules_str.replace(" ", "").split(',')
+ except KeyError as e:
+ logging.error("read config data failed, %s", e)
+ return
+
+ pattern = r'^[a-zA-Z0-9-_]+$'
+ for module_name in modules_list:
+ if not re.match(pattern, module_name):
+ logging.warning("module_name: %s is invalid", module_name)
+ continue
+ if not self.config.has_section(module_name):
+ logging.warning("module_name: %s config is incorrect", module_name)
+ continue
+ self.modules.append(module_name)
+
+ def load_module_config(self, module_name):
+ module_name = module_name.strip().lower()
+ if module_name in self.modules and self.config.has_section(module_name):
+ return {key.lower(): value for key, value in self.config[module_name].items()}
+ else:
+ raise ValueError(f"Module '{module_name}' not found in configuration")
+
+ def get_io_config(self):
+ result_io_config = {}
+ io_map_value = self.load_module_config(CONF_IO)
+ # period_time
+ period_time = io_map_value.get(CONF_IO_PERIOD_TIME)
+ if period_time and period_time.isdigit() and int(period_time) >= 1 and int(period_time) <= 300:
+ result_io_config[CONF_IO_PERIOD_TIME] = int(period_time)
+ else:
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %d",
+ CONF_IO, CONF_IO_PERIOD_TIME, CONF_IO_PERIOD_TIME_DEFAULT)
+ result_io_config[CONF_IO_PERIOD_TIME] = CONF_IO_PERIOD_TIME_DEFAULT
+ # max_save
+ max_save = io_map_value.get(CONF_IO_MAX_SAVE)
+ if max_save and max_save.isdigit() and int(max_save) >= 1 and int(max_save) <= 300:
+ result_io_config[CONF_IO_MAX_SAVE] = int(max_save)
+ else:
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %d",
+ CONF_IO, CONF_IO_MAX_SAVE, CONF_IO_MAX_SAVE_DEFAULT)
+ result_io_config[CONF_IO_MAX_SAVE] = CONF_IO_MAX_SAVE_DEFAULT
+ # disk
+ disk = io_map_value.get(CONF_IO_DISK)
+ if disk:
+ disk_str = disk.replace(" ", "")
+ pattern = r'^[a-zA-Z0-9-_,]+$'
+ if not re.match(pattern, disk_str):
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %s",
+ CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT)
+ disk_str = CONF_IO_DISK_DEFAULT
+ result_io_config[CONF_IO_DISK] = disk_str
+ else:
+ logging.warning("module_name = %s section, field = %s is incorrect, use default %s",
+ CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT)
+ result_io_config[CONF_IO_DISK] = CONF_IO_DISK_DEFAULT
+ logging.info("config get_io_config: %s", result_io_config)
+ return result_io_config
+
+ def get_common_config(self):
+ return {key.lower(): value for key, value in self.config['common'].items()}
diff --git a/src/python/sentryCollector/collect_io.py b/src/python/sentryCollector/collect_io.py
new file mode 100644
index 0000000..b826dc4
--- /dev/null
+++ b/src/python/sentryCollector/collect_io.py
@@ -0,0 +1,239 @@
+# coding: utf-8
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+collect module
+"""
+import os
+import time
+import logging
+import threading
+
+from .collect_config import CollectConfig
+
+Io_Category = ["read", "write", "flush", "discard"]
+IO_GLOBAL_DATA = {}
+IO_CONFIG_DATA = []
+
+class IoStatus():
+ TOTAL = 0
+ FINISH = 1
+ LATENCY = 2
+
+class CollectIo():
+
+ def __init__(self, module_config):
+
+ io_config = module_config.get_io_config()
+
+ self.period_time = io_config['period_time']
+ self.max_save = io_config['max_save']
+ disk_str = io_config['disk']
+
+ self.disk_map_stage = {}
+ self.window_value = {}
+
+ self.loop_all = False
+
+ if disk_str == "default":
+ self.loop_all = True
+ else:
+ self.disk_list = disk_str.strip().split(',')
+
+ self.stop_event = threading.Event()
+
+ IO_CONFIG_DATA.append(self.period_time)
+ IO_CONFIG_DATA.append(self.max_save)
+
+ def get_blk_io_hierarchy(self, disk_name, stage_list):
+ stats_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/stats'.format(disk_name)
+ try:
+ with open(stats_file, 'r') as file:
+ lines = file.read()
+ except FileNotFoundError:
+ logging.error("The file %s does not exist", stats_file)
+ return -1
+ except Exception as e:
+ logging.error("An error occurred3: %s", e)
+ return -1
+
+ curr_value = lines.strip().split('\n')
+
+ for stage_val in curr_value:
+ stage = stage_val.split(' ')[0]
+ if (len(self.window_value[disk_name][stage])) >= 2:
+ self.window_value[disk_name][stage].pop(0)
+
+ curr_stage_value = stage_val.split(' ')[1:-1]
+ self.window_value[disk_name][stage].append(curr_stage_value)
+ return 0
+
+ def append_period_lat(self, disk_name, stage_list):
+ for stage in stage_list:
+ if len(self.window_value[disk_name][stage]) < 2:
+ return
+ curr_stage_value = self.window_value[disk_name][stage][-1]
+ last_stage_value = self.window_value[disk_name][stage][-2]
+
+ for index in range(len(Io_Category)):
+ # read=0, write=1, flush=2, discard=3
+ if (len(IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save:
+ IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].pop()
+
+ curr_lat = self.get_latency_value(curr_stage_value, last_stage_value, index)
+ curr_iops = self.get_iops(curr_stage_value, last_stage_value, index)
+ curr_io_length = self.get_io_length(curr_stage_value, last_stage_value, index)
+ curr_io_dump = self.get_io_dump(disk_name, stage, index)
+
+ IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops])
+
+ def get_iops(self, curr_stage_value, last_stage_value, category):
+ try:
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
+ except ValueError as e:
+ logging.error("get_iops convert to int failed, %s", e)
+ return 0
+ value = finish / self.period_time
+ if value.is_integer():
+ return int(value)
+ else:
+ return round(value, 1)
+
+ def get_latency_value(self, curr_stage_value, last_stage_value, category):
+ try:
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
+ lat_time = (int(curr_stage_value[category * 3 + IoStatus.LATENCY]) - int(last_stage_value[category * 3 + IoStatus.LATENCY]))
+ except ValueError as e:
+ logging.error("get_latency_value convert to int failed, %s", e)
+ return 0
+ if finish <= 0 or lat_time <= 0:
+ return 0
+ value = lat_time / finish / 1000 / 1000
+ if value.is_integer():
+ return int(value)
+ else:
+ return round(value, 1)
+
+ def get_io_length(self, curr_stage_value, last_stage_value, category):
+ try:
+ finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH])
+ except ValueError as e:
+ logging.error("get_io_length convert to int failed, %s", e)
+ return 0
+ value = finish / self.period_time / 1000 / 1000
+ if value.is_integer():
+ return int(value)
+ else:
+ return round(value, 1)
+
+ def get_io_dump(self, disk_name, stage, category):
+ io_dump_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/io_dump'.format(disk_name, stage)
+ count = 0
+ try:
+ with open(io_dump_file, 'r') as file:
+ for line in file:
+ count += line.count('.op=' + Io_Category[category])
+ except FileNotFoundError:
+ logging.error("The file %s does not exist.", io_dump_file)
+ return count
+ except Exception as e:
+ logging.error("An error occurred1: %s", e)
+ return count
+ return count
+
+ def extract_first_column(self, file_path):
+ column_names = []
+ try:
+ with open(file_path, 'r') as file:
+ for line in file:
+ parts = line.strip().split()
+ if parts:
+ column_names.append(parts[0])
+ except FileNotFoundError:
+ logging.error("The file %s does not exist.", file_path)
+ except Exception as e:
+ logging.error("An error occurred2: %s", e)
+ return column_names
+
+ def task_loop(self):
+ if self.stop_event.is_set():
+ logging.info("collect io thread exit")
+ return
+
+ for disk_name, stage_list in self.disk_map_stage.items():
+ if self.get_blk_io_hierarchy(disk_name, stage_list) < 0:
+ continue
+ self.append_period_lat(disk_name, stage_list)
+
+ threading.Timer(self.period_time, self.task_loop).start()
+
+ def main_loop(self):
+ logging.info("collect io thread start")
+ base_path = '/sys/kernel/debug/block'
+ for disk_name in os.listdir(base_path):
+ if not self.loop_all and disk_name not in self.disk_list:
+ continue
+
+ disk_path = os.path.join(base_path, disk_name)
+ blk_io_hierarchy_path = os.path.join(disk_path, 'blk_io_hierarchy')
+
+ if not os.path.exists(blk_io_hierarchy_path):
+ logging.error("no blk_io_hierarchy directory found in %s, skipping.", disk_name)
+ continue
+
+ for file_name in os.listdir(blk_io_hierarchy_path):
+ file_path = os.path.join(blk_io_hierarchy_path, file_name)
+
+ if file_name == 'stats':
+ stage_list = self.extract_first_column(file_path)
+ self.disk_map_stage[disk_name] = stage_list
+ self.window_value[disk_name] = {}
+ IO_GLOBAL_DATA[disk_name] = {}
+
+ if len(self.disk_map_stage) == 0:
+ logging.warning("no disks meet the requirements. the thread exits")
+ return
+
+ for disk_name, stage_list in self.disk_map_stage.items():
+ for stage in stage_list:
+ self.window_value[disk_name][stage] = []
+ IO_GLOBAL_DATA[disk_name][stage] = {}
+ for category in Io_Category:
+ IO_GLOBAL_DATA[disk_name][stage][category] = []
+
+ while True:
+ start_time = time.time()
+
+ if self.stop_event.is_set():
+ logging.info("collect io thread exit")
+ return
+
+ for disk_name, stage_list in self.disk_map_stage.items():
+ if self.get_blk_io_hierarchy(disk_name, stage_list) < 0:
+ continue
+ self.append_period_lat(disk_name, stage_list)
+
+ elapsed_time = time.time() - start_time
+ sleep_time = self.period_time - elapsed_time
+ if sleep_time < 0:
+ continue
+ while sleep_time > 1:
+ if self.stop_event.is_set():
+ logging.info("collect io thread exit")
+ return
+ time.sleep(1)
+ sleep_time -= 1
+ time.sleep(sleep_time)
+
+ # set stop event, notify thread exit
+ def stop_thread(self):
+ logging.info("collect io thread is preparing to exit")
+ self.stop_event.set()
diff --git a/src/python/sentryCollector/collect_plugin.py b/src/python/sentryCollector/collect_plugin.py
new file mode 100644
index 0000000..49ce0a8
--- /dev/null
+++ b/src/python/sentryCollector/collect_plugin.py
@@ -0,0 +1,276 @@
+# coding: utf-8
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+collcet plugin
+"""
+import json
+import socket
+import logging
+import re
+
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
+
+# data length param
+CLT_MSG_HEAD_LEN = 9 #3+2+4
+CLT_MSG_PRO_LEN = 2
+CLT_MSG_MAGIC_LEN = 3
+CLT_MSG_LEN_LEN = 4
+
+CLT_MAGIC = "CLT"
+RES_MAGIC = "RES"
+
+# disk limit
+LIMIT_DISK_CHAR_LEN = 32
+LIMIT_DISK_LIST_LEN = 10
+
+# stage limit
+LIMIT_STAGE_CHAR_LEN = 20
+LIMIT_STAGE_LIST_LEN = 15
+
+#iotype limit
+LIMIT_IOTYPE_CHAR_LEN = 7
+LIMIT_IOTYPE_LIST_LEN = 4
+
+#period limit
+LIMIT_PERIOD_MIN_LEN = 1
+LIMIT_PERIOD_MAX_LEN = 300
+
+# interface protocol
+class ClientProtocol():
+ IS_IOCOLLECT_VALID = 0
+ GET_IO_DATA = 1
+ PRO_END = 3
+
+class ResultMessage():
+ RESULT_SUCCEED = 0
+ RESULT_UNKNOWN = 1 # unknown error
+ RESULT_NOT_PARAM = 2 # the parameter does not exist or the type does not match.
+ RESULT_INVALID_LENGTH = 3 # invalid parameter length.
+ RESULT_EXCEED_LIMIT = 4 # the parameter length exceeds the limit.
+ RESULT_PARSE_FAILED = 5 # parse failed
+ RESULT_INVALID_CHAR = 6 # invalid char
+
+Result_Messages = {
+ ResultMessage.RESULT_SUCCEED: "Succeed",
+ ResultMessage.RESULT_UNKNOWN: "Unknown error",
+ ResultMessage.RESULT_NOT_PARAM: "The parameter does not exist or the type does not match",
+ ResultMessage.RESULT_INVALID_LENGTH: "Invalid parameter length",
+ ResultMessage.RESULT_EXCEED_LIMIT: "The parameter length exceeds the limit",
+ ResultMessage.RESULT_PARSE_FAILED: "Parse failed",
+ ResultMessage.RESULT_INVALID_CHAR: "Invalid char"
+}
+
+
+def client_send_and_recv(request_data, data_str_len, protocol):
+ """client socket send and recv message"""
+ try:
+ client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ except socket.error:
+ print("collect_plugin: client creat socket error")
+ return None
+
+ try:
+ client_socket.connect(COLLECT_SOCKET_PATH)
+ except OSError:
+ client_socket.close()
+ print("collect_plugin: client connect error")
+ return None
+
+ req_data_len = len(request_data)
+ request_msg = CLT_MAGIC + str(protocol).zfill(CLT_MSG_PRO_LEN) + str(req_data_len).zfill(CLT_MSG_LEN_LEN) + request_data
+
+ try:
+ client_socket.send(request_msg.encode())
+ res_data = client_socket.recv(len(RES_MAGIC) + CLT_MSG_PRO_LEN + data_str_len)
+ res_data = res_data.decode()
+ except (OSError, UnicodeError):
+ client_socket.close()
+ print("collect_plugin: client communicate error")
+ return None
+
+ res_magic = res_data[:CLT_MSG_MAGIC_LEN]
+ if res_magic != "RES":
+ print("res msg format error")
+ return None
+
+ protocol_str = res_data[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN]
+ try:
+ protocol_id = int(protocol_str)
+ except ValueError:
+ print("recv msg protocol id is invalid %s", protocol_str)
+ return None
+
+ if protocol_id >= ClientProtocol.PRO_END:
+ print("protocol id is invalid")
+ return None
+
+ try:
+ res_data_len = int(res_data[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:])
+ res_msg_data = client_socket.recv(res_data_len)
+ res_msg_data = res_msg_data.decode()
+ return res_msg_data
+ except (OSError, ValueError, UnicodeError):
+ print("collect_plugin: client recv res msg error")
+ finally:
+ client_socket.close()
+
+ return None
+
+def validate_parameters(param, len_limit, char_limit):
+ ret = ResultMessage.RESULT_SUCCEED
+ if not param:
+ print("parm is invalid")
+ ret = ResultMessage.RESULT_NOT_PARAM
+ return [False, ret]
+
+ if not isinstance(param, list):
+ print(f"{param} is not list type.")
+ ret = ResultMessage.RESULT_NOT_PARAM
+ return [False, ret]
+
+ if len(param) <= 0:
+ print(f"{param} length is 0.")
+ ret = ResultMessage.RESULT_INVALID_LENGTH
+ return [False, ret]
+
+ if len(param) > len_limit:
+ print(f"{param} length more than {len_limit}")
+ ret = ResultMessage.RESULT_EXCEED_LIMIT
+ return [False, ret]
+
+ pattern = r'^[a-zA-Z0-9_-]+$'
+ for info in param:
+ if len(info) > char_limit:
+ print(f"{info} length more than {char_limit}")
+ ret = ResultMessage.RESULT_EXCEED_LIMIT
+ return [False, ret]
+ if not re.match(pattern, info):
+ print(f"{info} is invalid char")
+ ret = ResultMessage.RESULT_INVALID_CHAR
+ return [False, ret]
+
+ return [True, ret]
+
+def is_iocollect_valid(period, disk_list=None, stage=None):
+ result = inter_is_iocollect_valid(period, disk_list, stage)
+ error_code = result['ret']
+ if error_code != ResultMessage.RESULT_SUCCEED:
+ result['message'] = Result_Messages[error_code]
+ return result
+
+def inter_is_iocollect_valid(period, disk_list=None, stage=None):
+ result = {}
+ result['ret'] = ResultMessage.RESULT_UNKNOWN
+ result['message'] = ""
+
+ if not period or not isinstance(period, int):
+ result['ret'] = ResultMessage.RESULT_NOT_PARAM
+ return result
+ if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN:
+ result['ret'] = ResultMessage.RESULT_INVALID_LENGTH
+ return result
+
+ if not disk_list:
+ disk_list = []
+ else:
+ res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN)
+ if not res[0]:
+ result['ret'] = res[1]
+ return result
+
+ if not stage:
+ stage = []
+ else:
+ res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN)
+ if not res[0]:
+ result['ret'] = res[1]
+ return result
+
+ req_msg_struct = {
+ 'disk_list': json.dumps(disk_list),
+ 'period': period,
+ 'stage': json.dumps(stage)
+ }
+ request_message = json.dumps(req_msg_struct)
+ result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.IS_IOCOLLECT_VALID)
+ if not result_message:
+ print("collect_plugin: client_send_and_recv failed")
+ return result
+
+ try:
+ json.loads(result_message)
+ except json.JSONDecodeError:
+ print("is_iocollect_valid: json decode error")
+ result['ret'] = ResultMessage.RESULT_PARSE_FAILED
+ return result
+
+ result['ret'] = ResultMessage.RESULT_SUCCEED
+ result['message'] = result_message
+ return result
+
+def get_io_data(period, disk_list, stage, iotype):
+ result = inter_get_io_data(period, disk_list, stage, iotype)
+ error_code = result['ret']
+ if error_code != ResultMessage.RESULT_SUCCEED:
+ result['message'] = Result_Messages[error_code]
+ return result
+
+def inter_get_io_data(period, disk_list, stage, iotype):
+ result = {}
+ result['ret'] = ResultMessage.RESULT_UNKNOWN
+ result['message'] = ""
+
+ if not isinstance(period, int):
+ result['ret'] = ResultMessage.RESULT_NOT_PARAM
+ return result
+ if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN:
+ result['ret'] = ResultMessage.RESULT_INVALID_LENGTH
+ return result
+
+ res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN)
+ if not res[0]:
+ result['ret'] = res[1]
+ return result
+
+ res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN)
+ if not res[0]:
+ result['ret'] = res[1]
+ return result
+
+ res = validate_parameters(iotype, LIMIT_IOTYPE_LIST_LEN, LIMIT_IOTYPE_CHAR_LEN)
+ if not res[0]:
+ result['ret'] = res[1]
+ return result
+
+ req_msg_struct = {
+ 'disk_list': json.dumps(disk_list),
+ 'period': period,
+ 'stage': json.dumps(stage),
+ 'iotype' : json.dumps(iotype)
+ }
+
+ request_message = json.dumps(req_msg_struct)
+ result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.GET_IO_DATA)
+ if not result_message:
+ print("collect_plugin: client_send_and_recv failed")
+ return result
+ try:
+ json.loads(result_message)
+ except json.JSONDecodeError:
+ print("get_io_data: json decode error")
+ result['ret'] = ResultMessage.RESULT_PARSE_FAILED
+ return result
+
+ result['ret'] = ResultMessage.RESULT_SUCCEED
+ result['message'] = result_message
+ return result
+
diff --git a/src/python/sentryCollector/collect_server.py b/src/python/sentryCollector/collect_server.py
new file mode 100644
index 0000000..fa49781
--- /dev/null
+++ b/src/python/sentryCollector/collect_server.py
@@ -0,0 +1,285 @@
+# coding: utf-8
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+listen module
+"""
+import sys
+import signal
+import traceback
+import socket
+import os
+import json
+import logging
+import fcntl
+import select
+import threading
+import time
+
+from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA
+from .collect_config import CollectConfig
+
+SENTRY_RUN_DIR = "/var/run/sysSentry"
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
+
+# socket param
+CLT_LISTEN_QUEUE_LEN = 5
+SERVER_EPOLL_TIMEOUT = 0.3
+
+# data length param
+CLT_MSG_HEAD_LEN = 9 #3+2+4
+CLT_MSG_PRO_LEN = 2
+CLT_MSG_MAGIC_LEN = 3
+CLT_MSG_LEN_LEN = 4
+
+# data flag param
+CLT_MAGIC = "CLT"
+RES_MAGIC = "RES"
+
+# interface protocol
+class ServerProtocol():
+ IS_IOCOLLECT_VALID = 0
+ GET_IO_DATA = 1
+ PRO_END = 3
+
+class CollectServer():
+
+ def __init__(self):
+
+ self.io_global_data = {}
+
+ self.stop_event = threading.Event()
+
+ def is_iocollect_valid(self, data_struct):
+
+ result_rev = {}
+ self.io_global_data = IO_GLOBAL_DATA
+
+ if len(IO_CONFIG_DATA) == 0:
+ logging.error("the collect thread is not started, the data is invalid. ")
+ return json.dumps(result_rev)
+
+ period_time = IO_CONFIG_DATA[0]
+ max_save = IO_CONFIG_DATA[1]
+
+ disk_list = json.loads(data_struct['disk_list'])
+ period = int(data_struct['period'])
+ stage_list = json.loads(data_struct['stage'])
+
+ if (period < period_time) or (period > period_time * max_save) or (period % period_time):
+ logging.error("is_iocollect_valid: period time: %d is invalid", period)
+ return json.dumps(result_rev)
+
+ for disk_name, stage_info in self.io_global_data.items():
+ if len(disk_list) > 0 and disk_name not in disk_list:
+ continue
+ result_rev[disk_name] = []
+ if len(stage_list) == 0:
+ result_rev[disk_name] = list(stage_info.keys())
+ continue
+ for stage_name, stage_data in stage_info.items():
+ if stage_name in stage_list:
+ result_rev[disk_name].append(stage_name)
+
+ return json.dumps(result_rev)
+
+ def get_io_data(self, data_struct):
+ result_rev = {}
+ self.io_global_data = IO_GLOBAL_DATA
+
+ if len(IO_CONFIG_DATA) == 0:
+ logging.error("the collect thread is not started, the data is invalid. ")
+ return json.dumps(result_rev)
+ period_time = IO_CONFIG_DATA[0]
+ max_save = IO_CONFIG_DATA[1]
+
+ period = int(data_struct['period'])
+ disk_list = json.loads(data_struct['disk_list'])
+ stage_list = json.loads(data_struct['stage'])
+ iotype_list = json.loads(data_struct['iotype'])
+
+ if (period < period_time) or (period > period_time * max_save) or (period % period_time):
+ logging.error("get_io_data: period time: %d is invalid", period)
+ return json.dumps(result_rev)
+
+ collect_index = period // period_time - 1
+ logging.debug("period: %d, collect_index: %d", period, collect_index)
+
+ for disk_name, stage_info in self.io_global_data.items():
+ if disk_name not in disk_list:
+ continue
+ result_rev[disk_name] = {}
+ for stage_name, iotype_info in stage_info.items():
+ if len(stage_list) > 0 and stage_name not in stage_list:
+ continue
+ result_rev[disk_name][stage_name] = {}
+ for iotype_name, iotype_info in iotype_info.items():
+ if iotype_name not in iotype_list:
+ continue
+ if len(iotype_info) < collect_index:
+ continue
+ result_rev[disk_name][stage_name][iotype_name] = iotype_info[collect_index]
+
+ return json.dumps(result_rev)
+
+ def msg_data_process(self, msg_data, protocal_id):
+ """message data process"""
+ logging.debug("msg_data %s", msg_data)
+ protocol_name = msg_data[0]
+ try:
+ data_struct = json.loads(msg_data)
+ except json.JSONDecodeError:
+ logging.error("msg data process: json decode error")
+ return "Request message decode failed"
+
+ if protocal_id == ServerProtocol.IS_IOCOLLECT_VALID:
+ res_msg = self.is_iocollect_valid(data_struct)
+ elif protocal_id == ServerProtocol.GET_IO_DATA:
+ res_msg = self.get_io_data(data_struct)
+
+ return res_msg
+
+ def msg_head_process(self, msg_head):
+ """message head process"""
+ ctl_magic = msg_head[:CLT_MSG_MAGIC_LEN]
+ if ctl_magic != CLT_MAGIC:
+ logging.error("recv msg head magic invalid")
+ return None
+
+ protocol_str = msg_head[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN]
+ try:
+ protocol_id = int(protocol_str)
+ except ValueError:
+ logging.error("recv msg protocol id is invalid")
+ return None
+
+ data_len_str = msg_head[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:CLT_MSG_HEAD_LEN]
+ try:
+ data_len = int(data_len_str)
+ except ValueError:
+ logging.error("recv msg data len is invalid %s", data_len_str)
+ return None
+
+ return [protocol_id, data_len]
+
+ def server_recv(self, server_socket: socket.socket):
+ """server receive"""
+ try:
+ client_socket, _ = server_socket.accept()
+ logging.debug("server_fd listen ok")
+ except socket.error:
+ logging.error("server accept failed, %s", socket.error)
+ return
+
+ try:
+ msg_head = client_socket.recv(CLT_MSG_HEAD_LEN)
+ logging.debug("recv msg head: %s", msg_head.decode())
+ head_info = self.msg_head_process(msg_head.decode())
+ except (OSError, UnicodeError):
+ client_socket.close()
+ logging.error("server recv HEAD failed")
+ return
+
+ protocol_id = head_info[0]
+ data_len = head_info[1]
+ logging.debug("msg protocol id: %d, data length: %d", protocol_id, data_len)
+ if protocol_id >= ServerProtocol.PRO_END:
+ client_socket.close()
+ logging.error("protocol id is invalid")
+ return
+
+ if data_len < 0:
+ client_socket.close()
+ logging.error("msg head parse failed")
+ return
+
+ try:
+ msg_data = client_socket.recv(data_len)
+ msg_data_decode = msg_data.decode()
+ logging.debug("msg data %s", msg_data_decode)
+ except (OSError, UnicodeError):
+ client_socket.close()
+ logging.error("server recv MSG failed")
+ return
+
+ res_data = self.msg_data_process(msg_data_decode, protocol_id)
+ logging.debug("res data %s", res_data)
+
+ # server send
+ res_head = RES_MAGIC
+ res_head += str(protocol_id).zfill(CLT_MSG_PRO_LEN)
+ res_data_len = str(len(res_data)).zfill(CLT_MSG_LEN_LEN)
+ res_head += res_data_len
+ logging.debug("res head %s", res_head)
+
+ res_msg = res_head + res_data
+ logging.debug("res msg %s", res_msg)
+
+ try:
+ client_socket.send(res_msg.encode())
+ except OSError:
+ logging.error("server recv failed")
+ finally:
+ client_socket.close()
+ return
+
+ def server_fd_create(self):
+ """create server fd"""
+ if not os.path.exists(SENTRY_RUN_DIR):
+ logging.error("%s not exist, failed", SENTRY_RUN_DIR)
+ return None
+
+ try:
+ server_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ server_fd.setblocking(False)
+ if os.path.exists(COLLECT_SOCKET_PATH):
+ os.remove(COLLECT_SOCKET_PATH)
+
+ server_fd.bind(COLLECT_SOCKET_PATH)
+ os.chmod(COLLECT_SOCKET_PATH, 0o600)
+ server_fd.listen(CLT_LISTEN_QUEUE_LEN)
+ logging.debug("%s bind and listen", COLLECT_SOCKET_PATH)
+ except socket.error:
+ logging.error("server fd create failed")
+ server_fd = None
+
+ return server_fd
+
+
+ def server_loop(self):
+ """main loop"""
+ logging.info("collect server thread start")
+ server_fd = self.server_fd_create()
+ if not server_fd:
+ return
+
+ epoll_fd = select.epoll()
+ epoll_fd.register(server_fd.fileno(), select.EPOLLIN)
+
+ logging.debug("start server_loop loop")
+ while True:
+ if self.stop_event.is_set():
+ logging.info("collect server thread exit")
+ server_fd = None
+ return
+ try:
+ events_list = epoll_fd.poll(SERVER_EPOLL_TIMEOUT)
+ for event_fd, _ in events_list:
+ if event_fd == server_fd.fileno():
+ self.server_recv(server_fd)
+ else:
+ continue
+ except socket.error:
+ pass
+
+ def stop_thread(self):
+ logging.info("collect server thread is preparing to exit")
+ self.stop_event.set()
diff --git a/src/python/sentryCollector/collectd.py b/src/python/sentryCollector/collectd.py
new file mode 100644
index 0000000..b77c642
--- /dev/null
+++ b/src/python/sentryCollector/collectd.py
@@ -0,0 +1,99 @@
+# coding: utf-8
+# Copyright (c) 2024 Huawei Technologies Co., Ltd.
+# sysSentry is licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+
+"""
+main loop for collect.
+"""
+import sys
+import signal
+import traceback
+import socket
+import os
+import json
+import logging
+import fcntl
+import select
+
+import threading
+
+from .collect_io import CollectIo
+from .collect_server import CollectServer
+from .collect_config import CollectConfig
+
+SENTRY_RUN_DIR = "/var/run/sysSentry"
+COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock"
+SENTRY_RUN_DIR_PERM = 0o750
+
+COLLECT_LOG_FILE = "/var/log/sysSentry/collector.log"
+Thread_List = []
+Module_Map_Class = {"io" : CollectIo}
+
+def remove_sock_file():
+ try:
+ os.unlink(COLLECT_SOCKET_PATH)
+ except FileNotFoundError:
+ pass
+
+def sig_handler(signum, _f):
+ if signum not in (signal.SIGINT, signal.SIGTERM):
+ return
+ for i in range(len(Thread_List)):
+ Thread_List[i][0].stop_thread()
+
+ remove_sock_file()
+ sys.exit(0)
+
+def main():
+ """main
+ """
+ if not os.path.exists(SENTRY_RUN_DIR):
+ os.mkdir(SENTRY_RUN_DIR)
+ os.chmod(SENTRY_RUN_DIR, mode=SENTRY_RUN_DIR_PERM)
+
+ logging.basicConfig(filename=COLLECT_LOG_FILE, level=logging.INFO)
+ os.chmod(COLLECT_LOG_FILE, 0o600)
+
+ try:
+ signal.signal(signal.SIGINT, sig_handler)
+ signal.signal(signal.SIGTERM, sig_handler)
+ signal.signal(signal.SIGHUP, sig_handler)
+
+ logging.info("finish main parse_args")
+
+ module_config = CollectConfig()
+ module_list = module_config.modules
+
+ # listen thread
+ cs = CollectServer()
+ listen_thread = threading.Thread(target=cs.server_loop)
+ listen_thread.start()
+ Thread_List.append([cs, listen_thread])
+
+ # collect thread
+ for info in module_list:
+ class_name = Module_Map_Class.get(info)
+ if not class_name:
+ logging.info("%s correspond to class is not exists", info)
+ continue
+ cn = class_name(module_config)
+ collect_thread = threading.Thread(target=cn.main_loop)
+ collect_thread.start()
+ Thread_List.append([cn, collect_thread])
+
+ for i in range(len(Thread_List)):
+ Thread_List[i][1].join()
+
+ except Exception:
+ logging.error('%s', traceback.format_exc())
+ finally:
+ pass
+
+ logging.info("All threads have finished. Main thread is exiting.")
\ No newline at end of file
diff --git a/src/python/setup.py b/src/python/setup.py
index f96a96e..c28c691 100644
--- a/src/python/setup.py
+++ b/src/python/setup.py
@@ -31,7 +31,9 @@ setup(
'console_scripts': [
'cpu_sentry=syssentry.cpu_sentry:main',
'syssentry=syssentry.syssentry:main',
- 'xalarmd=xalarm.xalarm_daemon:alarm_process_create'
+ 'xalarmd=xalarm.xalarm_daemon:alarm_process_create',
+ 'sentryCollector=sentryCollector.collectd:main',
+ 'avg_block_io=sentryPlugins.avg_block_io.avg_block_io:main'
]
},
)
--
2.33.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。