代码拉取完成,页面将自动刷新
同步操作将从 Jemmy/COVID-19_notify 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/local/bin/python3
# _*_ coding: utf-8 _*_
# 持久化
import pymysql
import Config
import time
import json
from kafka import KafkaProducer
import redis
class PersistenceMysql():
def __init__(self):
self.host = Config.PERSISTENCE["mysql"]["host"]
self.username = Config.PERSISTENCE["mysql"]["user"]
self.password = Config.PERSISTENCE["mysql"]["password"]
self.database = Config.PERSISTENCE["mysql"]["database"]
self.table_name = Config.PERSISTENCE["mysql"]["table_name"]
self.conn = pymysql.connect(host=self.host, user=self.username,
password=self.password, database=self.database, charset="utf8")
self.insert_sql = "insert into " + self.table_name + \
"(create_time,value) values(%d,%s)"
self.block_sql = ""
@staticmethod
def pingpingping(self):
# todo: 检查表是否存在,不存在,则新建
pass
def item_serialize(self, item):
return json.dumps(item)
def get_current_time(self):
return int(time.time())
# 保存感染者到访小区信息
def save_infected_blocks(self, item):
sql = "insert into inflected_block(id, p_name, c_name, county_name, block_name, source_url, city_lng, city_lat, block_lng, block_lat) values({},'{}','{}','{}','{}','{}',{},{},{},{})".format(item.get("id"),item.get("province"),item.get("city"),item.get("district"),item.get("community"),item.get("sourceurl"),item.get("citylong"),item.get("citylat"),item.get("communitylong"),item.get("communitylat"))
cursor = self.conn.cursor()
try:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
print(e)
print("插入小区数据 {}:{}:{}:{}:{} 失败".format(item.get("id"),item.get("p_name"),item.get("c_name"),item.get("county_name"),item.get("block_name")))
# 获取表item总数
def get_infected_block_count(self):
sql = "select count(*) from inflected_block"
cursor = self.conn.cursor()
cursor.execute()
return cursor.fetchall()[0][0]
class KafkaWork():
def __init__(self):
self.topic = Config.PERSISTENCE["kafka"]["topic"]
self.server = Config.PERSISTENCE["kafka"]["server"]
self.producer = KafkaProducer(bootstrap_servers=self.server)
def send(self, key=None, value=None):
future = self.producer.send(
self.topic, value=bytes(value, encoding="utf8"))
try:
meta = future.get(timeout=10)
self.on_success(meta)
except Exception as e:
print(e)
print("kafka 通知失败")
return False
def on_success(self, meta):
# print("topic:{} offset: {} 发送成功!".format(meta.topic,meta.offset))
return True
class PersistenceRedis():
def __init__(self):
self.tip_key = "wxopensvr:tip"
self.country_key = "wxopensvr:country"
self.city_key = "wxopensvr:city"
self.tc_total_key = "wxopensvr:tc:total"
self.tc_add_key = "wxopensvr:tc:add"
self.tc_day_key = "wxopensvr:tc:day_list"
self.tc_day_add_key = "wxopensvr:tc:day_add_list"
self.host = Config.PERSISTENCE["redis"]["host"]
self.port = Config.PERSISTENCE["redis"]["port"]
self.index = Config.PERSISTENCE["redis"]["index"]
self.password = Config.PERSISTENCE["redis"]["password"]
self.conn = redis.Redis(
host=self.host, port=self.port, password=self.password, db=self.index)
# 保存提示
def save_tips(self, tip):
self.conn.set(self.tip_key, tip)
# 保存各个国家的数据
def save_countries(self, ll):
pipe = self.conn.pipeline()
self.conn.delete(self.country_key)
for i in ll:
pipe.zadd(self.country_key, {json.dumps(i): float(i.get("confirmedCount")})
pipe.execute()
# 保存我国各个城市的数据
def save_cities(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.city_key)
for i in ll:
pipe.zadd(self.city_key, {json.dumps(i): float(i.get("confirmedCount")})
pipe.execute()
# 保存腾讯新闻总数
def save_tc_total(self, item):
self.conn.set(self.tc_total_key, item)
# 保存腾讯新闻“较昨日新增”
def save_tc_add(self, item):
self.conn.set(self.tc_add_key, item)
# 保存腾讯新闻“疫情趋势”
def save_tc_day_list_item(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.tc_day_key)
for i in ll:
pipe.zadd(self.tc_day_key, {json.dumps(i): float(
int("".join(i.get("date").split("."))))})
pipe.execute()
# 保存腾讯新闻“疫情趋势新增”
def save_tc_day_list_add_item(self, ll):
pipe=self.conn.pipeline()
pipe.delete(self.tc_day_add_key)
for i in ll:
pipe.zadd(self.tc_day_add_key, {json.dumps(
i): float(int("".join(i.get("date").split("."))))})
pipe.execute()
if __name__ == "__main__":
k=KafkaWork()
k.send("央视新闻", json.dumps({"name": "标题", "content": "内容"}))
k.send("央视新闻", json.dumps({"name": "title", "content": "content"}))
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。