1 Star 0 Fork 0

seekerrc/actiondet

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ucf_thow_adaptive.py 15.71 KB
一键复制 编辑 原始数据 按行查看 历史
seekerrc 提交于 2022-05-24 10:15 . current
from efficientnet_pytorch import EfficientNet
import numpy as np
import cv2
import os
import time
from PIL import Image
import io
import copy
import torch
from models.experimental import attempt_load
from utils.torch_utils import select_device
from utils.general import (
check_img_size, non_max_suppression, apply_classifier, scale_coords,
xyxy2xywh, xywh2xyxy, strip_optimizer)
from torchvision import transforms
# import random
import ucf_TSM_Module
import random
from torch.nn import functional as F
import logging
def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True):
# Resize image to a 32-pixel-multiple rectangle https://github.com/ultralytics/yolov3/issues/232
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, 64), np.mod(dh, 64) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return img, ratio, (dw, dh)
def plot_one_box(x, img, color=None, label=None, line_thickness=None):
# Plots one bounding box on image img
tl = line_thickness or round(
0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
if label:
tf = max(tl - 1, 1) # font thickness
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3,
[225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
def plot_action(img, color=None, line_thickness=None, action=None):
tl = line_thickness or round(
0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
tf = max(tl - 1, 1) # font thickness
t_size1 = cv2.getTextSize(action, 0, fontScale=tl / 3, thickness=tf)[0]
c3 = (10, 10)
# c3 = (1, img.shape[0] - 1)
c4 = (c3[0] + t_size1[0], c3[1] - t_size1[1] - 3)
cv2.rectangle(img, c3, c4, color, -1, cv2.LINE_AA) # filled
cv2.putText(img, action, (c3[0], c3[1] - 2), 0, tl / 3,
[225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
class TemporalProposal:
def __init__(self, threshold, target, targetPath):
logging.basicConfig(filename = "./log/actionness.txt", level=logging.DEBUG)
weights = 'weights/yolov5x.pt'
self.imgsize = 640
self.confthres = 0.4
self.iouthres = 0.5
self.threshold = threshold
self.minFrame = 8
self.maxFrame = 24
self.device = select_device('0')
self.half = self.device.type != 'cpu'
self.yolo = attempt_load(weights, map_location=self.device)
if self.half:
self.yolo.half()
check_img_size(self.imgsize, s=self.yolo.stride.max())
# image data list
self.frames = []
# result list
self.bbox = []
self.pose = []
self.actionness = []
# detected index
self.detected = []
self.proposalFlag = False
self.poseNames = ["bend", "fall", "jump", "lie", "run",
"sit", "squat", "stand", "throw", "walk"]
self.posemodel = EfficientNet.from_pretrained('efficientnet-b5',
weights_path='weights/pose1215/pose.best.pth.tar',
num_classes=10, load_fc=True)
self.posemodel.to(self.device)
self.posemodel.eval()
if self.half:
self.posemodel.half() # to FP16
# transform after crop
self.tfms = transforms.Compose(
[transforms.Resize((224, 224)), transforms.ToTensor()])
self.detectAct = 0
self.detectOther = 0
self.target = target
self.targetPath = targetPath
self.isWorthy = False
self.proposalLen = self.minFrame
self.step = 2
def run(self, videoPath, adaptive = False):
cap = cv2.VideoCapture(videoPath)
fNUMS = cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = cap.get(cv2.CAP_PROP_FPS)
print("FPS is ", fps)
frameCount = 0
print("Starting...")
start = time.time()
while cap.isOpened():
ret, frame = cap.read()
if ret:
frameCount += 1
if frameCount % 6 == 0:
if adaptive:
self.adaptiveAnalysis(frame)
else:
self.frameAnalysis(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
break
cap.release()
end = time.time()
print("all done")
print("processing time : {0:.1f}, total : {1}, act : {2}".format(
float(end - start), self.detectAct + self.detectOther, self.detectAct))
def adaptiveAnalysis(self, img0):
if len(self.detected) < self.minFrame:
imgFrame = copy.deepcopy(img0)
bboxList, poseList = self.detect(img0)
poseOutput, actionnessOutput = self.poseAnalysis(poseList)
self.appendCache(imgFrame, bboxList, poseOutput, actionnessOutput)
return
if not self.isWorthy and len(self.detected) == self.minFrame:
self.generateBase()
if self.isWorthy:
self.proposalLen += self.step
# isContinue = True
else:
self.delCache(4)
imgFrame = copy.deepcopy(img0)
bboxList, poseList = self.detect(img0)
poseOutput, actionnessOutput = self.poseAnalysis(poseList)
self.appendCache(imgFrame, bboxList, poseOutput, actionnessOutput)
if len(self.detected) == self.proposalLen:
fall = [x[1] for x in self.actionness[-2:] if len(x) > 0]
lie = [x[3] for x in self.actionness[-2:] if len(x) > 0]
throw = [x[8] for x in self.actionness[-2:] if len(x) > 0]
fallconf = sum(fall) / self.step
lieconf = sum(lie) / self.step
throwconf = sum(throw) / self.step
isContinue = fallconf > self.threshold or lieconf > self.threshold or throwconf > self.threshold
if isContinue:
self.proposalLen += self.step
if not isContinue or self.proposalLen > self.maxFrame:
# sample 8 frames
self.sample(self.minFrame)
self.actionDetect()
self.clearCache()
self.proposalLen = self.minFrame
self.isWorthy = False
def sample(self, num):
length = len(self.detected)
step = length / num
index = [round(x * step) for x in range(num)]
self.frames = [self.frames[i] for i in index]
self.bbox = [self.bbox[i] for i in index]
self.pose = [self.pose[i] for i in index]
self.actionness = [self.actionness[i] for i in index]
self.detected = [self.detected[i] for i in index]
# if self.isWorthy:
# logging.info("num: " + str(self.detectAct))
# logging.info("fall: " + str(fall))
# logging.info("lie: " + str(lie))
# logging.info("throw: " + str(throw))
def generateBase(self):
fall = [x[1] for x in self.actionness if len(x) > 0]
lie = [x[3] for x in self.actionness if len(x) > 0]
throw = [x[8] for x in self.actionness if len(x) > 0]
fallconf = sum(fall) / self.minFrame
lieconf = sum(lie) / self.minFrame
throwconf = sum(throw) / self.minFrame
self.isWorthy = fallconf > self.threshold or lieconf > self.threshold or throwconf > self.threshold
# if self.isWorthy:
# logging.info("num: " + str(self.detectAct))
# logging.info("fall: " + str(fall))
# logging.info("lie: " + str(lie))
# logging.info("throw: " + str(throw))
def frameAnalysis(self, img0):
imgFrame = copy.deepcopy(img0)
bboxList, poseList = self.detect(img0)
poseOutput, actionnessOutput = self.poseAnalysis(poseList)
if len(self.detected) >= self.minFrame:
self.popCache(0)
self.appendCache(imgFrame, bboxList, poseOutput, actionnessOutput)
if len(self.actionness) == self.minFrame:
self.generateProposal()
if self.proposalFlag:
self.actionDetect()
self.clearCache()
def generateProposal(self):
fall = [x[1] for x in self.actionness if len(x) > 0]
lie = [x[3] for x in self.actionness if len(x) > 0]
throw = [x[8] for x in self.actionness if len(x) > 0]
fallconf = sum(fall) / self.minFrame
lieconf = sum(lie) / self.minFrame
throwconf = sum(throw) / self.minFrame
self.proposalFlag = fallconf > self.threshold or lieconf > self.threshold or throwconf > self.threshold
if self.proposalFlag:
fallLog = [round(x[1], 2) for x in self.actionness if len(x) > 0]
lieLog = [round(x[3], 2) for x in self.actionness if len(x) > 0]
throwLog = [round(x[8], 2) for x in self.actionness if len(x) > 0]
logging.info("num: " + str(self.detectAct))
logging.info("fall: " + str(fallLog))
logging.info("lie: " + str(lieLog))
logging.info("throw: " + str(throwLog))
def clearCache(self):
self.frames.clear()
self.bbox.clear()
self.pose.clear()
self.actionness.clear()
self.detected.clear()
self.proposalFlag = False
def delCache(self, index):
del self.frames[0:index]
del self.bbox[0:index]
del self.pose[0:index]
del self.actionness[0:index]
del self.detected[0:index]
def popCache(self, index):
self.frames.pop(index)
self.bbox.pop(index)
self.pose.pop(index)
self.actionness.pop(index)
self.detected.pop(index)
def appendCache(self, imgFrame, bboxList, poseOutput, actionnessOutput):
self.frames.append(imgFrame)
self.bbox.append(bboxList)
self.pose.append(poseOutput)
self.actionness.append(actionnessOutput)
if len(bboxList):
self.detected.append(1)
else:
self.detected.append(0)
def actionDetect(self, plotInfo = True):
tsmPred, tsmConf = ucf_TSM_Module.alertAction(self.frames)
actionLabel = f'{tsmPred} {tsmConf:.2f}%'
# or tsmPred == 'JavelinThrow'
# if tsmPred == 'BaseballPitch':
# plotInfo = False
if tsmPred == self.target:
self.detectAct += 1
for index, actFrame in enumerate(self.frames):
if plotInfo:
for (resultBox, resultPose) in zip(self.bbox[index], self.pose[index]):
label = f'{resultPose[0]} {resultPose[1]:.2f} {resultBox[3] - resultBox[1]}'
plot_one_box(resultBox, actFrame, label=label)
plot_action(actFrame, action=actionLabel)
savepath = os.path.join(self.targetPath, '{0}_{1}_{2}_detected{3}.jpg'.format(
tsmPred, self.detectAct, index, self.detected[index]))
cv2.imwrite(savepath, actFrame)
else:
self.detectOther += 1
def poseAnalysis(self, poseList):
poseOutput = []
actionness = []
# posestart = time.time()
# poseNames = ["bend", "fall", "jump", "lie", "ride", "run", "sit", "squat", "stand", "throw", "walk"]
if len(poseList):
posemodelinput = torch.stack(poseList, dim=0)
posemodelinput = posemodelinput.to(self.device)
posemodelinput = posemodelinput.half() if self.half else posemodelinput.float()
with torch.no_grad():
output = self.posemodel(posemodelinput)
output = F.softmax(output, dim=1)
prob, pred = output.topk(1, 1, True, True)
for i in range(len(pred)):
poseOutput.append(
[self.poseNames[pred[i].item()], prob[i].item()])
conf, _ = output.topk(1, 0, True, True)
actionness = [x.item() for x in conf[0]]
# poseend = time.time()
# print('pose single frame time: {:.3f}'.format(float(poseend - posestart)))
return poseOutput, actionness
def detect(self, img0):
height, width, _ = img0.shape[0], img0.shape[1], img0.shape[2]
img = letterbox(img0, new_shape=self.imgsize)[0]
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
img = np.ascontiguousarray(img)
img = torch.from_numpy(img).to(self.device)
img = img.half() if self.half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# convert to PIL image
pilimg = Image.fromarray(cv2.cvtColor(img0, cv2.COLOR_BGR2RGB))
# detection process
pred = self.yolo(img, augment=False)[0]
pred = non_max_suppression(
pred, self.confthres, self.iouthres, classes=None, agnostic=False)
result = []
poseimglist = []
for i, det in enumerate(pred):
if det is not None and len(det):
det[:, :4] = scale_coords(
img.shape[2:], det[:, :4], img0.shape).round()
for *xyxyTensor, conf, cls in reversed(det):
xyxy = torch.tensor(xyxyTensor).view(
1, 4).view(-1).tolist()
height = xyxy[3] - xyxy[1]
if cls == 0 and height >= 50:
# if cls == 0:
# cropstart = time.time()
cropleft, croptop, cropright, cropbottom = xyxy
cropped = pilimg.crop(
(cropleft, croptop, cropright, cropbottom))
cropped = self.tfms(cropped)
poseimglist.append(cropped)
# cropend = time.time()
# print('crop single box time: {:.3f}'.format(float(cropend - cropstart)))
result.append(xyxy)
return result, poseimglist
if __name__ == "__main__":
os.system('rm -rf data/ucftest/javelin/*')
videoPath = 'data/thumos14/javelin/video_validation_0000413.mp4'
threshold = 0.4
target = 'JavelinThrow'
targetPath = 'data/ucftest/javelin'
detector = TemporalProposal(threshold, target, targetPath)
# detector.run(videoPath, True)
detector.run(videoPath, False)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/seekerrc/actiondet.git
git@gitee.com:seekerrc/actiondet.git
seekerrc
actiondet
actiondet
master

搜索帮助