代码拉取完成,页面将自动刷新
import local_config as config
import requests
import datetime
import json
import os
import sys
import time
import logging
from logging.handlers import RotatingFileHandler
import pickledb
from zk import ZK, const
EMPLOYEE_NOT_FOUND_ERROR_MESSAGE = "No Employee found for the given employee field value."
device_punch_values_IN = getattr(config, 'device_punch_values_IN', [0,4])
device_punch_values_OUT = getattr(config, 'device_punch_values_OUT', [1,5])
# possible area of further developemt
# Real-time events - setup getting events pushed from the machine rather then polling.
#- this is documented as 'Real-time events' in the ZKProtocol manual.
# Notes:
# Status Keys in status.json
# - lift_off_timestamp
# - mission_accomplished_timestamp
# - <device_id>_pull_timestamp
# - <device_id>_push_timestamp
# - <shift_type>_sync_timestamp
def main():
"""Takes care of checking if it is time to pull data based on config,
then calling the relevent functions to pull data and push to EPRNext.
"""
try:
last_lift_off_timestamp = _safe_convert_date(status.get('lift_off_timestamp'), "%Y-%m-%d %H:%M:%S.%f")
if (last_lift_off_timestamp and last_lift_off_timestamp < datetime.datetime.now() - datetime.timedelta(minutes=config.PULL_FREQUENCY)) or not last_lift_off_timestamp:
status.set('lift_off_timestamp', str(datetime.datetime.now()))
info_logger.info("Cleared for lift off!")
for device in config.devices:
device_attendance_logs = None
info_logger.info("Processing Device: "+ device['device_id'])
dump_file = config.LOGS_DIRECTORY+'/'+device['ip'].replace('.', '_')+'_last_fetch_dump.json'
if os.path.exists(dump_file):
info_logger.error('Device Attendance Dump Found in Log Directory. This can mean the program crashed unexpectedly. Retrying with dumped data.')
with open(dump_file, 'r') as f:
file_contents = f.read()
if file_contents:
device_attendance_logs = list(map(lambda x: _apply_function_to_key(x, 'timestamp', datetime.datetime.fromtimestamp), json.loads(file_contents)))
try:
pull_process_and_push_data(device, device_attendance_logs)
status.set(f'{device["device_id"]}_push_timestamp', str(datetime.datetime.now()))
if os.path.exists(dump_file):
os.remove(dump_file)
info_logger.info("Successfully processed Device: "+ device['device_id'])
except:
error_logger.exception('exception when calling pull_process_and_push_data function for device'+json.dumps(device, default=str))
if hasattr(config,'shift_type_device_mapping'):
update_shift_last_sync_timestamp(config.shift_type_device_mapping)
status.set('mission_accomplished_timestamp', str(datetime.datetime.now()))
info_logger.info("Mission Accomplished!")
except:
error_logger.exception('exception has occurred in the main function...')
def pull_process_and_push_data(device, device_attendance_logs=None):
""" Takes a single device config as param and pulls data from that device.
params:
device: a single device config object from the local_config file
device_attendance_logs: fetching from device is skipped if this param is passed. used to restart failed fetches from previous runs.
"""
attendance_success_log_file = '_'.join(["attendance_success_log", device['device_id']])
attendance_failed_log_file = '_'.join(["attendance_failed_log", device['device_id']])
attendance_success_logger = setup_logger(attendance_success_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log')
attendance_failed_logger = setup_logger(attendance_failed_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_failed_log_file])+'.log')
if not device_attendance_logs:
device_attendance_logs = get_all_attendance_from_device(device['ip'], device_id=device['device_id'], clear_from_device_on_fetch=device['clear_from_device_on_fetch'])
if not device_attendance_logs:
return
# for finding the last successfull push and restart from that point (or) from a set 'config.IMPORT_START_DATE' (whichever is later)
index_of_last = -1
last_line = get_last_line_from_file('/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log')
import_start_date = _safe_convert_date(config.IMPORT_START_DATE, "%Y%m%d")
if last_line or import_start_date:
last_user_id = None
last_timestamp = None
if last_line:
last_user_id, last_timestamp = last_line.split("\t")[4:6]
last_timestamp = datetime.datetime.fromtimestamp(float(last_timestamp))
if import_start_date:
if last_timestamp:
if last_timestamp < import_start_date:
last_timestamp = import_start_date
last_user_id = None
else:
last_timestamp = import_start_date
for i, x in enumerate(device_attendance_logs):
if last_user_id and last_timestamp:
if last_user_id == str(x['user_id']) and last_timestamp == x['timestamp']:
index_of_last = i
break
elif last_timestamp:
if x['timestamp'] >= last_timestamp:
index_of_last = i
break
for device_attendance_log in device_attendance_logs[index_of_last+1:]:
punch_direction = device['punch_direction']
if punch_direction == 'AUTO':
if device_attendance_log['punch'] in device_punch_values_OUT:
punch_direction = 'OUT'
elif device_attendance_log['punch'] in device_punch_values_IN:
punch_direction = 'IN'
else:
punch_direction = None
erpnext_status_code, erpnext_message = send_to_erpnext(device_attendance_log['user_id'], device_attendance_log['timestamp'], device['device_id'], punch_direction)
if erpnext_status_code == 200:
attendance_success_logger.info("\t".join([erpnext_message, str(device_attendance_log['uid']),
str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()),
str(device_attendance_log['punch']), str(device_attendance_log['status']),
json.dumps(device_attendance_log, default=str)]))
else:
attendance_failed_logger.error("\t".join([str(erpnext_status_code), str(device_attendance_log['uid']),
str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()),
str(device_attendance_log['punch']), str(device_attendance_log['status']),
json.dumps(device_attendance_log, default=str)]))
if EMPLOYEE_NOT_FOUND_ERROR_MESSAGE not in erpnext_message:
raise Exception('API Call to ERPNext Failed.')
def get_all_attendance_from_device(ip, port=4370, timeout=30, device_id=None, clear_from_device_on_fetch=False):
# Sample Attendance Logs [{'punch': 255, 'user_id': '22', 'uid': 12349, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 29)},{'punch': 255, 'user_id': '7', 'uid': 7, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 36)}]
zk = ZK(ip, port=port, timeout=timeout)
conn = None
attendances = []
try:
conn = zk.connect()
x = conn.disable_device()
# device is disabled when fetching data
info_logger.info("\t".join((ip, "Device Disable Attempted. Result:", str(x))))
attendances = conn.get_attendance()
info_logger.info("\t".join((ip, "Attendances Fetched:", str(len(attendances)))))
status.set(f'{device_id}_push_timestamp', None)
status.set(f'{device_id}_pull_timestamp', str(datetime.datetime.now()))
if len(attendances):
# keeping a backup before clearing data incase the programs fails.
# if everything goes well then this file is removed automatically at the end.
dump_file_name = config.LOGS_DIRECTORY+'/' + device_id + "_" + ip.replace('.', '_') + '_last_fetch_dump.json'
with open(dump_file_name, 'w+') as f:
f.write(json.dumps(list(map(lambda x: x.__dict__, attendances)), default=datetime.datetime.timestamp))
if clear_from_device_on_fetch:
x = conn.clear_attendance()
info_logger.info("\t".join((ip, "Attendance Clear Attempted. Result:", str(x))))
x = conn.enable_device()
info_logger.info("\t".join((ip, "Device Enable Attempted. Result:", str(x))))
except:
error_logger.exception(str(ip)+' exception when fetching from device...')
raise Exception('Device fetch failed.')
finally:
if conn:
conn.disconnect()
return list(map(lambda x: x.__dict__, attendances))
def send_to_erpnext(employee_field_value, timestamp, device_id=None, log_type=None):
"""
Example: send_to_erpnext('12349',datetime.datetime.now(),'HO1','IN')
"""
url = config.ERPNEXT_URL + "/api/method/erpnext.hr.doctype.employee_checkin.employee_checkin.add_log_based_on_employee_field"
headers = {
'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET,
'Accept': 'application/json'
}
data = {
'employee_field_value' : employee_field_value,
'timestamp' : timestamp.__str__(),
'device_id' : device_id,
'log_type' : log_type
}
response = requests.request("POST", url, headers=headers, data=data)
if response.status_code == 200:
return 200, json.loads(response._content)['message']['name']
else:
error_str = _safe_get_error_str(response)
if EMPLOYEE_NOT_FOUND_ERROR_MESSAGE in error_str:
error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str]))
# TODO: send email?
else:
error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str]))
return response.status_code, error_str
def update_shift_last_sync_timestamp(shift_type_device_mapping):
"""
### algo for updating the sync_current_timestamp
- get a list of devices to check
- check if all the devices have a non 'None' push_timestamp
- check if the earliest of the pull timestamp is greater than sync_current_timestamp for each shift name
- then update this min of pull timestamp to the shift
"""
for shift_type_device_map in shift_type_device_mapping:
all_devices_pushed = True
pull_timestamp_array = []
for device_id in shift_type_device_map['related_device_id']:
if not status.get(f'{device_id}_push_timestamp'):
all_devices_pushed = False
break
pull_timestamp_array.append(_safe_convert_date(status.get(f'{device_id}_pull_timestamp'), "%Y-%m-%d %H:%M:%S.%f"))
if all_devices_pushed:
min_pull_timestamp = min(pull_timestamp_array)
if isinstance(shift_type_device_map['shift_type_name'], str): # for backward compatibility of config file
shift_type_device_map['shift_type_name'] = [shift_type_device_map['shift_type_name']]
for shift in shift_type_device_map['shift_type_name']:
try:
sync_current_timestamp = _safe_convert_date(status.get(f'{shift}_sync_timestamp'), "%Y-%m-%d %H:%M:%S.%f")
if (sync_current_timestamp and min_pull_timestamp > sync_current_timestamp) or (min_pull_timestamp and not sync_current_timestamp):
response_code = send_shift_sync_to_erpnext(shift, min_pull_timestamp)
if response_code == 200:
status.set(f'{shift}_sync_timestamp', str(min_pull_timestamp))
except:
error_logger.exception('Exception in update_shift_last_sync_timestamp, for shift:'+shift)
def send_shift_sync_to_erpnext(shift_type_name, sync_timestamp):
url = config.ERPNEXT_URL + "/api/resource/Shift Type/" + shift_type_name
headers = {
'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET,
'Accept': 'application/json'
}
data = {
"last_sync_of_checkin" : str(sync_timestamp)
}
try:
response = requests.request("PUT", url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
info_logger.info("\t".join(['Shift Type last_sync_of_checkin Updated', str(shift_type_name), str(sync_timestamp.timestamp())]))
else:
error_str = _safe_get_error_str(response)
error_logger.error('\t'.join(['Error during ERPNext Shift Type API Call.', str(shift_type_name), str(sync_timestamp.timestamp()), error_str]))
return response.status_code
except:
error_logger.exception("\t".join(['exception when updating last_sync_of_checkin in Shift Type', str(shift_type_name), str(sync_timestamp.timestamp())]))
def get_last_line_from_file(file):
# concerns to address(may be much later):
# how will last line lookup work with log rotation when a new file is created?
#- will that new file be empty at any time? or will it have a partial line from the previous file?
line = None
if os.stat(file).st_size < 5000:
# quick hack to handle files with one line
with open(file, 'r') as f:
for line in f:
pass
else:
# optimized for large log files
with open(file, 'rb') as f:
f.seek(-2, os.SEEK_END)
while f.read(1) != b'\n':
f.seek(-2, os.SEEK_CUR)
line = f.readline().decode()
return line
def setup_logger(name, log_file, level=logging.INFO, formatter=None):
if not formatter:
formatter = logging.Formatter('%(asctime)s\t%(levelname)s\t%(message)s')
handler = RotatingFileHandler(log_file, maxBytes=10000000, backupCount=50)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.hasHandlers():
logger.addHandler(handler)
return logger
def _apply_function_to_key(obj, key, fn):
obj[key] = fn(obj[key])
return obj
def _safe_convert_date(datestring, pattern):
try:
return datetime.datetime.strptime(datestring, pattern)
except:
return None
def _safe_get_error_str(res):
try:
error_json = json.loads(res._content)
if 'exc' in error_json: # this means traceback is available
error_str = json.loads(error_json['exc'])[0]
else:
error_str = json.dumps(error_json)
except:
error_str = str(res.__dict__)
return error_str
# setup logger and status
if not os.path.exists(config.LOGS_DIRECTORY):
os.makedirs(config.LOGS_DIRECTORY)
error_logger = setup_logger('error_logger', '/'.join([config.LOGS_DIRECTORY, 'error.log']), logging.ERROR)
info_logger = setup_logger('info_logger', '/'.join([config.LOGS_DIRECTORY, 'logs.log']))
status = pickledb.load('/'.join([config.LOGS_DIRECTORY, 'status.json']), True)
def infinite_loop(sleep_time=15):
print("Service Running...")
while True:
try:
main()
time.sleep(sleep_time)
except BaseException as e:
print(e)
if __name__ == "__main__":
infinite_loop()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。