代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
import pymysql # 用于存储
from selenium import webdriver
from selenium.webdriver.common.by import By
import time # 用于延时
from parsel import Selector # 解析html
from datetime import datetime # 记录每条记录的存储时间,估算总耗时和平均耗时
import re # 用于短信号码和验证码提取
import requests # 用于短信验证码api请求
from retrying import retry # 验证码登录有一定几率失败,用于重试
from loguru import logger # 记录匹配不到的企业
import pandas as pd # 用于sql>excel以及省市区地址字段的拆分
import cpca # 省市区地址字段的拆分的一个现成轮子
# 初始化及配置信息
HOST = 'cq13292957303.mysql.rds.aliyuncs.com'
USER = 'qianqian'
PASSWORD = 'Chenqian1234'
DATABASE = 'test1'
UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
url = 'https://www.qcc.com/'
url_login = 'https://www.qcc.com/weblogin'
logger.add("qcc_runtime.log")
db = pymysql.connect(host=HOST, user=USER,
password=PASSWORD, database=DATABASE)
options = webdriver.ChromeOptions()
options.add_argument('user-agent=' + UA) # 修改请求头
options.add_experimental_option(
'useAutomationExtension', False) # 去掉那个bar..自动化提示
options.add_experimental_option('excludeSwitches', ['enable-automation']) # 同上
options.add_argument('blink-settings=imagesEnabled=false') # 不加载图像
browser = webdriver.Chrome(options=options)
browser.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { # 修改navi,反反爬检测
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
num = 1 # 用于记录数据条数
fail_number = 0 # 用于记录失败数据条数
cursor = db.cursor() # 建立数据库游标
browser.implicitly_wait(10) # 隐式等待10s
# 获取号码的api(米云平台)
url_mobilephone = 'http://api.miyun.pro/api/get_mobile?token=6ab95acdea93-257376&project_id=10572'
# 获取验证码的api
url_captcha = 'http://api.miyun.pro/api/get_message?token=6ab95acdea93-257376&project_id=10572&phone_num={phone}'
'''
sql = 'create table firm(name varchar(50) not null,\ # 公司名
address varchar(100) not null,\ # 地址
zczb varchar(50) not null,\ # 注册资本
represent varchar(50) not null,\ # 法人
self_danger varchar(50) not null,\ # 自身风险 建表语句,首次运行时使用
rel_danger varchar(50) not null,\ # 相关风险
hos_danger varchar(50) not null,\ # 历史风险
sensive varchar(50) not null,\ # 敏感舆情
tip varchar(50) not null,\ # 提示信息
creaTime datetime,\ # 数据创建时间
primary key(name)) # 主键
'
cursor.execute(sql)
db.commit()
db.close()
'''
def get_firms():
''' 比较并计算得到初始表与存储表中尚未存储的公司名称
return:
frims:尚未得到存储的公司名称,可以断点续传,及时保存。
'''
sql = " SELECT a.企业名称 from firm_origin a LEFT JOIN firm b on a.企业名称=b.name where b.name is null "
cursor.execute(sql)
db.commit()
data = cursor.fetchall()
firms = []
for i in data:
firms.append(i[0]) # 因为数据库中取出的数据是二维,所以要用[0]
return firms
def spider(firm_name):
'''爬虫的主体程序,引用了登录,解析,保存三个方法,一条龙服务
arg:
firm_name:公司名称
'''
global fail_number
browser.get(url=url)
try:
browser.find_element(
by=By.XPATH, value='//*[@name="key"]').send_keys(firm_name) # 尝试找到输入框,否则进入登录流程
except:
login()
time.sleep(10) # 登录完后歇个10s
try:
browser.find_element(
by=By.XPATH, value='//div[contains(@class,"once-modal")]//a[@class="close"]').click() # 如果遇见了弹窗,找到关闭按钮关之
except:
pass
browser.find_element(
by=By.XPATH, value='//*[@name="key"]').send_keys(firm_name) # 尝试找到输入框
time.sleep(2) # 睡眠2s
try:
browser.find_element(
by=By.XPATH, value='//*[contains(text(),"查一下")]').click() # 尝试点击搜索按钮,报错则刷新页面
except:
browser.refresh()
firm_page_name = browser.find_element(
by=By.XPATH, value='(//a[contains(@class,"title")])[1]//span').text # 寻找首个搜索出来的结果
firm_url = browser.find_element(
by=By.XPATH, value='(//a[contains(@class,"title")])[1]').get_attribute('href') # 找到链接
if firm_page_name == firm_name: # 如果搜索出来的结果与要寻找的名称一致
browser.get(url=firm_url)
html = browser.page_source
data = detail_parse(html) # 解析
save(data) # 保存
time.sleep(10) # 睡眠10s
else:
logger.info(f'{firm_page_name}:不是查询目标') # 记录匹配不到的企业名称
fail_number += 1
@retry(stop_max_attempt_number=3) # 报错重试装饰器,最大重试次数3
def login():
'''用快捷登录方式登录,手机号+验证码
'''
browser.get(url=url_login)
mobilephone = requests.get(url_mobilephone).text
phone = re.search(r'"mobile":"(\d+)', mobilephone).group(1)
logger.info(f'手机号是:{phone}')
browser.find_element(
by=By.XPATH, value='//div[@class="quick-login_wrapper"]//input[@name="phone-number"]').send_keys(phone) # 填写手机号
browser.find_element(
by=By.XPATH, value='//a[contains(text(),"获取验证码")]').click() # 点击获取验证码的按钮
for i in range(20): # 每个5s请求一次验证码,超过20次,引发报错,进入重试
password_api = requests.get(url_captcha.format(phone=phone)).text
password = re.search(r'"code":"(\d+)', password_api)
logger.info(f'密码是:{phone}')
if password: # 如果密码存在不为空,则跳出循环,输出密码
password = password.group(1)
break
time.sleep(5)
if i == 19:
raise Exception('验证码超时,即将重试!')
browser.find_element(
by=By.XPATH, value='//*[@placeholder="短信验证码"]').send_keys(password) # 填入验证码
browser.find_element(
by=By.XPATH, value='//*[contains(text(),"登录/注册")]/parent::button').click() # 点击登录
def detail_parse(html):
'''解析html,拿到目标数据
arg:
html:企业的详情页html
return:
data:企业的目标数据
'''
global num # 声明全局变量,用于在局部上下文修改
select = Selector(text=html)
address = select.xpath(
'//span[contains(text(),"地址")]//*[@class="copy-value"]//text()').get() # 地址
name = select.xpath( # 公司名称
'//h1//text()').get()
zczb = select.xpath( # 注册资本
'//td[contains(text(),"注册资本")]/following-sibling::td[1]//text()').get()
represent = select.xpath( # 法人
'//td[contains(text(),"法定代表人")]/following-sibling::td[1]/descendant::a[1]//text()').get()
self_danger = select.xpath( # 自身风险
'//*[contains(text(),"自身风险")]/following-sibling::div//text()').get()
rel_danger = select.xpath( # 相对风险
'//*[contains(text(),"关联风险")]/following-sibling::div//text()').get()
hos_danger = select.xpath( # 历史风险
'//*[contains(text(),"历史风险")]/following-sibling::div//text()').get()
sensive = select.xpath( # 敏感舆情
'//*[contains(text(),"敏感舆情")]/following-sibling::div//text()').get()
tip = select.xpath( # 提示信息
'//*[contains(text(),"提示信息")]/following-sibling::div//text()').get()
creaTime = datetime.now().strftime(
"%Y-%m-%d, %H:%M:%S") # 数据创建时间
data = [name, address, zczb, represent, self_danger,
rel_danger, hos_danger, sensive, tip, creaTime]
print(f'{num}:{data}')
num += 1
return data
def save(data):
'''保存数据至mysql中
arg:
data:目标数据
'''
try: # 插入数据,如果数据已经存在,就替换它
sql_save = 'insert into firm(name, address, zczb, represent, self_danger,rel_danger, hos_danger, sensive, tip,creaTime)\
values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) on duplicate key update name=%s, address=%s, \
zczb=%s, represent=%s, self_danger=%s,rel_danger=%s, hos_danger=%s, sensive=%s, tip=%s,creaTime=%s'
db.ping(reconnect=True) # 如果数据库没连上,就自动重连
cursor.execute(sql_save, data*2)
db.commit()
logger.info('data has saved')
except:
logger.info('data_save failed!')
def trans_toxlsx():
'''从数据库中抽出数据并处理地址列,然后转化为excel输出'''
sql_save = 'select * from firm'
cursor.execute(sql_save)
db.commit()
data = cursor.fetchall()
data = pd.DataFrame(data, columns=[
'公司名称', '地址', '注册资金', '法人', '自身风险', '关联风险', '历史风险', '敏感舆情', '提示信息', '创建时间'])
data.drop(['创建时间'], axis=1, inplace=True)
address = data['地址']
df = cpca.transform(address)
data['省'] = df['省']
data['市'] = df['市']
data['区'] = df['区']
data['详细地址'] = df['地址']
data.drop(['地址'], axis=1, inplace=True)
data.to_excel('企业企查查信息.xlsx', header=True, index=False)
def main():
'''方法调度'''
firms = get_firms()
if len(firms) > fail_number: # 如果还有公司未被爬取存储
for firm_name in firms:
spider(firm_name)
else:
trans_toxlsx()
print('all over!')
browser.close()
if __name__ == "__main__":
main()
# 测试结果:497个公司名称,2个公司找不到,剩余495个,总耗时2h13min,平均27s一个
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。