代码拉取完成,页面将自动刷新
import json
import os
import requests
from flask import Flask, request
from time import sleep
import threading
class Logger:
def __init__(self, level='debug'):
self.level = level
def DebugLog(self, *args):
if self.level == 'debug':
print(*args)
def TraceLog(self, *args):
if self.level == 'trace':
print(*args)
def setDebugLevel(self, level):
self.level = level.lower()
class QQBot:
def __init__(self):
self.addr = 'http://127.0.0.1:8888/'
self.session = None
def verifySession(self, auth_key):
"""每个Session只能绑定一个Bot,但一个Bot可有多个Session。
session Key在未进行校验的情况下,一定时间后将会被自动释放"""
data = {"verifyKey": auth_key}
url = self.addr+'verify'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return res['session']
return None
def bindSession(self, session, qq):
"""校验并激活Session,同时将Session与一个已登录的Bot绑定"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'bind'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
self.session = session
return True
return False
def releaseSession(self, session, qq):
"""不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'release'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return True
return False
def getMsgFromGroup(self, session):
url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return None
def parseGroupMsg(self, data):
res = []
if data is None:
return res
for item in data:
if item['type'] == 'GroupMessage':
type = item['messageChain'][-1]['type']
if type == 'Image':
text = item['messageChain'][-1]['url']
elif type == 'Plain':
text = item['messageChain'][-1]['text']
elif type == 'Face':
text = item['messageChain'][-1]['faceId']
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
continue
name = item['sender']['memberName']
group_id = str(item['sender']['group']['id'])
group_name = item['sender']['group']['name']
res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
return res
def getMessageCount(self, session):
url = self.addr + 'countMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
def sendMsgToGroup(self, session, group, msg):
text = msg['text']
type = msg['type']
name = msg['name']
group_id = msg['groupId']
group_name = msg['groupName']
content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
name, group_id, group_name, text)
content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
name, group_id, group_name)
logger.DebugLog(">> 消息类型:" + type)
if type == 'Plain':
message = [{"type": type, "text": content1}]
elif type == 'Image':
message = [
{"type": 'Plain', "text": content2},
{"type": type, "url": text}]
elif type == 'Face':
message = [{"type": 'Plain', "text": content2},
{"type": type, "faceId": text}]
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
return 0
data = {
"sessionKey": session,
"group": group,
"messageChain": message
}
logger.DebugLog(">> 消息内容:" + str(data))
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
# 对每条消息进行检查
for msg in msg_data:
group_id = msg['groupId']
# 接收的消息群正确(目前只支持 消息类型)
if group_id in receive_groups:
# 依次将消息转发到目标群
for g in send_groups:
logger.DebugLog(">> 当前群:"+g)
if g == group_id:
logger.DebugLog(">> 跳过此群")
continue
res = self.sendMsgToGroup(session, g, msg)
if res != 0:
logger.TraceLog(">> 转发成功!{}".format(g))
def sendFriendMessage(self, session, qq, msg):
data = {
"sessionKey": session,
"target": qq,
"messageChain": [
{ "type": "Plain", "text": msg },
]
}
url = self.addr + 'sendFriendMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 发送失败")
return 0
if res['code'] == 0:
return res['messageId']
return 0
logger = Logger()
bot = QQBot()
app = Flask(__name__)
def qqTransfer():
with open('conf.json', 'r+', encoding="utf-8") as f:
content = f.read()
conf = json.loads(content)
auth_key = conf['auth_key']
bind_qq = conf['bind_qq']
sleep_time = conf['sleep_time']
debug_level = conf['debug_level']
receive_groups = conf['receive_groups']
send_groups = conf['send_groups']
logger.setDebugLevel(debug_level)
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if cnt:
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.getMsgFromGroup(session)
if len(data) == 0:
logger.DebugLog('消息为空')
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
data = bot.parseGroupMsg(data)
logger.DebugLog(data)
logger.DebugLog('转发消息内容')
bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
# else:
# logger.DebugLog('空闲')
sleep(sleep_time)
bot.releaseSession(session, bind_qq)
@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendFriendMessage(bot.session, qq, msg)
return 'Hello World!'
if __name__ == '__main__':
t = threading.Thread(target=qqTransfer)
t.setDaemon(True)
t.start()
app.run(port='8080', host='0.0.0.0')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。