28 Star 55 Fork 12

琪花瑶草工作室/MyETC

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
txffpOperate.py 25.88 KB
一键复制 编辑 原始数据 按行查看 历史
莫莫 提交于 2018-08-23 09:55 . 20180823
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Date : 2018-04-19 19:23:49 5月8日 modified
# @Author : kyle (you@example.org)
# @Link : http://example.org
# @Version : $Id$
import argparse
import datetime
import os
import re
import sys
import logging
import logging.handlers
import time
import random
import requests
import json
from lxml import etree
from com.myDatetimeUtil import myDatetimeUtil
BASE_DIR = os.path.dirname(os.path.abspath("__file__"))
sys.path.insert(0, BASE_DIR)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.117 Safari/537.36",
"Host": "pss.txffp.com",
"Origin": "https://pss.txffp.com",
}
INVOICE_EMAIL = "7@qq.com"
LOG_LEVEL = logging.INFO
with open(os.path.join(BASE_DIR, "cookie.txt"), "r", encoding="utf-8") as f:
COOKIE = f.read().strip()
REQUEST_DICT = {
"cookie_dict": {},
"headers": {},
"cookie_text": "",
}
class BaseException(Exception):
pass
class TypeException(BaseException):
pass
class BaseHandler(object):
def __init__(self, cookie="", headers=None, req_sleep=False, logger=None, log_level=logging.INFO):
self.__cookie_dict = {}
self.__cookie_text = cookie
self.headers = {}
self.req_sleep = req_sleep
if logger is None:
self.logger = self._logger(log_level)
else:
self.logger = logger
if not headers:
self._headers = {}
else:
self._headers = headers
self.__init_headers()
def __cookiedict_update(self):
"""解析文本cookie,并更新cookie字典"""
if not self.__cookie_text:
return
cookie_list = self.__cookie_text.split("; ")
for kv in cookie_list:
k, v = kv.split("=")
self.__cookie_dict[k] = v
def __cookie_update(self, new_cookie):
"""更新cookie信息"""
if type(new_cookie) != dict:
raise TypeException("new cookie必须为字典类型数据")
self.__cookie_dict.update(new_cookie)
temp = []
for k in self.__cookie_dict:
temp.append("%s=%s" % (k, self.__cookie_dict[k]))
self.__cookie_text = "; ".join(temp)
del temp
self.__flush_headers()
def __flush_headers(self):
"""刷新请求头"""
self.headers.update(self._headers)
self.headers["Cookie"] = self.__cookie_text
def __init_headers(self):
# 初始化cookie字典
self.__cookiedict_update()
"""初始化请求头"""
self.__flush_headers()
self.logger.info("初始化请求头...")
def _logger(self, level=logging.INFO):
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
# logger.handlers = []
ch = logging.StreamHandler()
# ch.setLevel(LOG_LEVEL)
fh = logging.handlers.RotatingFileHandler(
os.path.join(BASE_DIR, "txffp.log"),
maxBytes=1024 * 1024 * 1,
backupCount=5,
encoding="utf-8"
)
# fh.setLevel(LOG_LEVEL)
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
def set_header(self, key, value):
"""设置或添加键值对到headers中"""
self.headers[key] = value
def del_header(self, key):
"""删除headers内的键值对"""
del self.headers[key]
#根据api接口网址,请求服务器数据
def api_handler(self, url, headers="", data="", method="post"):
if self.req_sleep:
time.sleep(random.randint(1, 3))
self.logger.info("请求api接口: %s" % url)
try:
if method == "post":
response = requests.post(url, data, headers=headers)
elif method == "get":
response = requests.get(url, headers=headers)
else:
raise Exception("错误的或不支持的请求方式[%s]" % method)
except Exception:
self.logger.error("api接口查询失败(method: %s):\n\turl: %s\n\theaders: %s"
"\n\tdata: %s" % (method, url, headers, data))
return
if response.status_code == 404:
self.logger.error("得到了一个404响应,可能是cookie没有及时更新导致或者cookie过期等")
#return response.status_code
sys.exit("结束程序")
if response.status_code != 200:
self.logger.error("api接口信息获取失败(mthod:%s),状态码: [%s],"
"错误信息: [%s]" % (method, response.status_code, response.reason))
return
try:
html_text = response.content.decode("utf-8")
except UnicodeDecodeError:
self.logger.error("解码api响应内容失败")
return
# 更新cookie信息
self.__cookie_update(response.cookies.get_dict())
self.logger.info("得到应答")
return html_text
class APIHandler(BaseHandler):
APIS = {
"inv_manage": {
"url": "https://pss.txffp.com/pss/app/login/invoice/consumeTrans/manage",
"method": "post",
},
"inv_apply": {
"url": "https://pss.txffp.com/pss/app/login/invoice/consumeTrans/apply",
"method": "post",
},
"inv_subapply": {
"url": "https://pss.txffp.com/pss/app/login/invoice/consumeTrans/submitApply",
"method": "post",
},
"card_list": {
"url": "https://pss.txffp.com/pss/app/login/cardList/manage",
"method": "post",
},
"query_card": {
"url": "https://pss.txffp.com/pss/app/login/invoice/query/card",
"method": "post",
},
"query_apply": {
"url": "https://pss.txffp.com/pss/app/login/invoice/query/queryApply",
"method": "post",
},
}
MAX_PAGE_NUM = 6
def __init__(self, cookie="", headers=None, *args, **kwargs):
super(APIHandler, self).__init__(cookie, headers, *args, **kwargs)
def file_write(self, data, filepath):
with open(filepath, "wb") as f:
f.write(data)
def download_handler(self, url, save_path, filename):
if self.req_sleep:
time.sleep(random.randint(1, 3))
self.logger.info("开始下载文件[%s]: %s" % (filename, url))
try:
response = requests.get(url, headers=self.headers)
except Exception:
self.logger.error("文件下载过程中出现异常,url: %s" % url)
return
if response.status_code != 200:
self.logger.error("文件下载失败,状态码: %s" % response.status_code)
return
if not response.content:
self.logger.error("返回内容为空")
return
self.file_write(response.content, os.path.join(save_path, filename))
def api_inv_manage(self, id, month, page_num=1, tradeid_list="", title_id="", invoice_mail="", user_type=""):
"""
函数功能:获取卡,某个月的通行记录
"""
data = {
"id": id,
"tradeIdList": tradeid_list,
"titleId": title_id,
"invoiceMail": invoice_mail,
"userType": user_type,
"month": month,
"pageNo": page_num,
}
self.set_header(
key="Referer",
value="https://pss.txffp.com/pss/app/login/invoice/consumeTrans/manage/%s/COMPANY" % id
)
self.set_header(
key="cookie",
value=COOKIE
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["inv_manage"],
)
def api_inv_apply(self, id, month, tradeid_list, title_id="", invoice_mail="", user_type=""):
data = {
"id": id,
"tradeIdList": ",".join(tradeid_list),
"titleId": title_id,
"invoiceMail": invoice_mail,
"userType": user_type,
"month": month
}
self.set_header(
key="Referer",
value="https://pss.txffp.com/pss/app/login/invoice/consumeTrans/manage/%s/COMPANY" % id
)
self.set_header(
key="cookie",
value=COOKIE
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["inv_apply"],
)
def api_inv_subapply(self, apply_id, id, user_type="COMPANY"):
"""提交开票数据"""
data = {
"applyId": apply_id,
"id": id,
"userType": user_type,
}
self.set_header(
key="Referer",
value="https://pss.txffp.com/pss/app/login/invoice/consumeTrans/manage/%s/COMPANY" % id
)
self.set_header(
key="cookie",
value=COOKIE
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["inv_subapply"],
)
def api_card_list(self, page_num=1, user_type="COMPANY", type="invoiceApply", change_view="card", query_str=""):
"""获取开票页,卡列表"""
data = {
"userType": user_type,
"type": type,
"changeView": change_view,
"queryStr": query_str,
"pageNo": page_num,
}
self.set_header(
key="Referer",
value="https://pss.txffp.com/pss/app/login/cardList/manage/invoiceApply/PERSONAL",
)
self.set_header(
key="cookie",
value=COOKIE
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["card_list"],
)
def api_query_card(self, page_num, user_type="COMPANY", query_str="", change_view="card"):
data = {
"userType": user_type,
"queryStr": query_str,
"changeView": change_view,
"pageNo": page_num,
}
self.set_header(
key="Referer",
#value="https://pss.txffp.com/pss/app/login/invoice/query/card/PERSONAL",
value="https://pss.txffp.com/pss/app/login/invoice/query/card/COMPANY"
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["query_card"],
)
def api_query_apply(self, card_id, month, page_size=6, user_type="COMPANY", title_name="", station_name=""):
data = {
"pageSize": page_size,
"cardId": card_id,
"userType": user_type,
"month": month,
"titleName": title_name,
"stationName": station_name,
}
self.set_header(
key="Referer",
value="https://pss.txffp.com/pss/app/login/invoice/query/queryApply/%s/COMPANY" % card_id
)
self.set_header(
key='cookie',
value=COOKIE
)
return self.api_handler(
headers=self.headers,
data=data,
**self.APIS["query_apply"],
)
def submit_apply(self, id, month, invoice_mail="", car_num=""):
"""函数功能:获取交易号,提交一辆车的开票申请"""
self.logger.info("开始对[%s %s]进行开票操作" % (car_num, month))
page_num = 1
tradeids = []
while True:
# 开票获取tradeid阶段
html = self.api_inv_manage(id, month, page_num=page_num, invoice_mail=invoice_mail)
if html is None:
page_num += 1
continue
xphtml = etree.HTML(html)
_ = self.__get_tradeid(xphtml)
#20180429,此处按月份,车牌获取相应的ETC结算记录
etcTrades = self.__get_etcTrades_bycarid(month,car_num)
if etcTrades:
#tradeids = self.__compare_outtime(etcTrades,xphtml)
tradeids.extend(self.__compare_outtime(etcTrades,xphtml))
if tradeids:
print(tradeids)
path = os.path.join(BASE_DIR,"inv","%s" % (month))
isExists=os.path.exists(path)
if not isExists:
os.makedirs(path)
with open(os.path.join(path,"%s_%s_开票交易号.txt" % (month,car_num[0])), 'a') as f:
print(tradeids, file = f)
"""
if tradeids:
# 开票获取applyid阶段
apply_html = self.api_inv_apply(id, month, tradeids, invoice_mail=invoice_mail)
apply_id, id, user_type = self.__get_applyid(apply_html)
if not apply_id:
self.logger.error("获取apply id信息失败,response: %s" % apply_html)
# self.logger.info("开票成功(模拟)")
# 开票最终阶段
# submit_html = self.api_inv_subapply(apply_id, id, user_type)
# with open(os.path.join(BASE_DIR, "submit_html.html"), "w", encoding="utf-8") as f:
# f.write(submit_html)
# self.logger.info("%s %s 开票结果: %s" %
# (car_num, month, submit_html.strip()))
"""
if not self.__has_next_page(xphtml):
#print("下一页")
break
page_num += 1
if page_num >= self.MAX_PAGE_NUM:
print(apply_id)
break
if tradeids:
# 开票获取applyid阶段
apply_html = self.api_inv_apply(id, month, tradeids, invoice_mail=invoice_mail)
apply_id, id, user_type = self.__get_applyid(apply_html)
if not apply_id:
self.logger.error("获取apply id信息失败,response: %s" % apply_html)
self.logger.info("开票成功(模拟)")
# 开票最终阶段
submit_html = self.api_inv_subapply(apply_id, id, user_type)
# with open(os.path.join(BASE_DIR, "submit_html.html"), "w", encoding="utf-8") as f:
# f.write(submit_html)
# self.logger.info("%s %s 开票结果: %s" %
# (car_num, month, submit_html.strip()))
def submit_apply_all(self, month, invoice_mail="", *args, **kwargs):
"""提交开票信息"""
page_num = 1
while True:
html = self.api_card_list(page_num, *args, **kwargs)
if html is None:
page_num += 1
continue
card_list = self.__get_cardid(html)
if not card_list:
page_num += 1
continue
for cardinfo in card_list:
self.submit_apply(cardinfo[0], month, car_num=cardinfo[1])
if not self.__has_next_page(etree.HTML(html)):
break
page_num += 1
if page_num >= self.MAX_PAGE_NUM:
break
def inv_download(self, cardid, month, car_num, save_path, page_size=6):
"""函数功能:从票根网下载发票"""
page_num = 1
while True:
# print("第%s页内容" % page_num)
html = self.api_query_apply(cardid, month, page_size)
# print(html)
if html is None:
page_num += 1
self.logger.warning("响应数据为空,不执行解析")
continue
inv_list = self.__parse_query_apply(html)
if inv_list:
for invinfo in inv_list:
filename = self.__create_filename(invinfo, car_num)
self.download_handler(invinfo["dwurl"], save_path, filename)
if not self.__has_next_page(etree.HTML(html)):
self.logger.info("所有分页内容项目下载完毕,共%s页" % page_num)
break
page_num += 1
if page_num >= self.MAX_PAGE_NUM:
break
def inv_download_all(self, month, save_path, *args, **kwargs):
page_num = 1
while True:
html = self.api_query_card(page_num, *args, **kwargs)
if html is None:
page_num += 1
continue
for cardid, car_num in self.__get_query_cardid(html):
self.inv_download(cardid, month, car_num, save_path)
if not self.__has_next_page(etree.HTML(html)):
break
page_num += 1
if page_num >= self.MAX_PAGE_NUM:
break
def set_max_page_num(self, max_page_num):
self.MAX_PAGE_NUM = max_page_num
def __create_filename(self, invinfo, car_num, extention="zip"):
datetime_ = datetime.datetime.strptime(
invinfo["datetime"], "%Y-%m-%d %H:%M:%S")
#template = "%(car_num)s_%(datetime)s_金额%(amount)s_数量%(count)s_%(type)s.%(ext)s"
template = "%(car_num)s_%(datetime)s_金额%(amount)s_数量%(count)s.%(ext)s"
filename = template % {
"car_num": car_num,
"datetime": datetime_.strftime("%Y%m%d_%H%M%S"),
"amount": invinfo["amount"],
"count": invinfo["count"],
"type": invinfo["type"],
"ext": extention,
}
return filename
def __parse_query_apply(self, html):
inv_info = []
xphtml = etree.HTML(html)
invs = xphtml.xpath("//table[@class='table_wdfp']")
if invs:
for inv in invs:
temp = {
"datetime": inv.xpath("./tr[1]/td/table/tr[1]/th[1]/text()")[0][7:],
"type": inv.xpath("./tr[1]/td/table/tr[1]/th[3]/text()")[0],
"count": inv.xpath("./tr[2]/td/table/tr/td[3]/span/text()")[0],
"amount": re.findall(
r"-?\d+\.?\d*",
inv.xpath("./tr[1]/td/table/tr[1]/th[2]/span/text()")[0].replace(",",""))[0],
"dwurl": os.path.join(
"https://pss.txffp.com/",
inv.xpath(
"./tr[1]/td/table/tr/th[4]/a[2]")[0].get("href")[1:],
),
}
inv_info.append(temp)
self.logger.info("获得发票目标数据: %s" % str(temp))
return inv_info
def __get_query_cardid(self, html):
xphtml = etree.HTML(html)
cardid_list = []
cards = xphtml.xpath("//dl[@class='etc_card_dl']/div/a")
if not cards:
return cardid_list
for card in cards:
id = card.get("href")[40:-8]
car_num = card.xpath("./dd[2]/text()")[0].strip()[-7:]
cardid_list.append((id, car_num))
self.logger.info("获得[%s]对应id: %s" % (car_num, id))
return cardid_list
def __get_cardid(self, html):
"""返回嵌套id和车牌号元祖的列表"""
xphtml = etree.HTML(html)
cardid_list = []
cards = xphtml.xpath("//dl[@class='etc_card_dl']/div/a")
if not cards:
return cardid_list
for card in cards:
id = re.match(r"[^(]*\('([\w]*)'\)", card.get("onclick")).groups()[0]
car_num = re.match(
"[^:]*:(.*)", card.xpath("dd[2]/text()")[0].strip()).groups()
#print(id,car_num)
cardid_list.append((id, car_num))
logging.info("获得车牌号[%s]的id: %s" % (car_num, id))
return cardid_list
def __get_applyid(self, html):
"""返回(apply_id, id, user_type)"""
xphtml = etree.HTML(html)
apply_id = xphtml.xpath("//form[@id='checkForm']/input[@id='applyId']")
id = xphtml.xpath("//form[@id='checkForm']/input[@id='id']")
user_type = xphtml.xpath(
"//form[@id='checkForm']/input[@id='userType']")
tmp = [apply_id, id, user_type]
for n, i in enumerate(tmp):
if i:
tmp[n] = i[0].get("value")
else:
tmp[n] = ""
self.logger.info("获得applyId: [%s], id: [%s], user_type: [%s]" %
(tmp[0], tmp[1], tmp[2]))
return tmp
def __get_tradeid(self, xphtml):
"""返回tradeid列表信息"""
tradeid_list = []
results = xphtml.xpath(
'//tr/td[@class="tab_tr_td10"]/input[@class="check_one"]')
if results:
for res in results:
id = res.get("value")
if not id: continue
id = re.match(r"[^_]*", id).group()
if not id: continue
print("trade id:", id)
tradeid_list.append(id)
self.logger.info("获得[%s]条tradeid信息" % len(tradeid_list))
return tradeid_list
"""=================================================================="""
def __compare_outtime(self, etcList, xphtml):
"""函数功能:比较etc结算列表的数据。20180429"""
tradeid_list = []
results = xphtml.xpath('//tr[@style="cursor: pointer;"]')
if results:
for res in results:
id = res.xpath('./td[@class="tab_tr_td10"]/input[@name="ids"]/@value')[0]
if not id: continue
id = re.match(r"[^_]*", id).group()
if not id: continue
#money = res.xpath('./td[3]/text()')[0].strip()
outtime = res.xpath('./td[4]/text()')[0]
#20180430,比较outtime通行时间
for etcTrade in etcList:
if etcTrade['outtime'] == outtime:
tradeid_list.append(id)
return tradeid_list
def __get_etcTrades_bycarid(self,month,carid):
"""函数功能:根据车牌号,获取对应月份的结算列表"""
etcTrades = []
try:
#判断文件是否存在??ETC的数据一个文件,需要和票根网两个月比较??怎么破
monthbeforeLast = myDatetimeUtil.getMonthbeforeLast()
lastMonth = myDatetimeUtil.getLastMonth()
if month == monthbeforeLast:
etcfile = lastMonth + '.json'
else:
etcfile = month + '.json'
with open(etcfile, 'r',encoding='utf-8') as f:
data = json.load(f)
for card_info in iter(data):
if carid[0] == card_info['carid'][-7:]:
etcTrades = card_info['details']
return etcTrades
#for detail in card_info['details']:
# print('通行时间:',detail['outtime'],'结算时间:',detail['billtime'],detail['money'])
return etcTrades
except Exception:
return etcTrades
"""==========================================================================="""
def __has_next_page(self, xphtml):
"""判断是否存在下一页,返回True或者False"""
has_more = xphtml.xpath('//label[@id="taiji_search_hasMore"]/text()')
if has_more and has_more[0] == "true":
return True
else:
return False
def run():
description = "如果请求失败,请更新你的cookie信息。\r\n如果在网络请求中出现异常等程序中断,可等待网络恢复后重试。"
parser = argparse.ArgumentParser(description=description)
unique_opts = parser.add_mutually_exclusive_group(required=True)
unique_opts.add_argument("-d", "--download", action="store_true", dest="download", help="下载发票文件,需要指定对象,以及月份和保存目录")
unique_opts.add_argument("-i", "--invoice", action="store_true", dest="invoice", help="开票,需要指定开票月和对象")
unique_opts_ = parser.add_mutually_exclusive_group(required=False)
unique_opts_.add_argument("-a", "--all", action="store_true", default=True, dest="all", help="执行全部")
unique_opts_.add_argument("-c", "--cardid", action="store", dest="cardid", help="指定车辆编号,注意:cardid不是指车牌号(不推荐)")
parser.add_argument("-e", "--email", action="store", dest="email", help="开票时的发票文件接收邮箱地址")
parser.add_argument("-m", "--month", action="store", dest="month", help="目标年月份,例如2018年4月为:201804", required=True)
parser.add_argument("-s", "--savedir", action="store", dest="savedir", help="发票文件保存路径")
parser.add_argument("-w", "--wait", action="store_true", default=False, dest="wait", help="是否在每次进行网络请求间进行睡眠(默认睡眠1-3秒),可减轻对方服务器鸭梨")
options = parser.parse_args()
def print_exit(text):
print(text)
sys.exit()
# 验证月份的合法性
if not re.match(r"^20[0-3]\d(0\d|1[0-2])$", options.month):
print_exit("月份信息格式错误")
event_handler = APIHandler(COOKIE, HEADERS, req_sleep=options.wait)
# event_handler.set_max_page_num(12)
# 下载
if options.download:
# 判断路径信息是否存在
if not options.savedir:
print_exit("你需要指定一个保存路径")
else:
if not os.path.isdir(options.savedir):
print_exit("错误的目标路径")
if options.all:
event_handler.inv_download_all(options.month, options.savedir)
elif options.cardid:
event_handler.inv_download(options.cardid, options.month, options.car_num, options.save_path)
# 开票
elif options.invoice:
if options.all:
event_handler.submit_apply_all(options.month, options.email)
elif options.cardid:
event_handler.submit_apply(options.cardid, options.month, options.email)
event_handler.logger.info("任务完成")
def main():
run()
if __name__ == "__main__":
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/imoomoo/MyETC.git
git@gitee.com:imoomoo/MyETC.git
imoomoo
MyETC
MyETC
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385