代码拉取完成,页面将自动刷新
同步操作将从 Jemmy/COVID-19_notify 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/local/bin/python3
# _*_ coding: utf-8 _*_
import Config
from util import ColorUtil,RedisConn
from notify import DingDing,ServerChan
from persistence import PersistenceRedis,KafkaWork,PersistenceMysql
import schedule
import requests
from bs4 import BeautifulSoup
import js2py
import time
import json
class DingXiang():
def __init__(self):
self.source_url = "https://3g.dxy.cn/newh5/view/pneumonia?from=groupmessage&isappinstalled=0"
self.run_time = 0
self.schedule_minutes = 3 # 定时器每5分钟执行一次
self.sleep_time = 5
self.dingding = DingDing()
self.server_chan = ServerChan()
self.cache = RedisConn()
self.soup = None
self.persis_redis = PersistenceRedis()
self.persis_kafka = KafkaWork()
def get_soup(self):
res = requests.get(self.source_url)
if res.status_code != 200:
print("{}获取源数据错误!{}".format(ColorUtil.get_red(),ColorUtil.get_reset()))
return
res.encoding = "utf-8"
soup = BeautifulSoup(res.text,"lxml")
self.soup = soup
def get_info_internal(self,id):
data = self.soup.find_all(id=id)
if len(data) == 0:
print("{}获取数据错误!{}".format(ColorUtil.get_red(),ColorUtil.get_reset()))
return None
return js2py.eval_js("var a = " + data[0].text.replace("try { window." + id + " =","").replace("}catch(e){}","").strip())
# 获取腾讯新闻中的地图数据
def get_daily_info(self):
res = requests.get("https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5&callback=jQuery34109910769782031732_1580626695799&_=1580626695800")
if res.status_code != 200:
print("获取疫情新增人数失败")
return None
idx = res.text.find("(")
if idx == -1:
print("疫情新增无数据")
return None
try:
data = json.loads(json.loads(res.text[idx+1:-1])["data"])
return data
except Exception as e:
print(e)
return None
# 获取实时播报信息
def get_source(self):
resource = self.get_info_internal("getTimelineService").to_list()
# resouce 是一个列表,每一个元素是包含实时播报信息字典
# response[0] = {
# 'createTime': 1580048835000,
# 'id': 356,
# 'infoSource': '人民日报',
# 'modifyTime': 1580048835000,
# 'provinceId': '42',
# 'provinceName': '湖北省',
# 'pubDate': 1580047711000,
# 'pubDateStr': '18分钟前',
# 'sourceUrl': 'http://m.weibo.cn/2803301701/4465106090076043',
# 'summary': '武汉市市长周先旺:截至今天凌晨,武汉累计报告618例,已经治愈出院40例,死亡45例,目前在院治疗533例,重症87例,危重53例,都在定点医院接受隔离治疗。',
# 'title': '武汉新型肺炎死亡病例已有45人,已经治愈出院40例'
#}
print("{}第{}次刷新,本次获取数据{}条{}\n".format(ColorUtil.get_green_with_back(),self.run_time,len(resource),ColorUtil.get_reset()))
return resource
# 获取国家数据
def get_country(self):
resource = self.get_info_internal("getListByCountryTypeService2").to_list()
# resource[0] = {
# "cityName": "",
# "comment": "",
# "confirmedCount": 15,
# "continents": "亚洲",
# "countryType": 2,
# "createTime": 1580027704000,
# "curedCount": 1,
# "deadCount": 0,
# "id": 953,
# "modifyTime": 1580495779000,
# "operator": "xuyt",
# "provinceId": "6",
# "provinceName": "日本",
# "provinceShortName": "",
# "sort": 0,
# "suspectedCount": 0,
# "tags": ""
# }
return resource
# 获取提示信息
def get_tip(self):
resource = self.get_info_internal("getStatisticsService").to_dict()
# resource = {
# 'abroadRemark': '',
# 'confirmedCount': 11844,
# 'countRemark': '',
# 'createTime': 1579537899000,
# 'curedCount': 246,
# 'dailyPic': 'https://img1.dxycdn.com/2020/0201/693/3394145745204021706-135.png',
# 'deadCount': 259,
# 'deleted': False,
# 'generalRemark': '疑似病例数来自国家卫健委数据,目前为全国数据,未分省市自治区等',
# 'id': 1,
# 'imgUrl': 'https://img1.dxycdn.com/2020/0201/450/3394153392393266839-135.png',
# 'infectSource': '野生动物,可能为中华菊头蝠',
# 'modifyTime': 1580536761000,
# 'passWay': '经呼吸道飞沫传播,亦可通过接触传播',
# 'remark1': '易感人群: 人群普遍易感。老年人及有基础疾病者感染后病情较重,儿童及婴幼儿也有发病',
# 'remark2': '潜伏期: 一般为 3~7 天,最长不超过 14 天,潜伏期内存在传染性',
# 'remark3': '',
# 'remark4': '',
# 'remark5': '',
# 'summary': '',
# 'suspectedCount': 17988,
# 'virus': '新型冠状病毒 2019-nCoV'
# }
return resource
# 获取省份数据
def get_city(self):
resource = self.get_info_internal("getAreaStat").to_list()
return resource
# 定时器
def scheduler(self):
schedule.every(self.schedule_minutes).minutes.do(self.task)
while 1:
schedule.run_pending()
# 加入已读缓存
def set_is_read(self,id):
return self.cache.add(id) == 1
# 检查是否在已读缓存中
def check_is_read(self,id):
return self.cache.check_is_in(id)
# 发送通知
def notify(self,item):
res1 = True
res2 = True
if Config.NOTIFY_OPTION["dingding"]:
res1 = self.dingding.send(item)
if res1:
print("{}ID:{} Content:【{}】{} 钉钉通知成功".format(ColorUtil.get_green(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
else:
print("{}ID:{} Content:【{}】{} 钉钉通知失败".format(ColorUtil.get_red(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
if Config.NOTIFY_OPTION["server_chan"]:
res2 = self.server_chan.send(item)
if res2:
print("{}ID:{} Content:【{}】{} Server酱通知成功".format(ColorUtil.get_green(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
else:
print("{}ID:{} Content:【{}】{} Server酱通知失败".format(ColorUtil.get_red(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
if Config.NOTIFY_OPTION["kafka"]:
self.persis_kafka.send(item.get("id"),json.dumps(item))
return res1 and res2
# 持久化
def persistence(self):
if Config.NOTIFY_OPTION["redis"]:
# 国家信息
self.persis_redis.save_countries(self.get_country())
# 中国城市信息
self.persis_redis.save_cities(self.get_city())
# 提示信息
tips = self.get_tip()
self.persis_redis.save_tips(json.dumps(tips))
# 腾讯新闻
news = self.get_daily_info()
self.persis_redis.save_tc_total(json.dumps(news.get("chinaTotal")))
self.persis_redis.save_tc_add(json.dumps(news.get("chinaAdd")))
self.persis_redis.save_tc_day_list_item(news.get("chinaDayList"))
self.persis_redis.save_tc_day_list_add_item(news.get("chinaDayAddList"))
# 将"01.30"变成130 "10.01"变成1001
def from_date_to_int(self,d):
return int("".join(d.split(".")))
# 定时任务
def task(self):
self.get_soup()
self.persistence()
# 实时播报信息
resource = self.get_source()
sorted(resource,key=lambda s:s["id"],reverse=True) # 根据ID倒序
for item in resource:
# 1. 检查是否在已读缓存中
if self.check_is_read(item.get("id")):
print("{}ID:{} Content:【{}】{} 已在缓存中\n".format(ColorUtil.get_blue_with_back(),item.get("id"),item.get("title"),ColorUtil.get_reset()))
break # 根据ID倒序,最新的已经被已读,那么旧的肯定也被已读
# 2. 发送通知,并且加入已读缓存
if self.notify(item):
if self.set_is_read(item.get("id")):
print("{}{}加入已读缓存成功{}".format(ColorUtil.get_green(),item.get("id")),ColorUtil.get_reset())
else:
print("{}{}加入缓存失败!{}".format(ColorUtil.get_red_with_back(),item.get("id"),ColorUtil.get_reset()))
time.sleep(self.sleep_time) # 同批每次通知之间间隔5秒钟
self.run_time += 1
# 入口
def run(self):
print("{}爬虫启动...{}".format(ColorUtil.get_green(),ColorUtil.get_reset()))
self.scheduler()
# 通过cron执行时的入口
def cron_run():
self.task()
class InfectedBlocks():
def __init__(self):
self.pers_mysql = PersistenceMysql()
# 获取信息
def get_block_infos(self):
url = "https://www.xiaoyusan.com/activity/pneumonia/infectedcommunities"
res = requests.post(url)
if res.status_code != 200:
print("获取感染者到访小区数据失败")
return None
data = res.json().get("data")
def run(self):
data = self.get_block_infos()
if data is None:
return
count = self.pers_mysql.get_infected_block_count()
if len(data) == count:
print("数据没变")
return
else:
for i in data:
self.pers_mysql.save_infected_blocks(i)
print("感染者到访小区数据 加载完毕")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。