1 Star 0 Fork 0

曹义/hik

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
hkServer.py 44.16 KB
一键复制 编辑 原始数据 按行查看 历史
曹义 提交于 2024-10-09 09:12 . 测试

# -*- coding: utf-8 -*-
# @Time : 2024/7/18 11:01
# @Author : sdk
import os, platform, ctypes, json, signal, sys, time, signal, sys
# import signal,sys
from datetime import datetime
from ctypes import *
from sdkService import *
import redis, pymysql # pip install redis, pip install pymysql
# 开发环境
# 海康服务配置
# publicIP = "192.168.1.8" # 公网IP
# listenIP = "192.168.1.8" # 本机IP - 监听IP
# winPath = "E:/python/hik/upload" # 要正确设置绝对路径,注意斜线方向
# luxPath = "/mnt/e/python/hik/upload" # 要正确设置绝对路径
# # 数据库配置
# DB_CONFIG = {
# "host": "127.0.0.1",
# "port": 3306,
# "user": "root",
# "password": '123aaa',
# "database": "dumc",
# "charset": "utf8mb4",
# "cursorclass": pymysql.cursors.DictCursor
# }
# 生产环境
# 海康服务配置
# publicIP = "171.217.254.52" #公网IP
# listenIP = "10.0.0.89" # 本机IP - 监听IP
# winPath = "/home/dumc/webman/public/upload" # 要正确设置绝对路径,注意斜线方向 - 生产服没有winPath
# luxPath = "/home/dumc/webman/public/upload" # 要正确设置绝对路径
# # 数据库配置
# DB_CONFIG = {
# "host":"10.0.0.201",
# "port":3306,
# "user": "webuser",
# "password": '123aaA@Bbb',
# "database": "dumc",
# "charset": "utf8mb4",
# "cursorclass": pymysql.cursors.DictCursor
# }
# 测试环境
publicIP = "159.75.164.28" #公网IP
listenIP = "10.1.20.7" # 本机IP - 监听IP
winPath = "/home/hik/upload" # 要正确设置绝对路径,注意斜线方向
luxPath = "/home/hik/upload" # 要正确设置绝对路径
# 数据库配置
DB_CONFIG = {
"host":"127.0.0.1",
"port":3306,
"user": "root",
"password": '123aaA@Bbb',
"database": "dumc",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
# 跟开发和生产环境无关的共用配置
cmsPort = 7660 # 中心管理 CMS 服务器端口
amsPort = 7662 # 报警监听 AMS 服务器IP
ssPort = 6011 # 存储 SS 服务器IP
logLevel = 0 # 1-错误日志,2-调试日志,3-信息日志。
ssUserName = "tfcdznd" # 好像没有作用
ssUserPws = "12345678" # 好像没有作用
# 数据库连接
dbConn = None
# Redis连接 和 图片缓存 key 名称
R = None
picHashKey = "pic"
# 控制程序退出
proc_exit = False
dic_dev = {} # 登录句柄 -> 设备名称
dic_serial = {} # 登录句柄 -> 设备序列号
dev_serial_map = {} # 登录句柄 -> 设备名称序列号对应字典
amsDLL = None # 监听服务句柄,提供给cms.deviceRegisterCB 的 ENUM_EHOME50_DEV_SESSIONKEY(dwType=4)时对设备设置sessionKey
# 修改全局
class cmsClass(object):
'''注册服务模块功能'''
def __init__(self):
global publicIP, listenIP, cmsPort, amsPort, ssPort, logLevel
self.publicIP = publicIP
self.listenIP = listenIP
self.cmsPort = cmsPort
self.amsPort = amsPort
self.ssPort = ssPort
self.logLevel = logLevel
self.sysStr = platform.system() # 获取操作系统
self.cmsDLL = self.loadDLL() # 初始化sdk
self.cmsHandle = -1 # 监听句柄
self.fnDeviceRegisterCB = DEVICE_REGISTER_CB(self.deviceRegisterCB)
def loadDLL(self):
'''
根据操作系统加载sdk动态库
:return:
'''
# 根据系统加载依赖库
if self.sysStr != "Windows":
basePath = os.getcwd().encode()
strPath = basePath + b'/lib/libHCISUPCMS.so'
# print("载入库:",strPath.decode())
cmsDLL = ctypes.CDLL(strPath.decode('utf-8'))
libeay = basePath + b'/lib/libcrypto.so'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
ssleay = basePath + b'/lib/libssl.so'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
strPathCom = basePath + b'/lib/HCAapSDKCom'
strPathCom_p = ctypes.cast(ctypes.c_char_p(strPathCom), POINTER(c_void_p))
sqlite3 = basePath + b'/lib/libsqlite3.so'
sqlite3_p = ctypes.cast(ctypes.c_char_p(sqlite3), POINTER(c_void_p))
cmsDLL.NET_ECMS_SetSDKInitCfg(0, libeay32_p)
cmsDLL.NET_ECMS_SetSDKInitCfg(1, ssleay32_p)
cmsDLL.NET_ECMS_Init()
cmsDLL.NET_ECMS_SetSDKInitCfg(5, strPathCom_p)
# cmsDLL.NET_ECMS_SetSDKInitCfg(6, sqlite3_p)
cmsDLL.NET_ECMS_SetLogToFile(self.logLevel, basePath + b'/EHomeSDKLog/', False)
else:
basePath = os.getcwd().encode("gbk")
strPath = basePath + b'\\lib\\HCISUPCMS.dll'
# print("载入库:",strPath.decode('utf-8'))
cmsDLL = ctypes.CDLL(strPath.decode('utf-8'))
libeay = basePath + b'\\lib\\libeay32.dll'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
ssleay = basePath + b'\\lib\\ssleay32.dll'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
strPathCom = basePath + b'\\lib\\HCAapSDKCom'
strPathCom_p = ctypes.cast(ctypes.c_char_p(strPathCom), POINTER(c_void_p))
cmsDLL.NET_ECMS_SetSDKInitCfg(0, libeay32_p)
cmsDLL.NET_ECMS_SetSDKInitCfg(1, ssleay32_p)
cmsDLL.NET_ECMS_Init()
cmsDLL.NET_ECMS_SetSDKInitCfg(5, strPathCom_p)
cmsDLL.NET_ECMS_SetLogToFile(self.logLevel, basePath + b'\\EHomeSDKLog\\', False)
return cmsDLL
def startCmsListen(self):
# 设置监听模式
# listenMode = NET_EHOME_REGISTER_LISTEN_MODE()
# listenMode.dwSize = sizeof(listenMode)
# listenMode.dwRegisterListenMode = 2 # 监听TCP
# res = self.cmsDLL.NET_ECMS_SetSDKLocalCfg(REGISTER_LISTEN_MODE, ctypes.byref(listenMode))
# if res is False:
# print('NET_ECMS_SetSDKLocalCfg failed, error code: ', self.cmsDLL.NET_ECMS_GetLastError())
struCMSListenPara = NET_EHOME_CMS_LISTEN_PARAM() # 监听参数结构
ip = ctypes.create_string_buffer(self.listenIP.encode("utf-8"))
ctypes.memmove(struCMSListenPara.struAddress.szIP, ip, ctypes.sizeof(ip))
struCMSListenPara.struAddress.wPort = self.cmsPort
struCMSListenPara.fnCB = self.fnDeviceRegisterCB
self.cmsHandle = self.cmsDLL.NET_ECMS_StartListen(ctypes.byref(struCMSListenPara))
if self.cmsHandle < 0:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "注册服务启动失败,error code: : ",
self.cmsDLL.NET_ECMS_GetLastError())
else:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "注册服务启动成功: ",
str(struCMSListenPara.struAddress.szIP, encoding="utf-8").rstrip('\x00') + ':' +
str(struCMSListenPara.struAddress.wPort))
def stopCmsListen(self):
if (self.cmsHandle > -1):
if (self.cmsDLL.NET_ECMS_StopListen(self.cmsHandle) is False):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "注册服务停止失败,error code: : ",
self.cmsDLL.NET_ECMS_GetLastError())
# print('NET_ECMS_StopListen failed, error code: ', self.cmsDLL.NET_ECMS_GetLastError())
self.cmsDLL.NET_ECMS_Fini()
def deviceRegisterCB(self, lUserID, dwDataType, pOutBuffer, dwOutLen, pInBuffer, dwInLen, pUser):
global ssUserName, ssUserPws, dic_dev, dic_serial, amsDLL
# 设备上线回调 dwDataType 顺序: 3 -> 4 -> 5 -> 0
try:
# 设备上线回调 dwDataType:0
if dwDataType == EHOME_REGISTER_TYPE.ENUM_DEV_ON.value:
ams_address = ctypes.create_string_buffer(self.publicIP.encode())
ss_address = ctypes.create_string_buffer(self.publicIP.encode())
byClouldAccessKey = ctypes.create_string_buffer(ssUserName.encode())
byClouldSecretKey = ctypes.create_string_buffer(ssUserPws.encode())
pDevInfo = NET_EHOME_DEV_REG_INFO_V12()
ctypes.memmove(pointer(pDevInfo), pOutBuffer, sizeof(pDevInfo))
strEhomeServerInfo = NET_EHOME_SERVER_INFO_V50()
# 设置报警服务器地址、端口、类型
strEhomeServerInfo.dwAlarmServerType = 2 # MQTT
memmove(strEhomeServerInfo.struTCPAlarmSever.szIP, ams_address, sizeof(ams_address))
memmove(strEhomeServerInfo.struUDPAlarmSever.szIP, ams_address, sizeof(ams_address))
strEhomeServerInfo.struTCPAlarmSever.wPort = self.amsPort
strEhomeServerInfo.struUDPAlarmSever.wPort = self.amsPort
memmove(strEhomeServerInfo.byClouldAccessKey, byClouldAccessKey, sizeof(byClouldAccessKey))
memmove(strEhomeServerInfo.byClouldSecretKey, byClouldSecretKey, sizeof(byClouldSecretKey))
# 设置图片存储服务器地址、端口、类型
memmove(strEhomeServerInfo.struPictureSever.szIP, ss_address, sizeof(ss_address))
strEhomeServerInfo.struPictureSever.wPort = self.ssPort
strEhomeServerInfo.dwPicServerType = 2 # 图片服务器类型,1-VRB图片服务器,0-Tomcat图片服务,2-云存储, 3-KMS, 4-ISUP5.0
# strEhomeServerInfo.byClouldHttps = 0 # 云存储HTTPS使能 1-HTTPS 0-HTTP
info_size = sizeof(strEhomeServerInfo)
memmove(pInBuffer, pointer(strEhomeServerInfo), info_size)
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")," ","设备注册(dwDataType=3),设备地址: ", clientAddr, ",固件版本:",fireware)
# 设备名称(id)
devName = str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8")
# 设备地址
devAddr = str(ctypes.string_at(pDevInfo.struRegInfo.struDevAdd.szIP), "utf-8") + ":" + str(
pDevInfo.struRegInfo.struDevAdd.wPort)
# 固件版本
devFw = str(ctypes.string_at(pDevInfo.struRegInfo.byFirmwareVersion), "utf-8")
# 设备序列号
devSerial = str(ctypes.string_at(pDevInfo.struRegInfo.sDeviceSerial), "utf-8")
# 放入设备字典
dic_dev[lUserID] = devName
dic_serial[lUserID] = devSerial
# 设备名称和序列号对应
dev_serial_map[devSerial] = devName
# 更新设备在线状态
self.updateDeviceStatus(1, devName, devAddr, devFw, str(lUserID))
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "设备上线(dwDataType=0),设备名称: ", devName,
",设备地址:" + devAddr + ",固件版本:" + devFw + ",登录句柄:", str(lUserID))
# 透传什么
# print("透传 GET /ISAPI/System/PictureServer?format=json")
# lParam = NET_EHOME_PTXML_PARAM()
# url = c_char_p(b"GET /ISAPI/System/PictureServer?format=json")
# lParam.pRequestUrl = ctypes.cast(url, c_void_p)
# lParam.dwRequestUrlLen = sizeof(url)
# if self.cmsDLL.NET_ECMS_ISAPIPassThrough(lUserID, lParam) is False:
# print("透传失败,code:", self.cmsDLL.NET_ECMS_GetLastError())
# res = ctypes.string_at(c_void_p(lParam.pOutBuffer), lParam.dwReturnedXMLLen).decode("utf-8")
# print("透传返回数据大小:", str(lParam.dwReturnedXMLLen), ",结果:", res)
return True
# 设备下线回调 dwDataType:1
elif dwDataType == EHOME_REGISTER_TYPE.ENUM_DEV_OFF.value:
# devName = str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8")
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "设备下线(dwDataType=1),登录句柄: ",
str(lUserID))
# self.updateDeviceStatus(0, devName, "", "", str(lUserID))
if self.cmsDLL.NET_ECMS_ForceLogout(lUserID) is False:
print("强制注销失败,code:", self.cmsDLL.NET_ECMS_GetLastError())
return False
del dic_dev[lUserID]
del dic_serial[lUserID]
return True
# Ehome5.0 设备认证回调 dwDataType:3
elif dwDataType == EHOME_REGISTER_TYPE.ENUM_EHOME50_DEV_AUTH.value:
# pOutBuffer 是 NET_EHOME_DEV_REG_INFO
# pInBuffer 是 EHomeKey
pDevInfo = NET_EHOME_DEV_REG_INFO_V12()
ctypes.memmove(ctypes.pointer(pDevInfo), pOutBuffer, ctypes.sizeof(pDevInfo))
# 设备名称,demo的写法:
strDeviceID = str(pDevInfo.struRegInfo.byDeviceID, encoding="utf-8").rstrip('\x00')
# 也可以为: str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8")
# print(f"设备id为:{ctypes.string_at(pDevInfo.struRegInfo.byDeviceID)}")
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "设备注册(dwDataType=3),设备名称: ",
strDeviceID)
# pInBuffer 赋值 ehomeKey,设备上配置: 平台配置 -> 远程主机连接 -> ISUP -> 登录秘钥
ISUPKey = "hik12345"
b = ctypes.create_string_buffer(ISUPKey.encode())
ctypes.memmove(pInBuffer, b, ctypes.sizeof(b))
return True
# # Ehome5.0 设备Sessionkey回调 dwDataType:4
elif dwDataType == EHOME_REGISTER_TYPE.ENUM_EHOME50_DEV_SESSIONKEY.value:
pDevInfo = ctypes.cast(pOutBuffer, LPNET_EHOME_DEV_REG_INFO_V12).contents
struSessionKey = NET_EHOME_DEV_SESSIONKEY()
ctypes.memmove(struSessionKey.sDeviceID, pDevInfo.struRegInfo.byDeviceID,
ctypes.sizeof(pDevInfo.struRegInfo.byDeviceID))
ctypes.memmove(struSessionKey.sSessionKey, pDevInfo.struRegInfo.bySessionKey,
ctypes.sizeof(pDevInfo.struRegInfo.bySessionKey))
devName = str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8")
sesskey = str(ctypes.string_at(pDevInfo.struRegInfo.bySessionKey), "utf-8")
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "设备登录(dwDataType=4),设备名称: ", devName,
",sessionKey:", sesskey)
if self.cmsDLL.NET_ECMS_SetDeviceSessionKey(ctypes.byref(struSessionKey)) is False:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", '设置设备sessionKey失败,err_code:',
self.cmsDLL.NET_ECMS_GetLastError())
if amsDLL.NET_EALARM_SetDeviceSessionKey(ctypes.byref(struSessionKey)) is False:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", '设置设备sessionKey失败,err_code:',
self.amsDLL.NET_EALARM_GetLastError())
return True
# Ehome5.0 设备重定向请求回调 dwDataType:5
elif dwDataType == EHOME_REGISTER_TYPE.ENUM_EHOME50_DEV_DAS_REQ.value:
pDevInfo = ctypes.cast(pOutBuffer, LPNET_EHOME_DEV_REG_INFO_V12).contents
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "设备重定向(dwDataType=5),设备名称: ",
str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8"))
b = {"Type": "DAS",
"DasInfo": {"Address": self.publicIP,
"Domain": "http://192.168.1.8",
"ServerID": "das_" + self.publicIP + "_" + str(self.cmsPort),
"Port": self.cmsPort,
"UdpPort": self.cmsPort
}}
desinfo = json.dumps(b)
b = ctypes.create_string_buffer(desinfo.encode())
ctypes.memmove(pInBuffer, b, ctypes.sizeof(b))
return True
# Ehome5.0 校验密码失败 dwDataType:9
elif dwDataType == EHOME_REGISTER_TYPE.ENUM_DEV_DAS_EHOMEKEY_ERROR.value:
pDevInfo = ctypes.cast(pOutBuffer, LPNET_EHOME_DEV_REG_INFO_V12).contents
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "强制注销设备(dwDataType=9),设备名称: ",
str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8"), ",userid:", str(lUserID))
if (self.cmsDLL.NET_ECMS_ForceLogout(lUserID) is False):
print("强制注销失败,code:", self.cmsDLL.NET_ECMS_GetLastError())
return False
del dic_dev[lUserID]
del dic_serial[lUserID]
return True
else:
pDevInfo = ctypes.cast(pOutBuffer, LPNET_EHOME_DEV_REG_INFO_V12).contents
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "未知设备注册回调类型(dwDataType=",
str(dwDataType), "),设备名称: ", str(ctypes.string_at(pDevInfo.struRegInfo.byDeviceID), "utf-8"))
# print('未知注册类型,dwDataType: ', dwDataType)
except Exception as e:
print('error', e)
return True
def updateDeviceStatus(self, online, devName, devAddr, devFw, userid):
# 写入数据库
return True
class amsClass(object):
# 报警监听服务模块
def __init__(self):
global listenIP, amsPort, logLevel, amsDLL
self.listenIP = listenIP
self.amsPort = amsPort
self.logLevel = logLevel
self.sysStr = platform.system() # 获取操作系统
self.amsDLL = self.loadDLL() # 初始化sdk
self.iUserID = set([]) # 登录句柄
self.amsHandle = -1 # 监听句柄
self.fnMsgCb = EHomeMsgCallBack(self.deviceAlarmCB)
amsDLL = self.amsDLL
def loadDLL(self):
# 根据系统加载依赖库
if self.sysStr != "Windows":
basePath = os.getcwd().encode('utf-8')
strPath = basePath + b'/lib/libHCISUPAlarm.so'
amsDLL = ctypes.CDLL(strPath.decode('utf-8'))
# print("载入库:",strPath.decode('utf-8'))
libeay = basePath + b'/lib/libcrypto.so'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKInitCfg(0, libeay32_p)
ssleay = basePath + b'/lib/libssl.so'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKInitCfg(1, ssleay32_p)
if False == amsDLL.NET_EALARM_Init():
print("NET_ECMS_Init failed!")
return False
strPathCom = basePath + b'/lib/HCAapSDKCom'
strPathCom_p = ctypes.cast(ctypes.c_char_p(strPathCom), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKLocalCfg(5, strPathCom_p)
amsDLL.NET_EALARM_SetLogToFile(self.logLevel, basePath + b'/EHomeSDKLog/', False)
else:
basePath = os.getcwd().encode('gbk')
strPath = basePath + b'\\lib\\HCISUPAlarm.dll'
amsDLL = ctypes.CDLL(strPath.decode('utf-8'))
# print("载入库:",strPath.decode('utf-8'))
libeay = basePath + b'\\lib\\libeay32.dll'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKInitCfg(0, libeay32_p)
ssleay = basePath + b'\\lib\\ssleay32.dll'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKInitCfg(1, ssleay32_p)
if False == amsDLL.NET_EALARM_Init():
print("NET_ECMS_Init failed!")
return False
strPathCom = basePath + b'\\lib\\HCAapSDKCom'
strPathCom_p = ctypes.cast(ctypes.c_char_p(strPathCom), POINTER(c_void_p))
amsDLL.NET_EALARM_SetSDKLocalCfg(5, strPathCom_p)
amsDLL.NET_EALARM_SetLogToFile(self.logLevel, basePath + b'\\EHomeSDKLog\\', False)
return amsDLL
def startAmsListen(self):
# 设置监听模式
struAMSListenPara = NET_EHOME_ALARM_LISTEN_PARAM()
# listenMode.dwSize = sizeof(listenMode)
addr = ctypes.create_string_buffer(self.listenIP.encode("utf-8"))
ctypes.memmove(struAMSListenPara.struAddress.szIP, addr, ctypes.sizeof(addr))
struAMSListenPara.struAddress.wPort = self.amsPort
struAMSListenPara.fnMsgCb = self.fnMsgCb
struAMSListenPara.byProtocolType = 2 # 协议类型:0-TCP,1-UDP,2-MQTT。注:ISUP5.0接入,AMS报警服务器类型需要设置成2-MQTT,SS存储服务器类型需要设置成2-云存储,SS存储服务器端口建议使用6011
struAMSListenPara.byUseCmsPort = 0 # 复用cms端口,0-不复用,非 0-复用。
self.amsHandle = self.amsDLL.NET_EALARM_StartListen(ctypes.byref(struAMSListenPara))
if self.amsHandle < 0:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务启动失败,error code: : ",
self.amsDLL.NET_EALARM_GetLastError())
return False
else:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务启动成功: ",
str(struAMSListenPara.struAddress.szIP, encoding="utf-8").rstrip('\x00') + ':' +
str(struAMSListenPara.struAddress.wPort))
return True
def deviceAlarmCB(self, iHandle, pAlarmMsg, pUser):
global dic_dev, dic_serial, dev_serial_map
Alarm_struct = ctypes.cast(pAlarmMsg, LPNET_EHOME_ALARM_MSG).contents
dwType = Alarm_struct.dwAlarmType
log = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " 呼救告警:dwAlarmType = " + str(
Alarm_struct.dwAlarmType)
with open(os.getcwd() + "/alarm.log", 'a') as f:
f.write(log + "\n")
print(log)
# try:
# if Alarm_struct.dwAlarmType == self.amsDLL.EHOME_ALARM_GPS: # GPS 报警
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")," ","设备位置信息:",
# "Lng:", str(Alarm_struct.pAlarmInfo.dwLongitude),
# "Lat:", str(Alarm_struct.pAlarmInfo.dwLatitude),
# "设备名称:", Alarm_struct.pAlarmInfo.byDeviceID.decode('utf-8'),
# "采样时间:", Alarm_struct.pAlarmInfo.bySampleTime.decode('utf-8'),
# )
# elif Alarm_struct.dwAlarmType == self.amsDLL.EHOME_ALARM_NOTICE_PICURL: # 图片报警
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")," ","设备图片报警:",
# "报警时间:", Alarm_struct.pAlarmInfo.byAlarmTime.decode('utf-8'),
# "设备名称:", Alarm_struct.pAlarmInfo.byDeviceID.decode('utf-8'),
# "图片类型:", str(Alarm_struct.pAlarmInfo.wPicType), #0-定时抓图上传中心,1-报警抓图上传中心,2-手动抓图上传中心,3-下载图片,4-外部设备触发抓图上传中心。
# "图片URL:", Alarm_struct.pAlarmInfo.byPicUrl.decode('utf-8'),
# "是否重传:", Alarm_struct.pAlarmInfo.byRetransFlag.decode(),
# )
# else:
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")," ","其他未处理的报警:", str(pAlarmMsg.dwAlarmType))
# except Exception as e:
# print('error', e)
if (Alarm_struct.dwXmlBufLen != 0):
# 输出参数
Bbytes_OutBuffer = ctypes.string_at(Alarm_struct.pXmlBuf, Alarm_struct.dwXmlBufLen)
strOutBuffer = str(Bbytes_OutBuffer, 'UTF-8')
strOutBuffer = strOutBuffer.rstrip('\x00')
print('# 告警回调输出参数:', strOutBuffer)
if dwType == 13: # 呼救报警
if (Alarm_struct.dwAlarmInfoLen > 0):
strISAPIData = ctypes.cast(Alarm_struct.pAlarmInfo, LPNET_EHOME_ALARM_ISAPI_INFO).contents
if (strISAPIData.dwAlarmDataLen != 0): # Json或者XML数据
m_strISAPIData = ctypes.string_at(strISAPIData.pAlarmData, strISAPIData.dwAlarmDataLen)
strOutBuffer = str(m_strISAPIData, 'UTF-8')
strOutBuffer = strOutBuffer.rstrip('\x00')
# print(strOutBuffer + '\n')
# todo 获取设备名称和序列号
# 设备序列号
devSerial = str(ctypes.string_at(Alarm_struct.sDeviceSerial), "utf-8")
print("报警回调设备序列号:",devSerial)
# self.report(dic_dev[iHandle], dic_serial[iHandle], strOutBuffer)
self.report(dev_serial_map[devSerial], devSerial, strOutBuffer)
elif dwType == 4: # GPS 报警.
struGpsInfo = ctypes.cast(Alarm_struct.pAlarmInfo, POINTER(NET_EHOME_GPS_INFO)).contents
log = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " GPS上报,lng=" + str(
struGpsInfo.dwLongitude) + ",lat= " + str(struGpsInfo.dwLatitude)
print(log)
with open(os.getcwd() + "/gps.log", 'a') as f:
f.write(log + "\n")
return True
def stopAmsListen(self):
if (self.amsHandle > -1):
if (self.amsDLL.NET_EALARM_StopListen(self.amsHandle) is False):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务停止失败,error code: : ",
self.amsDLL.NET_EALARM_GetLastError())
self.amsDLL.NET_EALARM_Fini()
def report(self, devname, devserial, jsonstr):
global dbConn
# json = {
# "ipAddress": "192.168.1.64",
# "portNo": 8000,
# "protocol": "HTTP",
# "macAddress": "ffffffe8:ffffffa0:ffffffed:24:1",
# "channelID": 1,
# "dateTime": "2024-09-19T11:02:19+08:00",
# "activePostCount": 1,
# "eventType": "cityManagement",
# "eventState": "active",
# "eventDescription": "City Management",
# "channelName": "IPCamera 01",
# "deviceID": "device-test",
# "isDataRetransmission": true,
# "Result": [
# {
# "ruleID": 1,
# "subEventType": "illegalHeap",
# "Target": [
# {
# "targetID": 1,
# "Rect":
# {
# "height": 0.1315,
# "width": 0.0687,
# "x": 0.7273,
# "y": 0.7719
# }
# }],
# "backgroundImageURL": "http://192.168.1.8:6120/pic?892FE22F3AE3E90DD18017D540339F2A",
# "DeviceGPS":
# {
# "divisionEW": "E",
# "longitude": 37671667,
# "divisionNS": "N",
# "latitude": 10845240,
# "direction": 17492,
# "speed": 0
# }
# }]
# }
# 要要的字段:
# json.dateTime = 2024-09-19T11:02:19+08:00 ,报警触发时间
# json.activePostCount = 1 ,同一个报警已经上传的次数, desc:事件触发频次
# json.eventType = cityManagement, 事件类型
# eventState = active, 事件状态 [active#有效事件,inactive#无效事件], desc:针对持续性事件-->active
# eventDescription = City Management, 事件描述
# channelName = 1, 通道名称, range:[1,64]
# deviceID = device-test, 设备ID, desc:EHome报警中需返回,例如test0123(EHome2.0、EHome4.0、ISUP5.0)-->12345
# isDataRetransmission = true, 是否为重传数据
#
try:
res = json.loads(jsonstr)
except Exception as e:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务alarmCB,数据解析失败。设备:", devname,
",数据:", jsonstr)
return False
# 重传数据不处理,生产环境要打开
# if res['isDataRetransmission'] == True:
# return True
saveData = {'occurtime': 0, 'report_at': 0, 'devname': '', 'devserial': '', "channel": "", 'eventType': '',
'lng': 0, 'lat': 0, 'pic': '', 'json': ''}
# 处理数据
if "Result" in res is False:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务alarmCB,数据缺少Result。设备:", devname,
",数据:", jsonstr)
return False
if len(res['Result']) != 1:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务alarmCB,Result元素!=于1。设备:", devname,
",数据:", jsonstr)
return False
if 'backgroundImageURL' in res['Result'][0] is False:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "报警服务alarmCB,Result没有图片。设备:", devname,
",数据:", jsonstr)
return False
# 事件发生时间
saveData["occurtime"] = int(time.mktime(time.strptime(res['dateTime'], "%Y-%m-%dT%H:%M:%S+08:00")))
# 上报时间
saveData['report_at'] = int(time.time())
# 设备id - 其实是名称
saveData["devname"] = res['deviceID']
# 设备序列号
saveData["devserial"] = devserial
# 通道
saveData["channel"] = res['channelID']
# 事件类型: illegalHeap = 乱堆乱放?
saveData["eventType"] = res['Result'][0]['subEventType']
# 图片
key = res['Result'][0]['backgroundImageURL'].split('pic?')[-1]
path = R.hget(picHashKey, key)
if path is None:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "从redis取图片不存在,设备:", devname, ",图片:",
res['Result'][0]['backgroundImageURL'])
return False
R.hdel(picHashKey, key)
# todo 这里需要对存储的文件名称进行修改 path需要修改成类似 设备名称-时-分-秒-微妙.jpg 格式
# 图片已经存在了,只是将名称修改下就可以了
now = datetime.now()
date = now.strftime("%Y-%m-%d-%H-%M-%S").split("-")
strPath = '/' + date[0] + '/' + date[1] + '/' + date[2]
os.makedirs(self.picPath + strPath, exist_ok=True)
oldPicName = self.picPath + strPath + "/" + key
fileName = devname + '-' + date[3] + '-' + date[4] + '-' + date[5] + "-" + str(now.microsecond) + ".jpg"
picPath = strPath + "/" + fileName
newPicName = self.picPath + picPath
os.rename(oldPicName, newPicName)
saveData["pic"] = picPath
# 坐标
saveData["lng"] = round(int(res['Result'][0]['DeviceGPS']['longitude']) / 360000, 6)
saveData["lat"] = round(int(res['Result'][0]['DeviceGPS']['latitude']) / 360000, 6)
# 原始报文
saveData['json'] = jsonstr
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")," ","报警服务alarmCB,设备:", devname, ",待存数据:",saveData)
sql = "insert into hk_report(`occur_at`, `report_at`, `name`, `serial`, `channel`, `eventType`, `pic`, `lng`, `lat`, `json`) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
# print(sql)
cursor = dbConn.cursor()
try:
cursor.execute(sql, (
saveData["occurtime"],
saveData["report_at"],
saveData["devname"],
saveData["devserial"],
saveData["channel"],
saveData["eventType"],
saveData["pic"],
saveData["lng"],
saveData["lat"],
saveData["json"],
))
dbConn.commit()
cursor.close()
except Exception as e:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "数据入库失败。设备:", devname, ",数据:", saveData,
",错误:", e)
return False
return True
class ssClass(object):
# 存储服务模块
def __init__(self):
global publicIP, listenIP, ssPort, logLevel
self.publicIP = publicIP
self.listenIP = listenIP
self.ssPort = ssPort
self.logLevel = logLevel
self.sysStr = platform.system() # 获取操作系统
self.ssDLL = self.loadDLL() # 初始化sdk
# self.iUserID = set([]) # 登录句柄
self.ssHandle = -1 # 监听句柄
self.fnSMsgCb = EHomeSSMsgCallBack(self.ssMsgCB)
self.fnSStorageCb = EHomeSSStorageCallBack(self.ssStorageCB)
self.fnSSRWCb = EHomeSSRWCallBack(self.ssRWCB)
self.fnSSRWCbEx = EHomeSSRWCallBackEx(self.ssRWCbEx)
def loadDLL(self):
global winPath, luxPath
# 根据系统加载依赖库
if self.sysStr != "Windows":
basePath = os.getcwd().encode('utf-8')
strPath = basePath + b'/lib/libHCISUPSS.so'
# print("载入库:",strPath.decode('utf-8'))
self.picPath = luxPath
ssDLL = ctypes.CDLL(strPath.decode('utf-8'))
libeay = basePath + b'/lib/libcrypto.so'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
ssDLL.NET_ESS_SetSDKInitCfg(4, libeay32_p)
ssleay = basePath + b'/lib/libssl.so'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
ssDLL.NET_ESS_SetSDKInitCfg(5, ssleay32_p)
sqlite3 = basePath + b'/lib/libsqlite3.so'
sqlite3_p = ctypes.cast(ctypes.c_char_p(sqlite3), POINTER(c_void_p))
ssDLL.NET_ESS_SetSDKInitCfg(6, sqlite3_p)
ssDLL.NET_ESS_Init()
# 设置公网地址 - 如果存在端口映射(云服务器)
addr = NET_EHOME_IPADDRESS()
ip = ctypes.create_string_buffer(self.publicIP.encode())
ctypes.memmove(addr.szIP, ip, ctypes.sizeof(ip))
addr.wPort = self.ssPort
if False == ssDLL.NET_ESS_SetSDKInitCfg(3, ctypes.byref(addr)):
print("设置公网地址失败,error code: ", ssDLL.NET_ESS_GetLastError())
ssDLL.NET_ESS_SetLogToFile(self.logLevel, basePath + b'/EHomeSDKLog/', False)
else:
basePath = os.getcwd().encode('gbk')
strPath = basePath + b'\\lib\\HCISUPSS.dll'
# print("载入库:",strPath.decode('utf-8'))
self.picPath = winPath
ssDLL = ctypes.CDLL(strPath.decode('utf-8'))
libeay = basePath + b'\\lib\\libeay32.dll'
libeay32_p = ctypes.cast(ctypes.c_char_p(libeay), POINTER(c_void_p))
ssDLL.NET_ESS_SetSDKInitCfg(4, libeay32_p)
ssleay = basePath + b'\\lib\\ssleay32.dll'
ssleay32_p = ctypes.cast(ctypes.c_char_p(ssleay), POINTER(c_void_p))
ssDLL.NET_ESS_SetSDKInitCfg(5, ssleay32_p)
# sqlite3 = basePath + b'/lib/libsqlite3.sqlite3'
# sqlite3_p = ctypes.cast(ctypes.c_char_p(sqlite3), POINTER(c_void_p))
# ssDLL.NET_ESS_SetSDKInitCfg(6, sqlite3_p)
ssDLL.NET_ESS_Init()
# 设置监听的公网地址 - 如果存在端口映射(云服务器)
addr = NET_EHOME_IPADDRESS()
ip = ctypes.create_string_buffer(self.publicIP.encode("utf-8"))
ctypes.memmove(addr.szIP, ip, ctypes.sizeof(ip))
addr.wPort = self.ssPort
ssDLL.NET_ESS_SetSDKInitCfg(3, ctypes.byref(addr))
ssDLL.NET_ESS_SetLogToFile(self.logLevel, basePath + b'\\EHomeSDKLog\\', False)
return ssDLL
def startSSListen(self):
global ssUserName, ssUserPws
# 设置监听模式
struSSListenPara = NET_EHOME_SS_LISTEN_PARAM()
ip = ctypes.create_string_buffer(self.listenIP.encode("utf-8"))
ctypes.memmove(struSSListenPara.struAddress.szIP, ip, ctypes.sizeof(ip))
struSSListenPara.struAddress.wPort = self.ssPort
username = ctypes.create_string_buffer(ssUserName.encode("utf-8"))
passwd = ctypes.create_string_buffer(ssUserPws.encode("utf-8"))
ctypes.memmove(struSSListenPara.szKMS_UserName, username, ctypes.sizeof(username))
ctypes.memmove(struSSListenPara.szKMS_Password, passwd, ctypes.sizeof(passwd))
ctypes.memmove(struSSListenPara.szAccessKey, username, ctypes.sizeof(username))
ctypes.memmove(struSSListenPara.szSecretKey, passwd, ctypes.sizeof(passwd))
struSSListenPara.byHttps = 0
struSSListenPara.bySecurityMode = 1
struSSListenPara.fnSMsgCb = self.fnSMsgCb
struSSListenPara.fnSStorageCb = self.fnSStorageCb # 与 fnSSRWCb 互斥
struSSListenPara.fnSSRWCb = self.fnSSRWCb # 与 fnSStorageCb 互斥
# struSSListenPara.fnSSRWCbEx = self.fnSSRWCbEx
self.ssHandle = self.ssDLL.NET_ESS_StartListen(ctypes.byref(struSSListenPara))
if self.ssHandle < 0:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务启动失败,error code: : ",
self.ssDLL.NET_ESS_GetLastError())
return False
else:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务启动成功: ",
str(struSSListenPara.struAddress.szIP, encoding="utf-8").rstrip('\x00') + ':' +
str(struSSListenPara.struAddress.wPort))
return True
# 存储消息回调
def ssMsgCB(self, iHandle, enumType, pOutBuffer, dwOutLen, pInBuffer, dwInLen, pUser):
# print("ssMsgCB:", iHandle,enumType, pOutBuffer, dwOutLen, pInBuffer, dwInLen, pUser)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务ssMsgCB收到消息,iHandle=", str(iHandle),
",enumType=", str(enumType))
# print(pOutBuffer)
if enumType == 1: # 回调 Tomcat 服务器信息,详情参见结构体 NET_EHOME_SS_TOMCAT_MSG 。
print("ssMsgCB call:", str(enumType), ":", str(pOutBuffer, encoding="utf-8"))
elif enumType == 2: # 回调密钥管理服务器(KMS)的用户名和密码。
# pInBuffer = 1
ctypes.memmove(pInBuffer, ctypes.cast(1, ctypes.pointer(c_void_p)), 1)
elif enumType == 3: # 回调 5.0 版本 ISUP 设备的访问密钥。
ctypes.memmove(pInBuffer,
ctypes.create_string_buffer("5e998dbbafb44ca783099afcdead40fa7A3Vf7Fh".encode("utf-8")),
dwInLen)
return True
# 存储文件信息回调
def ssStorageCB(self, iHandle, pFileName, pFileBuf, dwFileLen, pFilePath, pUser):
global dic_dev
if (dwFileLen < 1):
return False
fileName = ctypes.string_at(pFileName).decode("utf-8").rstrip('\x00')
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务StorageCB回调,devName=", dic_dev[iHandle],
",dwFileLen=", str(dwFileLen), ",pFileName=", fileName)
# 保存图片
url = self.saveFile(dic_dev[iHandle], pFileBuf, dwFileLen, fileName)
# 把url 写入 pFilePath
# b_url = ctypes.create_string_buffer(url.encode("utf-8"))
# ctypes.memmove(pFilePath, b_url, ctypes.sizeof(b_url))
s = c_char_p(url.encode('utf-8'))
s_len = string_at(s).decode('UTF-8')
memmove(pFilePath, s, s_len.__sizeof__())
return True
def saveFile(self, devname, buff, size, oriname):
# 创建目录
now = datetime.now()
date = now.strftime("%Y-%m-%d-%H-%M-%S").split("-")
strPath = '/' + date[0] + '/' + date[1] + '/' + date[2]
os.makedirs(self.picPath + strPath, exist_ok=True)
# 创建文件名 读取 const char* *pFileName
# fileName = ctypes.string_at(ctypes.cast(pFilename, POINTER(c_char_p))).decode("utf-8")
# todo 修改存储的文件名称
# fileName = devname + '-' + date[3]+ '-' + date[4] + '-' + date[5] + "-" + str(now.microsecond) + ".jpg"
# 原始名称类似这样 "http://192.168.1.8:6120/pic?A98552BE59E8627BB4BCBBFE0EB38243", 只保留文件名
# 判断是否只是类似这样的文件名 A98552BE59E8627BB4BCBBFE0EB38243
orinameQuery = oriname.split("?")[0]
if orinameQuery == oriname:
# 说明没有带参数,直接使用
fileName = oriname
else:
fileName = oriname.split("?")[-1]
# 转换数据
bin_data = ctypes.cast(buff, POINTER(c_byte * size)).contents
bin_bytes = bytes(bin_data)
# 写入文件
with open(self.picPath + strPath + '/' + fileName, "wb") as f:
f.write(string_at(buff, size))
R.hset(picHashKey, oriname, strPath + "/" + fileName)
# return 什么name都没用,改不了 ssStorageCB pFilePath
return strPath[1:] + "/" + fileName
def ssRWCB(self, iHandle, byAct, pFileName, pFileBuf, dwFileLen, pFileUrl, pUser):
# print("ssRWCB:", iHandle, byAct, pFileName, pFileBuf, dwFileLen, pFileUrl, pUser)
print("ssRWCB call:", str(byAct))
return True
def ssRWCbEx(self, iHandle, pRwParam, pExStruct):
global dic_dev
# print("ssRWCbEx:", iHandle, pRwParam, pExStruct)
# print("ssRWCbEx call.")
struRwParam = NET_EHOME_SS_RW_PARAM()
ctypes.memmove(ctypes.byref(struRwParam), pRwParam, ctypes.sizeof(struRwParam))
fileSize = struRwParam.dwFileLen.contents.value
fileName = struRwParam.pFileName.decode("utf-8")
fileUrl = struRwParam.pFileUrl.decode("utf-8")
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务ssRWCbEx回调,dwFileLen:", fileSize,
",filename:", fileName, ",fileurl:", fileUrl)
if struRwParam.byAct == 0: # 写文件
if (fileSize < 1):
return False
# todo 无法获取到序列号或者设备名称
url = self.saveFile(dic_dev[iHandle], struRwParam.pFileBuf, fileSize)
# java demo 说:URL由平台自行生成赋值给pRwParam.pRetIndex, 示例demo写死了12345.jpg,第三方可以根据业务自己修改
struRwParam.byUseRetIndex = 0 # 是否使用上层设置的索引(pRetIndex):0-不使用,1-使用。
# c_url = ctypes.create_string_buffer(url.encode("utf-8"))
c_url = ctypes.create_string_buffer(url.encode("utf-8"))
ctypes.memmove(struRwParam.pRetIndex, byref(c_url), ctypes.sizeof(c_url))
return True
else:
return False
def stopSSListen(self):
if (self.ssHandle > -1):
if (self.ssDLL.NET_ESS_StopListen(self.ssHandle) is False):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "存储服务停止失败,error code: : ",
self.ssDLL.NET_ESS_GetLastError())
self.ssDLL.NET_ESS_Fini()
# 关闭各种资源,结束程序,
def quit(signum, frame):
global proc_exit
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "关闭服务 ...")
R.close()
proc_exit = True
if __name__ == '__main__':
# 初始化Redis连接
if sys.platform.startswith("linux"):
# R = redis.Redis(host="localhost", port=6379, password="zhoushanglin", db=2, decode_responses=True)
R = redis.Redis(host="localhost", port=6379, db=3, decode_responses=True)
else:
R = redis.Redis(host="localhost", port=6379, db=3, decode_responses=True)
# 初始化数据库
try:
dbConn = pymysql.connect(**DB_CONFIG)
except Exception as e:
raise RuntimeError("数据库连接失败")
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "启动服务 ...")
ams = amsClass()
ams.startAmsListen()
ss = ssClass()
ss.startSSListen()
cms = cmsClass()
cms.startCmsListen()
# 捕捉Ctrl+C 退出信号
signal.signal(signal.SIGINT, quit)
# while cms.iUserID < 0:
# print('等待设备注册上线! iUserID:', cms.iUserID)
# time.sleep(1)
while not proc_exit:
time.sleep(1)
for key in dic_dev:
cms.cmsDLL.NET_ECMS_ForceLogout(key)
cms.stopCmsListen()
ss.stopSSListen()
ams.stopAmsListen()
# 关闭数据库
dbConn.close()
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ", "服务停止")
sys.exit(0)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/caoyiqiao/hik.git
git@gitee.com:caoyiqiao/hik.git
caoyiqiao
hik
hik
master

搜索帮助