1 Star 0 Fork 0

何俊峰/SFTP的Server和Client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
LogMiddleWarePro.py 8.28 KB
一键复制 编辑 原始数据 按行查看 历史
frederic he 提交于 2024-06-12 14:02 . 上传代码文件
# -*- coding: utf-8 -*-
import sys
import time
import logging
from datetime import datetime
from pathlib import Path
from logging import handlers
from ProjectConfig import *
if system_type != "Windows":
sys.path.append('/Library/Frameworks/Python.framework/Versions/3.9/lib')
import paramiko
class LogMiddlewareClient:
def __init__(self, log_name="LogMiddleware", log_level="INFO", is_console=True):
"""
初始化LogMiddleware类, 加载配置并初始化日志记录器。
"""
self.logger = self.initialize_logger(log_name, log_level, is_console)
self.config = CONFIG
self.client_config = self.config.get("local_Server")
self.server_config = self.config.get("server")
self.uploaded_list_path = Path("UploadedFileName.csv")
self.check_config()
def initialize_logger(self, log_name, log_level="DEBUG", is_console=True):
"""
初始化日志记录器。
"""
log_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
}
tmp_log_level = log_levels.get(log_level.upper(), logging.DEBUG)
handler = handlers.TimedRotatingFileHandler(f"{log_name}.log", when="D", interval=1, backupCount=0,
encoding='utf-8')
formatter = logging.Formatter("%(asctime)s - %(filename)s:%(lineno)s - %(levelname)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
handler.suffix = "_%Y-%m-%d.log"
logger = logging.getLogger(log_name)
logger.addHandler(handler)
logger.setLevel(tmp_log_level)
if is_console:
console = logging.StreamHandler()
console.setLevel(tmp_log_level)
console.setFormatter(formatter)
logger.addHandler(console)
return logger
def check_config(self):
"""
检查配置文件中的客户端和服务器配置是否存在。
"""
if not self.server_config:
self.logger.error("Server config load fail")
def convert_path_for_ftp(self, local_path):
"""
根据操作系统将本地路径转换为适合 SFTP 的路径。
"""
if os.name == 'nt': # Windows
sftp_path = local_path.replace("\\", "/")
if sftp_path[1] == ':':
sftp_path = "/" + sftp_path
elif os.name == 'posix': # macOS
sftp_path = local_path.replace("\\", "/")
else:
raise RuntimeError(f"Unsupported operating system: {platform.system()}")
return sftp_path
def connect_ftp(self):
"""
连接到远程SFTP服务器,并返回sftp对象。
"""
hostname = self.server_config.get("hostname")
port = self.server_config.get("port")
username = self.server_config.get("username")
password = self.server_config.get("password")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接到 SFTP 服务器
ssh.connect(hostname, port, username, password)
# 打开 SFTP 会话
sftp = ssh.open_sftp()
self.logger.info("连接成功")
return sftp
except Exception as e:
self.logger.error(f"FTP连接失败: {e}")
return None
def check_and_create_remote_folder(self, sftp, local_folder_path):
"""
检查远程目录是否存在,如果不存在则逐层创建。
"""
# 将本地路径转换为 SFTP 服务器路径
remote_folder_path = self.convert_path_for_ftp(local_folder_path)
try:
# 尝试切换到目标目录
sftp.stat(remote_folder_path)
self.logger.info(f"Directory {remote_folder_path} already exists.")
return True
except Exception as e:
self.logger.info(f"Directory {remote_folder_path} does not exist. Creating it...{e}")
# 尝试逐层创建目录
parts = local_folder_path.split('/')
path = ''
for part in parts:
if part:
path = os.path.join(path, part)
try:
sftp.stat(path)
except FileNotFoundError:
sftp.mkdir(path)
print(f"Created directory: {path}")
except Exception as e:
print(f"Error checking/creating directory {path}: {e}")
return False
return True
def upload_file(self, sftp, local_file_path, remote_folder_path):
"""
上传文件到远程目录。
"""
remote_file_path = os.path.join(remote_folder_path, os.path.basename(local_file_path))
# local_abs_file_path = os.path.abspath(remote_file_path)
try:
real_file_path = remote_file_path.replace('\\', '/')
sftp.put(local_file_path, real_file_path)
self.record_uploaded_file(os.path.basename(local_file_path))
self.logger.info(f"Upload {os.path.basename(local_file_path)} success")
except Exception as e:
self.logger.error(f"Upload {local_file_path} to {real_file_path} fail: {e}")
def process(self, log_file_path):
"""
处理日志文件的上传。
"""
sftp = self.connect_ftp()
if not sftp:
return
remote_folder_path = self.server_config.get("remote_folder_path")
if not self.check_and_create_remote_folder(sftp, remote_folder_path):
sftp.close()
return
self.upload_file(sftp, log_file_path, self.convert_path_for_ftp(remote_folder_path))
sftp.close()
def record_uploaded_file(self, file_name):
"""
记录已上传的文件名。
"""
with open(self.uploaded_list_path, 'a') as f:
f.write(f"{file_name}\n")
def load_uploaded_list(self):
"""
加载已上传的文件列表。
"""
uploaded_list = set()
if self.uploaded_list_path.exists():
with open(self.uploaded_list_path, 'r') as f:
uploaded_list = {line.strip() for line in f}
return uploaded_list
def mainloop(self):
"""
主循环,检查需要上传的文件并上传。
"""
uploaded_list = self.load_uploaded_list()
local_folder_path = Path(self.client_config.get('root_dir'))
# local_folder_path = Path(os.path.abspath(self.client_config.get('local_folder_path')))
if not local_folder_path.exists():
self.logger.error(
f"Failed to find {local_folder_path} log folder, please check the shared folder from Beckhoff PLC")
return
# today_folder = local_folder_path.joinpath(datetime.today().strftime("%Y%m%d"))
# if not today_folder.exists():
# self.logger.error(
# f"Failed to find {today_folder} log folder, please check the Clematis Log software in Beckhoff PLC")
# return
log_file_list = [file for file in local_folder_path.glob("*.tar") if file.name not in uploaded_list]
for log_file in log_file_list:
if not log_file.exists():
self.logger.error(f"Failed to find {log_file}, please check the Clematis Log software in Beckhoff PLC")
continue
self.logger.info(f"现在准备上传 {log_file}")
self.process(log_file)
time.sleep(1)
def timed_run(self):
"""
定时运行主循环。
"""
self.logger.info("Middleware V1.0 starts")
while True:
try:
self.mainloop()
except Exception as e:
self.logger.error(f"An error occurred during the main loop: {e}")
return
try:
if datetime.now().hour == 23 and (50 < datetime.now().minute <= 59):
time.sleep(60)
else:
time.sleep(3)
except Exception as e:
self.logger.error(f"An error occurred during sleep: {e}")
return
if __name__ == "__main__":
middleware = LogMiddlewareClient()
middleware.timed_run()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/frederic-he/ServerClientOfSftp.git
git@gitee.com:frederic-he/ServerClientOfSftp.git
frederic-he
ServerClientOfSftp
SFTP的Server和Client
master

搜索帮助