代码拉取完成,页面将自动刷新
import os
import time
from loguru import logger
from cacheout import Cache
from config import settings
from uuid import uuid4
import cv2
from multitimer import MultiTimer
import mimetypes
with open('settings.toml', 'r') as f:
for line in f:
logger.info(line.strip())
class FileStorage:
def __init__(self, root='temp', duration=0, sub_file_modules=[]):
self.root = root
self.duration = duration
self.files = {}
self.timer = None
self.sub_file_module = {}
self.init()
self.clear()
self.regester_sub_file_module(sub_file_modules)
def regester_sub_file_module(self, modules):
for module in modules:
model = module(self)
self.sub_file_module[model.get_mime_type()] = model
def init(self):
if not os.path.exists(self.root):
os.mkdir(self.root)
for file in os.listdir(self.root):
modified_time = os.path.getmtime(os.path.join(self.root, file))
uuid = file.split('.')[0]
ext = file.split('.')[1]
self.files[uuid] = {'modified_time': modified_time, 'ext': ext}
def clear(self, duration=None):
if duration is None:
duration = self.duration
if duration > 0:
new_files = {}
for file in self.files:
if time.time() - self.files[file]['modified_time'] > duration * 3600:
os.remove(os.path.join(self.root, file + '.' + self.files[file]['ext']))
else:
new_files[file] = self.files[file]
self.files = new_files
def save(self, bytes, ext):
uuid = str(uuid4())
file_name = uuid + '.' + ext
file_path = os.path.join(self.root, file_name)
with open(file_path, 'wb') as f:
f.write(bytes)
self.files[uuid] = {'modified_time': time.time(), 'ext': ext}
return uuid
def add(self, obj, ext):
uuid = str(uuid4())
if ext == 'jpg':
cv2.imwrite(os.path.join(self.root, uuid + '.jpg'), obj)
else:
raise Exception('not support file type')
self.files[uuid] = {'modified_time': time.time(), 'ext': ext}
return uuid
def get(self, uuid):
if uuid in self.files:
self.files[uuid]['modified_time'] = time.time()
return os.path.join(self.root, uuid + '.' + self.files[uuid]['ext'])
else:
if '_' in uuid:
parent_uuid = uuid.split('_')[0]
parent_file = self.get(parent_uuid)
mime_type = mimetypes.guess_type(parent_file)[0].split('/')[0]
info = uuid.split('_')[1:]
return self.sub_file_module[mime_type].get(parent_uuid, parent_file, uuid, info)
else:
raise Exception('{} not found'.format(uuid))
def start_auto_clear(self, interval=60):
if self.timer is None:
self.timer = MultiTimer(interval, self.clear, args=[settings.TEMP_FILE_HOURS])
self.timer.start()
def stop_auto_clear(self):
if self.timer is not None:
self.timer.stop()
self.timer = None
class SubFile:
def __init__(self, storage: FileStorage=None):
self.storage = storage
self.mime_type = ''
def add(self, sub_file_uuid, ext):
self.storage.files[sub_file_uuid] = {'modified_time': time.time(), 'ext': ext}
def get_mime_type(self):
return self.mime_type
def set_storage(self, storage):
self.storage = storage
class VideoSubFile(SubFile):
def __init__(self, storage: FileStorage=None):
super().__init__(storage)
self.cache = Cache(maxsize=settings.VIDEO_CACHE_SIZE, ttl=0, timer=time.time, default=None)
self.mime_type = 'video'
def get_data(self, video_uuid:str, video_path:str, frame_uuid, info:list):
assert len(info) > 0
frame = int(info[0])
video = self.cache.get(video_uuid)
if video is None:
video = cv2.VideoCapture(video_path)
self.cache.set(video_uuid, video)
cur_frame = int(video.get(cv2.CAP_PROP_POS_FRAMES))
if cur_frame == frame:
ret, img = video.read()
else:
video.set(cv2.CAP_PROP_POS_FRAMES, frame)
ret, img = video.read()
return img
def get(self, video_uuid:str, video_path:str, frame_uuid, info:list):
img = self.get_data(video_uuid, video_path, frame_uuid, info)
img_path = os.path.join(self.storage.root, frame_uuid + '.jpg')
cv2.imwrite(img_path, img)
self.add(frame_uuid, 'jpg')
return img_path
file_storage = FileStorage(duration=settings.TEMP_FILE_HOURS * 3600, sub_file_modules=[VideoSubFile])
file_storage.start_auto_clear()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。