代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# coding:utf-8
import re
import json
import os
import threading
import time
import requests
from requests.exceptions import RequestException
from bs4 import BeautifulSoup, element
class LeetCode:
def get_problems_set(self):
"""
获取所有问题列表
"""
try:
url = "https://leetcode-cn.com/api/problems/all/"
response = requests.get(url)
if response.status_code == 200:
html = json.loads(response.text)
problems = self.filter_exist_problems(html["stat_status_pairs"])
return problems
print("get_problems_set response error. status_code", response.status_code)
except RequestException as e:
print("get_problems_set error.", e)
return []
def exist_problems(self):
pbm_path = "../problems/"
res = []
if os.path.isdir(pbm_path):
pattern = re.compile("\d+\.[ ]*(.*?)\.html$",re.S)
for item in os.listdir(pbm_path):
match_data = re.match(pattern, item)
if match_data:
res.append(match_data.group(1))
return res
def filter_exist_problems(self, problems):
"""
过滤掉已经下载的题目
"""
old_pbms = self.exist_problems()
new_pbms = []
for problem in problems:
title = problem["stat"]["question__title_slug"]
if title in old_pbms:
old_pbms.remove(title)
else:
new_pbms.append(title)
return new_pbms
class ProblemParser:
def __init__(self, leetcode):
self.base_url = "https://leetcode-cn.com/problems/"
self.leetcode = leetcode
def parse_set(self, titles):
for title in titles:
self.parse_single(title)
def parse_single(self, title):
url = self.generate_url(title)
self.leetcode.get_proble_content(url, title)
def generate_url(self, title):
return self.base_url + title
class MutliThreadParser(ProblemParser):
def parse_set(self, titles):
for title in titles:
t = threading.Thread(target=self.parse_single, args=(title,))
t.start()
class ThreadPoolParser(ProblemParser):
def __init__(self, leetcode):
LeetCode.ProblemParser.__init__(self, leetcode)
self.mutex = threading.Lock()
self.thread_num = 28
def parse_set(self, titles):
for i in range(self.thread_num):
t = threading.Thread(target=self.parse, args=(titles,))
t.start()
def parse(self, titles):
"""
抓取单个题目
"""
while True:
title = self.get_title(titles)
if not title: break
self.parse_single(title)
def get_title(self, titles):
"""
获取一个可用的题目title
"""
self.mutex.acquire()
try:
title = titles.pop()
except IndexError:
title = None
finally:
self.mutex.release()
return title
def save_problem(self, title, content):
"""
保存题目内容到文件
"""
#content = bytes(content,encoding = 'utf8')
filename = title + ".html"
with open(filename, 'w+',encoding="utf-8")as f:
f.write(content)
def get_proble_content(self, problemUrl, title):
"""
获取题目的内容(中英文)
"""
response = requests.get(problemUrl)
setCookie = response.headers["Set-Cookie"]
'''
print(setCookie)
setCookie = json.loads(setCookie)
print(type(setCookie))
'''
try:
pattern = re.compile("csrftoken=(.*?);.*?", re.S)
csrftoken = re.search(pattern, setCookie)
url = "https://leetcode-cn.com/graphql"
data = {
"operationName": "questionData",
"variables": {"titleSlug": title},
"query": "query questionData($titleSlug: String!) {\n question(titleSlug: $titleSlug) {\n questionId\n questionFrontendId\n boundTopicId\n title\n titleSlug\n content\n translatedTitle\n translatedContent\n isPaidOnly\n difficulty\n likes\n dislikes\n isLiked\n similarQuestions\n contributors {\n username\n profileUrl\n avatarUrl\n __typename\n }\n langToValidPlayground\n topicTags {\n name\n slug\n translatedName\n __typename\n }\n companyTagStats\n codeSnippets {\n lang\n langSlug\n code\n __typename\n }\n stats\n hints\n solution {\n id\n canSeeDetail\n __typename\n }\n status\n sampleTestCase\n metaData\n judgerAvailable\n judgeType\n mysqlSchemas\n enableRunCode\n enableTestMode\n envInfo\n __typename\n }\n}\n"
}
headers = {
'x-csrftoken': csrftoken.group(1),
'referer': problemUrl,
'content-type': 'application/json',
'origin': 'https://leetcode-cn.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'
}
cookies = {
'__cfduid': 'd9ce37537c705e759f6bea15fffc9c58b1525271602',
'_ga': 'GA1.2.5783653.1525271604',
'_gid': 'GA1.2.344320119.1533189808',
'csrftoken': csrftoken.group(1),
' _gat': '1'
}
dumpJsonData = json.dumps(data)
response = requests.post(url, data = dumpJsonData, headers = headers, cookies = cookies)
dictInfo = json.loads(response.text)
title = dictInfo["data"]["question"]["questionId"] + "." + title
content = self.format_html(dictInfo["data"]["question"])
soup = BeautifulSoup(content, 'lxml')
self.save_problem(title, soup.prettify(formatter = element.HTML5Formatter()))
except Exception as e:
print(e)
print("错误:", problemUrl)
def format_html(self, question):
"""
格式化题目为html
"""
text = '<html><head><link href="./leetcode-problem.css" rel="stylesheet" type="text/css"></head>'
text += "<body><div class='question_difficulty'>难度:" + question["difficulty"] + "</div>"
text += "<div><h1 class='question_title'>" + question["questionId"] + ". " + question["title"] + "</h1>"
text += question["content"] + "</div>"
text += "<div><h1 class='question_title'>" + question["questionId"] + ". " + question["translatedTitle"] + "</h1>"
text += question["translatedContent"] + "</div></body>"
return text
def main():
"""
获取问题列表
"""
ltcd = LeetCode()
problems = ltcd.get_problems_set()
# LeetCode.MutliThreadParser(ltcd).parse_set(problems)
LeetCode.ThreadPoolParser(ltcd).parse_set(problems)
if __name__=='__main__':
if os.path.exists("problems"):
os.chdir("problems")
else:
os.mkdir("problems")
os.chdir("problems")
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。