1 Star 0 Fork 0

linjian/getCGYX

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
data_cgyx.py 5.62 KB
一键复制 编辑 原始数据 按行查看 历史
linjian 提交于 2024-04-02 16:15 . 优化项目结构
import os
import requests
from bs4 import BeautifulSoup
from pip._vendor import requests
import public_params
import popup_util
import utils
table_list_result = list()
limit_page = -1
def get_data(sheet_title: str, input_value: int):
"""
请求数据
:param sheet_title: sheet标题
:return:
"""
global table_list_result, limit_page
url = 'https://www.ccgp-hubei.gov.cn:9040/quSer/search'
params = {
'queryInfo.type': 'cgyx',
'queryInfo.key': '',
'queryInfo.xmmc': '',
'queryInfo.cgdw': '',
'queryInfo.city': '宜昌市',
'queryInfo.qybm': 420582,
'queryInfo.district': '当阳市',
'queryInfo.je1': '',
'queryInfo.begin': '',
'queryInfo.end': '',
'queryInfo.pageNo': 0,
'queryInfo.pageSize': 15,
'queryInfo.pageTotle': 1
}
table_list_result = list()
# 主动限制条件(默认 -1 获取全部)
limit_page = input_value
headers = public_params.get_headers(public_params.get_session())
# print(headers)
while int(params['queryInfo.pageNo']) < int(params['queryInfo.pageSize']) and (
limit_page < 0 or int(params['queryInfo.pageNo']) < limit_page):
params['queryInfo.pageNo'] += 1
table_list = get_https_data(url, params, headers)
table_list_result.extend(table_list)
# 添加第一项标题
table_dic_first = dict()
table_dic_first['index'] = '序号(100)'
table_dic_first['title'] = '意向公告标题(800)'
table_dic_first['content'] = '采购项目(700)'
table_dic_first['amount'] = '项目金额(万元)(240)'
table_dic_first['time'] = '发布时间(240)'
table_dic_first['desc_address'] = '详情地址(1200)'
table_list_result.insert(0, table_dic_first)
# 导出到excel
# print(table_list_result)
public_params.export_to_excel(table_list_result, sheet_title)
def get_https_data(url: str, params: dict, headers: dict):
"""
获取具体的请求数据
:param url: 请求的地址
:param params: 请求携带的参数
:param headers: 请求头
:return: 请求后得到的数据列表
"""
os.environ['REQUESTS_CA_BUNDLE'] = "certifi/cacert.pem"
response = requests.post(url, data=params, headers=headers, verify=True)
response.encoding = 'utf-8'
html_content = response.text
# print(html_content)
# 获取到table
# soup = BeautifulSoup(html_content, 'html.parser')
# 采用 html5lib解析,容错率更高
soup = BeautifulSoup(html_content, 'html5lib')
# 获取总数
div_tag = soup.find("div", class_="serach-page-state")
# print(div_tag)
font_tag = div_tag.find('font', attrs={"color": "#106AB2"})
# print(font_tag)
total_num = font_tag.get_text()
total_num = int(utils.get_between_strings_regex(total_num, "“", "”"))
# 获取表单参数
form_tag = soup.find('form', id='noticeForm')
# print(form_tag)
child_array = form_tag.findChildren('input', type='hidden', recursive=False)
for child in child_array:
if child.has_attr('name'):
input_name = child['name']
input_value = child['value']
# print('input_name='+input_name)
# print('input_value='+input_value)
if 'queryInfo.pageNo'.__eq__(input_name):
input_value = int(input_value)
elif 'queryInfo.pageSize'.__eq__(input_name):
input_value = int(input_value)
elif 'queryInfo.pageTotle'.__eq__(input_name):
input_value = int(input_value)
params[input_name] = input_value
# print(params)
tables = soup.find_all('table')
# 字典
# table_dict = {"index": "", "title": "", "content": "", "amount": "", "time": "", "desc_address": ""}
table_dict = dict()
table_list = list()
current_page_num = params['queryInfo.pageNo']
page_size = params['queryInfo.pageSize']
total_page = params['queryInfo.pageTotle']
for table in tables:
rows = table.find_all('tr')
for row in rows:
# th为标题
# 获取具体列表内容
cells = row.find_all('td')
table_dict = dict()
for cellIndex in range(len(cells)):
cell = cells[cellIndex]
cellContent = cell.text
# print("cellIndex=" + str(cellIndex))
# print(cellContent)
if cellIndex == 0:
table_dict["index"] = (int(cellContent) + (current_page_num - 1) * page_size)
if cellIndex == 1:
table_dict["title"] = cellContent
if cellIndex == 2:
table_dict["content"] = cellContent
if cellIndex == 3:
table_dict["amount"] = cellContent
if cellIndex == 4:
table_dict["time"] = cellContent
if cellIndex == 5:
aContent = cell.find('a').get('href')
# print(aContent)
table_dict["desc_address"] = aContent
# print(table_dict)
if "index" in table_dict:
table_list.append(table_dict)
if limit_page > 0:
# print(limit_page)
if limit_page < total_page:
popup_util.update_progress(round(table_dict['index'] / (limit_page * page_size), 2))
else:
popup_util.update_progress(round(table_dict['index'] / total_num, 2))
else:
popup_util.update_progress(round(table_dict['index'] / total_num, 2))
return table_list
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/linjian7628/get-cgyx.git
git@gitee.com:linjian7628/get-cgyx.git
linjian7628
get-cgyx
getCGYX
master

搜索帮助