代码拉取完成,页面将自动刷新
#!/usr/bin/python3
import signal
import socket
import ssl
import struct
import sys
import time
import selectors
from threading import Thread
from logzero import logger
from utils import package
from utils.common import close_socket, usage, parse_args, save_conf
from utils.package import read_count
from utils.stlobject import StlObject
class StlClient(StlObject):
'''
客户端
'''
@classmethod
def config(cls, filename: str):
'''
类方法,创建实例并加载配置文件
:param filename: 配置文件路径
:return: StlClient
'''
return cls()._config(filename)
@classmethod
def create(cls, conf: dict):
'''
指定配置信息启动
:param conf: 配置信息
:return: StlClient
'''
return cls()._config(None, conf)
def __init__(self):
'''
初始化参数
'''
super(StlClient, self).__init__()
self.__sock = None
self._ssl_context = None
self.__local_ssl_context = None
self.__alive = True
self.__retry_connect = 0
def __del__(self):
super(StlClient, self).__del__()
self.quit()
def quit(self):
super(StlClient, self).quit()
if self.__sock is not None:
close_socket(self.__sock)
def __wait_message(self, sock:socket.socket, mask:int) -> None:
'''
服务器响应处理
:param sock: 主连接
'''
(l, len_data) = read_count(sock, 4) # 读取数据包长度
if l != 4: # 读取数据包失败
logger.warning('Failed to get data length: {0}'.format(l))
try:
self._select.unregister(sock)
except Exception as e:
logger.error(e)
if self.__alive is False and self.__retry_connect > 0:
return
else:
self.quit()
length = struct.unpack('!I', len_data)[0]
(l1, data) = read_count(sock, length) # 读取数据包的数据
if l1 != length:
logger.error('Failed to read package: {0}'.format(data))
self.quit()
return
else:
(ptype, pdata) = package.parse(data)
if ptype == package.PKG_TUNNEL: # 数据通道连接请求
logger.info('The server requests a tunnel: {0}'.format(pdata))
tunnel_sock = self.__new_connect(pdata)
local_sock = None
try:
addr = (self.conf['transfer']['host'], self.conf['transfer']['port'])
timeout = self.conf['transfer']['timeout']
local_sock = socket.create_connection(addr, timeout=timeout)
local_sock.settimeout(None)
if 'ssl' in self.conf['transfer'] and self.conf['transfer']['ssl']:
if self.__local_ssl_context is None:
if 'cert' in self.conf['transfer']:
self.__local_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.__local_ssl_context.load_verify_locations(self.conf['transfer']['cert'])
else:
self.__local_ssl_context = ssl.create_default_context()
local_sock = self.__local_ssl_context.wrap_socket(local_sock, server_hostname=self.conf['transfer']['host'])
except Exception as e:
logger.error(e)
if tunnel_sock is not None and local_sock is not None:
resp_data = package.stringify(package.PKG_TUNNEL, pdata)
else:
resp_data = package.stringify(package.PKG_DISCONNECT, pdata)
if local_sock is not None:
close_socket(local_sock)
if tunnel_sock is not None:
close_socket(tunnel_sock)
sock.send(resp_data)
self._thread_pool.submit(self.transfer_data, tunnel_sock, local_sock, self.__transfer_error, None)
elif ptype == package.PKG_HEART: # 心跳响应
self.__alive = True
self.__retry_connect = self.conf['keep-alive']['retry']
def run(self):
self.__create_main_connect()
if self.__sock is None:
return
if 'keep-alive' in self.conf: # 开启心跳处理线程
self.__retry_connect = self.conf['keep-alive']['retry']
th = Thread(target=self.__start_heart_thread, daemon=True)
th.start()
while not self._quit:
try:
if len(self._select.get_map()) > 0:
events = self._select.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
else:
time.sleep(1)
except Exception as e:
logger.error(e)
def __transfer_error(self, tunnel:socket.socket, local:socket.socket, args:tuple) -> None:
'''
数据交换出错/结束时的回调
:param tunnel: 数据通道
:param local: 本地服务连接
:param args: 其它参数
'''
logger.info('tunnel closed: {0}'.format(tunnel.getpeername()))
if local is not None:
close_socket(local)
if tunnel is not None:
close_socket(tunnel)
def __new_connect(self, id:bytes = None) -> socket.socket:
'''
建立一个与服务器的连接
:param id:
:return:
'''
sock = None
if 'server' in self.conf:
addr = (self.conf['server']['host'], self.conf['server']['port'])
try:
sock = socket.create_connection(addr, timeout=self.conf['server']['timeout'])
sock.settimeout(None)
if self._ssl_context is not None:
sock = self._ssl_context.wrap_socket(sock, server_hostname=addr[0])
if id is None:
data = package.stringify(package.PKG_CONNECT, self.conf['server']['key'].encode('utf-8'))
else:
data = package.stringify(package.PKG_TUNNEL, id)
try:
sock.send(data)
try:
(length, resp) = read_count(sock, 6)
pkg_type = struct.unpack('!H', resp[4:])[0]
if length == 6 and (pkg_type == package.PKG_CONNECT or pkg_type == package.PKG_TUNNEL):
if id is None:
logger.info('connect to main service success')
else:
logger.info('tunnel connect success')
return sock
else:
raise Exception('error from check response')
except Exception as e:
logger.error(e)
except Exception as e:
logger.error('Failed to send package: {0}'.format(e.args))
except Exception as e:
logger.error('Failed to connect: {0} {1}'.format(e.args, addr))
if sock is not None:
close_socket(sock)
return None
def __create_main_connect(self) -> None:
'''
创建主连接
:return:
'''
if self.__sock is not None:
try:
self._select.unregister(self.__sock)
except:
pass
self.__sock = self.__new_connect()
if self.__sock is not None:
self._select.register(self.__sock, selectors.EVENT_READ, self.__wait_message)
def __start_heart_thread(self) -> None:
'''
心跳处理线程函数
'''
max_retry = self.conf['keep-alive']['retry']
interval = self.conf['keep-alive']['interval']
while not self._quit:
time.sleep(interval)
try:
if self.__alive is False:
logger.info('reconnect server({0})...'.format(max_retry - self.__retry_connect))
self.__create_main_connect()
self.__retry_connect = self.__retry_connect - 1
if self.__retry_connect == 0:
self.quit()
self.__alive = False
if self.__sock is not None:
heart_data = package.stringify(package.PKG_HEART, b'')
self.__sock.send(heart_data)
except Exception as e:
logger.error(e)
self.__alive = False
if __name__ == '__main__':
cli:StlClient = None
def cli_quit(signum:int, frame:object):
'''
信号退出函数
'''
global cli
if cli is not None:
cli.quit()
sys.exit(0)
conf = parse_args()
if conf['conf-file'] is None and conf['conf-write'] is not None:
save_conf(conf)
else:
try:
if (('key' in conf['server'] and conf['server']['key'] is not None) or ('conf-file' in conf and conf['conf-file'] is not None)) and conf['help'] is False:
if conf['conf-file'] is not None:
cli = StlClient.config(conf['conf-file'])
else:
cli = StlClient.create(conf)
signal.signal(signal.SIGINT, cli_quit)
signal.signal(signal.SIGTERM, cli_quit)
cli.run()
else:
usage()
except Exception as e:
logger.error(e)
if cli is not None:
cli.quit()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。