1 Star 0 Fork 0

nowtd/stl-simple_tcp_tunnel

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
stl-cli.py 9.68 KB
一键复制 编辑 原始数据 按行查看 历史
libkodi 提交于 2022-03-14 11:09 . 44
#!/usr/bin/python3
import signal
import socket
import ssl
import struct
import sys
import time
import selectors
from threading import Thread
from logzero import logger
from utils import package
from utils.common import close_socket, usage, parse_args, save_conf
from utils.package import read_count
from utils.stlobject import StlObject
class StlClient(StlObject):
'''
客户端
'''
@classmethod
def config(cls, filename: str):
'''
类方法,创建实例并加载配置文件
:param filename: 配置文件路径
:return: StlClient
'''
return cls()._config(filename)
@classmethod
def create(cls, conf: dict):
'''
指定配置信息启动
:param conf: 配置信息
:return: StlClient
'''
return cls()._config(None, conf)
def __init__(self):
'''
初始化参数
'''
super(StlClient, self).__init__()
self.__sock = None
self._ssl_context = None
self.__local_ssl_context = None
self.__alive = True
self.__retry_connect = 0
def __del__(self):
super(StlClient, self).__del__()
self.quit()
def quit(self):
super(StlClient, self).quit()
if self.__sock is not None:
close_socket(self.__sock)
def __wait_message(self, sock:socket.socket, mask:int) -> None:
'''
服务器响应处理
:param sock: 主连接
'''
(l, len_data) = read_count(sock, 4) # 读取数据包长度
if l != 4: # 读取数据包失败
logger.warning('Failed to get data length: {0}'.format(l))
try:
self._select.unregister(sock)
except Exception as e:
logger.error(e)
if self.__alive is False and self.__retry_connect > 0:
return
else:
self.quit()
length = struct.unpack('!I', len_data)[0]
(l1, data) = read_count(sock, length) # 读取数据包的数据
if l1 != length:
logger.error('Failed to read package: {0}'.format(data))
self.quit()
return
else:
(ptype, pdata) = package.parse(data)
if ptype == package.PKG_TUNNEL: # 数据通道连接请求
logger.info('The server requests a tunnel: {0}'.format(pdata))
tunnel_sock = self.__new_connect(pdata)
local_sock = None
try:
addr = (self.conf['transfer']['host'], self.conf['transfer']['port'])
timeout = self.conf['transfer']['timeout']
local_sock = socket.create_connection(addr, timeout=timeout)
local_sock.settimeout(None)
if 'ssl' in self.conf['transfer'] and self.conf['transfer']['ssl']:
if self.__local_ssl_context is None:
if 'cert' in self.conf['transfer']:
self.__local_ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.__local_ssl_context.load_verify_locations(self.conf['transfer']['cert'])
else:
self.__local_ssl_context = ssl.create_default_context()
local_sock = self.__local_ssl_context.wrap_socket(local_sock, server_hostname=self.conf['transfer']['host'])
except Exception as e:
logger.error(e)
if tunnel_sock is not None and local_sock is not None:
resp_data = package.stringify(package.PKG_TUNNEL, pdata)
else:
resp_data = package.stringify(package.PKG_DISCONNECT, pdata)
if local_sock is not None:
close_socket(local_sock)
if tunnel_sock is not None:
close_socket(tunnel_sock)
sock.send(resp_data)
self._thread_pool.submit(self.transfer_data, tunnel_sock, local_sock, self.__transfer_error, None)
elif ptype == package.PKG_HEART: # 心跳响应
self.__alive = True
self.__retry_connect = self.conf['keep-alive']['retry']
def run(self):
self.__create_main_connect()
if self.__sock is None:
return
if 'keep-alive' in self.conf: # 开启心跳处理线程
self.__retry_connect = self.conf['keep-alive']['retry']
th = Thread(target=self.__start_heart_thread, daemon=True)
th.start()
while not self._quit:
try:
if len(self._select.get_map()) > 0:
events = self._select.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
else:
time.sleep(1)
except Exception as e:
logger.error(e)
def __transfer_error(self, tunnel:socket.socket, local:socket.socket, args:tuple) -> None:
'''
数据交换出错/结束时的回调
:param tunnel: 数据通道
:param local: 本地服务连接
:param args: 其它参数
'''
logger.info('tunnel closed: {0}'.format(tunnel.getpeername()))
if local is not None:
close_socket(local)
if tunnel is not None:
close_socket(tunnel)
def __new_connect(self, id:bytes = None) -> socket.socket:
'''
建立一个与服务器的连接
:param id:
:return:
'''
sock = None
if 'server' in self.conf:
addr = (self.conf['server']['host'], self.conf['server']['port'])
try:
sock = socket.create_connection(addr, timeout=self.conf['server']['timeout'])
sock.settimeout(None)
if self._ssl_context is not None:
sock = self._ssl_context.wrap_socket(sock, server_hostname=addr[0])
if id is None:
data = package.stringify(package.PKG_CONNECT, self.conf['server']['key'].encode('utf-8'))
else:
data = package.stringify(package.PKG_TUNNEL, id)
try:
sock.send(data)
try:
(length, resp) = read_count(sock, 6)
pkg_type = struct.unpack('!H', resp[4:])[0]
if length == 6 and (pkg_type == package.PKG_CONNECT or pkg_type == package.PKG_TUNNEL):
if id is None:
logger.info('connect to main service success')
else:
logger.info('tunnel connect success')
return sock
else:
raise Exception('error from check response')
except Exception as e:
logger.error(e)
except Exception as e:
logger.error('Failed to send package: {0}'.format(e.args))
except Exception as e:
logger.error('Failed to connect: {0} {1}'.format(e.args, addr))
if sock is not None:
close_socket(sock)
return None
def __create_main_connect(self) -> None:
'''
创建主连接
:return:
'''
if self.__sock is not None:
try:
self._select.unregister(self.__sock)
except:
pass
self.__sock = self.__new_connect()
if self.__sock is not None:
self._select.register(self.__sock, selectors.EVENT_READ, self.__wait_message)
def __start_heart_thread(self) -> None:
'''
心跳处理线程函数
'''
max_retry = self.conf['keep-alive']['retry']
interval = self.conf['keep-alive']['interval']
while not self._quit:
time.sleep(interval)
try:
if self.__alive is False:
logger.info('reconnect server({0})...'.format(max_retry - self.__retry_connect))
self.__create_main_connect()
self.__retry_connect = self.__retry_connect - 1
if self.__retry_connect == 0:
self.quit()
self.__alive = False
if self.__sock is not None:
heart_data = package.stringify(package.PKG_HEART, b'')
self.__sock.send(heart_data)
except Exception as e:
logger.error(e)
self.__alive = False
if __name__ == '__main__':
cli:StlClient = None
def cli_quit(signum:int, frame:object):
'''
信号退出函数
'''
global cli
if cli is not None:
cli.quit()
sys.exit(0)
conf = parse_args()
if conf['conf-file'] is None and conf['conf-write'] is not None:
save_conf(conf)
else:
try:
if (('key' in conf['server'] and conf['server']['key'] is not None) or ('conf-file' in conf and conf['conf-file'] is not None)) and conf['help'] is False:
if conf['conf-file'] is not None:
cli = StlClient.config(conf['conf-file'])
else:
cli = StlClient.create(conf)
signal.signal(signal.SIGINT, cli_quit)
signal.signal(signal.SIGTERM, cli_quit)
cli.run()
else:
usage()
except Exception as e:
logger.error(e)
if cli is not None:
cli.quit()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/nowtd/stl-simple_tcp_tunnel.git
git@gitee.com:nowtd/stl-simple_tcp_tunnel.git
nowtd
stl-simple_tcp_tunnel
stl-simple_tcp_tunnel
master

搜索帮助