代码拉取完成,页面将自动刷新
同步操作将从 Ikaros/AI-Vtuber 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/bin/python
# coding:utf-8
# @FileName: liveMan.py
# @Time: 2024/1/2 21:51
# @Author: bubu
# @Project: douyinLiveWebFetcher
import gzip
import random
import re
import string
import requests
import websocket
from protobuf.douyin import *
def generateMsToken(length=107):
"""
产生请求头部cookie中的msToken字段,其实为随机的107位字符
:param length:字符位数
:return:msToken
"""
random_str = ''
base_str = string.ascii_letters + string.digits + '=_'
_len = len(base_str) - 1
for _ in range(length):
random_str += base_str[random.randint(0, _len)]
return random_str
def generateTtwid():
"""
产生请求头部cookie中的ttwid字段,访问抖音网页版直播间首页可以获取到响应cookie中的ttwid
:return: ttwid
"""
url = "https://live.douyin.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except Exception as err:
print("【X】request the live url error: ", err)
else:
return response.cookies.get('ttwid')
class DouyinLiveWebFetcher:
def __init__(self, live_id):
"""
直播间弹幕抓取对象
:param live_id: 直播间的直播id,打开直播间web首页的链接如:https://live.douyin.com/261378947940,
其中的261378947940即是live_id
"""
self.__ttwid = None
self.__room_id = None
self.is_connected = None
self.live_id = live_id
self.live_url = "https://live.douyin.com/"
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/120.0.0.0 Safari/537.36"
def send_heartbeat(self, ws):
import time, threading
def heartbeat():
while True:
time.sleep(15)#每15秒发送一次心跳
if self.is_connected:
ws.send("hi")#使用实际的心跳消息格式
else:
print( "Connection lost, stopping heartbeat.")
return
threading.Thread(target=heartbeat).start()
def start(self):
self._connectWebSocket()
def stop(self):
self.ws.close()
@property
def ttwid(self):
"""
产生请求头部cookie中的ttwid字段,访问抖音网页版直播间首页可以获取到响应cookie中的ttwid
:return: ttwid
"""
if self.__ttwid:
return self.__ttwid
headers = {
"User-Agent": self.user_agent,
}
try:
response = requests.get(self.live_url, headers=headers)
response.raise_for_status()
except Exception as err:
print("【X】Request the live url error: ", err)
else:
self.__ttwid = response.cookies.get('ttwid')
return self.__ttwid
@property
def room_id(self):
"""
根据直播间的地址获取到真正的直播间roomId,有时会有错误,可以重试请求解决
:return:room_id
"""
if self.__room_id:
return self.__room_id
url = self.live_url + self.live_id
headers = {
"User-Agent": self.user_agent,
"cookie": f"ttwid={self.ttwid}&msToken={generateMsToken()}; __ac_nonce=0123407cc00a9e438deb4",
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except Exception as err:
print("【X】Request the live room url error: ", err)
else:
match = re.search(r'roomId\\":\\"(\d+)\\"', response.text)
if match is None or len(match.groups()) < 1:
print("【X】No match found for roomId")
self.__room_id = match.group(1)
return self.__room_id
def _connectWebSocket(self):
"""
连接抖音直播间websocket服务器,请求直播间数据
"""
# wss = f"wss://webcast3-ws-web-lq.douyin.com/webcast/im/push/v2/?" \
# f"app_name=douyin_web&version_code=180800&webcast_sdk_version=1.3.0&update_version_code=1.3.0" \
# f"&compress=gzip" \
# f"&internal_ext=internal_src:dim|wss_push_room_id:{self.room_id}|wss_push_did:{self.room_id}" \
# f"|dim_log_id:202302171547011A160A7BAA76660E13ED|fetch_time:1676620021641|seq:1|wss_info:0-1676" \
# f"620021641-0-0|wrds_kvs:WebcastRoomStatsMessage-1676620020691146024_WebcastRoomRankMessage-167661" \
# f"9972726895075_AudienceGiftSyncData-1676619980834317696_HighlightContainerSyncData-2&cursor=t-1676" \
# f"620021641_r-1_d-1_u-1_h-1" \
# f"&host=https://live.douyin.com&aid=6383&live_id=1" \
# f"&did_rule=3&debug=false&endpoint=live_pc&support_wrds=1&" \
# f"im_path=/webcast/im/fetch/&user_unique_id={self.room_id}&" \
# f"device_platform=web&cookie_enabled=true&screen_width=1440&screen_height=900&browser_language=zh&" \
# f"browser_platform=MacIntel&browser_name=Mozilla&" \
# f"browser_version=5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20" \
# f"like%20Gecko)%20Chrome/110.0.0.0%20Safari/537.36&" \
# f"browser_online=true&tz_name=Asia/Shanghai&identity=audience&" \
# f"room_id={self.room_id}&heartbeatDuration=0&signature=00000000"
wss = "wss://webcast5-ws-web-lq.douyin.com/webcast/im/push/v2/?app_name=douyin_web&version_code=180800&webcast_sdk_version=1.0.14-beta.0&update_version_code=1.0.14-beta.0&compress=gzip&device_platform=web&cookie_enabled=true&screen_width=2048&screen_height=1152&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_version=5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/126.0.0.0%20Safari/537.36%20Edg/126.0.0.0&browser_online=true&tz_name=Etc/GMT-8&cursor=h-7383323426352862262_t-1719063974519_r-1_d-1_u-1&internal_ext=internal_src:dim|wss_push_room_id:7383264938631973686|wss_push_did:7293153952199050788|first_req_ms:1719063974385|fetch_time:1719063974519|seq:1|wss_info:0-1719063974519-0-0|wrds_v:7383323492227230262&host=https://live.douyin.com&aid=6383&live_id=1&did_rule=3&endpoint=live_pc&support_wrds=1&user_unique_id=7293153952199050788&im_path=/webcast/im/fetch/&identity=audience&need_persist_msg_count=15&insert_task_id=&live_reason=&room_id=7383264938631973686&heartbeatDuration=0&signature=6DJMtCOOuubiYZP4"
headers = {
"cookie": f"ttwid={self.ttwid}",
'user-agent': self.user_agent,
}
self.ws = websocket.WebSocketApp(wss,
header=headers,
on_open=self._wsOnOpen,
on_message=self._wsOnMessage,
on_error=self._wsOnError,
on_close=self._wsOnClose)
try:
self.ws.run_forever()
except Exception:
self.stop()
raise
def _wsOnOpen(self, ws):
"""
连接建立成功
"""
print("WebSocket connected.")
self.is_connected = True
def _wsOnMessage(self, ws, message):
"""
接收到数据
:param ws: websocket实例
:param message: 数据
"""
# 根据proto结构体解析对象
package = PushFrame().parse(message)
response = Response().parse(gzip.decompress(package.payload))
# 返回直播间服务器链接存活确认消息,便于持续获取数据
if response.need_ack:
ack = PushFrame(log_id=package.log_id,
payload_type='ack',
payload=response.internal_ext.encode('utf-8')
).SerializeToString()
ws.send(ack, websocket.ABNF.OPCODE_BINARY)
# 根据消息类别解析消息体
for msg in response.messages_list:
method = msg.method
try:
{
'WebcastChatMessage': self._parseChatMsg, # 聊天消息
'WebcastGiftMessage': self._parseGiftMsg, # 礼物消息
'WebcastLikeMessage': self._parseLikeMsg, # 点赞消息
'WebcastMemberMessage': self._parseMemberMsg, # 进入直播间消息
'WebcastSocialMessage': self._parseSocialMsg, # 关注消息
'WebcastRoomUserSeqMessage': self._parseRoomUserSeqMsg, # 直播间统计
'WebcastFansclubMessage': self._parseFansclubMsg, # 粉丝团消息
'WebcastControlMessage': self._parseControlMsg, # 直播间状态消息
'WebcastEmojiChatMessage': self._parseEmojiChatMsg, # 聊天表情包消息
'WebcastRoomStatsMessage': self._parseRoomStatsMsg, # 直播间统计信息
'WebcastRoomMessage': self._parseRoomMsg, # 直播间信息
'WebcastRoomRankMessage': self._parseRankMsg, # 直播间排行榜信息
}.get(method)(msg.payload)
except Exception:
pass
def _wsOnError(self, ws, error):
print("WebSocket error: ", error)
self.is_connected = False
def _wsOnClose(self, ws):
print("WebSocket connection closed.")
self.is_connected = False
def _parseChatMsg(self, payload):
'''聊天消息'''
message = ChatMessage().parse(payload)
user_name = message.user.nick_name
user_id = message.user.id
content = message.content
print(f"【聊天msg】[{user_id}]{user_name}: {content}")
def _parseGiftMsg(self, payload):
'''礼物消息'''
message = GiftMessage().parse(payload)
user_name = message.user.nick_name
gift_name = message.gift.name
gift_cnt = message.combo_count
print(f"【礼物msg】{user_name} 送出了 {gift_name}x{gift_cnt}")
def _parseLikeMsg(self, payload):
'''点赞消息'''
message = LikeMessage().parse(payload)
user_name = message.user.nick_name
count = message.count
print(f"【点赞msg】{user_name} 点了{count}个赞")
def _parseMemberMsg(self, payload):
'''进入直播间消息'''
message = MemberMessage().parse(payload)
user_name = message.user.nick_name
user_id = message.user.id
gender = ["女", "男"][message.user.gender]
print(f"【进场msg】[{user_id}][{gender}]{user_name} 进入了直播间")
def _parseSocialMsg(self, payload):
'''关注消息'''
message = SocialMessage().parse(payload)
user_name = message.user.nick_name
user_id = message.user.id
print(f"【关注msg】[{user_id}]{user_name} 关注了主播")
def _parseRoomUserSeqMsg(self, payload):
'''直播间统计'''
message = RoomUserSeqMessage().parse(payload)
current = message.total
total = message.total_pv_for_anchor
print(f"【统计msg】当前观看人数: {current}, 累计观看人数: {total}")
def _parseFansclubMsg(self, payload):
'''粉丝团消息'''
message = FansclubMessage().parse(payload)
content = message.content
print(f"【粉丝团msg】 {content}")
def _parseEmojiChatMsg(self, payload):
'''聊天表情包消息'''
message = EmojiChatMessage().parse(payload)
emoji_id = message.emoji_id
user = message.user
common = message.common
default_content = message.default_content
print(f"【聊天表情包id】 {emoji_id},user:{user},common:{common},default_content:{default_content}")
def _parseRoomMsg(self, payload):
message = RoomMessage().parse(payload)
common = message.common
room_id = common.room_id
print(f"【直播间msg】直播间id:{room_id}")
def _parseRoomStatsMsg(self, payload):
message = RoomStatsMessage().parse(payload)
display_long = message.display_long
print(f"【直播间统计msg】{display_long}")
def _parseRankMsg(self, payload):
message = RoomRankMessage().parse(payload)
ranks_list = message.ranks_list
print(f"【直播间排行榜msg】{ranks_list}")
def _parseControlMsg(self, payload):
'''直播间状态消息'''
message = ControlMessage().parse(payload)
if message.status == 3:
print("直播间已结束")
self.stop()
if __name__ == '__main__':
live_id = '386798490464'
DouyinLiveWebFetcher(live_id).start()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。