代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
import json
# import pandas as pd
import requests
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning) # 消除FutureWarning提示
import datetime
import time
from tenacity import retry, stop_after_attempt, wait_fixed
import get_and_check_proxy_ip, get_request_conf, redis_client
def change_yi_wan(num):
if abs(num) > 100000000: # 处理负数
n = round(num/100000000, 2)
new_num = str(n) + '亿'
else:
n = round(num/10000)
new_num = str(n) + '万'
return new_num
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1)) # 每隔1妙重试一次,一共20次
def get_kpl_datas():
url = 'https://apphq.longhuvip.com/w1/api/index.php'
# 获取随机的header
headers = get_request_conf.get_random_header()
# print(headers)
params = {
'Index': '0',
'Order': '1',
'PhoneOSNew': '2',
'Type': '1',
'VerSion': '5.12.0.3',
'ZSType': '7',
'a': 'RealRankingInfo',
'apiv': 'w34',
'c': 'ZhiShuRanking',
'st': '80'
}
# 获取可用的代理ip
proxy_ip = get_and_check_proxy_ip.get_and_check_ip()
# 发送POST请求
response = requests.post(url, params=params, headers=headers, proxies=proxy_ip, timeout=5)
# 将编码设置为当前编码
response.encoding = response.apparent_encoding
# 解析JSON数据
data = json.loads(response.text)
# print(data)
top_code_list = []
data_json = {}
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f'-----------------> {now_time} <-----------------')
print('板块代码', ' ', '板块名称', ' ', '板块强度', ' ','净额\t\t', ' ', '成交额')
n = 10
for item in data.get('list'):
# print(item)
bankuaiId = item[0]
top_code_list.append(bankuaiId)
bk_name = item[1]
strong = item[2]
mainPure = item[6]
mainPure = change_yi_wan(int(mainPure))
mainAll = item[5]
mainAll = change_yi_wan(int(mainAll))
data_json[bankuaiId] = {'bk_name': bk_name, 'strong': strong, 'mainPure': mainPure, 'mainAll': mainAll}
if n > 1:
if len(bk_name) < 3:
bk_name = bk_name + '\t'
if len(mainPure) < 5:
mainPure = mainPure + '\t'
print(bankuaiId, '\t', bk_name, '\t', strong, '\t', mainPure, '\t', mainAll)
# print(bankuaiId, '\t', bk_name, '\t', strong, '\t', mainPure, '\t', mainAll)
n -= 1
print('---------------------------------------------------------')
# print(data_json)
return top_code_list, data_json
# 根据索引位置,获取强度榜不同数据
@retry(stop=stop_after_attempt(20), wait=wait_fixed(1)) # 每隔1妙重试一次,一共20次
def get_kpl_with_page(index=0, page_num=80):
url = 'https://apphq.longhuvip.com/w1/api/index.php'
headers = get_request_conf.get_random_header()
params = {
# 'Date': '2024-01-05', # 可选参数,根据日期查询强度数据
'Index': '0',
'Order': '1',
'PhoneOSNew': '2',
'Type': '1',
'VerSion': '5.13.0.3',
'ZSType': '7',
'a': 'RealRankingInfo',
'apiv': 'w35',
'c': 'ZhiShuRanking',
'st': '20'
}
params['Index'] = index
params['st'] = page_num
# 获取可用的代理ip
proxy_ip = get_and_check_proxy_ip.get_and_check_ip()
# 发送POST请求
response = requests.post(url, params=params, headers=headers, proxies=proxy_ip, timeout=3)
# 将编码设置为当前编码
response.encoding = response.apparent_encoding
# 解析JSON数据
data = json.loads(response.text)
return data
# 根据接口返回数据,循环遍历组装数据
def do_for(resp_data, now_time, top_code_list, data_json):
for item in resp_data.get('list'):
bankuaiId = item[0]
top_code_list.append(bankuaiId)
bk_name = item[1]
strong = item[2]
mainPure = item[6]
# mainPure = change_yi_wan(int(mainPure))
mainAll = item[5]
# mainAll = change_yi_wan(int(mainAll))
bk_datas = []
if now_time not in data_json: # 这个判断没有意义
time_data = []
time_data.append(now_time)
time_data.append(strong)
time_data.append(mainPure)
time_data.append(mainAll)
bk_datas.append(time_data)
data_json[bankuaiId] = {'bk_name': bk_name, 'bk_datas': bk_datas}
# print(data_json)
return top_code_list, data_json
# 获取强度榜某一时刻所有的数据,共240条,分3次获取
def get_all_strong_datas():
top_code_list = []
data_json = {}
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
i = 0
top_10 = []
while i < 240:
# print(i)
resp_data = get_kpl_with_page(index=i, page_num=80)
top_code_list, data_json = do_for(resp_data=resp_data, now_time=now_time, top_code_list=top_code_list, data_json=data_json)
# 获取强度榜单前10板块id
if i == 0:
if len(top_code_list) > 10:
top_10 = top_code_list[:10]
else:
top_10 = top_code_list
i+=80
# print(type(data_json)) # 如果不存储数据,可以给接口出数据了,只不过没有历史数据
return data_json, top_10
# 实时将当日强度数据存入redis,hash_name=当天日期
# 1、检查有没有已有的,没有直接薪增
# 2、已有则,新数据追加进字典(按板块颗粒度)
# 3、写入redis
def save_strong_datas_in_redis():
now_date = datetime.datetime.now().strftime("%Y-%m-%d") # 当天日期,作为hash_name
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
# 判断是否处于开盘期间,不是就不退出,不做任何操作
# if now_time > now_date+' 15:00' or now_date+' 12:00' < now_date < now_date+' 13:01' or now_date < now_date+' 9:30':
# print('当前非开盘时间,不存储任何数据')
# return
client = redis_client.Redis_Client()
data_json, top_10 = get_all_strong_datas()
print(top_10)
# 存入强度榜top 10 数据,主键是top,与板块id同级
client.save_hash(hash_name=now_date, hash_key="top10", hash_value=top_10)
for hash_key, val in data_json.items(): # 遍历dict需要加上items()
# 根据板块代码主键,获取新增的list
new_list = val['bk_datas'][0]
# 根据板块代码主键,获取redis已存数据
old_data = client.get_hash_2_dict(hash_name=now_date, hash_key=hash_key)
# 判断是否为空
if old_data:
# 判断新增数据是否在数据列表中,根据分钟数据判断
# print(new_list[0], old_data['bk_datas'][-1][0])
if new_list[0] != old_data['bk_datas'][-1][0]:
old_data['bk_datas'].append(new_list)
save = client.save_hash(hash_name=now_date, hash_key=hash_key, hash_value=old_data)
# print(f'新增数据:{hash_key}:{new_list} 写入成功。')
else:
client.save_hash(hash_name=now_date, hash_key=hash_key, hash_value=val)
# print(f'初始数据:{val} 写入成功。')
print('数据写入完毕。')
# 关闭redis连接池
client.close_redis_pool()
def get_all_strong_datas_from_redis(some_date, bk_num):
client = redis_client.Redis_Client()
data_dict = client.get_hash_2_dict(some_date, bk_num)
client.close_redis_pool()
return data_dict
def make_graph(title, x, y1, y2, y3):
import matplotlib.pyplot as plt
plt.title(f'{title}')
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示汉字
plt.xlabel('时间') # x轴标题
plt.ylabel('差值') # y轴标题
plt.plot(x, y1, marker='o', markersize=3) # 绘制折线图,添加数据点,设置点的大小
plt.plot(x, y2, marker='o', markersize=3)
# plt.plot(x, y3, marker='o', markersize=3)
for a, b in zip(x, y1):
plt.text(a, b, b, ha='center', va='bottom', fontsize=10) # 设置数据标签位置及大小,数据太多就密密麻麻,不好看
for a, b in zip(x, y2):
plt.text(a, b, b, ha='center', va='bottom', fontsize=10)
# for a, b in zip(x, y3):
# plt.text(a, b, b, ha='center', va='bottom', fontsize=10)
plt.legend(['强度', '板块净额']) # 设置折线名称
plt.show() # 显示折线图 多图会阻塞
if __name__ == '__main__':
# d = get_kpl_datas()
# d = get_kpl_with_page()
# d = get_all_strong_datas()
d = save_strong_datas_in_redis()
# now_date = datetime.datetime.now().strftime("%Y-%m-%d") # 当天日期,作为hash_name
# # now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
# # if now_time > now_date + ' 15:03': # 可以直接判断
# # print('fuck')
# print(now_date)
# data_dict = get_all_strong_datas_from_redis('2024-01-09', '801735')
# print(data_dict)
# x = []
# y1 = []
# y2 = []
# y3 = []
# for item in data_dict['bk_datas']:
# x.append(item[0])
# y1.append(item[1])
# y2.append(float(item[2]))
# y3.append(float(item[3]))
# make_graph(data_dict['bk_name'], x, y1, y2, y3)
# l = [['2024-01-08 17:09', 3590, -23232, 4234234234], '2024-01-08 17:10']
# if '2024-01-08 17:10' in l:
# print(type('2024-01-08 17:10'))
# d = get_proxy_ip()
# print(len('火电灵活改造'))
# print(len('NMN'))
# print(len('港口'))
# i = 0
# while i < 240:
# print(i, )
# i+=80
# l = [[1,2], [2,3]]
# l2 = [[1,2,2]]
# l2.append(l)
# print(l2)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。