代码拉取完成,页面将自动刷新
#!/usr/bin/python
# -*-coding:utf-8-*-
"""微信公众号历史文章获取"""
import asyncio
from pyppeteer import launch
import logging
from pyppeteer.errors import TimeoutError
import re
import common
# 插入数据库
import sys
import requests
import json
from pyppeteer_stealth import stealth
import qrcode
import zxing
import pandas as pd
import numpy as np
import time
import pymongo
import os
try:
mongo_collection = pymongo.MongoClient(os.getenv('MONGOD_HOST'), os.getenv(
'MONGOD_PORT')) # 如 mongodb://admin:123456@192.168.221.128:27017
mongo_collection.server_info()
except Exception as e:
print("数据库配置异常,请检查...")
sys.exit()
def draw(type=1):
"""
终端打印二维码 1:账号密码登录 2:直接登录
:return:
"""
reader = zxing.BarCodeReader()
barcode_url = reader.decode('qrcode.png')
# 拆分 截取
if type == 1:
print("账号密码登录")
str_url = re.split("='", str(barcode_url))
good_url = re.split("',", str_url[1])
else:
print("直接扫码登录")
str_url = re.split("='", str(barcode_url))
good_url = re.split("',", str_url[1])
# 终端显示二维码
qr = qrcode.QRCode(version=1)
qr.add_data(good_url[0])
# invert=True白底黑块,有些app不识别黑底白块.
qr.print_ascii(invert=True)
# 保存数据
async def save(data):
common.saveData(data, "data_tencent")
# 过滤html标签
async def filterHtml(html):
p = re.compile('(?!<(img|p).*?>)<.*?>')
return p.sub("", html)
class Tencent:
# 查询网站
def __init__(self):
mongo_db = mongo_collection['article']
mongo_collection_model = mongo_db['article']
# 获取代理
with open("proxy.txt", "r") as f:
l = f.readlines()
self.proxy = []
for line in l:
line = line.strip() # 去掉每行头尾空白
self.proxy.append(str(line))
# 登录成功后获取
self.token = 0
self.cookies = ''
# 是否已登录
self.is_login = 0
self.keyword_url = "https://mp.weixin.qq.com/"
self.user = "674514904@qq.com"
self.pwd = "wkf674514904"
asyncio.get_event_loop().run_until_complete(self.main())
# request请求
fakeid = ["MzkwNzIxMjQwMQ==","MzA4NTYwNjQxMg==","MzI1MDE4NjAxNA==","MzkxNDM4NDA4OQ=="]
self.fakeid_dic = {"MzU1NDAxMzIzOA==": "男士穿搭技巧", "MzkwNzIxMjQwMQ==": "男装搭配潮人", "MzA4NTYwNjQxMg==": "男士穿搭博主",
"MzI1MDE4NjAxNA==": "男士穿衣搭配杂志", "MzkxNDM4NDA4OQ==": "一棵图集"}
for i in range(0, len(fakeid)):
begin = 0
while begin <= 25:
is_end = self.wx_request(fakeid[i], begin)
print(is_end)
begin += 5
if is_end == 0:
break
def wx_request(self, fakeid, begin=0, count=5):
print("进入request获取")
print(self.token)
print(self.cookies)
mongo_db = mongo_collection['article']
mongo_collection_model = mongo_db['man']
url = "https://mp.weixin.qq.com/cgi-bin/appmsg"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_16_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3542.0 Safari/537.36',
'cookie': self.cookies,
}
proxies = {'http': self.proxy[np.random.randint(0, 11)]}
print(proxies)
dda = []
# 配置网址
data = {
"token": self.token, # 需要定期修改
"lang": "zh_CN",
"f": "json",
"ajax": "1",
"action": "list_ex",
"begin": begin,
"count": count,
"query": "",
"fakeid": fakeid,
"type": "9",
}
dda.append(data)
title = []
link = []
time1 = []
for i in range(0, len(dda)):
time.sleep(np.random.randint(20, 40))
# 获取历史
content_json = requests.get(url, headers=headers, params=dda[i], proxies=proxies, verify=False,
timeout=3).json()
print("爬取成功第" + str(i) + "个")
print(content_json)
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}
try:
app_msg_list = content_json["app_msg_list"]
except:
# 查询禁止了切换代理
proxies = {'http': self.proxy[np.random.randint(0, 11)]}
content_json = requests.get(url, headers=headers, params=dda[i], proxies=proxies, verify=False).json()
print("第二次" + str(content_json))
# 返回了一个json,里面是每一页的数据
for it in content_json["app_msg_list"]: # 提取信息
info = {}
title.append(it["title"]) # 标题
link.append(it["link"]) # 链接
time1.append(it['create_time']) # 时间
# 保存到mongodb
info = {'title': it["title"], 'link': it["link"], 'time1': it['create_time'], 'status': 0,
'type': "微信公众号", 'article_create': it['create_time'], "fakeid_name": self.fakeid_dic[fakeid]}
ret2find = mongo_collection_model.find_one({"title": it["title"], "link": it["link"]})
print(ret2find)
if ret2find is None:
print("不存在数据")
mongo_collection_model.insert_one(info)
cnt = content_json['app_msg_cnt']
get_cnt = len(content_json['app_msg_list']) + begin
print(cnt)
print(get_cnt)
# {'app_msg_cnt': 104, 'app_msg_list': [], 'base_resp': {'err_msg': 'ok', 'ret': 0}}
if int(cnt) == int(get_cnt) or len(content_json['app_msg_list']) == 0 or get_cnt>=20:
return 0
else:
return 1
async def main(self, type=1):
# userDataDir='./userdata',
browser = await launch(headless=True, userDataDir='./userdata', args=['--disable-infobars', '--no-sandbox'],
dumpio=True)
page = await browser.newPage()
await stealth(page) # <-- Here
await page.goto(self.keyword_url)
# 查询是否已登录成功
# .weui-desktop-account__nickname
print(await page.title())
title = await page.title()
print(await browser.pages())
if title == "微信公众平台":
# 需要登录
self.is_login = 0
else:
# 已经登录
self.is_login = 1
if self.is_login == 0:
if type == 1:
# 账号密码登录
# 切换到账号密码登录
await page.click(
"#header > div.banner > div > div > div.login__type__container.login__type__container__scan > a")
await asyncio.sleep(2)
await page.type(
'#header > div.banner > div > div > div.login__type__container.login__type__container__account > form > div.login_input_panel > div:nth-child(1) > div > span > input',
self.user)
await page.type(
'#header > div.banner > div > div > div.login__type__container.login__type__container__account > form > div.login_input_panel > div:nth-child(2) > div > span > input',
self.pwd, {'delay': 120})
await page.click(".btn_login")
# 扫码登录
await asyncio.sleep(1)
await asyncio.sleep(1)
await page.waitForSelector(".weui-desktop-qrcheck__img-area")
# 保存二维码验证身份图片
img = await page.querySelector('.weui-desktop-qrcheck__img-area>img')
await img.screenshot(path='qrcode.png')
# 终端打印二维码进行扫码
draw(1)
else:
# 扫码登录
await asyncio.sleep(1)
await page.waitForSelector(".login__type__container__scan__qrcode")
# 保存二维码验证身份图片
img = await page.querySelector('.login__type__container__scan__qrcode')
await img.screenshot(path='qrcode.png')
# 终端打印二维码进行扫码
draw(2)
while True:
await page.waitForSelector(".weui-desktop-logo__inner")
# 检查是否已二维码确认
try:
logo_html = await page.querySelectorAllEval(".weui-desktop-logo__inner",
'nodes => nodes.map(node => node.innerHTML)')
print(logo_html)
await page.screenshot(path='login.png')
self.is_login = 1
# 跳出循环
print("跳出循环")
break
except:
print("继续循环")
self.is_login = 0
await page.reload() # reload() 刷新页面 node.hrefhttps://mp.weixin.qq.com/cgi-bin/settingpage?t=setting/index&action=index&token=1593978713&lang=zh_CN'
# 截图
await page.screenshot(path='img.png')
print("登录成功 状态:" + str(self.is_login))
nickname_info = await page.querySelectorAllEval(".weui-desktop-account__nickname",
'nodes => nodes.map(node => node.href)')
print(nickname_info)
# 解析token
print("正在解析token...")
token_info = re.split("&token=", nickname_info[0])
token = re.split("&", token_info[1])[0]
print("token=" + str(token))
self.token = token
# 获取js cookile
js_page_cookie = await page.evaluate("""
function get_cookie(){
return document.cookie}
""")
print("js_page_cookie:{}".format(js_page_cookie))
print("---------------------")
# 获取并保存cookies
page_cookie = await page.cookies()
cookies = ''
for cookie in page_cookie:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
self.cookies = js_page_cookie + ";" + cookies
# 关闭
await page.close()
await browser.close()
# 测试打开
Tencent()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。