1 Star 0 Fork 0

李春鹏/python_test

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
log_aiter.py 1.97 KB
一键复制 编辑 原始数据 按行查看 历史
import asyncio
import websockets
import websockets_routes
import datetime
from kubernetes import client, config
import json
import pytz
import asyncio
import websockets
config.kube_config.load_kube_config(config_file="/root/.kube/config")
core = client.CoreV1Api()
api = client.AppsV1Api()
class AsyncLog1:
def __init__(self, items):
self.items = items
async def __aiter__(self):
for item in self.items:
yield item
class AsyncLog2:
def __init__(self, seq):
self.iter = iter(seq)
def __aiter__(self):
return self
# for item in self.items:
# yield item
async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration
def read_pod_container_log_stream(namespace, name, container="", number=0):
log_stream = core.read_namespaced_pod_log(
name=name,
namespace=namespace,
container=container,
follow=True,
pretty=True,
_preload_content=False,
timestamps=True,
tail_lines=number
).stream()
return log_stream
class AsyncLog3:
def __init__(self, namespace, name):
self.iter = iter(read_pod_container_log_stream(namespace, name))
def __aiter__(self):
return self
# for item in self.items:
# yield item
async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration
async def ws_handler(websocket):
async for i in AsyncLog3("default", "hello-python-5885fc4968-788s8"):
if i:
# await websocket.send(i.decode("utf-8"))
log_line = i.decode("utf-8").replace("\n", "\r\n")
await websocket.send(log_line)
command = await websocket.recv()
print(command)
async def main():
async with websockets.serve(ws_handler, "0.0.0.0", 8765):
await asyncio.Future()
asyncio.run(main())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/cplinux98/python_test.git
git@gitee.com:cplinux98/python_test.git
cplinux98
python_test
python_test
master

搜索帮助