1 Star 0 Fork 0

abc-edb-fund/edb_mapping_customs

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
customs.py 19.42 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/6/29 15:05
# @Author : Orient
# @File : customs_type2.py
# @Software: PyCharm
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from config import *
import xlrd
import xlwt
import time
import re
url = 'http://114.55.107.24:8088/index'
usr = 'lidongfang'
pwd = '123456'
modelurl = 'http://114.55.107.24:8088/model/manage'
joburl = 'http://114.55.107.24:8088/job/manage'
datasource = '海关总署'
title = input('title: ')
currency = input('currency: ')
# title: 2018年3月进出口商品关别总值表(人民币)
# indicator_frequency: 月
my_conn = MongoConnCT()
cursor = conn_data_center()
class Mapping():
def _init_(self, title, data, row,col,format_time):
self.name = title
self.data = data
self.row = row
self.col = col
self.FormatTime = format_time
def getTitles(self):
data = MongoConnCT().db['edb_mapping_table'].distinct('title',{'data_source':datasource})
titles = []
for item in data:
titles.append(item)
# 解析title
title_prase = []
pattern = r'[0-9]{4}年[0-9]{1,2}月'
for title in titles:
#title特征初始化
dict = {}
dict['title'] = title
dict['datetime'] = re.match(pattern,title).group(0)
dict['type'] = re.sub(pattern,"[0-9]{4}年[0-9]{1,2}月",title)
dict['type'] = re.sub(r'\(|\)|\(|\)','.*',dict['type'])
title_prase.append(dict)
# 提取最新一期title
title_filter = []
type_list = []
for item in title_prase:
if item['type'] in type_list:
for atom in title_filter:
if atom['type'] == item['type']:
if item['datetime'] > atom['datetime']:
atom['title'] = item['title']
atom['datetime'] = item['datetime']
atom['type'] = item['type']
else:
pass
else:
type_list.append(item['type'])
title_filter.append(item)
n = 1
for item in title_filter:
print(item)
#获取原始数据
def getRawData(self):
data = MongoConnCT().db['edb_mapping_table'].find({'title':title},{'_id' : 0, 'title': 1, 'data': 1})
for item in data:
return item
#标题配置
def titleConfig(self):
global title
title_sub = title
pattern = r'[0-9]{1,4}年[0-9]{1,2}月'
for symbol in dict_regex.items():
title_sub = title_sub.replace(symbol[0],symbol[1])
title_reg = re.sub(pattern,'[0-9]{1,4}年[0-9]{1,2}月',title_sub)
return title_reg
#行指标配置
def rowRaw(self):
data = self.getRawData()
row_head = ''
row_tail = ''
row = []
for cell in data['data']:
if cell['row'] == 1 and cell['column'] >= 1:
row_head += (cell['text']+',')*cell['colSpan']
if cell['row'] == 2 and cell['column'] >= 0:
row_tail += cell['text']+','
# 多行组合
#sub = ''
for i in range(len(row_head.split(',')) - 1):
sub = '%s%s' % (row_head.split(',')[i], row_tail.split(',')[i])
row.append(sub)
#print(sub)
return row
def rowConfig(self):
row = self.rowRaw()
#表格标准化
pattern = r'[0-9]{1,2}'
row_config = []
#行指标配置正则化
for item in row:
for symbol in dict_regex.items():
item = item.replace(symbol[0], symbol[1])
sub = re.sub(pattern,'[0-9]{1,2}',item)
row_config.append('%s$'%sub)
#print(f'{sub}$')
return row_config
#标准名称
def rowStd(self):
row = self.rowRaw()
# 行指标标准名称
row_std = []
for item in row:
for sym in dict_std.items():
item = re.sub(sym[0], sym[1],item)
row_std.append(item)
#print(item)
return row_std
#列指标配置
def colRaw(self):
data = self.getRawData()
col_head = []
col_tail = []
col = []
for cell in data['data']:
if cell['row'] >= 3 and cell['column'] == 0:
col_head.append(cell['text'])
for i in range(len(col_head)):
dict = {}
dict['head'] = col_head[i]
dict['tail'] = ''
col.append(dict)
return col
def colConfig(self):
col = self.colRaw()
col_com = []
for item in col:
col_sub = '%s%s'%(item['head'],item['tail'])
for sym in dict_regex.items():
if sym[0] in col_sub:
col_sub = col_sub.replace('r' + sym[0],sym[1])
#print(col_sub)
col_com.append(col_sub)
return col_com
def colStd(self):
col = self.colRaw()
col_com = []
for item in col:
col_sub = '%s%s' % (item['head'], item['tail'])
for sym in dict_regex.items():
if sym[0] in col_sub:
col_sub = re.sub(sym[0], '\\' + sym[0], col_sub)
# print(col_sub)
col_com.append(col_sub)
def indicatorPrase(self):
data = self.getRawData()
rowStd = self.rowStd()
colStd = self.colRaw()
nrow = len(rowStd)
ncol = len(colStd)
# 来源指标名
resource_indicators = []
indicator_prase = []
for i in range(ncol):
for j in range(nrow):
dict = {}
dict['id_name'] = colStd[i]['head']+ colStd[i]['tail'] + ":" + rowStd[j]
dict['id_name_from_row'] = rowStd[j]
dict['id_name_from_col'] = colStd[i]['head']+ colStd[i]['tail']
resource_indicators.append(dict)
# 来源指标名解析
# 初始化
num = len(resource_indicators)
for i in range(num):
dict = {}
id_name = resource_indicators[i]['id_name']
id_name_tail = resource_indicators[i]['id_name_from_col']
dict['resource_indicator_name'] = id_name
dict['keyword_head'] = ''
dict['keyword_tail'] = id_name_tail
dict['value_type'] = ''
#dict['indicator_unit'] = ''
dict['indicator_frequency'] = '月'
#头部关键字
for item in dict_head:
if item in id_name:
dict['keyword_head'] = item
break
#尾部关键字
#dict['keyword_tail'] = id_name_tail
#数值类型
for item in value_type:
if item in id_name:
dict['value_type'] = item
for cell in data['data']:
if cell['row'] == 0 and cell['column'] == 0:
dict['unit'] = cell['text'].replace('单位:','')
if '数量' in id_name:
dict['type'] = '数量'
for cell in data['data']:
if cell['text'] == id_name_tail:
r = cell['row']
for cell in data['data']:
if cell['row'] == r and cell['column'] == 1:
dict['unit'] = cell['text']
else:
dict['type'] = '金额'
if '比' in dict['value_type']:
dict['unit'] = '%'
else:
pass
indicator_prase.append(dict)
#print(dict)
return indicator_prase
def addNewIndicator(self):
dt = self.indicatorPrase()
new_id = []
for item in dt:
dict = {}
keyword_head = item['keyword_head']
if item['value_type'] == '当月值':
value_type = '当期值'
else:
value_type = item['value_type']
keyword_tail = item['keyword_tail']
type = item['type']
unit = item['unit']
dict['indicator_name_cn'] = '%s:%s金额:目的地:%s'%(keyword_tail,keyword_head,value_type)
dict['indicator_name_en'] = ''
dict['area'] = '亚洲'
dict['country'] = '中国'
dict['province'] = ''
dict['city'] = ''
dict['county'] = ''
dict['frequency'] = '月'
dict['unit'] = unit
dict['data_source'] = '海关总署'
dict['main_name'] = '%s:%s金额:目的地'%(keyword_tail,keyword_head)
dict['value_type'] = value_type
new_id.append(dict)
return new_id
def creadTable(self):
new_id = self.addNewIndicator()
workbook = xlwt.Workbook(encoding='ascii')
style0 = xlwt.easyxf('font: name 宋体, color-index black')
# 设置模型识别页
sheet1 = workbook.add_sheet('Sheet1', cell_overwrite_ok=True)
sheet1.write(0, 0, '指标中文名称', style0)
sheet1.write(0, 1, '指标英文名称', style0)
sheet1.write(0, 2, '统计区域', style0)
sheet1.write(0, 3, '国家', style0)
sheet1.write(0, 4, '省份', style0)
sheet1.write(0, 5, '地级市', style0)
sheet1.write(0, 6, '县区', style0)
sheet1.write(0, 7, '指标频率', style0)
sheet1.write(0, 8, '指标单位', style0)
sheet1.write(0, 9, '数据源名称', style0)
sheet1.write(0, 10, '主指标名', style0)
sheet1.write(0, 11, '值类型', style0)
for i in range(len(new_id)):
sheet1.write(i+1, 0, new_id[i]['indicator_name_cn'], style0)
sheet1.write(i+1, 1, new_id[i]['indicator_name_en'], style0)
sheet1.write(i+1, 2, new_id[i]['area'], style0)
sheet1.write(i+1, 3, new_id[i]['country'], style0)
sheet1.write(i+1, 4, new_id[i]['province'], style0)
sheet1.write(i+1, 5, new_id[i]['city'], style0)
sheet1.write(i+1, 6, new_id[i]['county'], style0)
sheet1.write(i+1, 7, new_id[i]['frequency'], style0)
sheet1.write(i+1, 8, new_id[i]['unit'], style0)
sheet1.write(i+1, 9, new_id[i]['data_source'], style0)
sheet1.write(i+1, 10, new_id[i]['main_name'], style0)
sheet1.write(i+1, 11, new_id[i]['value_type'], style0)
now = time.strftime("%d%m%Y%H%M%S")
filepath = 'D:/json/CUSTOMS/配置文件/value批量导入模板' + now + '.xls'
workbook.save(filepath)
def currPrase(self):
global currency
curr = ''
if currency == '0':
curr = '人民币'
elif currency == '1':
curr = ''
return curr
def getTargetID(self):
data = self.getRawData()
indicator_parse = self.indicatorPrase()
target_indicator = []
# curr = self.currPrase()
for item in indicator_parse:
keyword_head = item['keyword_head']
keyword_tail = item['keyword_tail']
for sym in dict_regex.items():
if sym[0] in keyword_tail:
keyword_tail = keyword_tail.replace('r'+ sym[0],sym[1])
if item['value_type'] == '当月值':
value_type = '当期值'
else:
value_type = item['value_type']
frequency = item['indicator_frequency']
curr = self.currPrase()
unit = item['unit']
type = item['type']
sql = '''
select a.id,a.indicator_name from edb_indicator_value a
join edb_indicator_category b on a.category_id = b.category_id
where indicator_name like "%s"
and indicator_name REGEXP "%s"
and indicator_frequency like "%s"
and indicator_unit = '%s'
and b.category_id = '10004003073'
AND a.status = '0'
order by id
''' % (keyword_tail+'%', keyword_tail + '\:?' + keyword_head + '金额\:?目的地\:?' +value_type,frequency,unit)
#print(sql)
result = cursor.execute(sql)
row = cursor.fetchall()
target = {}
target['resource_indicator_name'] = item['resource_indicator_name']
if result == 1:
target['target_id'] = row[0][0]
target['target_name'] = row[0][1]
else:
target['target_id'] = ''
target['target_name'] = ''
target_indicator.append(target)
print(target)
return target_indicator
def isElementExist(self,xpath):
global driver
try:
driver.find_element_by_xpath(xpath)
return True
except:
return False
# 登录作业管理界面
def login(self):
global driver
driver = webdriver.Chrome()
driver.get(url)
driver.find_element_by_xpath('//form/input[1]').clear()
driver.find_element_by_xpath('//form/input[1]').send_keys(usr)
driver.find_element_by_xpath('//form/input[2]').clear()
driver.find_element_by_xpath('//form/input[2]').send_keys(pwd)
driver.find_element_by_xpath('//*[@id="login"]').click()
WebDriverWait(driver, 20, 0.5).until(EC.presence_of_element_located((By.CLASS_NAME, 'J_menuItem')))
driver.get(joburl)
return driver
def uploadConfig(self):
global title
driver = self.login()
titleConfig = self.titleConfig()
rowConfig = self.rowConfig()
rowStd = self.rowStd()
colConfig = self.colConfig()
colRaw = self.colRaw()
data = self.indicatorPrase()
targeteIndicator = self.getTargetID()
driver.find_element_by_xpath('//*[@id="jobname"]').clear()
driver.find_element_by_xpath('//*[@id="jobname"]').send_keys(title)
driver.find_element_by_xpath('/html/body/div[1]/div/div/div/div[1]/div[13]/button').click()
xpath = '//*[@id="exampleTable"]/tbody/tr/td[10]/a[1]'
if self.isElementExist(xpath):
time.sleep(1)
driver.find_element_by_xpath(xpath).click()
driver.find_element_by_xpath('//*[@id="modelName"]').clear()
driver.find_element_by_xpath('//*[@id="modelName"]').send_keys(titleConfig)
time.sleep(2)
# driver.execute_script(js, data_type)
js = 'document.getElementById("datatype").title="海关总署,";'
driver.execute_script(js)
js = 'document.getElementById("datatype").value="海关总署,";'
driver.execute_script(js)
driver.find_element_by_xpath('//*[@id="exampleTable"]/tbody/tr[4]/td[2]/div[1]/input').clear()
driver.find_element_by_xpath('//*[@id="exampleTable"]/tbody/tr[4]/td[2]/div[1]/input').send_keys(titleConfig)
row_config = driver.find_elements_by_name('targetConfig')
col_config = driver.find_elements_by_name('columnTargetConfig')
#调整行列指标配置单元格数目
for i in range(3 - len(row_config)):
js = 'document.getElementById("targetConfigAdd").click()'
driver.execute_script(js)
for i in range(3 - len(col_config)):
js = 'document.getElementById("columnTargetConfigAdd").click()'
driver.execute_script(js)
row_config = driver.find_elements_by_name('targetConfig')
col_config = driver.find_elements_by_name('columnTargetConfig')
#填写行列指标配置
for i in range(3):
row_config[i].clear()
row_config[i].send_keys(rowConfig[i])
for i in range(3):
col_config[i].clear()
col_config[i].send_keys(colConfig[i])
#填写指标最少匹配数
driver.find_element_by_id('indexMatching').clear()
driver.find_element_by_id('indexMatching').send_keys('3')
driver.find_element_by_xpath('//*[@id="tab-1"]/a').click()
# 调整标准名称单元格数目
standard_name = driver.find_elements_by_css_selector('.table-box.standardName')
add_box = driver.find_element_by_id('standardNameAdd')
for i in range(len(rowConfig) - len(standard_name)):
js = '$(arguments[0]).click()'
driver.execute_script(js,add_box)
# 填写标准名称
for i in range(len(rowConfig)):
xpathnum = i + 3
xpath = '//*[@id="exampleTable2"]/tbody/tr[%s]/td[2]/div[1]/input'%xpathnum
driver.find_element_by_xpath(xpath).clear()
driver.find_element_by_xpath(xpath).send_keys(rowStd[i])
for i in range(len(rowConfig)):
xpathnum = i + 3
xpath = '//*[@id="exampleTable2"]/tbody/tr[%s]/td[2]/table/tbody/tr[1]/td[2]/div[1]/input'%xpathnum
driver.find_element_by_xpath(xpath).clear()
driver.find_element_by_xpath(xpath).send_keys(rowConfig[i])
#查找列标签个数
driver.find_element_by_xpath('//*[@id="tab-2"]/a').click()
#res = driver.find_elements_by_xpath('//*[@id="exampleTable3"]/tbody/tr')
#n = len(res)
#构造目标指标名xpath
count = 0
for j in range(len(colRaw)):
xpath = '//*[@id="exampleTable3"]/tbody/tr[%s]/td[1]/div'%(j + 2)
resource = driver.find_element_by_xpath(xpath)
resource_id_name = data[j*9]['keyword_tail']
for sym in dict_regex.items():
if sym[0] in resource_id_name:
resource_id_name = resource_id_name.replace('r'+sym[0],sym[1])
js = '$(arguments[0]).text("%s")' % (resource_id_name)
driver.execute_script(js, resource)
print(data[j*8]['keyword_tail'])
for i in range(len(colConfig)):
for j in range(len(rowConfig)):
m = i + 2
n = j + 2
xpath = '//*[@id="exampleTable3"]/tbody/tr[%s]/td[2]/div[%s]/div[2]'%(m,n)
target = driver.find_element_by_xpath(xpath)
id = targeteIndicator[count]['target_id']
name = targeteIndicator[count]['target_name']
if isinstance(id, int):
sub = '%s[%s]' % (name, id)
js0 = '$(arguments[0]).attr("data-id", %s)' % id
js1 = '$(arguments[0]).attr("data-title","%s")' % name
js2 = '$(arguments[0]).text("%s")' % sub
driver.execute_script(js0, target)
driver.execute_script(js1, target)
driver.execute_script(js2, target)
else:
pass
count += 1
def getProfile(self):
file_path = 'C:/Users\dfli.abcft\Desktop/01-EDB项目组/17-EDB自生产/03-海关总署/edb000041-EDB表格聚类梳理汇总-海关总署.xlsx'
with xlrd.open_workbook(file_path) as data:
table = data.sheet_by_index(5)
title = table.col_values(2)
#模型分类
model_type = read_as_int(table.col_values(3))
#自生产测试
job_status = read_as_int(table.col_values(4))
#币种
currency = read_as_int(table.col_values(5))
return title,model_type,job_status,currency
test = Mapping()
#test.rowRaw()
#test.rowConfig()
#test.rowStd()
#test.colRaw()
#test.colConfig()
#test.colStd()
#test.indicatorPrase()
#test.creadTable()
#test.getTargetID()
test.uploadConfig()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/edbmapping/edb_mapping_customs.git
git@gitee.com:edbmapping/edb_mapping_customs.git
edbmapping
edb_mapping_customs
edb_mapping_customs
TYPE5

搜索帮助