代码拉取完成,页面将自动刷新
同步操作将从 ayuliao/CrawlerCode 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import asyncio
import cgi
from collections import namedtuple
import logging
import re
import time
import urllib.parse
from asyncio import Queue
import aiohttp
logger = logging.getLogger()
def set_logger():
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(process)d-%(threadName)s - '
'%(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
set_logger()
def lenient_host(host):
parts = host.split('.')[-2:]
return ''.join(parts)
def is_redirect(response):
return response.status in (300, 301, 302, 303, 307)
"""
namedtuple vs tuple 更易读
tuple 使用下标的, namedtuple 使用具有业务含义的名称
namedtuple vs dict 省空间
namedtuple 查询数据,O(n)
dict 查询数据,O(1)
"""
FetchStatistic = namedtuple('FetchStatistic',
['url',
'next_url',
'status',
'exception',
'size',
'content_type',
'encoding',
'num_urls',
'num_new_urls'])
class Crawler:
"""Crawl a set of URLs.
This manages two sets of URLs: 'urls' and 'done'. 'urls' is a set of
URLs seen, and 'done' is a list of FetchStatistics.
"""
def __init__(self, roots,
exclude=None, strict=True, # What to crawl.
max_redirect=2, max_tries=5, # Per-url limits.
max_tasks=1, *, loop=None):
"""
# 多任务的话, await 执行权限,交出去,debug 就很难理解
:param roots: root url(起始url)
:param exclude: 要排除的url
:param strict: 严格匹配host
:param max_redirect: 最大重连接次数 3xx 302
:param max_tries: 最大重试次数 http
:param max_tasks: 最大任务数 协程 多任务并发去爬取
:param loop:
"""
self.loop = loop or asyncio.get_event_loop()
self.roots = roots
self.exclude = exclude
self.strict = strict
self.max_redirect = max_redirect
self.max_tries = max_tries
self.max_tasks = max_tasks
# 支持协程的队列
self.q = Queue(loop=self.loop)
# 处理过的URL
self.seen_urls = set() # 集合中的元素不可重复
self.done = []
self.session = aiohttp.ClientSession()
self.root_domains = set()
for root in roots:
parts = urllib.parse.urlparse(root)
host, port = urllib.parse.splitport(parts.netloc)
if not host:
continue
if re.match(r'\A[\d\.]*\Z', host):
self.root_domains.add(host)
else:
host = host.lower()
if self.strict:
self.root_domains.add(host)
else:
self.root_domains.add(lenient_host(host))
for root in roots:
self.add_url(root)
self.t0 = time.time()
self.t1 = None
def close(self):
"""Close resources."""
self.session.close()
def host_okay(self, host):
"""Check if a host should be crawled.
A literal match (after lowercasing) is always good. For hosts
that don't look like IP addresses, some approximate matches
are okay depending on the strict flag.
"""
host = host.lower()
if host in self.root_domains:
return True
if re.match(r'\A[\d\.]*\Z', host):
return False
if self.strict:
return self._host_okay_strictish(host)
else:
return self._host_okay_lenient(host)
def _host_okay_strictish(self, host):
"""Check if a host should be crawled, strict-ish version.
This checks for equality modulo an initial 'www.' component.
"""
host = host[4:] if host.startswith('www.') else 'www.' + host
return host in self.root_domains
def _host_okay_lenient(self, host):
"""Check if a host should be crawled, lenient version.
This compares the last two components of the host.
"""
return lenient_host(host) in self.root_domains
def record_statistic(self, fetch_statistic):
"""Record the FetchStatistic for completed / failed URL."""
self.done.append(fetch_statistic)
async def parse_links(self, response):
"""Return a FetchStatistic and list of links."""
links = set()
content_type = None
encoding = None
body = await response.read() # aiohttp docs read, byte
if response.status == 200:
content_type = response.headers.get('content-type')
pdict = {}
if content_type:
content_type, pdict = cgi.parse_header(content_type)
encoding = pdict.get('charset', 'utf-8')
if content_type in ('text/html', 'application/xml'):
text = await response.text()
# Replace href with (?:href|src) to follow image links.
urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text))
if urls:
logger.info('got %r distinct urls from %r',
len(urls), response.url)
for url in urls:
# 处理非http开头的url
if 'http' not in url:
try:
if ';' in url:
url = url.split(';')[0]
normalized = urllib.parse.urljoin(response.url.host, url)
defragmented, frag = urllib.parse.urldefrag(normalized)
if self.url_allowed(defragmented):
links.add(defragmented)
except:
logger.warning('Error url: ', url)
else:
links.add(url)
stat = FetchStatistic(
url=response.url,
next_url=None,
status=response.status,
exception=None,
size=len(body),
content_type=content_type,
encoding=encoding,
num_urls=len(links),
num_new_urls=len(links - self.seen_urls))
return stat, links
async def fetch(self, url, max_redirect):
"""Fetch one URL."""
tries = 0
exception = None
while tries < self.max_tries:
try:
# aiohttp session get
# allow_redirects = False, 不允许 302
response = await self.session.get(url, allow_redirects=False)
if tries > 1:
logger.info('try %r for %r success', tries, url)
break
except aiohttp.ClientError as client_error:
logger.info('try %r for %r raised %r', tries, url, client_error)
exception = client_error
tries += 1
else:
# We never broke out of the loop: all tries failed.
logger.error('%r failed after %r tries',
url, self.max_tries)
self.record_statistic(FetchStatistic(url=url,
next_url=None,
status=None,
exception=exception,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))
return
try:
if is_redirect(response):
# HTTP Head Location => next_url
location = response.headers['location']
next_url = urllib.parse.urljoin(url, location)
self.record_statistic(FetchStatistic(url=url,
next_url=next_url,
status=response.status,
exception=None,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))
# next_url 释放已经被爬取过了
if next_url in self.seen_urls:
return
if max_redirect > 0:
logger.info('redirect to %r from %r', next_url, url)
self.add_url(next_url, max_redirect - 1)
else:
logger.error('redirect limit reached for %r from %r',
next_url, url)
else:
try:
# parse_links 获得页面中所有的 url
# A -> URL -> queue
# root_url -> 对应页面中的所有URL都解析处理,然后进行一步爬取
stat, links = await self.parse_links(response)
self.record_statistic(stat)
for link in links.difference(self.seen_urls):
self.q.put_nowait((link, self.max_redirect))
self.seen_urls.update(links)
except Exception as e:
logger.warning(e)
finally:
res = await response.text()
# 释放连接回连接器
await response.release()
return res
async def work(self):
"""Process queue items forever."""
try:
while True:
url, max_redirect = await self.q.get()
assert url in self.seen_urls
res = await self.fetch(url, max_redirect)
# insert mongo,原始的HTML
print('res: ', res)
self.q.task_done()
except asyncio.CancelledError:
pass
def url_allowed(self, url):
if self.exclude and re.search(self.exclude, url):
return False
parts = urllib.parse.urlparse(url)
if parts.scheme not in ('http', 'https'):
logger.debug('skipping non-http scheme in %r', url)
return False
host, port = urllib.parse.splitport(parts.netloc)
if not self.host_okay(host):
logger.debug('skipping non-root host in %r', url)
return False
return True
def add_url(self, url, max_redirect=None):
"""Add a URL to the queue if not seen before."""
if max_redirect is None:
max_redirect = self.max_redirect
logger.debug('adding %r %r', url, max_redirect)
self.seen_urls.add(url)
# (http://xxx.com, 10)
self.q.put_nowait((url, max_redirect))
async def crawl(self):
"""Run the crawler until all finished."""
workers = [asyncio.Task(self.work(), loop=self.loop)
for _ in range(self.max_tasks)]
self.t0 = time.time()
# 等待 queue 的 url 被处理完则可
await self.q.join()
self.t1 = time.time()
# 释放
for w in workers:
w.cancel()
crawler = Crawler(['https://movie.douban.com/top250'], max_redirect=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(crawler.crawl())
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。