1 Star 0 Fork 4

abc-edb-fund/edb-nbs-rsc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
06-rsc-sheet-parse-E.py 15.31 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python 3.6.4
# -*- coding: utf-8 -*-
# @Time : 2018/9/14 12:28
# @Author : Orient
# @File : 06-rsc-sheet-parse-E.py
# @Software: PyCharm
from config import *
import xlrd,xlwt
cate_path = r'.\category\目录景气月报2018-04-30.xls'
file_code = '65'
class get_id_name_list(object):
"""docstring for delete_blank_row."""
def __init__(self):
super(get_id_name_list, self).__init__()
def file_data(self):
"""读取Excel源文件内容
Args:
self
Returns:
data
"""
index = excel_index()
# 读取各个文件的路径
with xlrd.open_workbook(cate_path) as cate_data:# todo 剥离读取cate_file到单独的类
data = cate_data.sheet_by_index(0)
file_path = data.cell_value(index.get_row_index(data, file_code),index.get_column_index(data,'file_path'))
# 读取各个sheet的数据
with xlrd.open_workbook(file_path) as file_data:
file_data_list = []
for i in range(file_data.nsheets):
file_data_list.append(file_data.sheet_by_index(i))
return file_data_list
# 过滤空行,获取行标题
def filter_blank_row(self):
file_data_list = self.file_data()
real_data_list = []
for file_data in file_data_list:
real_data = []
for i in range(file_data.nrows):
try:
r_value = list(file_data.row_values(i))
for i in range(len(r_value)):
try:
# 替换空格为空值
r_value[i] = r_value[i].replace(' ', '').replace('\n', '')
except AttributeError as e:
pass
else:
pass
finally:
pass
r_value_com = ''.join(r_value)
if r_value_com == '':
pass
elif '资料来源' in r_value_com:
pass
elif 'Source' in r_value_com:
pass
elif '注' in r_value_com:
pass
elif 'Note' in r_value_com:
pass
else:
real_data.append(r_value)
except TypeError as e:
pass
else:
pass
finally:
pass
real_data_list.append(real_data)
return real_data_list
# 分拆一个sheet有多个表格的情况
def real_data_split(self):
real_data_list = self.filter_blank_row()
id_name_area_info_list = []
sheet_code = 0
area_count = []
for real_data in real_data_list:
area_code = 0
head_row = 0
for data in real_data:
id_name_area_info = {}
if data[0] == '年':
area_code += 1
id_name_area_info['sheet_code'] = sheet_code
id_name_area_info['area_code'] = area_code
id_name_area_info['head_row'] = head_row
id_name_area_info_list.append(id_name_area_info)
head_row += 1
area_count.append(area_code)
sheet_code += 1
n = len(id_name_area_info_list)
for i in range(n):
id_name_area_info_list[i]['area_count'] = area_count[id_name_area_info_list[i]['sheet_code']]
if id_name_area_info_list[i]['area_count'] == 1:
sheet_code = id_name_area_info_list[i]['sheet_code']
id_name_area_info_list[i]['tail_row'] = len(real_data_list[sheet_code]) - 1
elif id_name_area_info_list[i]['area_count'] > 1:
if id_name_area_info_list[i]['area_code'] == id_name_area_info_list[i]['area_count']:
sheet_code = id_name_area_info_list[i]['sheet_code']
id_name_area_info_list[i]['tail_row'] = len(real_data_list[sheet_code]) - 1
else:
id_name_area_info_list[i]['tail_row'] = id_name_area_info_list[i + 1]['head_row'] - 1
return id_name_area_info_list
# 组合行标题,输出指标列表
def id_name_group(self):
"""组合行标题
Args:
self
Returns:
file_data
"""
# 行标题区域
id_name_area_info_list = self.real_data_split()
id_name_sheets = []
real_data_list = self.filter_blank_row()
for id_name_area_info in id_name_area_info_list:
sheet_code = id_name_area_info['sheet_code']
head_row = id_name_area_info['head_row']
tail_row = id_name_area_info['tail_row']
sheet_data = real_data_list[sheet_code]
id_name_area = sheet_data[head_row:tail_row + 1]
# 填充列
for id_name_area_info in id_name_area_info_list:
sheet_code = id_name_area_info['sheet_code']
head_row = id_name_area_info['head_row']
tail_row = id_name_area_info['tail_row']
sheet_data = real_data_list[sheet_code]
for data in sheet_data[head_row:tail_row + 1]:
# 填充行
for i in range(len(data[2:])):
if i == 0:
fst_data = data[2 + i]
elif i > 0:
if data[2+i]:
fst_data = data[2+i]
elif not data[2+i]:
data[2+i] = fst_data
c = len(sheet_data[head_row:tail_row - 1][0])
mainhead = []
for i in range(2,c):
id_name = ''
for data in sheet_data[head_row:tail_row - 2]:
id_name = f'{id_name}<{data[i]}>'
mainhead.append(id_name)
# 副标题处理
subhead = [sheet_data[tail_row - 1][2:],sheet_data[tail_row][2:]]
id_name_lists = []
for head in subhead:
id_name_list = []
for i in range(len(mainhead)):
id_name = f'{mainhead[i]}<{head[i]}>'
id_name_list.append(id_name)
id_name_lists.append(id_name_list)
id_name_sheets.append(id_name_lists)
return id_name_sheets
def write_back_to_model(self):
"""生成模板文件
Args:
self
Returns:
模型文件
"""
id_name_lists = self.id_name_group()
# 创建一个workbook 设置编码
workbook = xlwt.Workbook(encoding='utf-8')
# 创建一个worksheet
worksheet = workbook.add_sheet('sheet1')
# 写入行标题
sty = style_head()
style_h = sty.style_h()
style_d = sty.style_d()
row_name = ['code','id_name','target_id_code','target_id_name','fst_cate','sed_cate','file_name']
i = 0
for name in row_name:
worksheet.write(0, i, name, style_h)
i += 1
# 写入数据
i = 1
for id_name_list in id_name_lists:
for id_name in id_name_list:
for name in id_name:
worksheet.write(i, 0, i,style_d)
worksheet.write(i, 1, name,style_d)
i += 1
col_length =[8, 118, 20]
i = 0
for length in col_length:
# xlwt中是行和列都是从0开始计算的
sheet1col0 = worksheet.col(i)
sheet1col0.width = 256 * col_length[i]
i += 1
with xlrd.open_workbook(cate_path) as file_data:
sheet_data = file_data.sheet_by_index(0)
index = excel_index()
fst_cate = file_data.sheet_by_index(0).cell_value(index.get_row_index(sheet_data, file_code),
index.get_column_index(sheet_data, 'fst_cate'))
sed_cate = file_data.sheet_by_index(0).cell_value(index.get_row_index(sheet_data, file_code),
index.get_column_index(sheet_data, 'sed_cate'))
file_name = file_data.sheet_by_index(0).cell_value(index.get_row_index(sheet_data, file_code),
index.get_column_index(sheet_data, 'file_name'))
file_name = fst_cate + sed_cate+ file_name
file_path = f'./rsc-model-E/{file_name}.xls'
workbook.save(file_path)
test = get_id_name_list()
#test.real_data_split()
#test.id_name_group()
test.write_back_to_model()
from pandas.core.frame import DataFrame
import pandas as pd
class get_data_area(object):
"""docstring for get_data_area."""
def __init__(self):
super(get_data_area, self).__init__()
def file_data(self, i_base):
"""读取xls文件
Args:
self
Returns:
表格行数
表格列数
表格数据
"""
index = excel_index()
with xlrd.open_workbook(cate_path) as cate_data:# todo 剥离读取cate_file到单独的类
data = cate_data.sheet_by_index(0)
file_path = data.cell_value(index.get_row_index(data, file_code), index.get_column_index(data, 'file_path'))
data = xlrd.open_workbook(file_path)
sheet_base = data.sheet_by_index(i_base)
r = sheet_base.nrows
c = sheet_base.ncols
return r, c, sheet_base
def data_area(self, i_base):# todo 扩展:支持多种时间类型
"""识别表格数值区域
Args:
self
Returns:
起始行数,结尾行数
"""
quarter_tag = ['I', 'II', 'III', 'IV', 'I-II', 'I-III', 'I-IV']
r, c, sheet1 = self.file_data(i_base)
data_area = []
for i in range(r):
row_value = sheet1.row_values(i)
dict = {}
if row_value[0] == 2014.0 and row_value[1] in quarter_tag:
dict['head_row'] = i
dict['tail_row'] = i + 19
data_area.append(dict)
return data_area
def data_area_as_pd(self, i_base):
"""删除数值区域的空行
Args:
self, i_base
Returns:
"""
r, c, sheet1 = self.file_data(i_base)
data_area = self.data_area(i_base)
n = len(data_area)
data_list = []
for i in range(n):
real_data = []
for j in range(data_area[i]['head_row'],data_area[i]['tail_row']):
real_data.append(sheet1.row_values(j)[2:])
data = DataFrame(real_data)
data_list.append(data)
data_merge = pd.concat(data_list, axis=1)
return data_merge
#test = get_data_area()
#test.data_area_as_pd(0)
# 时间标准化
import re
class time_trans(get_data_area):# todo 扩展:支持多种时间类型
"""docstring for time_trans."""
def __init__(self, ):
super(time_trans, self).__init__()
def trans(self,i_base):
r, c, sheet1 = self.file_data(i_base)
data_area = self.data_area(i_base)
time_area = []
for j in range(data_area[0]['head_row'], data_area[0]['tail_row']):
time_area.append(sheet1.row_values(j)[0:2])
time_area = DataFrame(time_area)
i = 0
for time in time_area[0]:
if time:
fst_time = time
i += 1
elif not time:
time_area[0][i] = fst_time
i += 1
date_time = []
for i in range(len(time_area)):
q_dict = {
'^I$':'03-31',
'^II$': '06-30',
'^III$': '09-30',
'^IV$': '12-31',
}
year = int(time_area[0][i])
month = time_area[1][i]
for q in q_dict.items():
month = re.sub(q[0],q[1],month)
date_time.append(f'{year}-{month}')
return date_time
#test = time_trans()
#test.trans(0)
# 读取源文件为标准文件
class mk_format_file(time_trans,get_data_area):
"""docstring for mk_format_file."""
def __init__(self, ):
super(mk_format_file, self).__init__()
def model_parse(self):
target_id_list = []
with xlrd.open_workbook(r'./rsc-model-B/test.xls') as model_data:
model_sheet = model_data.sheet_by_index(0)
for i in range(1, model_sheet.nrows):
target_id_list.append(model_sheet.cell_value(i, 2))
return target_id_list
def format_file(self):
data_area = self.data_area_as_pd(0)
target_id_list = self.model_parse()
time_list =self.trans(0)
id_info = {}
workbook = xlwt.Workbook(encoding='utf-8')
worksheet = workbook.add_sheet('sheet1',cell_overwrite_ok=True)
# 写入行标题
# 设置行格式
style_head = xlwt.XFStyle()
# 设置居中
al = xlwt.Alignment()
al.horz = 0x02 # 设置水平居中
al.vert = 0x01 # 设置垂直居中
style_head.alignment = al
# 设置背景色
pattern = xlwt.Pattern()
pattern.pattern = xlwt.Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = xlwt.Style.colour_map['sea_green']
style_head.pattern = pattern
# 设置框线
borders = xlwt.Borders()
borders.left = 1
borders.left_colour = 0x09
borders.right = 1
borders.right_colour = 0x09
borders.top = 1
borders.top_colour = 0x09
borders.bottom = 1
borders.bottom_colour = 0x09
style_head.borders = borders
# 设置字体颜色
font = xlwt.Font()
font.name = u'Arial'
font.colour_index = 1
font.bold = True
style_head.font = font
# 设置数值区域格式
style_data = xlwt.XFStyle()
# 设置字体颜色
font = xlwt.Font()
font.name = u'微软雅黑'
font.colour_index = 0
font.bold = False
style_data.font = font
row_name = ['id_code', 'id_time', 'id_data']
i = 0
for name in row_name:
worksheet.write(0, i, name, style_head)
i += 1
k = 0
for i in range(len(target_id_list)):
for j in range(len(time_list)):
try:
id_info['id_code'] = target_id_list[i]
id_info['id_time'] = time_list[j]
id_info['id_data'] = float(data_area.iloc[j, i])
worksheet.write(k + 1, 0, id_info['id_code'], style_data)
worksheet.write(k + 1, 1, id_info['id_time'], style_data)
worksheet.write(k + 1, 2, id_info['id_data'], style_data)
k += 1
except ValueError as e:
pass
else:
pass
finally:
pass
col_length = [13, 12, 12]
i = 0
for length in col_length:
# xlwt中是行和列都是从0开始计算的
sheet1col0 = worksheet.col(i)
sheet1col0.width = 256 * col_length[i]
i += 1
workbook.save(r'.\rsc-format-file\test.xls')# todo 明确标准文件命名规则
#test = mk_format_file()
#test.format_file()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/edbmapping/edb-nbs-rsc.git
git@gitee.com:edbmapping/edb-nbs-rsc.git
edbmapping
edb-nbs-rsc
edb-nbs-rsc
master

搜索帮助