13 Star 44 Fork 17

enmotech/compat-tools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test.py 8.22 KB
一键复制 编辑 原始数据 按行查看 历史
洪日华 提交于 2022-07-26 14:23 . 增加对 MogDB 3.0.0 版本的测试
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""
Copyright by HongyeDBA
"""
# Docker is required.
import os
import sys
import time
import platform
# ------------------------------------------------------------------------------
# Set Env and Variables
# ------------------------------------------------------------------------------
CURRENT_DIR = os.path.abspath(os.curdir)
GS_PASSWORD = "MogDB@123"
IMAGE_TYPE = "arm" if platform.machine() == "aarch64" else "amd"
DB_IMAGES = {
f"swr.cn-north-4.myhuaweicloud.com/mogdb/mogdb:3.0.0_{IMAGE_TYPE}": {
"DB_TYPE": "MogDB",
"DB_VERSION": "3.0.0"},
f"swr.cn-north-4.myhuaweicloud.com/mogdb/mogdb:2.1.0_{IMAGE_TYPE}": {
"DB_TYPE": "MogDB",
"DB_VERSION": "2.1.0"},
f"swr.cn-north-4.myhuaweicloud.com/mogdb/mogdb:2.0.0_{IMAGE_TYPE}": {
"DB_TYPE": "MogDB",
"DB_VERSION": "2.0.0"},
f"enmotech/opengauss:3.0.0": {
"DB_TYPE": "openGauss",
"DB_VERSION": "3.0.0"},
f"enmotech/opengauss:2.1.0": {
"DB_TYPE": "openGauss",
"DB_VERSION": "2.1.0"},
f"enmotech/opengauss:2.0.0": {
"DB_TYPE": "openGauss",
"DB_VERSION": "2.0.0"},
}
# Get platform architecture (aarch64, x86_64)
GAUSS_ENV = ['export GAUSSHOME=/usr/local/mogdb',
'export PATH=$GAUSSHOME/bin:$PATH',
'export LD_LIBRARY_PATH=$GAUSSHOME/lib',
]
# ------------------------------------------------------------------------------
# FUNCTION: Check script files
# ------------------------------------------------------------------------------
def check_compat_dir():
missing_files = list()
for file in ('Oracle_Functions.sql',
'Oracle_Functions.sql',
'Oracle_Views.sql',
'Oracle_Packages.sql',
'MySQL_Functions.sql',
'DB2_Functions.sql',
'runMe.sql'):
if not os.path.exists(os.path.join(CURRENT_DIR, file)):
missing_files.append(file)
if missing_files:
print("[ERROR] The script file is incompleted")
print('Missing files: {}'.format(','.join(missing_files)))
# ------------------------------------------------------------------------------
# FUNCTION: Execute OS Command
# ------------------------------------------------------------------------------
def exec_command(command: str) -> str:
print('Execute: [{}]'.format(command))
cmd_result = os.popen(command).read().strip()
print('--> Result: [{}]'.format(cmd_result))
return cmd_result
# ------------------------------------------------------------------------------
# FUNCTION: test compat-tools in MogDB/openGauss
# ------------------------------------------------------------------------------
def test_in_database(db_image: str):
print('Docker pull')
exec_command('docker pull {} > /dev/null'.format(db_image))
print('Docker run')
docker_cmd = "docker run -d -e GS_PASSWORD={} -v '{}':/tmp/compat_tools {}".format(
GS_PASSWORD, CURRENT_DIR, db_image
)
container_name = exec_command(docker_cmd)
print('Docker name [{}]'.format(container_name))
# Get env GAUSSHOME
# -------------------------------------------------------------------------
GAUSS_ENV[0] = exec_command(
"docker exec -u omm {} bash -c 'cat ~/.bashrc | grep GAUSSHOME='".format(container_name)
)
print('Read Env [{}]'.format(GAUSS_ENV[0]))
gsql_cmd = "docker exec -u omm -w '/tmp/compat_tools' {} bash -c '{} && gsql -d {{}} {{}}'".format(
container_name, '; '.join(GAUSS_ENV)
)
# Wait for DB started
# -------------------------------------------------------------------------
wait_start = time.time()
print("Waiting for DB to start...")
check_cnt = 0
while check_cnt <= 4:
db_start = exec_command(gsql_cmd.format('postgres', '-q -c "SELECT 1234567890" 2>/dev/null'))
if db_start.find('1234567890') >= 0:
check_cnt += 1
elif time.time() - wait_start > 90:
print("[ERROR] Timed out waiting for postgres to start!")
sys.exit(1)
else:
print('DB does not started in [{}] seconds'.format(time.time() - wait_start))
time.sleep(1)
print("DB is online in container [{}]".format(container_name))
# Check
# -------------------------------------------------------------------------
check_result = {'A': 'SUCCESS', 'B': 'SUCCESS', 'PG': 'SUCCESS'}
# check_result = {'A': 'SUCCESS', }
# cmd_db = gsql_cmd.format('postgres', """-c "\\copy (select name,case when boot_val is not null then boot_val else setting end setting,short_desc ,vartype,min_val,max_val from pg_catalog.pg_settings) to 'pg_setting_{}.log' with csv;" """.format((db_image.split(':'))[-1]))
# exec_command(cmd_db)
for db_compatible in check_result.keys():
db_name = "test_for_{C}".format(C=db_compatible).lower()
cmd_db = gsql_cmd.format('postgres', '-q -f db_creation.sql 2>&1')
cmd_runme = gsql_cmd.format(db_name, "-f runMe.sql 2>&1 | grep -i error")
cmd_result = gsql_cmd.format(db_name, '-q -c "select * from compat_tools.compat_testing where test_ok is null or (not test_ok)" 2>&1')
# Prepare create-db scripts
with open('db_creation.sql', 'w') as DBF:
DBF.write("CREATE DATABASE {} with DBCOMPATIBILITY='{}';\n".format(db_name, db_compatible))
# Create target database
print('Create database')
db_creation = exec_command(cmd_db)
if not db_creation.startswith('total time'):
check_result[db_compatible] = db_creation
continue
# Run scripts
print('Run scripts')
script_error = exec_command(cmd_runme)
if script_error:
check_result[db_compatible] = script_error
# Check result
print('Check results')
failed_list = exec_command(cmd_result)
if not failed_list.endswith('(0 rows)'):
check_result[db_compatible] = failed_list
# Destroy docker
# -------------------------------------------------------------------------
print('Stop and remove docker image [{}]'.format(container_name))
exec_command('docker kill {} 1>/dev/null'.format(container_name))
exec_command('docker rm -f {} 1>/dev/null'.format(container_name))
os.unlink('db_creation.sql')
# Return result
# -------------------------------------------------------------------------
return check_result
# ------------------------------------------------------------------------------
# Main
# ------------------------------------------------------------------------------
if __name__ == '__main__':
if os.path.exists('test_result.md'):
os.unlink('test_result.md')
print("Check current script directory")
check_compat_dir()
# Execute testing
db_result = dict()
failed_cnt = dict()
for image_name in DB_IMAGES:
print("Testing {} version {}".format(DB_IMAGES[image_name]['DB_TYPE'], DB_IMAGES[image_name]['DB_VERSION']))
db_result[image_name] = test_in_database(image_name)
failed_cnt[image_name] = sum([0 if x == 'SUCCESS' else 1 for x in db_result[image_name].values()])
# Make result
with open('test_result.md', 'w') as RESF:
all_failed_cnt = sum(failed_cnt.values())
if all_failed_cnt > 0:
RESF.write("""
## Test Summary
- DB Type & Version: {}
- DB Compatible: A, B, PG
- Test Result: FAILED in [{}] running
## Failed Result\n """.format(', '.join(['{}({})'.format(x['DB_TYPE'], x['DB_VERSION']) for x in DB_IMAGES.values()]),
all_failed_cnt))
for image_name in db_result:
if failed_cnt[image_name] > 0:
RESF.write('\n### DB Type & Version: {} ({})\n'.format(DB_IMAGES[image_name]['DB_TYPE'], DB_IMAGES[image_name]['DB_VERSION']))
for db_comp, db_result in db_result[image_name].items():
if db_result != 'SUCCESS':
RESF.write('\n#### DB Compatible: {}\n\n```\n{}\n```\n'.format(db_comp, db_result))
else:
RESF.write("""
## Test Summary
- DB Type & Version: {}
- DB Compatible: A, B, PG
- Test Result: SUCCEED\n""".format(', '.join(['{}({})'.format(x['DB_TYPE'], x['DB_VERSION']) for x in DB_IMAGES.values()]))
)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
SQL
1
https://gitee.com/enmotech/compat-tools.git
git@gitee.com:enmotech/compat-tools.git
enmotech
compat-tools
compat-tools
master

搜索帮助