代码拉取完成,页面将自动刷新
import random
import json
import time
import sys
from websocket import create_connection
from barrage import MessageDecode
from flask import Flask, request, Response, jsonify
from operator import itemgetter
from flask_cors import CORS
import websocket
import requests
import os
import threading
import webbrowser
import traceback
iid = '' #直播间id
timeout = 0 #倒计时
u_user = [] #已投用戶
u_piao = [] #监控的票
u_users = [] #主播
timestr = time.strftime("%Y%m%d", time.localtime())
cache_file = '' #缓存
app = Flask(__name__, static_folder='images')
CORS(app)
def go(iid, start_c=True):
try:
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Host': 'live.kuaishou.com',
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Referer': 'https://live.kuaishou.com/',
'Cookie': 'clientid=3; did=web_403d467ae64348958d48792621f6789e; client_key=65890b29; didv=1578388674000; Hm_lvt_86a27b7db2c5c0ae37fee4a8a35033ee=1578883494,1578883507; kuaishou.live.bfb1s=477cb0011daca84b36b3a4676857e5a1',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36'
}
payboard = {
"operationName": "LiveDetail",
"variables": {"principalId": iid},
"query": "query LiveDetail($principalId: String) { webLiveDetail(principalId: $principalId) { liveStream feedInfo { pullCycleMillis __typename } watchingInfo { likeCount watchingCount __typename } noticeList { feed options __typename } fastComments commentColors moreRecommendList { user { id avatar name __typename } watchingCount poster coverUrl caption id playUrls { quality url __typename } quality gameInfo { category name pubgSurvival type kingHero __typename } hasRedPack liveGuess expTag __typename } __typename }}"
}
headers['content-Type'] = 'application/json;charset=UTF-8'
page = requests.post('https://live.kuaishou.com/m_graphql',
data=json.dumps(payboard), headers=headers).json()
steam_id = page['data']['webLiveDetail']['liveStream']['liveStreamId']
payboard = {"operationName": "WebSocketInfoQuery", "variables": {"liveStreamId": steam_id},
"query": "query WebSocketInfoQuery($liveStreamId: String) {\n webSocketInfo(liveStreamId: $liveStreamId) {\n token\n webSocketUrls\n __typename\n }\n}\n"}
infos = requests.post('https://live.kuaishou.com/m_graphql',
data=json.dumps(payboard), headers=headers).json()
token = infos['data']['webSocketInfo']['token']
wss_url = infos['data']['webSocketInfo']['webSocketUrls'][0]
if start_c:
start1(wss_url, token, steam_id)
except Exception as e:
print(e)
return 1
def start1(wssUrl, token, steamId):
global u_user, u_users, timeout
def get_page_id():
charset = "bjectSymhasOwnProp-0123456789ABCDEFGHIJKLMNQRTUVWXYZ_dfgiklquvxz"
page_id = ''
for _ in range(0, 16):
page_id += random.choice(charset)
page_id += "_"
page_id += str(int(time.time() * 1000))
return page_id
part1 = b'\x08\xc8\x01\x1a\xc8\x01\n\x98\x01'
part2 = token.encode()
part3 = b'\x12\x0b'
part4 = steamId.encode()
part5 = b':\x1e'
pageId = get_page_id()
part6 = pageId.encode()
uu = part1 + part2 + part3 + part4 + part5 + part6
ws = create_connection(wssUrl)
ws.send(uu, opcode=websocket.ABNF.OPCODE_BINARY)
while timeout > 0:
try:
if not ws.connected:
ws = create_connection(wssUrl)
ws.send(uu, opcode=websocket.ABNF.OPCODE_BINARY)
pp1 = b'\x08\x01\x1a\x07\x08\xb0\xc4\xf7\x9e\x89.'
ws.send(pp1, opcode=websocket.ABNF.OPCODE_BINARY)
message = ws.recv()
data = [m for m in message]
message = MessageDecode(data)
if message.decode():
message.feed_decode()
if message.message.get('user'):
inds = message.message.get('user')
for ind in inds:
name = ind['user']['userName']
content = ind['content']
if name != '':
try:
if content in u_piao:
if not name in u_user:
u_user.append(name)
name = ind['user']['userName']
content = ind['content']
for i, u in enumerate(u_users):
if content == u['content']:
u_users[i]['num'] += 1
except Exception as e:
print(e, '解码出错?')
except Exception as e:
print(e)
ws.close()
def start(wssUrl, token, steamId):
global u_user, u_users, timeout
def get_page_id():
charset = "bjectSymhasOwnProp-0123456789ABCDEFGHIJKLMNQRTUVWXYZ_dfgiklquvxz"
page_id = ''
for _ in range(0, 16):
page_id += random.choice(charset)
page_id += "_"
page_id += str(int(time.time() * 1000))
return page_id
part1 = b'\x08\xc8\x01\x1a\xc8\x01\n\x98\x01'
part2 = token.encode()
part3 = b'\x12\x0b'
part4 = steamId.encode()
part5 = b':\x1e'
pageId = get_page_id()
part6 = pageId.encode()
uu = part1 + part2 + part3 + part4 + part5 + part6
ws = create_connection(wssUrl)
ws.send(uu, opcode=websocket.ABNF.OPCODE_BINARY)
while timeout>0:
try:
if not ws.connected:
ws = create_connection(wssUrl)
ws.send(uu, opcode=websocket.ABNF.OPCODE_BINARY)
pp1 = b'\x08\x01\x1a\x07\x08\xb0\xc4\xf7\x9e\x89.'
ws.send(pp1, opcode=websocket.ABNF.OPCODE_BINARY)
message = ws.recv()
data = [m for m in message]
message = MessageDecode(data)
if message.decode():
message.feed_decode()
if message.message.get('user'):
inds = message.message.get('user')
for ind in inds:
# print(ind)
name = ind['user']['userName']
content = ind['content']
if name != '':
try:
if content in u_piao:
if not name in u_user:
u_user.append(name)
name = ind['user']['userName']
content = ind['content']
for i, u in enumerate(u_users):
if content == u['content']:
u_users[i]['num'] += 1
except Exception as e:
print(e, '解码出错?')
except Exception as e:
print(e, '发送出错?')
ws.close()
def save(a, b):
with open('test.txt', 'a+', encoding='utf8') as f:
f.write(str(a) + str(b) + '\n')
def is_connected():
try:
_ = requests.head("http://www.baidu.com", timeout=2)
return True
except:
return False
return False
def timetick():
global timeout, cache_file
lost = 0
while timeout > 0:
#保存缓存
save_cache()
timeout = timeout - 1
#每隔5秒检测主播是否开播
if timeout % 5 == 0:
try:
if go(iid, False) == 1:
lost += 1
except Exception as e1:
print('检测主播状态出错:', e1)
lost += 1
try:
if not is_connected():
lost += 1
except Exception as e2:
print('检测网络状态出错:', e2)
lost += 1
# if lost >= 10:
# print('失去连接或主播已下播...即将关闭程序')
# time.sleep(3)
# os._exit(0)
time.sleep(0.9)
if timeout == 0:
time.sleep(5)
os._exit(0)
# threading.Timer(0.9, timetick).start()
def save_cache():
global cache_file, u_user, timeout, u_users, timestr
new_timestr = time.strftime("%Y%m%d", time.localtime())
if timestr != new_timestr:
timestr = new_timestr
cache_file = './cache/{}_{}.txt'.format(iid, new_timestr)
u_user = []
if not os.path.exists(cache_file):
cache_file = './cache/{}_{}.txt'.format(iid, timestr)
strs = ''
with open(cache_file, 'w', encoding='utf8') as f:
strs += 'users#' + '|'.join(u_user) + '\n'
if timeout != 0:
strs += 'timeout#' + str(timeout) + '\n'
for v in u_users:
strs += '{}#{}\n'.format(str(v['content']), str(v['num']))
f.write(strs.encode('UTF-8','ignore').decode('UTF-8'))
def load_cache():
global u_user, timeout, u_users, cache_file
cache_file = './cache/{}_{}.txt'.format(iid, timestr)
if os.path.exists(cache_file):
f = open(cache_file, encoding='utf8')
c = f.read()
c = c.split("\n")
for i in c:
v = i.split("#")
if len(v) == 2:
if v[0] == 'timeout':
timeout = int(v[1])
if v[0] == 'users':
u_user = v[1].split('|')
if v[0] in u_piao:
for i, u in enumerate(u_users):
if v[0] == u['content']:
u_users[i]['num'] = int(v[1])
f.close()
@app.route('/barrage')
def barrage():
data = sorted(u_users, key=itemgetter('num'))
resp = {'timeout': timeout, 'data': data}
return jsonify(resp)
@app.route('/start_ticker', methods=['POST'])
def start_ticker():
try:
timetick()
return jsonify({'message': 'success'})
except Exception as e:
return jsonify({'message': e})
if __name__ == "__main__":
host = '127.0.0.1'
port = 8000
#读取配置检查配置
with open('./config.json', 'r', encoding='utf8') as f:
data = json.load(f)
base = data.get('base')
if base['iid'] == '':
print('请输入主播id')
os._exit(0)
if base['timeout'] == '':
print('请输入倒计时')
os._exit(0)
if len(data.get('users')) == 0:
print('请输入要爬取的主播信息')
os._exit(0)
iid = base['iid']
timeout = base['timeout']
u_users = data.get('users')
for u in u_users:
u['num'] = 0
u['avatar'] = 'http://{}:{}{}'.format(host, port, u['avatar'])
if u['content'] not in u_piao:
u_piao.append(u['content'])
#检测是否开播
if go(iid, False):
print('主播已下播,即将关闭程序...')
time.sleep(3)
os._exit(0)
#读取对应直播间缓存更新u_piao, u_user,每秒更新缓存,结束时删除缓存
load_cache()
#启动爬虫
tt = threading.Thread(target=go, args=(iid,))
tt.setDaemon(True)
tt.start()
#定时
hh = threading.Thread(target=timetick)
hh.setDaemon(True)
hh.start()
# timetick()
#打开网页
webbrowser.open('file://' + os.path.realpath('web/index.html'))
#启动server
app.run(host=host, port=port)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。