代码拉取完成,页面将自动刷新
import asyncio
import logging
import multiprocessing
import os
import signal
import sys
import threading
from multiprocessing.context import SpawnProcess
from socket import AF_INET
from socket import SO_REUSEADDR
from socket import SO_REUSEPORT
from socket import socket
from socket import SOL_SOCKET
from types import FrameType
from typing import Callable
from typing import List
from typing import Optional
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s][%(processName)s:%(process)d:%(threadName)s][%(filename)s:%(lineno)d] - %(message)s",
)
logger = logging.getLogger(__name__)
# reload: HUP
# reopen: USR1
# stop: TERM (立即退出)
# quit: QUIT (优雅停止)
HANDLED_SIGNALS = (
signal.SIGINT, # Ctrl+C
signal.SIGTERM, # kill <pid>
# SIGKILL, SIGSTOP cannot be caught, blocked, or ignored (from man 7 signal)
)
class Process(object):
def __init__(self, target: Callable):
self.spawn_process = SpawnProcess(
name="WorkerProcess",
target=self.process_entrypoint,
kwargs={"target": target},
)
self.parent_conn, self.child_conn = multiprocessing.Pipe()
self.liveness_probe_timeout = 5
def process_entrypoint(self, target: Callable, **kwargs):
probe_thread = threading.Thread(
name="probe_thread", target=self.probe_thread_entrypoint, daemon=True
)
probe_thread.start()
target(**kwargs)
def start(self):
self.spawn_process.start()
def terminate(self):
self.spawn_process.terminate()
self.parent_conn.close()
self.child_conn.close()
def join(self):
self.spawn_process.join()
def is_alive(self) -> bool:
if not self.spawn_process.is_alive():
return False
return self.liveness_probe()
def liveness_probe(self) -> bool:
self.parent_conn.send(b".")
ready = self.parent_conn.poll(self.liveness_probe_timeout)
if not ready:
return False
self.parent_conn.recv()
return True
def probe_thread_entrypoint(self):
while True:
self.child_conn.recv()
self.child_conn.send(b".")
class ProcessManager(object):
def __init__(self, target: Callable):
self.target = target
self.processes: List[Process] = []
self.should_exit_or_reload = threading.Event()
self.should_reload = False
self.pid = os.getpid()
self.n_workers = 2
def signal_handler(self, sig: int, frame: Optional[FrameType]):
if sig == signal.SIGHUP:
self.should_reload = True
self.should_exit_or_reload.set()
def reload(self):
# TODO: reload module, config or something else
self.shutdown()
self.startup()
def handle_reload(self):
if self.should_reload:
logger.debug("reloading ...")
self.reload()
self.should_reload = False
self.should_exit_or_reload.clear()
def register_signal(self):
for sig in HANDLED_SIGNALS:
signal.signal(sig, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
# 由于有probe_thread, 当探活失败, 会执行kill_worker. 所以不必要处理signal.SIGCHLD信号
# 如果处理CHLD, 不需要额外的probe线程, 但可能有一些情况进程无法响应但是还没退出(发送CHLD), 所以用probe_thread的方式适合更多的场景
def startup(self):
for _ in range(self.n_workers):
process = Process(self.target)
self.processes.append(process)
process.start()
def wait_signal(self):
while True:
if self.should_exit_or_reload.wait(2):
if not self.should_reload:
break
self.handle_reload()
else:
self.manage_workers()
def manage_workers(self):
for worker in self.processes:
if not worker.is_alive():
self.kill_worker(worker)
self.processes.remove(worker)
n_missing = self.n_workers - len(self.processes)
assert n_missing >= 0
for _ in range(n_missing):
process = Process(self.target)
self.processes.append(process)
process.start()
def kill_worker(self, worker: Process):
worker_pid = worker.spawn_process.pid
logger.debug("killing worker: %d", worker_pid)
worker.terminate()
logger.debug("killed worker: %d", worker_pid)
def shutdown(self):
for process in self.processes:
process.terminate()
for process in self.processes:
process.join()
self.processes.clear()
def run(self):
logger.info(f"主进程开始运行(pid={self.pid})")
self.register_signal()
self.startup()
self.wait_signal()
self.shutdown()
logger.info(f"主进程运行结束(pid={self.pid})")
class TcpServer(object):
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.should_exit = False
self.servers: List[asyncio.base_events.Server] = []
self.sockets: List[socket] = [self.bind_socket()]
def add_protocol(self, protocol_class):
self.protocol_class = protocol_class
def add_handler(self, handler_func):
self.protocol_handler = handler_func
def bind_socket(self) -> socket:
sock = socket(family=AF_INET)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
sock.set_inheritable(True)
try:
sock.bind((self.host, self.port))
except OSError as exc:
logger.error(exc)
sys.exit(1)
logger.info(f"Listening on {self.host}:{self.port} ...")
return sock
def run_forever(self):
manager = ProcessManager(target=self.run_in_each_worker)
manager.run()
def run_in_each_worker(self):
return asyncio.run(self.serve())
async def serve(self):
process_id = os.getpid()
self.install_signal_handlers()
logger.info(f"worker进程启动[pid={process_id}] ...")
await self.startup()
await self.mainloop()
await self.shutdown()
logger.info(f"worker进程退出[pid={process_id}] ...")
async def startup(self):
def protocol_factory():
return self.protocol_class(self.protocol_handler)
loop = asyncio.get_running_loop()
for sock in self.sockets:
server = await loop.create_server(protocol_factory, sock=sock)
self.servers.append(server)
async def mainloop(self):
while not self.should_exit:
await asyncio.sleep(1)
async def shutdown(self):
for server in self.servers:
server.close()
for sock in self.sockets:
sock.close()
for server in self.servers:
await server.wait_closed()
def handle_exit(self, sig: int):
logger.info(f"handling signal [{sig}] ...")
self.should_exit = True
def install_signal_handlers(self):
if threading.current_thread() is not threading.main_thread():
# 只有主线程才处理信号
return
loop = asyncio.get_event_loop()
for sig in HANDLED_SIGNALS:
loop.add_signal_handler(sig, self.handle_exit, sig)
# 不能放到 if __name__ == "__main__" 里面
async def test_handler(channel):
print(await channel.recv())
await channel.send(b"hi 1")
print(await channel.recv())
await channel.send(b"hi 2")
print(await channel.recv())
await channel.send(b"hi 3")
await channel.close()
if __name__ == "__main__":
from channel.server import ServerProtocol
server = TcpServer("localhost", 8000)
server.add_protocol(ServerProtocol)
server.add_handler(test_handler)
server.run_forever()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。