4 Star 21 Fork 13

didiplus/script

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
network_contet.py 4.60 KB
一键复制 编辑 原始数据 按行查看 历史
didiplus 提交于 2020-06-17 10:07 . 优化巡检脚本
'''
@文件 :network_contet.py
@说明 :
@时间 :2020/04/01 17:37:36
@作者 :didiplus
@版本 :1.0
'''
from telnetlib import Telnet
import time
from datetime import datetime
import paramiko
import re
import os
import threading
class MyThreading(threading.Thread):
def __init__(self, func, args=()):
super(MyThreading, self).__init__()
self.func = func
self.args = args
def run(self):
try:
self.result = self.func(*self.args)
except Exception as e:
print(e)
def get_result(self):
# threading.Thread.join(self)
try:
return self.result
except Exception as e:
print(e)
return None
class BaseClass(object):
def __init__(self, host, username: str, password: str, superpassword=None):
self.host = host
self.login_dict = {
'host': host,
'username': username,
'password': password,
'super': superpassword
}
def savefile(self):
str_path = datetime.now().strftime("%Y%m%d")
if os.path.exists(str_path):
pass
else:
os.makedirs(str_path)
file = open(str_path + '\\' + self.host + "_" +
datetime.now().strftime("%H%M%S") + ".txt", "w")
file.write(self.result_str)
file.close()
class SSHtool(BaseClass):
def ssh_on(self, commands: str):
self.client = paramiko.SSHClient() # 实例化一个SSH连接客户端
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.login_dict['host'], 22, self.login_dict['username'], self.login_dict['password'],
timeout=6) # 客户端实例化一个连接
self.remote_session = self.client.invoke_shell() # 为客户端实例化一个输入输出shell
time.sleep(5)
self.buff = self.remote_session.recv(65535).decode()
if self.login_dict['super']:
self.remote_session.send('\b super \n')
self.buff = self.remote_session.recv(65535).decode()
self.remote_session.send(
self.login_dict['super'].encode('ascii') + b'\n')
self.result_list = []
for command in commands:
self.remote_session.send(command.encode('ascii') + b'\n')
time.sleep(4)
while True:
self.buff = self.remote_session.recv(
65535).decode() # 接受最大65535个字节的返回
self.result_list.append(self.buff)
if '---- More ----' in self.buff.strip():
self.remote_session.send(b' ')
time.sleep(1)
else:
break
self.data = [c.replace('---- More ----', '') for c in self.result_list]
self.data1 = [
f.replace(' ', '') for f in self.data]
self.result_str = '\n'.join(self.data1)
self.savefile()
return self.result_str
class Telnettool(BaseClass):
def telnet_on(self, commands: str):
tn = Telnet(self.login_dict['host'], port=23, timeout=10)
tn.read_until(b"Username:")
tn.write(self.login_dict['username'].encode('ascii') + b'\n')
tn.read_until(b"Password:")
tn.write(self.login_dict['password'].encode('ascii') + b'\n')
if self.login_dict['super']:
tn.write(b'super \n')
tn.read_until(b" Password:")
tn.write(self.login_dict['super'].encode('ascii') + b'\n')
tn.write(b'sys \n')
result_list = []
for command in commands:
tn.write(command.encode('ascii') + b'\n')
time.sleep(2)
while (True):
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if '---- More ----' in command_result.strip():
tn.write(b' ')
time.sleep(0.5)
else:
break
data = [c.replace('---- More ----', '') for c in result_list]
data1 = [
f.replace(' ', '') for f in data]
self.result_str = '\n'.join(data1)
self.savefile()
return self.result_str
if __name__ == '__main__':
commands = ['display ip interface brief', 'display cpu-usage']
ip_list = ['192.168.56.10', '192.168.56.20']
for ip in ip_list:
ssh_obj = SSHtool(host=ip, username='admin', password='admin')
t = threading.Thread(target=ssh_obj.ssh_on, args=(commands,))
t.start()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/didiplus/script.git
git@gitee.com:didiplus/script.git
didiplus
script
script
master

搜索帮助