代码拉取完成,页面将自动刷新
#!/usr/bin/python3
import json
import selectors
import signal
import socket
import struct
import sys
import time
from threading import Thread
from logzero import logger
from utils import package
from utils.common import close_socket, start_tcp_server, connectid, parse_args, save_conf, usage
from utils.package import read_count
from utils.stlobject import StlObject
class StlServer(StlObject):
'''
服务器端
'''
@classmethod
def config(cls, filename:str):
'''
类方法,创建实例并加载配置文件
:param filename: 配置文件路径
:return: StlServer
'''
return cls()._config(filename)
@classmethod
def create(cls, conf:dict):
'''
指定配置信息启动
:param conf: 配置信息
:return: StlServer
'''
return cls()._config(None, conf)
def __init__(self) -> None:
'''
初始化部分参数,避免后面调用出错
'''
super(StlServer, self).__init__()
self.__sock = None
self._ssl_context = None
self.__sub_services = {}
self.__tunnel_wait = {}
def __del__(self):
'''
程序时执行quit的代码
:return:
'''
super(StlServer, self).__del__()
self.quit()
def quit(self) -> None:
'''
退出程序时的处理,关闭一些打开与连接过来的主要连接
'''
super(StlServer, self).quit()
for (key, sub_service) in self.__sub_services.items():
if 'in' in sub_service and sub_service['in'] is not None:
close_socket(sub_service['in'])
if 'out' in sub_service and sub_service['out'] is not None:
close_socket(sub_service['out'])
if self.__sock is not None:
close_socket(self.__sock)
def __wait_client(self, conn:socket.socket, mask:int) -> None:
'''
等待客户端连接
:param conn: 主socket
'''
try:
client, addr = conn.accept()
self.__client_connect(client, addr)
except Exception as e:
logger.error(e)
def __wait_connect(self, conn:socket.socket, mask:int) -> None:
'''
等待用户连接
:param conn: 映射端口socket
'''
try:
client, addr = conn.accept()
key = None
for k, val in self.__sub_services.items():
if conn is val['in']:
key = k
break
if key is None:
close_socket(client)
else:
self.__user_connect(key, client, addr)
except Exception as e:
logger.error(e)
def __remove_timeout_connect(self) -> None:
'''
线程函数,检测是否有连接没有被处理(与客户端失去连接时有用户连接的情况下)
'''
while not self._quit:
try:
time.sleep(5)
now = int(time.time())
keys = list(self.__tunnel_wait.keys())
for key in keys:
if key in self.__tunnel_wait:
item = self.__tunnel_wait[key]
t = item['time']
if now - t > 60:
sock = item['in']
del self.__tunnel_wait[key]
close_socket(sock)
except Exception as e:
logger.error(e)
def run(self) -> None:
'''
开始运行,创建socket与监听连接
'''
self.__start_main_service()
if self.__sock is not None:
th = Thread(target=self.__remove_timeout_connect, daemon=True)
th.start()
self.__start_sub_service()
while not self._quit:
events = self._select.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
def __start_main_service(self) -> None:
'''
初始化主连接,与客户端连接用
'''
self.__sock = start_tcp_server(self.conf['listen']['host'], self.conf['listen']['port'], self.conf['listen']['backlog'])
self.__sock = self.__upgrade_ssl(self.__sock)
self._select.register(self.__sock, selectors.EVENT_READ, self.__wait_client)
logger.info('stl started: {0}:{1}'.format(self.conf['listen']['host'], self.conf['listen']['port']))
def __start_sub_service(self) -> None:
'''
初始化子服务,用户连接入口, 客户端映射出口
'''
infos = self.conf['inbound']
for info in infos:
if 'key' in info:
key = info['key']
else:
key = connectid()
sock = start_tcp_server('0.0.0.0', info['port'], info['backlog'])
if sock is not None:
sock = self.__upgrade_ssl(sock)
self.__sub_services[key] = {
'in': sock,
'out': None,
'port': info['port']
}
self._select.register(sock, selectors.EVENT_READ, self.__wait_connect)
logger.info('sub service started: {0}:{1}'.format(key, info['port']))
else:
logger.warning('Failed to start sub service: port: {0}, key: {1}'.format(info['port'], key))
def __upgrade_ssl(self, sock:socket.socket) -> socket.socket:
'''
将socket转为ssl连接
:param sock: socket连接
:return: ssl套字节连接
'''
if sock is None:
raise Exception('Failed to upgrade ssl: socket is None')
if self._ssl_context is None:
return sock
return self._ssl_context.wrap_socket(sock, server_side=True)
def __client_connect(self, client:socket.socket, addr:tuple) -> None:
'''
处理客户端连接
:param client: 客户端连接对象
:param addr: 客户端连接地址
'''
try:
(dlen, data) = read_count(client, 4) # 获取需要读取的数据包的长度
if dlen != 4:
logger.error('Error connect: {0}'.format(addr))
close_socket(client)
return
else:
try:
length = struct.unpack('!I', data)[0]
except Exception as e:
logger.error('{0} {1}'.format(e, addr))
close_socket(client)
return
try:
(pkg_len, pkg_data) = read_count(client, length) # 读取数据包
(pkg_type, pkg_con) = package.parse(pkg_data)
except Exception as e:
logger.error('Failed to parse package: {0}'.format(e.args))
close_socket(client)
return
if pkg_type == package.PKG_CONNECT: # 客户端连接
sub_key = pkg_con.decode('utf-8')
if sub_key in self.__sub_services:
if self.__sub_services[sub_key]['out'] is not None:
try:
self._select.unregister(self.__sub_services[sub_key]['out'])
except Exception as e:
logger.error(e)
logger.warning('disconnect: {0}'.format(self.__sub_services[sub_key]['out'].getpeername()))
close_socket(self.__sub_services[sub_key]['out'])
self.__sub_services[sub_key]['out'] = client
conn_resp = package.stringify(package.PKG_CONNECT, b'')
client.send(conn_resp)
self._select.register(client, selectors.EVENT_READ, self.__client_process)
logger.info('client connect to: {0} {1}'.format(sub_key, addr))
else:
raise Exception('error from client connect what key is not found.')
elif pkg_type == package.PKG_TUNNEL: # 数据通道申请的响应, 通道建立完成,可以进行数据交换
resp_data = package.stringify(package.PKG_TUNNEL, b'')
client.send(resp_data)
key = pkg_con.decode('utf-8')
if key in self.__tunnel_wait: # 将需要交换数据的两个socket丢进线程池中处理
self._thread_pool.submit(self.transfer_data, client, self.__tunnel_wait[key]['in'], self.__transfer_error, None)
del self.__tunnel_wait[key]
else:
close_socket(client)
except Exception as e:
logger.error('error from client connect: {0}'.format(e.args))
close_socket(client)
def __transfer_error(self, tunnel:socket.socket, user:socket.socket, args) -> None:
'''
数据交换出错/结束的回调
:param tunnel: 数据通道
:param user: 用户连接
:param args: 其它参数
'''
logger.info('tunnel closed: {0}'.format(tunnel.getpeername()))
logger.info('user closed: {0}'.format(user.getpeername()))
close_socket(tunnel)
close_socket(user)
def __user_connect(self, key:str, client:socket.socket, addr:tuple) -> None:
'''
处理用户端连接
:param key: 用户连接的子服务的key
:param client: 用户连接对象
:param addr: 用户连接地址
'''
out = self.__sub_services[key]['out']
if out is None:
close_socket(client)
logger.warning('No client for this service({0}).'.format(key))
else:
id = connectid()
data = package.stringify(package.PKG_TUNNEL, id.encode('utf-8'))
try:
self.__tunnel_wait[id] = {
'time': int(time.time()),
'in': client
}
out.send(data)
except Exception as e:
logger.warning('Lost contact with client: {0} ==> {1}'.format(out.getpeername(), e.args))
if id in self.__tunnel_wait:
del self.__tunnel_wait[id]
self.__sub_services[key]['out'] = None
try:
self._select.unregister(out)
except Exception as e:
logger.error(e)
close_socket(out)
def __client_process(self, conn:socket.socket, mask:int) -> None:
'''
客户端数据处理
:param conn: 客户端对应的socket
'''
key = None
try:
for k, val in self.__sub_services.items():
if conn is val['out']:
key = k
break
if key is None:
raise Exception('Invalid client session')
(l, ldata) = read_count(conn, 4)
if l == 0:
raise Exception('client down: {0}'.format(conn.getpeername()))
elif l != 4:
raise Exception('Failed to read package: {0} ==> {1}'.format(conn.getpeername(), ldata))
else:
(l1, pdata) = read_count(conn, struct.unpack('!I', ldata)[0])
(pkg_type, pkg_data) = package.parse(pdata)
if pkg_type == package.PKG_DISCONNECT:
k = pkg_data.decode('utf-8')
if k in self.__tunnel_wait:
logger.warning('Failed to get a tunnel: {0}'.format(self.__tunnel_wait[k]['in'].getpeername()))
close_socket(self.__tunnel_wait[k]['in'])
del self.__tunnel_wait[k]
elif pkg_type == package.PKG_HEART:
heart_data = package.stringify(package.PKG_HEART, b'')
conn.send(heart_data)
except Exception as e:
try:
self._select.unregister(conn)
except Exception as e1:
logger.warning(e1)
if key is not None and self.__sub_services[key]['out'] is conn:
self.__sub_services[key]['out'] = None
close_socket(conn)
logger.warning(e)
if __name__ == '__main__':
serv:StlServer = None
def serv_quit(signum:int, frame:object):
'''
信号退出函数
'''
global serv
if serv is not None:
serv.quit()
sys.exit(0)
conf = parse_args(server_side=True)
if conf['conf-file'] is None and conf['conf-write'] is not None:
save_conf(conf)
else:
try:
if (('inbound' in conf and len(conf['inbound']) > 0) or ('conf-file') in conf and conf['conf-file'] is not None) and conf['help'] is False:
if conf['conf-file'] is not None:
serv = StlServer.config(conf['conf-file'])
else:
serv = StlServer.create(conf)
signal.signal(signal.SIGINT, serv_quit)
signal.signal(signal.SIGTERM, serv_quit)
serv.run()
else:
usage(server_side=True)
except Exception as e:
logger.error(e)
if serv != None:
try:
serv.quit()
except Exception as e1:
logger.debug(e1)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。