1 Star 0 Fork 0

yulei/railway

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.py 9.19 KB
一键复制 编辑 原始数据 按行查看 历史
import random # 生成随机数的库
import time # 时间相关的库
import requests # 网络请求相关的库
import logging # 输出信息
import re # 正则表达式库
import urllib.request
from urllib.parse import urljoin # 做URL的拼接
from openpyxl import Workbook # 处理xlsx格式的Excel文件
# https://www.python51.com/jc/272.html,解决urlopen报错的问题
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
# 定义log级别和log的格式,时间+log级别+具体日志内容
logging.basicConfig(level=logging.INFO,
format='%(asctime)s-%(levelname)s:%(message)s')
BASE_URL = 'http://www.xiaguanzhan.com' # 站点根URL
PAGE_URL = 'http://www.xiaguanzhan.com/Pro.asp?classId=121' # 每个页对应的基础URL
# http://www.xiaguanzhan.com/Pro.asp?classId=107&Page=2
# 因此,是http://www.xiaguanzhan.com/Pro.asp?classId=107 + "&Page=" + 页码,表示每页内容
TOTAL_PAGE = 3 # 爬取的总页数
total_scrape_pages = 0
total_sucess_pages = 0
'''
本质就是通过URL去向服务器发出请求,服务器再把相关内容封装成一个Response对象返回给我们
这是通过requests.get()实现的。而我们获取到的Response对象下有四个常用的方法
status_code、content、text、encoding
'''
headers = {
'User-Agent': "Mozilla/5.2 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82"
}
def scrape_page_bak(url):
logging.info('scraping %s......', url) # 显示正在爬取什么网址
try:
response = requests.get(url)
response.encoding = 'gbk' # 设置编码格式,防止中文乱码
if response.status_code == 200: # 状态码200表示请求成功
return response.text # 返回文本内容
logging.error('get invalid status code %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrape_page(url):
# logging.info('scraping %s...', url) # 显示正在爬取什么网址
response = requests.get(url, headers = headers)
response.encoding = 'gbk' # 设置编码格式,防止中文乱码
if response.status_code == 200: # 状态码200表示请求成功
response.close()
return response.text # 返回文本内容
else:
logging.error('get invalid status code %s while scraping %s', response.status_code, url)
return None
def scrape_index(page):
if page == 0:
index_url = f'{PAGE_URL}' # 构造分页的URL
else:
index_url = f'{PAGE_URL}&Page={page}' # 构造分页的URL
logging.info('scraping %s...', index_url) # 显示正在爬取什么网址
return scrape_page(index_url) #调用页面信息爬取的函数,返回页面信息
def create_picture_url(url):
index_url = f'{BASE_URL}/{url}' # 构造分页的URL
return index_url
# <a href="ProView.asp?ProId=19297" target="_blank">和谐电1-0002 HXD1-0002 太局湖段</a>
def parse_index(html):
pattern = re.compile('<span class="bottombr"><a.*?href="(ProView.*?)".*?</a></span>') # 正则表达式编译为一个模式对象,这个匹配的是
items = re.findall(pattern, html) # 根据pattern,从html找那个匹配到所有的内容,返回一个列表
# print(items)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL,item)
# logging.info('get detail url %s ', detail_url)
yield detail_url
def scrape_detail(url):
return scrape_page(url)
def parse_detail(html):
picture_pattern = re.compile(
'<td.*?<a href="uploadfiles.*?"></a><img src="(.*?)" alt=.*?</td>', re.S)
info_pattern = re.compile(
'<P><FONT face=Verdana>(.*?)</FONT></P>.*?<P><FONT face=Verdana>(.*?)</FONT></P></td>', re.S)
# logging.info('get html %s', html)
'''
1. grop看着是切分字符串,类似于awk命令,group0是整个字符串,group1是第一个括号匹配的结果,没有括号时不能使用这个1
2. basic_info表示基本信息,shoot_info表示拍摄信息,通过(.*?)获取像要的项
'''
# 格式化对应的基本信息
basic_info = re.search(info_pattern, html).group(
1).strip() if re.search(info_pattern, html) else None
if basic_info is None:
return None
#basic_info = basic_info.replace("&nbsp", "") # 去掉空格
# 格式化对应的拍摄信息
shoot_info = re.search(info_pattern, html).group(
2).strip() if re.search(info_pattern, html) else None
if shoot_info is None:
return None
#shoot_info = shoot_info.replace(":", ":") # 替换中文括号
# print(shoot_info)
# 格式化对应的图片链接
picture_info = re.search(picture_pattern, html).group(
1).strip() if re.search(picture_pattern, html) else None
picture_info = create_picture_url(picture_info) # 创建picture的链接
# 需要具体切分方式根据上述的结果处理
basic_info_pattern = re.compile(
'机车名称:(.*?)<BR>车.*号:(.*?)<BR>生产厂商:(.*?)<BR>运行时速:(.*)<BR>轨.*距:(.*)', re.S)
shoot_info_pattern = re.compile(
'拍摄日期:(.*?)<BR>拍摄配属:(.*?)<BR>拍摄作者:(.*)', re.S)
# 提取对应的信息
name = re.search(basic_info_pattern, basic_info).group(
1).strip() if re.search(basic_info_pattern, basic_info) else None
number = re.search(basic_info_pattern, basic_info).group(
2).strip() if re.search(basic_info_pattern, basic_info) else None
factory = re.search(basic_info_pattern, basic_info).group(
3).strip() if re.search(basic_info_pattern, basic_info) else None
speed = re.search(basic_info_pattern, basic_info).group(
4).strip() if re.search(basic_info_pattern, basic_info) else None
distance = re.search(basic_info_pattern, basic_info).group(
5).strip() if re.search(basic_info_pattern, basic_info) else None
shoot_date = re.search(shoot_info_pattern, shoot_info).group(
1).strip() if re.search(shoot_info_pattern, shoot_info) else None
shoot_attach = re.search(shoot_info_pattern, shoot_info).group(
2).strip() if re.search(shoot_info_pattern, shoot_info) else None
shoot_author = re.search(shoot_info_pattern, shoot_info).group(
3).strip() if re.search(shoot_info_pattern, shoot_info) else None
return {
'name': name,
'number': number,
'factory': factory,
'speed': speed,
'distance': distance,
'shoot_date': shoot_date,
'shoot_attach': shoot_attach,
'shoot_author': shoot_author,
'picture_info': picture_info,
}
#定义数据存储函数
def save(inputData,outPutFile):
Lable = ['A', 'B', 'C', 'D', 'E', 'F', 'H', 'I', 'J']
wb = Workbook() # 抽象出一个excel工作簿
sheet = wb.active # 激活这个工作簿
sheet.title = "Sheet1"
item_0 = inputData[0] # 取一个数据
i = 0
for key in item_0.keys():
sheet[Lable[i] + str(1)].value = key # i=0时,处理的是sheet[A1],str是将1转成字符串拼接到了Lable[i]后边
i = i + 1
j = 1
for item in inputData:
k = 0
for key in item:
sheet[Lable[k] + str(j + 1)].value = item[key] # 依次填充B1 C1 D1 ...
k = k + 1 # 换行,B C D E ...
j = j + 1 # 换列 1 2 3
wb.save(outPutFile)
print('数据写入完毕!')
def data():
df = []
global total_scrape_pages
global total_sucess_pages
for page in range(1,TOTAL_PAGE):
pasue = random.random() * 10
time.sleep(pasue) # sleep一个随机值,防止被网站限制
index_html = scrape_index(page) # 第一个主页
if index_html is None:
continue
detail_urls = parse_index(index_html) # 匹配当前页的所有信息
detail_urls = list(detail_urls ) # 返回一个列表
# print(detail_urls) # 打印所有获取到的机车列表
logging.info('detail urls %s',detail_urls)
for i in range(len(detail_urls)):
detail_url = detail_urls[i]
detail_html = scrape_detail(detail_url)
total_scrape_pages = total_scrape_pages + 1
if detail_html is None:
continue
total_sucess_pages = total_sucess_pages + 1
data = parse_detail(detail_html)
# logging.info('get detail data %s', data)
if data is not None:
df.append(data) # 将每一条信息追加到df上
return df
if __name__ == '__main__':
df = data() # 获取数据
save(df,'data.xlsx') # 保存数据
print("total_scrape_pages:", total_scrape_pages)
print("total_sucess_pages:", total_sucess_pages)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/cnyulei/railway.git
git@gitee.com:cnyulei/railway.git
cnyulei
railway
railway
master

搜索帮助