代码拉取完成,页面将自动刷新
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import io
import os
import sys
import argparse
from sqlalchemy.schema import MetaData
from sqlalchemy.engine import create_engine
from sqlacodegen.codegen import CodeGenerator
from tornado.options import define
abs_file = os.path.abspath(sys.argv[0])
ROOT_PATH = abs_file[:abs_file.rfind('/')]
define('ROOT_PATH', ROOT_PATH)
sys.path.insert(0, '/Users/leeyi/workspace/py3/trest')
from trest.utils import func
from trest.db.dbalchemy import DBConfigParser
class ModelGenerator(CodeGenerator):
template = '''\
#!/usr/bin/env python
# -*- coding: utf-8 -*-
{imports}
from trest.db import Model as Base
{models}
'''
def render(self, outfile):
rendered_models = []
for model in self.models:
if isinstance(model, self.class_model):
rendered_models.append(self.render_class(model))
elif isinstance(model, self.table_model):
rendered_models.append(self.render_table(model))
imports = self.render_imports()
imports = imports.replace('from sqlalchemy.ext.declarative import declarative_base', '')
output = self.template.format(
imports=imports,
metadata_declarations=self.render_metadata_declarations(),
models=self.model_separator.join(rendered_models).rstrip('\n'))
# print(output, file=outfile)
fout = open(outfile, 'w', encoding='utf8')
# 写入文件内容
fout.write(output)
# 关闭文件
fout.close()
class ServiceGenerator:
template = '''\
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from trest.utils import utime
from trest.logger import SysLogger
from trest.config import settings
from trest.exception import JsonError
from applications.common.models.{classname_underline} import {classname}
class {classname}Service(object):
@staticmethod
def page_list(where, page, per_page):
"""列表记录
Arguments:
where dict -- 查询条件
page int -- 当前页
per_page int -- 每页记录数
return:
Paginate 对象 | None
"""
query = {classname}.Q
if 'id' in where.keys():
query = query.filter({classname}.id == where['id'])
if 'title' in where.keys():
query = query.filter({classname}.title == where['title'])
if 'status' in where.keys():
query = query.filter({classname}.status == where['status'])
else:
query = query.filter({classname}.status != -1)
pagelist_obj = query.paginate(page=page, per_page=per_page)
if pagelist_obj is None:
raise JsonError('暂无数据')
return pagelist_obj
@staticmethod
def get(id):
"""获取单条记录
[description]
Arguments:
id int -- 主键
return:
{classname} Model 实例 | None
"""
if not id:
raise JsonError('ID不能为空')
obj = {classname}.Q.filter({classname}.id == id).first()
return obj
@staticmethod
def update(id, param):
"""更新记录
[description]
Arguments:
id int -- 主键
param dict -- [description]
return:
True | JsonError
"""
columns = [i for (i, _) in {classname}.__table__.columns.items()]
param = {{k:v for k,v in param.items() if k in columns}}
if 'updated_at' in columns:
param['updated_at'] = utime.timestamp(3)
if not id:
raise JsonError('ID 不能为空')
try:
{classname}.Update.filter({classname}.id == id).update(param)
{classname}.session.commit()
return True
except Exception as e:
{classname}.session.rollback()
SysLogger.error(e)
raise JsonError('update error')
@staticmethod
def insert(param):
"""插入
[description]
Arguments:
id int -- 主键
param dict -- [description]
return:
True | JsonError
"""
columns = [i for (i, _) in {classname}.__table__.columns.items()]
param = {{k:v for k,v in param.items() if k in columns}}
if 'created_at' in columns:
param['created_at'] = utime.timestamp(3)
try:
obj = {classname}(**param)
{classname}.session.add(obj)
{classname}.session.commit()
return True
except Exception as e:
{classname}.session.rollback()
SysLogger.error(e)
raise JsonError('insert error')
'''
def render(self, classname, appname = 'admin'):
fname = func.hump2underline(classname)
app = f'{ROOT_PATH}/applications/{appname}'
app_s = f'{app}/services'
sfile = f'{app_s}/{fname}.py'
if os.path.exists(sfile):
return
if not os.path.exists(app):
os.mkdir(app)
if not os.path.exists(app_s):
os.mkdir(app_s)
fout = open(sfile, 'w', encoding='utf8')
output = self.template.format(
classname_underline=fname,
appname=appname,
classname=classname
)
# 写入文件内容
fout.write(output)
# 关闭文件
fout.close()
class AdminHandlerGenerator:
template = '''\
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""{classname}控制器
"""
from trest.router import get
from trest.router import put
from trest.router import post
from trest.router import delete
from trest.exception import JsonError
from applications.admin.utils import required_permissions
from applications.admin.utils import admin_required_login
from applications.admin.services.{classname_underline} import {classname}Service
from .common import CommonHandler
class {classname}Handler(CommonHandler):
@post('{classname_underline}')
@admin_required_login
@required_permissions()
def {classname_underline}_post(self, *args, **kwargs):
param = self.params()
{classname}Service.insert(param)
return self.success()
@get('{classname_underline}/(?P<id>[0-9]+)')
@admin_required_login
@required_permissions()
def {classname_underline}_get(self, id):
"""获取单个记录
"""
resp_data = {classname}Service.get(id)
return self.success(data=resp_data)
@put('{classname_underline}/(?P<id>[0-9]+)')
@admin_required_login
@required_permissions()
def {classname_underline}_put(self, id, *args, **kwargs):
param = self.params()
{classname}Service.update(id, param)
return self.success(data=param)
@delete('{classname_underline}/(?P<id>[0-9]+)')
@admin_required_login
@required_permissions()
def {classname_underline}_delete(self, id, *args, **kwargs):
param = {{
'status':-1
}}
{classname}Service.update(id, param)
return self.success()
class {classname}ListHandler(CommonHandler):
@get(['{classname_underline}','{classname_underline}/(?P<category>[a-zA-Z0-9_]*)'])
@admin_required_login
@required_permissions()
def {classname_underline}_list_get(self, category = '', *args, **kwargs):
"""列表、搜索记录
"""
page = int(self.get_argument('page', 1))
per_page = int(self.get_argument('limit', 10))
id = self.get_argument('id', None)
title = self.get_argument('title', None)
status = self.get_argument('status', None)
param = {{}}
if category:
param['category'] = category
if id:
param['id'] = id
if title:
param['title'] = title
if status:
param['status'] = status
resp_data = {classname}Service.page_list(param, page, per_page)
return self.success(data=resp_data)
'''
def render(self, classname, appname = 'admin'):
fname = func.hump2underline(classname)
outfile = f'{ROOT_PATH}/applications/{appname}/handlers/{fname}.py'
if os.path.exists(outfile):
return
output = self.template.format(
classname_underline=fname,
classname=classname
)
fout = open(outfile, 'w', encoding='utf8')
# 写入文件内容
fout.write(output)
# 关闭文件
fout.close()
class HandlerGenerator:
template = '''\
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""{classname}控制器
"""
import tornado
from trest.router import get
from trest.router import put
from trest.router import post
from trest.router import delete
from trest.exception import JsonError
from applications.{appname}.services.{classname_underline} import {classname}Service
from .common import CommonHandler
class {classname}Handler(CommonHandler):
@post('{classname_underline}')
@tornado.web.authenticated
def {classname_underline}_post(self, *args, **kwargs):
param = self.params()
{classname}Service.insert(param)
return self.success()
@get('{classname_underline}/(?P<id>[0-9]+)')
@tornado.web.authenticated
def {classname_underline}_get(self, id):
"""获取单个记录
"""
resp_data = {classname}Service.get(id)
return self.success(data=resp_data)
@put('{classname_underline}/(?P<id>[0-9]+)')
@tornado.web.authenticated
def {classname_underline}_put(self, id, *args, **kwargs):
param = self.params()
{classname}Service.update(id, param)
return self.success(data=param)
@delete('{classname_underline}/(?P<id>[0-9]+)')
@tornado.web.authenticated
def {classname_underline}_delete(self, id, *args, **kwargs):
param = {{
'status':-1
}}
{classname}Service.update(id, param)
return self.success()
class {classname}ListHandler(CommonHandler):
@get(['{classname_underline}','{classname_underline}/(?P<category>[a-zA-Z0-9_]*)'])
@tornado.web.authenticated
def {classname_underline}_list_get(self, category = '', *args, **kwargs):
"""列表、搜索记录
"""
page = int(self.get_argument('page', 1))
per_page = int(self.get_argument('limit', 10))
id = self.get_argument('id', None)
title = self.get_argument('title', None)
status = self.get_argument('status', None)
param = {{}}
if category:
param['category'] = category
if id:
param['id'] = id
if title:
param['title'] = title
if status:
param['status'] = status
resp_data = {classname}Service.page_list(param, page, per_page)
return self.success(data=resp_data)
'''
def render(self, classname, appname):
fname = func.hump2underline(classname)
app = f'{ROOT_PATH}/applications/{appname}'
app_s = f'{app}/handlers'
sfile = f'{app_s}/{fname}.py'
if os.path.exists(sfile):
return
if not os.path.exists(app):
os.mkdir(app)
if not os.path.exists(app_s):
os.mkdir(app_s)
fout = open(sfile, 'w', encoding='utf8')
output = self.template.format(
classname_underline=fname,
appname=appname,
classname=classname
)
# 写入文件内容
fout.write(output)
# 关闭文件
fout.close()
def get_metadata(tables):
# Use reflection to fill in the metadata
engines = DBConfigParser.parser_engines()
engine = create_engine(engines['default']['main'][0])
metadata = MetaData(engine)
schema = None
noviews = True
# tables = None
metadata.reflect(engine, schema, noviews, tables)
return metadata
def create_models(tables = None):
try:
outfile = f'{ROOT_PATH}/applications/common/models/{tables[0]}.py'
# if os.path.exists(outfile):
# return
metadata = get_metadata(tables)
noindexes = True
noconstraints = True
nojoined = True
noinflect = True
noclasses = False
nocomments = False
# Write the generated model code to the specified file or standard output
generator = ModelGenerator(metadata, noindexes, noconstraints, nojoined, noinflect, noclasses, nocomments=nocomments)
# print('generator ', type(generator), generator)
generator.render(outfile)
except Exception as e:
raise
def create_services(tables, appname = 'admin'):
try:
generator = ServiceGenerator()
for t in tables:
generator.render(func.underline2hump(t, True), appname)
except Exception as e:
raise
def create_handlers(tables, appname = 'admin'):
try:
generator = AdminHandlerGenerator() if appname == 'admin' else HandlerGenerator()
for t in tables:
generator.render(func.underline2hump(t, True), appname)
except Exception as e:
raise
if __name__ == "__main__":
appname = 'ishici'
# 创建命令行解析器句柄,并自定义描述信息
parser = argparse.ArgumentParser(description="py_admin code gen")
# 定义可选参数appname
parser.add_argument("--appname", "-a", help="appname must is a string")
# 定义可选参数module
parser.add_argument("--model", "-md", help="multiple models are separated by commas; overwrite the original file")
parser.add_argument("--service", "-sv", help="multiple services are separated by commas")
parser.add_argument("--handler", "-hd", help="multiple handlers are separated by commas")
args = parser.parse_args() # 返回一个命名空间
param = vars(args)
appname = param['appname']
param['service'] = param['service'] if param['service'] else ''
param['handler'] = param['handler'] if param['handler'] else ''
param['model'] = param['model'] if param['model'] else ''
if appname == 'admin':
metadata = get_metadata(None)
# print(dir(metadata))
for table in metadata.sorted_tables:
create_models([table.name])
create_services([table.name], appname)
create_handlers([table.name], appname)
else:
services = [m.strip() for m in param['service'].split(',') if m]
if appname and services:
create_services(services, appname)
print(f'create {appname} app, {services} service success')
handlers = [m.strip() for m in param['handler'].split(',') if m]
if appname and handlers:
create_handlers(handlers, appname)
print(f'create {appname} app, {handlers} handler success')
models = [m.strip() for m in param['model'].split(',') if m]
if models:
try:
[create_models([m]) for m in models]
except Exception as e:
raise
print(f'create {models} model success')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。