1 Star 0 Fork 0

banjiaojuhao/cds-downloader

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
query-executor.py 7.28 KB
一键复制 编辑 原始数据 按行查看 历史
banjiaojuhao 提交于 2021-05-30 11:45 . format output time usage
import base64
import json
import logging
import os.path
import time
import redis
import requests
import logger as my_log
import series_key
# redisKeyNewTaskId incremental id for next uploaded grib file(and task id)
redisKeyNewTaskId = "client-backend:query:new-task-id"
# redisKeyInsertTaskListAll [set]task id list
redisKeyInsertTaskListAll = "client-backend:query:task-list:all"
# redisKeyInsertTaskListDispatched hash(task_id:heartbeat) of dispatched tasks, need to be watched
redisKeyInsertTaskListDispatched = "client-backend:query:task-list:dispatched"
# redisKeyInsertTaskListWait list(task_id) of task need to dispatch
redisKeyInsertTaskListWait = "client-backend:query:task-list:wait"
# redisKeyInsertTaskStateMessage hash(task_id:json_str_message) for task state
# keys including size, progress, status message
redisKeyInsertTaskStateMessage = "client-backend:query:task-state:message"
my_log.init()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
rdb = redis.Redis()
redisGetTaskStr = """
-- KEYS[1]: task-list:all
-- KEYS[2]: task-list:dispatched
-- KEYS[3]: task-state:message
-- ARGV[1]: task-id
local stateStr = redis.call("hget", KEYS[3], ARGV[1])
if stateStr == false
then
-- return when task is deleted
return ""
end
local stateObject = cjson.decode(stateStr)
local state = stateObject["state"]
if state ~= 0
then
-- task state changed by other routine(may be paused by user)
-- insert executor need to fetch other tasks
return ""
else
-- when state is set to running by this routine, task can be executed.
stateObject["state"] = 1
stateObject["message"] = "task has been dispatched"
stateStr = cjson.encode(stateObject)
redis.call("hset", KEYS[3], ARGV[1], stateStr)
return stateStr
end
"""
redisGetTask = rdb.register_script(redisGetTaskStr)
# ignore pause
redisUpdateStateStr = """
-- KEYS[1]: task-list:all
-- KEYS[2]: task-list:dispatched
-- KEYS[3]: task-state:message
-- ARGV[1]: task-id
-- ARGV[2]: progress
-- ARGV[3]: message
-- ARGV[4]: state
-- ARGV[5]: csv-result-path
if redis.call("SISMEMBER", KEYS[1], ARGV[1]) == 1
then
local stateStr = redis.call("hget", KEYS[3], ARGV[1])
local stateObject = cjson.decode(stateStr)
stateObject["progress"] = tonumber(ARGV[2])
stateObject["message"] = ARGV[3]
stateObject["state"] = tonumber(ARGV[4])
if ARGV[5] ~= nil
then
stateObject["finish_csv_path"] = ARGV[5]
end
stateStr = cjson.encode(stateObject)
redis.call("hset", KEYS[3], ARGV[1], stateStr)
return "continue"
else
return "delete"
end
"""
redisUpdateState = rdb.register_script(redisUpdateStateStr)
def query(task_state: dict) -> None:
file_start_time = time.time()
start_key = series_key.get_series_key_by_id(
task_state['variable'], task_state['pressure'], task_state['start_time'])
stop_key = series_key.get_series_key_by_id(
task_state['variable'], task_state['pressure'], task_state['stop_time'])
response = requests.get("http://localhost:8081/query", params={
"start_key": base64.b64encode(start_key).decode(),
"stop_key": base64.b64encode(stop_key).decode(),
})
if response.status_code != 200:
logger.error(f"failed to get datanode of data block: [{response.status_code}] {response.text}")
return
datanode_list = response.text.split(",")
datanode_list = list(set(datanode_list))
csv_title = "longitude,latitude,value,timestamp"
title_len = len(csv_title)
response_csv = csv_title
for i, datanode_ip in enumerate(datanode_list):
progress = i * 100 / len(datanode_list)
message = f'query from {datanode_ip}'
state = 1
# action = "continue"
action = redisUpdateState(
keys=[redisKeyInsertTaskListAll,
redisKeyInsertTaskListDispatched,
redisKeyInsertTaskStateMessage],
args=[task_state['id'], progress, message, state]).decode()
logger.info(action)
if action == "delete":
logger.info(f"task deleted before finish")
break
logger.info(message)
download_start_time = time.time()
url = f"http://{datanode_ip}:8082/query"
start = time.time()
response = requests.get(url, params={
"variable": task_state['variable'],
"pressure": task_state['pressure'],
"start_time": task_state['start_time'],
"stop_time": task_state['stop_time'] + 1,
"lat_start": task_state['lat_start'],
"lat_stop": task_state['lat_stop'],
"long_start": task_state['long_start'],
"long_stop": task_state['long_stop'],
})
print(response.status_code, response.content)
print(time.time() - start)
response_csv += response.text[title_len:]
finish_task_time = time.time()
csv_path = os.path.join(output_csv_path, f'{task_state["id"]}.csv')
with open(csv_path, 'w') as f:
f.write(response_csv)
if action == "delete":
pass
elif action == "continue":
message = f"query finished, used {finish_task_time - file_start_time:.2f} seconds"
logger.info(message)
state = 3
progress = 100
redisUpdateState(
keys=[redisKeyInsertTaskListAll,
redisKeyInsertTaskListDispatched,
redisKeyInsertTaskStateMessage],
args=[task_state['id'], progress, message, state, csv_path])
def main():
while True:
task_id = rdb.blpop(redisKeyInsertTaskListWait)
task_id = task_id[1].decode()
logger.info(f"[1/2] got task {task_id}")
task_state = redisGetTask(
keys=[redisKeyInsertTaskListAll, redisKeyInsertTaskListDispatched,
redisKeyInsertTaskStateMessage],
args=[task_id]).decode()
if task_state == "":
logger.info(f'task {task_id} may be delete')
continue
logger.info(f"[2/2] got task {task_id}")
try:
query(json.loads(task_state))
except Exception as err:
message = f"query failed, err: {err}"
logger.error(message)
state = 2
progress = 0
redisUpdateState(
keys=[redisKeyInsertTaskListAll,
redisKeyInsertTaskListDispatched,
redisKeyInsertTaskStateMessage],
args=[task_id, progress, message, state])
def test():
# insert {'variable': 'Temperature', 'pressure_level': '1', 'timestamp': 283996800}
query({
'id': '1',
'progress': 0,
'message': "",
'state': 0,
'variable': series_key.variable_name2id['Temperature'],
'pressure': series_key.pressure_name2id['1'],
'start_time': 283996800,
'stop_time': 283996800,
'lat_start': 0,
'lat_stop': 10,
'long_start': 0,
'long_stop': 10
})
output_csv_path = "/usr/share/nginx/html/download"
if __name__ == '__main__':
main()
if not os.path.isdir(output_csv_path):
os.mkdir(output_csv_path)
print(f'result will write to {output_csv_path}')
# test()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/banjiaojuhao/cds-downloader.git
git@gitee.com:banjiaojuhao/cds-downloader.git
banjiaojuhao
cds-downloader
cds-downloader
master

搜索帮助