1 Star 0 Fork 0

三鹿/collect_cool_pythonc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
WebSocket.py 2.08 KB
一键复制 编辑 原始数据 按行查看 历史
三鹿 提交于 2022-03-07 16:16 . 文章自定义获取修复
# -*- coding: utf-8 -*-
import asyncio
import websockets
import index
import json
import time
import multiprocessing as mp
# 检测客户端权限,用户名密码通过才能退出循环
async def check_permit(websocket):
while True:
recv_str = await websocket.recv()
cred_dict = recv_str.split(":")
if cred_dict[0] == "admin" and cred_dict[1] == "123456":
response_str = "是啊,我好帅啊"
await websocket.send(response_str)
return True
else:
response_str = "不好意思,密码不对~"
await websocket.send(response_str)
# 接收客户端消息并处理,
async def recv_msg(websocket):
while True:
recv_text = await websocket.recv()
# print(recv_text);
if not recv_text:
await websocket.send("msg:错误编码,无内容")
else:
recv_text_list = json.loads(recv_text)
KEYWORDLIST = index.GetKeyWordlist(recv_text_list['KEYWORDLIST'])
for KEYWORD in KEYWORDLIST:
await websocket.send("服务器正在检索关键字:\"" + KEYWORD + "\"可能存在的文章")
# time.sleep(1)
WORDURLLIST = index.GetBaiDuUrlList(KEYWORD)
await websocket.send("已得到包含:\"" + KEYWORD + "\"的文章链接共计:" + str(len(WORDURLLIST)) +"条")
for WORDURL in WORDURLLIST:
await websocket.send("已得到新闻链接:\"" + WORDURL)
print(WORDURLLIST)
print("收到消息:%s" % (recv_text))
return recv_text
# tim = await input("回复:")
# if tim == "q":
# break
# else:
# await websocket.send(tim)
# 服务器端主逻辑
async def main_logic(websocket, path):
await check_permit(websocket)
await recv_msg(websocket)
# await print_msg(websocket)
if __name__ == "__main__":
start_server = websockets.serve(main_logic, '0.0.0.0', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/sanlu/collect_cool_pythonc.git
git@gitee.com:sanlu/collect_cool_pythonc.git
sanlu
collect_cool_pythonc
collect_cool_pythonc
master

搜索帮助