代码拉取完成,页面将自动刷新
# 做一个处理消息的文件
import socket
import json
import random
from datetime import datetime,timezone,timedelta
import traceback
import requests
import sys
# 添加一个日志函数
def getTimeStr():
utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
return bj_dt.strftime("%Y-%m-%d %H:%M:%S")
# 输出调试信息,并及时刷新缓冲区
def log(*args,**kw_args):
print(f'{getTimeStr()} :',*args,**kw_args)
sys.stdout.flush()
# 我们创建一个MsgServer 类来专门处理信息的手法
class MsgServer:
# 初始化
def __init__(self):
# 消息发送配置
self.sender = None # 是一个socket对象
self.send_ip = '127.0.0.1'
self.send_port = 5710
# 消息接收配置
self.receiver = None # 是一个socket对象
self.recv_ip = '127.0.0.1'
self.recv_port = 5711
# 公共配置
self.encoding = 'utf-8' #编码和解码格式
self.message =json.loads('{}') #我们把收到的最新消息存在这里 json 格式
# ====================接收器函数==========================
def receiver_start_server(self):
"""
运行消息接收器
"""
self.sender = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.receiver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.receiver.bind(('127.0.0.1', 5711)) #监听5711,是因为,我们把go-cqhttp的消息发送到了本机的5711端口
self.receiver.listen(100)
self.i_got_it_3q = "HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
# 用无限循环来接受消息
while True:
self.message = self.recv_msg()
log(f'接受消息:{self.message}')
self.auto_reply() # 自动回复来应答
def get_json(self,msg):
"""
获得消息中的json数据
"""
for i in range(len(msg)):
if msg[i]=="{":
return json.loads(msg[i:])
return None
def recv_msg(self):
"""
接收消息的
"""
conn, Address = self.receiver.accept()
Request = conn.recv(1024).decode(encoding=self.encoding)
rev_json=self.get_json(Request)
log(rev_json)
conn.sendall ( (self.i_got_it_3q).encode(encoding=self.encoding) )
conn.close()
return rev_json
# =====================发送消息函数=========================
def send_msg(self,id,msg,send_type = 'private'):
"""
发送消息
"""
if send_type == 'group':
payload = f"GET /send_group_msg?group_id={str(id)}&message={msg} HTTP/1.1\r\nHost: {self.send_ip}:{self.send_port}\r\nConnection: close\r\n\r\n"
elif send_type == 'private':
payload = f"GET /send_private_msg?user_id={str(id)}&message={msg} HTTP/1.1\r\nHost: {self.send_ip}:{self.send_port}\r\nConnection: close\r\n\r\n"
# log(f'paload={payload}')
self.sender = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sender.connect((self.send_ip,self.send_port))
self.sender.send(payload.encode("utf-8"))
self.sender.close()
def get_message_type(self):
# 获取信息类型 群聊/私聊 group/private
return self.message['message_type']
def get_number(self):
# 获取群号/私聊qq号
if self.get_message_type() == 'group':
return self.message['group_id']
elif self.get_message_type() == 'private':
return self.message['user_id']
else:
log(f'error: 消息格式错误! 找不到群号/QQ号 \n{self.message}')
exit()
def get_raw_message(self):
#获取收到的信息
return self.message['raw_message']
def call_tuling_api(self,uid:int, text: str,image:str=''):
url = 'http://openapi.tuling123.com/openapi/api/v2'
payload = {
"reqType":0,
"perception": {
"inputText": {
"text": text
},
"inputImage": {
"url": image
},
"selfInfo": {
"location": {
"city": "上海",
"province": "上海",
"street": "海泉路"
}
}
},
"userInfo": {
"apiKey": 'e7e9e6fdb1f14a758c8ffe29d2c3f5ac',
"userId": f"{uid}"
}
}
try:
resp=requests.post(url, data=json.dumps(payload))
result = json.loads(resp.text)
log(result)
return result['results'][0]['values']['text']
except:
log(traceback.format_exc())
return None
def response_api(self,uid,q):
# 如果有ai的话,用这个函数根据发送者的消息给融来给出回复
# 先做个位置,我们暂时用复读
a = self.call_tuling_api(uid=uid,text=q)
return a
def auto_reply(self):
"""
自动回复
"""
# 获取发送者的消息类型 私聊/群聊
msg_type = self.get_message_type()
# 获取发送者的qq号
qq = self.get_number()
# 组织语言
q = self.get_raw_message()
a = self.response_api(uid=qq,q=q)
# 发送回答
log(f'answer = {a}')
self.send_msg(
id = qq,# 获得发送者的qq
msg = a,
send_type= msg_type
)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。