0 Star 3 Fork 0

gin/presto-etl

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
db_util.py 3.37 KB
一键复制 编辑 原始数据 按行查看 历史
gin 提交于 2019-02-17 16:06 . first commit
import os.path
import pymysql
import prestodb
import time
__presto_config_file = os.path.join(os.path.dirname(__file__), 'etl-presto.config')
__mysql_config_file = os.path.join(os.path.dirname(__file__), 'etl-mysql.config')
class Properties(object):
def __init__(self, file_name):
self.fileName = file_name
self.properties = {}
with open(self.fileName, 'r') as pro_file:
for line in pro_file.readlines():
line = line.strip().replace('\n', '')
if line.find("#") != -1:
line = line[0:line.find('#')]
if line.find('=') > 0:
strs = line.split('=')
strs[1] = line[len(strs[0]) + 1:]
self.properties[strs[0].strip()] = strs[1].strip()
def get_properties(self):
return self.properties
def to_string(self):
print(self.properties)
def get_presto_con():
config = Properties(__presto_config_file).get_properties()
return prestodb.dbapi.connect(
host=config['presto.host'],
port=config['presto.port'],
user=config['presto.user'],
catalog=config['presto.catalog']
)
def get_mysql_con():
config = Properties(__mysql_config_file).get_properties()
return pymysql.connect(
host=config['mysql.host'],
port=int(config['mysql.port']),
user=config['mysql.user'],
password=config['mysql.pass'],
db='bigdata',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor
)
# 批量一次执行presto-sql
def exec_presto_sqls(sqls):
with get_presto_con() as con:
cur = con.cursor()
if sqls.__len__() >= 1:
rs = []
for sql in sqls:
if sql and sql.strip() != '':
print("执行SQL:", sql)
cur.execute(sql.strip())
r = cur.fetchone()
print(r)
# rs = rs + cur.fetchall() # 返回sql文件中所有sql
# print(rs)
return rs
else:
return None
# 批量一次执行presto-sql
def exec_presto_sqls_sleep(sqls):
with get_presto_con() as con:
cur = con.cursor()
if sqls.__len__() >= 1:
rs = []
for sql in sqls:
if sql and sql.strip() != '':
print("执行SQL:", sql)
cur.execute(sql.strip())
r = cur.fetchone()
time.sleep(3)
print(r)
# rs = rs + cur.fetchall() # 返回sql文件中所有sql
# print(rs)
return rs
else:
return None
def exec_presto_sql(sql):
with get_presto_con() as con:
cur = con.cursor()
if sql and sql.strip() != '':
print("执行sql:", sql)
cur.execute(sql.strip())
r = cur.fetchone()
print("结果:", r)
return r
def exec_mysql_sql(sql):
if sql:
con = get_mysql_con()
try:
with con.cursor() as cur:
print("执行sql:", sql)
cur.execute(sql.strip())
con.commit()
r = cur.fetchone()
print("结果:", r)
return r
except:
con.rollback()
finally:
con.close()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/lonelygin/presto-etl.git
git@gitee.com:lonelygin/presto-etl.git
lonelygin
presto-etl
presto-etl
master

搜索帮助