代码拉取完成,页面将自动刷新
# -*- encoding:utf-8 -*-
import chardet, json, stat
import os, time, win32api, win32con, sys
import unittest, HTMLTestRunner
import datetime
from pywinauto import application
from extendScript.WriteDic import ExcelImpldic
from extendScript.ReadTextFile import ReadTestFile
from extendScript.DBAadaptor import *
import httplib
import time
reload(sys)
sys.setdefaultencoding("utf-8")
class DataKeeperGUI:
def __init__(self, listValue=None):
self.gtd = GetTestData()
self.listValue = self.gtd.get_onedata_value(listValue)
start_num = ReadTestFile().getstart()
# print start_num
# 判断是否需要运行DataKeeper
if start_num == "1":
self.startDataKeeper()
else:
pass
def startDataKeeper(self):
path = self.gtd.get_pz_data("DataKeeper")
app = application.Application()
dk = app.start(path)
case = self.gtd
HostIP = case.get_pz_data("HostIP")
HostPort = case.get_pz_data("HostPort")
QueryIP = case.get_pz_data("QueryIP")
QueryPort = case.get_pz_data("QueryPort")
Username = case.get_pz_data("user")
Passwd = case.get_pz_data("password")
self.login(dk, HostIP, HostPort, QueryIP, QueryPort, Username, Passwd)
# print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(6)
# 禁止unittest中重新启动程序
ReadTestFile().writestart("5")
self.addService(dk, HostIP, HostPort)
time.sleep(1)
time.sleep(2)
# 添加服务端口
port01 = 8080
self.addServicePort(dk, port01)
# 关闭服务端口
self.serviceStop(dk, HostIP, HostPort)
# # 启动服务端口
self.serviceStart(dk, HostIP, HostPort)
# 判断0和6端口是否配置服务器
self.IsService(dk)
# 上传数据,创建所有散列
self.uploaddata(dk)
# # 导出散列文件
self.exportfile(dk)
# #获取数据库中各个图层数据条数
self.getdbnum(dk)
dk['DataKeeper']["STATIC17"].Click()
dk["FormMyMessageBox"]["STATIC2"].Click()
def serviceStop(self, dk, ip, port):
dk['DataKeeper']['Static11'].Click()
# ii = dk['FormStartOrStop']['SysListView32']
#
# ll=ii.Texts()
s = ""
dk['FormStartOrStop'][u'全选'].Click()
dk['FormStartOrStop'][u'停止服务'].Click()
for i in range(8080, 8083):
time.sleep(1)
url = "http://" + str(ip) + str(port) + "/cgi-bin/DEUDataPubCtrl?type=checkPortIsActive&port=" + str(i)
conn = httplib.HTTPConnection(ip)
conn.request(method="GET", url=url)
response = conn.getresponse()
res = response.read()
n = len(res)
s = str(n) + "," + s
time.sleep(2)
ReadTestFile().writestop(s)
dk['FormStartOrStop'].Close()
def uploaddata(self, dk):
dk['DataKeeper']["Static2"].Click()
filelist = []
# caselist = case.getOneTestCaseByExcel(type=self.type, testNo=self.caseNo)
dsingle = {}
dbatch = {}
for i in ("TileBuilder", "PreProcessor"):
caselist = self.gtd.get_alldata_value(type=i)
# caselist = case.getTestCaseByExcel(type=i)
if i == "TileBuilder":
for tcase in caselist:
if tcase[10] == u"是":
filename = tcase[0] + "_" + tcase[2] + "_" + str(tcase[3]) + ".idx"
tilename = tcase[0] + "_" + tcase[2] + "_" + str(tcase[3])
filenum = str(tcase[7])
slnum = str(tcase[13])
filepath = os.getcwd() + "\\outputdata\\DataSet\\" + filename
if os.path.exists(filepath):
# 创建散列节点
self.addDataSet(dk, tilename, filenum, slnum)
if tcase[11] == u"单次":
# 导入本地数据集
self.exportData(dk, filepath)
time.sleep(1)
self.postData(dk)
# 记录数据条数
flnum = self.gettilenum(dk)
dsingle[tilename] = flnum
dk['DataKeeper']['STATIC18'].Click()
elif tcase[11] == u"批量":
self.exportData(dk, filepath)
flnum = self.gettilenum(dk)
dbatch[tilename] = flnum
dk['DataKeeper']['STATIC18'].Click()
filelist.append(filename)
else:
pass
elif i == "PreProcessor":
for tcase in caselist:
if tcase[20] == u"是":
filename = tcase[0] + "_" + str(tcase[18]) + ".idx"
tilename = tcase[0] + "_" + str(tcase[18])
filenum = str(tcase[18])
slnum = str(tcase[23])
filepath = os.getcwd() + "\\outputdata\\DataSet\\" + filename
if os.path.exists(filepath):
# 创建散列节点
self.addDataSet(dk, tilename, filenum, slnum)
if tcase[21] == u"单次":
# 打开文件数据集
self.exportData(dk, filepath)
# 获取数据集中数据条数
flnum = str(int(self.gettilenum(dk)) - 1)
dsingle[tilename] = flnum
time.sleep(1)
self.postData(dk)
dk['DataKeeper']['STATIC18'].Click()
elif tcase[21] == u"批量":
self.exportData(dk, filepath)
# 获取数据集中数据条数
flnum = str(int(self.gettilenum(dk)) - 1)
dbatch[tilename] = flnum
dk['DataKeeper']['STATIC18'].Click()
filelist.append(filename)
else:
pass
# 记录数据集中的数据条数
orinumpath = os.getcwd() + "\\inputdata\\orinum.xlsx"
ExcelImpldic(orinumpath).writeStatusSheet(dsingle, "single")
time.sleep(1)
ExcelImpldic(orinumpath).writeStatusSheet(dbatch, "batch")
if filelist == []:
pass
else:
self.batchExport(dk, filelist)
time.sleep(1)
self.batchPost(dk)
def getdbnum(self, dk):
# sj=str(tilename)+":"+str(tilenum)
root = dk['DataKeeper']['SysTreeView32'].Root()
dbdisc = {}
sjlist = []
for jm in root.Children():
sjlist.append(jm.Text())
for sj in sjlist:
cclist = []
namelist = sj.split(":")
tilename = namelist[0]
mm = root.GetChild(sj)
for cc in mm.Children():
cclist.append(cc.Text())
dbnum = 0
for i in cclist:
pp = mm.GetChild(i)
pp.Click()
# time.sleep(1)
pp.ClickInput(button="left", double="true")
nn = self.gettilenum(dk)
dk['DataKeeper']['STATIC18'].Click()
# time.sleep(1)
dbnum = dbnum + int(nn)
dbdisc[tilename] = dbnum
ExcelImpldic().writeStatusSheet(dbdisc)
# 登录
def login(self, dk, HostIP, HostPort, QueryIP, QueryPort, Username, Passwd):
dk['DataKeeper']['Static14'].Click()
time.sleep(1)
dk['FormLogin']['Edit4'].SetEditText(HostIP)
dk['FormLogin']['Edit6'].SetEditText(HostPort)
dk['FormLogin']['Edit5'].SetEditText(QueryIP)
dk['FormLogin']['Edit3'].SetEditText(QueryPort)
dk['FormLogin']['Edit2'].SetEditText(Username)
dk['FormLogin']['Edit1'].SetEditText(Passwd)
dk['FormLogin']['Static4'].Click()
# 添加子服务器
def addService(self, dk, HostIP, HostPort):
dk['DataKeeper']['Static13'].Click()
dk['FormHostSet'][u'添加子服务器'].Click()
dk['FormAddService']['Edit1'].SetEditText(HostIP)
dk['FormAddService']['Edit2'].SetEditText(HostPort)
dk['FormAddService']['Static6'].Click()
# 当子服务器已经存在
message = dk['FormMyMessageBox']['Static1'].WindowText()
if message == u"添加子服务器成功!":
dk['FormMyMessageBox']['Static2'].Click()
else:
dk['FormMyMessageBox']['Static2'].Click()
dk['FormAddService']['Static1'].Click()
dk['FormHostSet']['Static2'].Click()
# 添加子服务器端口
def addServicePort(self, dk, port):
dk['DataKeeper']['Static13'].Click()
for i in range(0, 3):
dk['FormHostSet'][u'添加子服务端口'].Click()
dk['FormAddService']['ComBoBox.'].Click()
port01 = port + i
dk['FormAddService']['Edit2'].SetEditText(port01)
# time.sleep(1)
dk['FormAddService'][u'确定'].Click()
try:
dk['FormMyMessageBox']['Static2'].Click()
dk['FormAddService']['Static3'].Click()
except:
pass
dk['FormHostSet'][u'关闭'].Click()
# 启动服务
def serviceStart(self, dk, ip, port):
dk['DataKeeper']['Static11'].Click()
dk['FormStartOrStop'][u'全选'].Click()
dk['FormStartOrStop'][u'启动服务'].Click()
r = ""
for i in range(8080, 8083):
url = "http://" + str(ip) + str(port) + "/cgi-bin/DEUDataPubCtrl?type=checkPortIsActive&port=" + str(i)
conn = httplib.HTTPConnection(ip)
conn.request(method="GET", url=url)
response = conn.getresponse()
res = response.read()
n = len(res)
r = str(n) + "," + r
time.sleep(1)
ReadTestFile().writerun(r)
dk['FormStartOrStop'].Close()
# 导入本地数据
def exportData(self, dk, filepath):
dk['DataKeeper']['Static10'].Click()
dk[u'打开']['Edit1'].SetEditText(filepath)
dk[u'打开'][u'打开(&O)'].Click()
try:
dk[u'打开'][u'打开(&O)'].Click()
except Exception as e:
# dk[u'打开'][u'打开(&O)'].Click()
print e
time.sleep(1)
# 获取数据记录数目
def gettilenum(self, dk):
kk = dk['DataKeeper'].Window_(title_re=u"共").Texts()
for i in kk:
bb = i.decode("utf-8")
# print bb, "+++++++++++++++++"
cc = bb.split()
tj = cc[0]
num = tj[1:len(tj) - 1]
# print num
# ReadTestFile().writedist(str(bb))
return num
# 判断0和6端口号是否配置服务器
def IsService(self, dk):
dk['DataKeeper']['Static12'].Click()
root = dk['FormLoadBalancing']['SysTreeView32'].Root()
# 判断0是否存在服务器
vt = root.GetChild("vt:0")
m = "0~100%"
vtpd = vt.GetChild("0~100%")
if len(vtpd.Children()) == 0:
vt.Click(double=True)
vtpd.Click(double=True)
vtpd.ClickInput(button="right")
self.rightClick(15, 15)
dk['FormAddOrEditService'][u'确定'].Click()
time.sleep(1)
# 判断6是否存在服务器
lay = root.GetChild("layer:6")
laypd = lay.GetChild("0~100%")
if len(laypd.Children()) == 0:
lay.Click(double=True)
laypd.Click(double=True)
laypd.ClickInput(button="right")
self.rightClick(10, 10)
dk['FormAddOrEditService'][u'确定'].Click()
dk['FormLoadBalancing'][u'同步到服务器'].Click()
try:
dk['FormMyMessageBox']['Static2'].Click()
except Exception as e:
print(e)
dk['FormLoadBalancing']['Static4'].Click()
# 建数据集编号
def addDataSet(self, dk, dataName=None, dataNumber=None, slnum=None):
# print dataName,dataNumber,slnum
dk['DataKeeper']['Static12'].Click()
root = dk['FormLoadBalancing']['SysTreeView32'].Root()
slist = slnum.split(",")
numlist = []
iplist = []
slqjlist = []
# 判断数据集下有多少个服务器节点
for i in slist:
j = i.split(":")
num = j[0].split("~")[1]
ip = j[1] + ":" + j[2]
numlist.append(num)
iplist.append(ip)
slqjlist.append(j[0])
sj = dataName + ":" + dataNumber
# 判断数据集是否存在
sjlist = []
for jm in root.Children():
sjlist.append(jm.Text())
# 判断图层是否存在
if sj in sjlist:
dk['FormLoadBalancing']['Static4'].Click()
else:
root.Click()
root.ClickInput(button="right")
time.sleep(1)
self.rightClick(12, 12)
dk['FormAddDataSet']['Edit1'].SetEditText(dataName)
dk['FormAddDataSet']['Edit2'].SetEditText(dataNumber)
dk['FormAddDataSet'][u'确定'].Click()
# print sj
qq = root.GetChild(sj)
time.sleep(1)
qq.ClickInput(button="right")
self.rightClick(10, 10)
# 循环创建散列节点
for i in range(0, len(iplist)):
# print numlist[i]
dk['FormAddSegment']["EDIT1"].SetEditText(numlist[i])
time.sleep(1)
dk['FormAddSegment'][u'确定'].Click()
time.sleep(1)
# 循环选择服务器
for i in range(0, len(slqjlist)):
# print slqjlist[i],iplist[i]
slname = qq.GetChild(slqjlist[i])
slname.ClickInput(button="right")
self.rightClick(10, 10)
dk['FormAddOrEditService']['EDIT1'].SetEditText(iplist[i])
dk['FormAddOrEditService'][u'确定'].Click()
time.sleep(1)
time.sleep(1)
dk['FormLoadBalancing'][u'同步到服务器'].Click()
try:
dk['FormMyMessageBox']['Static2'].Click()
except Exception as e:
print(e)
dk['FormLoadBalancing']['Static4'].Click()
# 提交数据
def postData(self, dk):
dk['DataKeeper'][u'提交'].Click()
time.sleep(1)
text = dk['DataKeeper'][u'终止'].WindowText()
while text == u'终止':
try:
text = dk['DataKeeper'][u'终止'].WindowText()
time.sleep(1)
except:
# dk['DataKeeper'].Close()
break
# 以批量打开文件
def batchExport(self, dk, filelist):
# 为文件建立数据集
idxfiles = []
for file in filelist:
filepath = os.getcwd() + "\\outputdata\\DataSet\\" + file
if os.path.exists(filepath):
idxfiles.append(filepath)
else:
pass
dk['DataKeeper']['Static9'].Click()
for idxfile in idxfiles:
dk['FormUpMultiData'][u'文件'].Click()
dk[u'请选择文件']['Edit1'].SetEditText(idxfile)
dk[u'请选择文件'][u'打开(&O)'].Click()
try:
dk[u'请选择文件'][u'打开(&O)'].Click()
except Exception as e:
print e
# 批量提交
def batchPost(self, dk):
dk['FormUpMultiData'][u'提交'].Click()
while 1:
time.sleep(1)
try:
mes = dk['FormMyMessageBox']['Static1'].WindowText()
if mes == u"批量上传完成!":
dk['FormMyMessageBox'][u'确定'].Click()
dk['FormUpMultiData']['Static8'].Click()
# dk['DataKeeper'].Close()
break
except:
pass
# 通过鼠标位置获取右键
def rightClick(self, a, b):
x, y = win32api.GetCursorPos()
win32api.mouse_event(win32con.MOUSE_MOVED, a, b, 0, 0)
win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, x, y, 0, 0)
win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0)
# 导出配置文件
def exportfile(self, dk):
dk['DataKeeper']['Static12'].Click()
time.sleep(3)
dk['FormLoadBalancing'][u'导出配置文件'].Click()
time.sleep(1)
fl = os.getcwd() + "\\inputdata\\" + "resdataset.txt"
if os.path.exists(fl):
os.chmod(fl, stat.S_IWRITE)
os.remove(fl)
else:
pass
dk[u"请选择导出文件路径"]["Edit1"].SetEditText(fl)
dk[u"请选择导出文件路径"][u"保存"].Click()
try:
dk[u"请选择导出文件路径"][u"保存"].Click()
except Exception as e:
print e
time.sleep(1)
# try:
# dk[u"确认另存为"]["Button1"].Click()
# except Exception as e:
# print e
dk["FormMyMessageBox"][u"确定"].Click()
time.sleep(1)
dk['FormLoadBalancing']['Static4'].Click()
def toUnicode(s):
code = chardet.detect(s)
return unicode(s, code.get("encoding"))
class startTestGui(unittest.TestCase):
def setUp(self):
# self.verificationErrors = []
DataKeeperGUI()
self.result = ExcelImpldic(os.getcwd() + "\\inputdata\\" + "resnum.xlsx")
self.ori = ExcelImpldic(os.getcwd() + "\\inputdata\\" + "orinum.xlsx")
p = open("inputdata\\oridataset.txt", "r")
oristr = p.read()
self.orilist = oristr.split()
p.close()
# ori = json.loads(oridict)
r = open("inputdata\\resdataset.txt", "r")
resstr = r.read()
self.reslist = resstr.split()
r.close()
def test_startserver(self):
rs = ReadTestFile().getrun().split(",")
# print len(rs)
for i in range(0, len(rs) - 1):
self.assertEqual("18", rs[i])
def test_stopserver(self):
sl = ReadTestFile().getstop().split(",")
# print len(sl)
for i in range(0, len(sl) - 1):
self.assertEqual("31", sl[i])
def test_serverconfig(self):
oridict = self.orilist[1].strip()
resdict = self.reslist[1].strip()
self.assertItemsEqual(oridict, resdict)
def test_dataset(self):
oridict = self.orilist[0]
resdict = self.reslist[0]
self.assertItemsEqual(oridict, resdict)
def test_singleupload(self):
oritype = "single"
restype = "num"
res = self.result.gettestdic(restype)
ori = self.ori.gettestdic(oritype)
for key in ori:
self.assertEqual(ori[key], res[key])
def test_batchupload(self):
oritype = "batch"
restype = "num"
res = self.result.gettestdic(restype)
ori = self.ori.gettestdic(oritype)
for key in ori:
self.assertEqual(ori[key], res[key])
def tearDown(self):
pass
class initThisTester:
def __init__(self):
pass
def delReportAndOutdata(self):
print u"测试环境初始化,请稍后!"
self.delReport()
self.delOutdata()
def delReport(self):
self.delAllfolder(os.getcwd() + "\\ReportFile")
def delOutdata(self):
self.delAllfolder(os.getcwd() + "\\outputdata\\DataSet")
def delAllfolder(self, filepath):
for dfile in os.listdir(filepath):
os.remove(filepath + "\\" + dfile)
# def closePreProcessor(self):
# try:
# app = application.Application().connect(title_re="DataKeeper")
# wfMain = app.DataKeeper
#
# time.sleep(1)
#
# wfMain['关闭'].Click()
# app['WindowsForms10.Window.8.app.0.2bf8098_r11_ad1']['确定'].Click()
# except Exception as e:
# print e
# def jltext_datakeeper(self):
# ReadTestFile().writestart("1")
# suite = unittest.makeSuite(startTestGui)
# mm = datetime.datetime.now().strftime("%H-%M-%S")
# filename = os.getcwd() + "\\ReportFile\\" + "DataKeeper_" + mm + "_Report.html"
# fp = file(filename, 'wb')
# runner = HTMLTestRunner.HTMLTestRunner(stream=fp, title=u'自动化测试报告',
# description=u'自动化测试报告,测试用例')
# runner.run(suite)
class GetTestData:
def __init__(self):
self.program = os.getcwd() + u'\\inputdata\\FilePath.xlsx'
self.program_info = DB_Excel(self.program, u'FilePath').get_globe_dict()
def get_pz_data(self, strOption=None):
if strOption is not None:
return self.program_info[strOption]
else:
return self.program_info
# "TileBuilder", "PreProcessor"
def get_alldata_value(self,type=None):
if type == "PreProcessor":
if self.program_info["dbsourcetype"] == 0: # 0:excel 1:mysql
return self.get_excel_data(os.getcwd() + "\\inputdata\\" + self.program_info["excel_PreProcessor"] + ".xlsx",
self.program_info["excel_PreProcessor"])
if self.program_info["dbsourcetype"] == 1: # 0:excel 1:mysql
return self.get_mysql_data(self.program_info["mysql_host"],
self.program_info["mysql_username"], self.program_info["mysql_passwd"],
self.program_info["mysql_dbname"], "PreProcessor")
elif type == "TileBuilder":
if self.program_info["dbsourcetype"] == 0: # 0:excel 1:mysql
return self.get_excel_data(
os.getcwd() + "\\inputdata\\" + self.program_info["excel_TileBuilder"] + ".xlsx",
self.program_info["excel_TileBuilder"])
if self.program_info["dbsourcetype"] == 1: # 0:excel 1:mysql
return self.get_mysql_data(self.program_info["mysql_host"],
self.program_info["mysql_username"], self.program_info["mysql_passwd"],
self.program_info["mysql_dbname"], "TileBuilder")
else:
print "上传数据选取有误!!"
def get_onedata_value(self, strtestnum):
if self.program_info["dbsourcetype"] == 0: # 0:excel 1:mysql
return self.get_excel_onedata(
os.getcwd() + "\\inputdata\\" + self.program_info["excel_PreProcessor"] + ".xlsx",
self.program_info["excel_PreProcessor"], strtestnum)
if self.program_info["dbsourcetype"] == 1: # 0:excel 1:mysql
return self.get_mysql_onedata(self.program_info["mysql_host"],
self.program_info["mysql_username"], self.program_info["mysql_passwd"],
self.program_info["mysql_dbname"], "PreProcessor", strtestnum)
def get_excel_data(self, excelpath, tablename):
data_list = DB_Excel(excelpath, tablename).get_OneTestCase_row_RList()
return data_list
def get_mysql_data(self, host, username, passwd, dbname, tablename):
data_list = DB_MySQL(host, username, passwd, dbname, tablename).get_OneTestCase_row_RList()
return data_list
def get_excel_program_info(self, excelpath):
return DB_Excel(excelpath, 'FilePath').get_globe_dict()
def get_mysql_program_info(self, host, username, passwd, dbname):
return DB_MySQL(host, username, passwd, dbname, 'FilePath').get_globe_dict()
def get_mysql_onedata(self, host, username, passwd, dbname, tablename, strtestnum):
data_list = DB_MySQL(host, username, passwd, dbname, tablename)
return data_list.get_OneTestCase_by_testNO(strtestnum)
def get_excel_onedata(self, excelpath, tablename, strtestnum):
data_list = DB_Excel(excelpath, tablename).get_OneTestCase_by_testNO(strtestnum)
return data_list
if __name__ == '__main__':
# ReadTestFile().writestart("1")
# aa = DataKeeperGUI()
inittter = initThisTester()
# inittter.delReportAndOutdata()
ReadTestFile().writestart("1")
suite = unittest.makeSuite(startTestGui)
mm = datetime.datetime.now().strftime("%H-%M-%S")
filename = os.getcwd() + "\\ReportFile\\" + "DataKeeper_" + mm + "_Report.html"
fp = file(filename, 'wb')
runner = HTMLTestRunner.HTMLTestRunner(stream=fp, title=u'自动化测试报告',
description=u'自动化测试报告,测试用例')
runner.run(suite)
# inittter.closePreProcessor()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。