1 Star 0 Fork 0

WZY99/统一图像处理平台后端

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
utils.py 4.76 KB
一键复制 编辑 原始数据 按行查看 历史
WZY99 提交于 2022-09-19 22:50 . 修改UUID及文件存储
import os
import time
from loguru import logger
from cacheout import Cache
from config import settings
from uuid import uuid4
import cv2
from multitimer import MultiTimer
import mimetypes
with open('settings.toml', 'r') as f:
for line in f:
logger.info(line.strip())
class FileStorage:
def __init__(self, root='temp', duration=0, sub_file_modules=[]):
self.root = root
self.duration = duration
self.files = {}
self.timer = None
self.sub_file_module = {}
self.init()
self.clear()
self.regester_sub_file_module(sub_file_modules)
def regester_sub_file_module(self, modules):
for module in modules:
model = module(self)
self.sub_file_module[model.get_mime_type()] = model
def init(self):
if not os.path.exists(self.root):
os.mkdir(self.root)
for file in os.listdir(self.root):
modified_time = os.path.getmtime(os.path.join(self.root, file))
uuid = file.split('.')[0]
ext = file.split('.')[1]
self.files[uuid] = {'modified_time': modified_time, 'ext': ext}
def clear(self, duration=None):
if duration is None:
duration = self.duration
if duration > 0:
new_files = {}
for file in self.files:
if time.time() - self.files[file]['modified_time'] > duration * 3600:
os.remove(os.path.join(self.root, file + '.' + self.files[file]['ext']))
else:
new_files[file] = self.files[file]
self.files = new_files
def save(self, bytes, ext):
uuid = str(uuid4())
file_name = uuid + '.' + ext
file_path = os.path.join(self.root, file_name)
with open(file_path, 'wb') as f:
f.write(bytes)
self.files[uuid] = {'modified_time': time.time(), 'ext': ext}
return uuid
def add(self, obj, ext):
uuid = str(uuid4())
if ext == 'jpg':
cv2.imwrite(os.path.join(self.root, uuid + '.jpg'), obj)
else:
raise Exception('not support file type')
self.files[uuid] = {'modified_time': time.time(), 'ext': ext}
return uuid
def get(self, uuid):
if uuid in self.files:
self.files[uuid]['modified_time'] = time.time()
return os.path.join(self.root, uuid + '.' + self.files[uuid]['ext'])
else:
if '_' in uuid:
parent_uuid = uuid.split('_')[0]
parent_file = self.get(parent_uuid)
mime_type = mimetypes.guess_type(parent_file)[0].split('/')[0]
info = uuid.split('_')[1:]
return self.sub_file_module[mime_type].get(parent_uuid, parent_file, uuid, info)
else:
raise Exception('{} not found'.format(uuid))
def start_auto_clear(self, interval=60):
if self.timer is None:
self.timer = MultiTimer(interval, self.clear, args=[settings.TEMP_FILE_HOURS])
self.timer.start()
def stop_auto_clear(self):
if self.timer is not None:
self.timer.stop()
self.timer = None
class SubFile:
def __init__(self, storage: FileStorage=None):
self.storage = storage
self.mime_type = ''
def add(self, sub_file_uuid, ext):
self.storage.files[sub_file_uuid] = {'modified_time': time.time(), 'ext': ext}
def get_mime_type(self):
return self.mime_type
def set_storage(self, storage):
self.storage = storage
class VideoSubFile(SubFile):
def __init__(self, storage: FileStorage=None):
super().__init__(storage)
self.cache = Cache(maxsize=settings.VIDEO_CACHE_SIZE, ttl=0, timer=time.time, default=None)
self.mime_type = 'video'
def get_data(self, video_uuid:str, video_path:str, frame_uuid, info:list):
assert len(info) > 0
frame = int(info[0])
video = self.cache.get(video_uuid)
if video is None:
video = cv2.VideoCapture(video_path)
self.cache.set(video_uuid, video)
cur_frame = int(video.get(cv2.CAP_PROP_POS_FRAMES))
if cur_frame == frame:
ret, img = video.read()
else:
video.set(cv2.CAP_PROP_POS_FRAMES, frame)
ret, img = video.read()
return img
def get(self, video_uuid:str, video_path:str, frame_uuid, info:list):
img = self.get_data(video_uuid, video_path, frame_uuid, info)
img_path = os.path.join(self.storage.root, frame_uuid + '.jpg')
cv2.imwrite(img_path, img)
self.add(frame_uuid, 'jpg')
return img_path
file_storage = FileStorage(duration=settings.TEMP_FILE_HOURS * 3600, sub_file_modules=[VideoSubFile])
file_storage.start_auto_clear()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wzy-99/udip-backend.git
git@gitee.com:wzy-99/udip-backend.git
wzy-99
udip-backend
统一图像处理平台后端
master

搜索帮助