代码拉取完成,页面将自动刷新
同步操作将从 liweimin/抖音视频上传助手 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# -*- coding: utf-8 -*-
import base64
from threading import Thread
import time
import random
import zlib
import requests
import logging
from io import BytesIO
import http.cookiejar as cookielib
from PIL import Image
from uuid import uuid4
import os, sys
requests.packages.urllib3.disable_warnings()
logger = logging.getLogger(__name__)
agent = [
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0'
]
headers = {
'User-Agent' : random.choice(agent),
'Referer' : 'https://creator.douyin.com/',
'Origin' : 'https://creator.douyin.com'
}
def initloger():
if not os.path.exists('./logs'):
os.mkdir('./logs')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('./logs/{}.log'.format(time.strftime('%Y%m%d', time.localtime(time.time()))), encoding='utf-8',mode='a')
formatter = logging.Formatter("%(asctime)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
fh.setFormatter(formatter)
logger.addHandler(fh)
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
initloger()
def generate_random_str(randomlength=11):
# 生成一个指定长度的随机字符串
random_str = ''
base_str = 'abcdefghijklmnopqrstuvwxyz0123456789'
length = len(base_str) - 1
for i in range(randomlength):
random_str += base_str[random.randint(0, length)]
return random_str
class showpng(Thread):
def __init__(self, data):
Thread.__init__(self)
self.data = data
def run(self):
img = Image.open(BytesIO(self.data))
img.show()
def islogin(userid):
if not os.path.exists('./cache'):
os.mkdir('./cache')
cookiefile = './cache/%s.txt' % userid
if not os.path.exists(cookiefile):
with open(cookiefile, 'w') as f:
f.write('')
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename=cookiefile)
try:
session.cookies.load(ignore_discard=True)
except Exception:
pass
res = session.get('https://sso.douyin.com/check_login/', headers=headers).json()
if res['has_login']:
#logger.info('Cookies值有效,无需扫码登录!')
return 1, session
else:
logger.info('Cookies值失效,请重新扫码登录!')
return 0, session
def login(userid):
status, session = islogin(userid)
if status == 0:
session = requests.session()
cookiefile = './cache/%s.txt' % userid
session.cookies = cookielib.LWPCookieJar(filename=cookiefile)
urldata = session.get('https://sso.douyin.com/get_qrcode/', headers=headers).json()
testpng = base64.b64decode(urldata['data']['qrcode'])
# <img src="data:image/png;base64,{testpng}">
token = urldata['data']['token']
t = showpng(testpng)
t.start()
while True:
tokendata = session.get('https://sso.douyin.com/check_qrconnect/?token=%s' % token, headers=headers).json()
if 5 == int(tokendata['data']['status']):
logger.info('登录二维码已过期,请重新打开')
if 3 == int(tokendata['data']['status']):
session.get(tokendata['data']['redirect_url'], headers=headers)
session.get('https://creator.douyin.com/', headers=headers)
session.get('https://creator.douyin.com/web/api/media/user/info/', headers=headers)
logger.info('已确认,登录成功')
break
time.sleep(3)
session.cookies.save()
return session
def getaccount(session):
req = session.get('https://creator.douyin.com/aweme/v1/creator/user/info/', headers=headers)
res = req.json()
if res['status_code'] == 0:
userId = res['douyin_user_verify_info']['douyin_unique_id']
userName = res['douyin_user_verify_info']['nick_name']
userAvatar = res['douyin_user_verify_info']['avatar_url']
return userId, userName, userAvatar
return False
def getcookies(session):
cookies = {}
for ck in session.cookies:
cookies[ck.name] = ck.value
return cookies
def delvideo(session, item_id):
data = {
'item_id': (None, str(item_id))
}
res = session.post('https://creator.douyin.com/web/api/media/aweme/delete/', files=data, headers=headers).json()
if res['status_code'] == 0:
return True
def workslist(userid):
status, session = islogin(userid)
if status == 0:
return False
res = session.get('https://creator.douyin.com/web/api/media/aweme/post/?scene=star_atlas&status=0&count=12&max_cursor=0', headers=headers).json()
if res['status_code'] == 0:
#if res['has_more']:
totalCount = res['total']
pageCount = int(int(totalCount + 50 - 1) / 50)
for work in res['aweme_list']:
workId , title, cover, uploadTime, playCount, likeCount, commentCount = work['aweme_id'], work['desc'], work['create_time'], '', work['statistics']['play_count'], work['statistics']['digg_count'], work['statistics']['comment_count']
def uploadtoken(session, fsize):
# 1获取上传ak auth
cookies = getcookies(session)
headers['x-csrf-token'] = cookies['csrf_token']
res = session.get('https://creator.douyin.com/web/api/media/upload/auth/', headers=headers).json()
if res['status_code'] == 0:
akey = res['ak']
auth = res['auth']
else:
logger.info('获取上传token错误')
# 2申请上传
headersx = {
'Authorization': auth,
'X-TT-Access': akey,
'Content-Type': 'application/json'
}
rdmstr = generate_random_str()
res = session.get('https://vas-lf-x.snssdk.com/video/openapi/v1/?action=GetVideoUploadParams&s=%s&use_edge_node=1&file_size=%s' % (rdmstr, fsize), headers=headersx).json()
if res['message'] == 'ok':
vid = res['data']['vid']
oid = res['data']['oid']
tos_host = res['data']['tos_host']
tos_sign = res['data']['tos_sign']
token = res['data']['token']
upload_id = res['data']['upload_id']
encryption_key = res['data']['encryption_key']
#tmp = session.post('https://%s/%s?uploads' % (tos_host, oid), headers=headers).json()
#upload_id = tmp['payload']['uploadID']
return vid, oid, tos_host, tos_sign, token, upload_id, akey, auth
else:
logger.info('申请上传错误 - %s' % res['message'])
return False
def upload(session, flen, chunk, i, tos_sign, upload_id, upload_url):
prev = zlib.crc32(chunk)
crc32 = "%X" % (prev & 0xffffffff)
headersx = {
'Authorization': tos_sign,
'Content-CRC32': crc32.lower(),
'Content-Disposition': 'attachment; filename="undefined"',
'Content-Length': str(len(chunk)),
'Content-Type': 'application/octet-stream'
}
put_url = upload_url if flen <= 5242880 else '%s?partNumber=%s&uploadID=%s' % (upload_url, i, upload_id)
res = session.post(put_url, data=chunk, headers=headersx).json()
if res['success'] == 0:
pass
else:
logger.info('分块上传失败 - %s' % i)
def upload_check(session, tos_sign, upload_id, upload_url, vid, oid, token, akey, auth):
data = {
'vid': vid,
'oid': oid,
'token': token,
'poster_ss': 0,
'is_exact_poster': True,
'user_reference':''
}
headersx = {
'Authorization': auth,
'X-TT-Access': akey,
'Content-Type': 'application/json'
}
res = session.post('https://vas-lf-x.snssdk.com/video/openapi/v1/?action=UpdateVideoUploadInfos', json=data, headers=headersx).json()
if res['message'] == 'ok':
return res['data']['video']['width'], res['data']['video']['height'], res['data']['poster']['oid']
else:
logger.info('校验上传出错 - %s' % res['message'])
return False
def submit(session, vid, poster_oid, caption = '', longitude = '', latitude = ''):
data = {
'video_id': (None, vid),
'text': (None, caption),
'record': (None, 'null'),
'source_info': (None, ''),
'text_extra': (None, '[]'),
'challenges': (None, '[]'),
'mentions': (None, '[]'),
'hashtag_source': (None, ''),
'visibility_type': (None, '1'),
'download': (None, '0'),
'upload_source': (None, '1'),
'mix_id': (None, ''),
'mix_order': (None, ''),
'is_preview': (None, '0'),
'hot_sentence': (None, ''),
'cover_text_uri': (None, ''),
'cover_text': (None, ''),
'poster': (None, poster_oid),
'poster_delay': (None, '0'),
'poi_id': (None, '6601244105802516494'),
'poi_name': (None, '华润大厦'),
'music_source': (None, '0'),
'music_id': (None, '')
}
res = session.post('https://creator.douyin.com/web/api/media/aweme/create/', files=data, headers=headers).json()
if res['status_code'] == 0:
return 1
else:
logger.info('发布出错 - %s' % res['status_msg'])
return 0
def upvideo(session, filePath, title='', longitude='', latitude=''):
with open(filePath, 'rb') as f:
f.seek(0, 2)
flen = f.tell()
# 获取上传凭证
vid, oid, tos_host, tos_sign, token, upload_id, akey, auth = uploadtoken(session, flen)
# 回文件头
f.seek(0, 0)
i = 1
while True:
chunk = f.read(5242880)
if not chunk:
break
# 按5M大小分块上传视频
upload_url = 'https://%s/%s' % (tos_host, oid)
upload(session, flen, chunk, i, tos_sign, upload_id, upload_url)
i += 1
try:
# 校验上传
width, height, poster_oid = upload_check(session, tos_sign, upload_id, upload_url, vid, oid, token, akey, auth)
# 发布
result = submit(session, vid, poster_oid, title, longitude, latitude)
except Exception as e:
result = 0
logger.info('Error#' + str(e))
return result
def publish(session, filePath, title, longitude='', latitude='', unlink = 0):
logger.info('开始上传 - %s < %s' % (title, filePath))
result = upvideo(session, filePath, title, longitude, latitude)
if result == 1:
if unlink == 1:
os.remove(filePath)
logger.info('上传完成 - %s %s' % (title, ('> 缓存文件已删除' if unlink == 1 else '')))
return result
def worker(curpath, unlink = False):
userid = curpath.split('\\')[-1]
session = login(userid)
filenum = 0
fileset = set()
for root, dirs, files in os.walk(curpath):
for f in files:
if f.endswith('.mp4'):
fileset.add(os.path.join(root, f))
filenum += 1
fileset = list(fileset)
logger.info('%s共%s个视频' % (curpath, str(filenum)))
for i in range(0, filenum):
time.sleep(3)
fname = fileset[i]
logger.info('正在上传%s_%s_%s' % (curpath, str(i+1), fname))
#发布内容
title = os.path.basename(fname)[:-4]
result = upvideo(session, fname, title)
if result is True and unlink is True:
os.remove(fname)
def traceexcept(type, value, trace):
log = 'Trace#' + str(type) + '::' + str(value)
while trace:
log += os.linesep + ' File ' + str(trace.tb_frame.f_code.co_filename) + ', Line ' + str(trace.tb_lineno)
trace = trace.tb_next
logger.error(log)
if __name__ == '__main__':
sys.excepthook = traceexcept
# 视频文件夹
curpath = sys.argv[1] if len(sys.argv)>1 else './'
# 上传完成后是否删除
unlink = True if len(sys.argv)>2 else False
worker(curpath, unlink)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。