3 Star 0 Fork 0

Jason-Zhang/speed

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
startyFFmpeg.py 17.33 KB
一键复制 编辑 原始数据 按行查看 历史
ZZL 提交于 2024-11-22 09:12 . d
# -*- coding: utf-8 -*-
import asyncio
import base64
import concurrent
import os
import subprocess
import sys
import time
from datetime import datetime
from io import BytesIO
import json
import cv2
import numpy as np
import requests
import torch
from PIL import Image, ImageDraw, ImageFont
from fastapi import FastAPI, HTTPException
from fastapi import WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from minio import Minio, S3Error
from pydantic import BaseModel
from starlette.responses import FileResponse
from models.experimental import attempt_load
from utils.datasets import letterbox
from utils.general import non_max_suppression, scale_coords
from utils.plots import plot_one_box
from video_inspection.SnowflakeGenerator import SnowflakeGenerator
sys.path.append(os.path.abspath('/home/feitian/LM/speed'))
app = FastAPI()
# 配置跨域允许的来源
origins = ["*"] # 在生产环境中建议用实际来源替代 "*"
app.add_middleware(
CORSMiddleware,
allow_origins=origins, # 允许所有来源
allow_credentials=True,
allow_methods=["*"], # 允许所有请求方法
allow_headers=["*"], # 允许所有请求头
)
# 初始化 MinIO 客户端
minio_client = Minio(
"125.74.93.87:9000", # MinIO服务器的URL,例如 "127.0.0.1:9000"
access_key="admin", # 你的访问密钥
secret_key="Sqgs50z@2023!", # 你的密钥ID
secure=False # 如果使用HTTP,则设置为False;如果使用HTTPS,则设置为True
)
# 定义存储桶名称
bucket_name = "visual"
# 创建存储桶
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
hls_output_dir = "/home/feitian/LM/speed/video_inspection/uploads"
if not os.path.exists(hls_output_dir):
os.makedirs(hls_output_dir)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 选择设备
# 加载多个 YOLOv7 模型
weights_paths = ['/home/feitian/LM/speed/weights/yolov7.pt', '/home/feitian/LM/speed/smoke/best.pt',
'/home/feitian/LM/speed/cracks/best.pt','/home/feitian/LM/speed/phone/best.pt',
'/home/feitian/LM/speed/dljs/best.pt','/home/feitian/LM/speed/hysb/best.pt'
,'/home/feitian/LM/speed/dlsg/best.pt','/home/feitian/LM/speed/aqm/best.pt'
,'/home/feitian/LM/speed/laji/best.pt','/home/feitian/LM/speed/kzsb/best.pt'
,'/home/feitian/LM/speed/hdpfw/best.pt','/home/feitian/LM/speed/sleep/best.pt'
] # 不同模型权重文件路径
models = []
for weights in weights_paths:
model = attempt_load(weights, map_location=device) # 加载模型
model.eval() # 设置模型为评估模式
models.append(model)
names_list = [model.names if hasattr(model, 'names') else ['class_{}'.format(i) for i in range(1000)] for model in
models]
# 初始化雪花算法生成器
generator = SnowflakeGenerator(datacenter_id=1, worker_id=1)
# 加载模型的类别名称
names = model.names if hasattr(model, 'names') else ['class_{}'.format(i) for i in range(1000)]
exit_flags = {} # 存储各线程的退出标志
# 初始化线程池
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
# 定义请求数据结构
class M3U8Input(BaseModel):
m3u8_url: str # 输入视频流的URL
names_dict: str #
class Exlfag(BaseModel):
thread_id: str
class FileUrlInput(BaseModel):
file_url: str
names_dict: str
# 上传文件到 MinIO
def upload_to_minio(unique_id, object_name, img_data):
try:
minio_client.put_object(bucket_name, object_name, BytesIO(img_data), len(img_data))
except S3Error as e:
print(f"上传到 MinIO 失败: {e}")
async def start_streaming(input_stream_url: str, unique_id: str, names_dict: str):
m3u8_output_path = os.path.join(hls_output_dir, f"{unique_id}.m3u8")
# 打开摄像头
cap = cv2.VideoCapture(input_stream_url)
if not cap.isOpened():
print("无法打开摄像头")
return
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 获取总帧数
# 判断是否为 MP4 类型的视频(假定 MP4 视频帧数较多)
is_mp4_like = total_frames > fps # 如果帧数大于帧率,认为是长视频
frame_count = 0
ffmpeg_command = [
'ffmpeg',
'-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', f"{frame_width}x{frame_height}",
'-r', str(20),
'-threads', '12', # 根据实际CPU核数配置,通常2-4
'-i', '-',
'-c:v', 'libx264',
'-g', '40', # 设置较短的关键帧间隔,增加编码稳定性
'-preset', 'veryfast',
'-bufsize', '3000k',
'-tune', 'zerolatency',
'-f', 'hls',
'-hls_time', '4', # 每个 TS 文件的时长
'-hls_list_size', '6', # 保留播放列表中的 TS 文件数
'-hls_flags', 'delete_segments', # 删除旧 TS 文件,动态更新 m3u8
'-hls_base_url', f"http://125.74.7.201:8089/stream/{unique_id}/", # 修改 base_url
m3u8_output_path
]
# 启动 FFmpeg 推流进程
process = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE)
exit_flags[unique_id] = process
try:
# 读取摄像头视频流并推送到 HLS
while True:
ret, frame = cap.read()
if not ret:
print("无法读取视频帧")
break
# 处理每一帧
frame = await process_frame(unique_id, frame, names_dict)
# 写入帧数据到 FFmpeg 输入流
process.stdin.write(frame.tobytes())
process.stdin.flush()
frame_count += 1
# 计算实际处理时间并调整等待时间
asyncio.sleep(0.01) # 稍微增加等待时间
except Exception as e:
print(f"推流中出现错误: {e}")
finally:
# 关闭摄像头和 FFmpeg
if cap is not None:
cap.release()
if process is not None:
process.stdin.close()
process.wait()
async def process_frame(unique_id, frame, names_dict):
# 统计类别数量的字典
class_count = {}
# 在这里添加处理逻辑,例如推理和绘制
img = letterbox(frame, 640, stride=32)[0]
img = img[:, :, ::-1].transpose(2, 0, 1)
img = np.ascontiguousarray(img)
img = torch.from_numpy(img).to(device)
img = img.float() / 255.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# 对每个模型进行推理
for model, names in zip(models, names_list):
with torch.no_grad():
pred = model(img)[0]
pred = non_max_suppression(pred, 0.25, 0.45, agnostic=False)
for det in pred:
if len(det):
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], frame.shape).round()
for *xyxy, conf, cls in det:
file_id = generator.next_id()
class_id = int(cls)
label_name = names[class_id]
chinese_label_name = Label_Names.get(label_name, label_name)
color = Label_Color.get(label_name, (0, 255, 0))
label = f'{chinese_label_name} {conf:.2f}'
match_found = False
for target in names_dict.split(','):
if label_name == target:
match_found = True
break
if match_found:
# # 更新类别计数
class_count[chinese_label_name] = class_count.get(chinese_label_name, 0) + 1
frame = put_chinese_text(color, frame, label, (int(xyxy[0]), int(xyxy[1] - 30)), 21)
plot_one_box(xyxy, frame, label="", color=color, line_thickness=2)
# 将帧编码为 JPEG 格式
ret, img_data = cv2.imencode('.jpg', frame)
# 按日期创建文件夹
today = datetime.now().strftime('%Y-%m-%d')
object_name = f'{today}/{unique_id}/识别帧_{file_id}.jpg'
executor.submit(upload_to_minio(unique_id, object_name, img_data))
# 在左上角显示类别统计信息
text_position = (10, 30)
for category, count in class_count.items():
text = f'{category}: {count}'
frame = put_chinese_text((255, 51, 58), frame, text, text_position, 26) # 白色字体
text_position = (text_position[0], text_position[1] + 30) # 每行向下移动
return frame
def put_chinese_text(color, image, text, position, font_size):
# 创建 PIL 图像
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
# 选择一个支持中文的字体
font = ImageFont.truetype("/home/feitian/LM/speed/Alimama_ShuHeiTi_Bold.ttf", font_size)
draw.text(position, text, font=font, fill=color)
# 转换回 OpenCV 格式
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
def convert_to_base64(result):
# 将图片转换为二进制数据
_, buffer = cv2.imencode('.jpg', result)
# 将二进制数据编码为 Base64
base64_str = base64.b64encode(buffer).decode('utf-8')
# 添加图片前缀
base64_image = f"data:image/jpeg;base64,{base64_str}"
return base64_image
@app.post("/generate-link")
async def generate_link(m3u8_input: M3U8Input):
if not m3u8_input:
raise HTTPException(status_code=400, detail="Missing input_stream_url parameter")
input_stream_url = m3u8_input.m3u8_url
names_dict = m3u8_input.names_dict
unique_id = str(generator.next_id())
# 在主线程中使用事件循环来运行协程
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, lambda: asyncio.run(start_streaming(input_stream_url, unique_id, names_dict)))
# 立即返回生成的 m3u8 URL
m3u8_url = f"/stream/{unique_id}.m3u8"
return {"m3u8_url": m3u8_url, "file_id": unique_id}
@app.post("/exit")
async def exit_detection(unique_id: Exlfag):
unique_id = unique_id.thread_id
if unique_id not in exit_flags:
raise HTTPException(status_code=200, detail="未找到线程ID!")
# 从字典中获取进程
process = exit_flags.get(unique_id)
# 检查进程是否存在
if process is not None:
try:
process.terminate() # 终止进程
process.wait(timeout=5) # 等待进程退出,设置超时
except subprocess.TimeoutExpired:
process.kill() # 强制杀死进程
finally:
# 移除线程记录
# 清理退出的线程信息
del exit_flags[unique_id]
else:
print(f"No stream process found for thread: {unique_id}")
return {"message": f"线程 {unique_id} 已停止!"}
@app.get("/stream/{unique_id}.m3u8")
async def get_m3u8(unique_id: str):
m3u8_file_path = os.path.join(hls_output_dir, f"{unique_id}.m3u8")
if os.path.exists(m3u8_file_path):
return FileResponse(m3u8_file_path)
raise HTTPException(status_code=404, detail="m3u8 file not found")
@app.get("/stream/{unique_id}/{ts_filename}")
async def get_ts(ts_filename: str):
ts_file_path = os.path.join(hls_output_dir, f"{ts_filename}") # 假设 TS 文件直接存放在 hls_output_dir 中
if os.path.exists(ts_file_path):
return FileResponse(ts_file_path)
raise HTTPException(status_code=404, detail="TS file not found")
@app.post("/detect-image")
async def detect_image(File_url_input: FileUrlInput):
file_url = File_url_input.file_url
names_dict = File_url_input.names_dict
# 读取文件内容为字节流
response = requests.get(file_url)
# file_bytes = await file.read()
# 将字节流转换为 NumPy 数组,并解码为图像
nparr = np.frombuffer(response.content, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# 检查是否成功读取图片
if image is None:
raise HTTPException(status_code=500, detail="无法解码上传的图像文件.")
unique_id = generator.next_id()
result = await process_frame(unique_id, image, names_dict)
base64 = convert_to_base64(result)
# 根据 result 的具体内容进行判断
if result is not None and result.any(): # 或使用 result.all()
return {"message": "图像处理成功", "cbase64": base64}
else:
# 如果没有检测到目标,返回成功状态但不保存文件
return {"message": "未检测到目标,未保存文件"}
@app.websocket("/ws/getFiles/{thread_id}")
async def websocket_get_files(websocket: WebSocket, thread_id: str):
await websocket.accept()
today = time.strftime("%Y-%m-%d")
prefix = f"{today}/{thread_id}/"
no_file_found_count = 0
max_attempts = 30
try:
while no_file_found_count < max_attempts:
# 获取对象列表
objects = minio_client.list_objects(bucket_name="visual", prefix=prefix)
found_files = False
for obj in objects:
object_url = f"http://125.74.93.87:9000/visual/{obj.object_name}"
message = {
"status": "success",
"data": object_url
}
await websocket.send_text(json.dumps(message))
await asyncio.sleep(2) # 每次查询的延迟
# 删除文件
try:
minio_client.remove_object(bucket_name="visual", object_name=obj.object_name)
except S3Error as e:
await websocket.send_text(f"Error removing {obj.object_name}: {str(e)}")
found_files = True
# 如果没有找到文件,增加计数器
if not found_files:
no_file_found_count += 1
else:
no_file_found_count = 0
# 每次查询之间的延迟
await asyncio.sleep(2)
if no_file_found_count >= max_attempts:
message = {
"status": "error",
"data": "尝试30次后未找到文件,停止搜索"
}
await websocket.send_text(json.dumps(message))
except Exception as e:
print("WebSocket disconnected")
finally:
await websocket.close()
Label_Names = {
'person': '人',
'smoke': '吸烟',
'smog': '烟雾',
'garbage': '垃圾',
'face_mask': '口罩',
'normal': '工作',
'play': '玩',
'sleep': '睡觉',
'can': '塑料易拉罐垃圾',
'foam': '泡沫',
'plastic': '塑料',
'plastic bottle': '塑料瓶',
'unknow': '未知漂浮物',
'fire': '火源',
'head': '未佩戴头盔',
'helmet': '戴头盔',
'accident': '事故',
'non-accident': '无事故',
'bicycle': '自行车',
'car': '汽车',
'cracks': '裂缝',
'puddle': '积水',
'motorcycle': '电动车',
'airplane': '飞机',
'bus': '公交车',
'train': '火车',
'truck': '卡车',
'boat': '船',
'traffic light': '红绿灯',
'fire hydrant': '消防栓',
'stop sign': '停车标志',
'parking meter': '停车计时器',
'bench': '长椅',
'bird': '鸟',
'cat': '猫',
'dog': '狗',
'horse': '马',
'sheep': '羊',
'cow': '牛',
'elephant': '大象',
'bear': '熊',
'zebra': '斑马',
'giraffe': '长颈鹿',
'backpack': '背包',
'umbrella': '雨伞',
'handbag': '手提包',
'tie': '领带',
'suitcase': '行李箱',
'frisbee': '飞盘',
'skis': '滑雪板',
'snowboard': '滑雪板',
'sports ball': '运动球',
'kite': '风筝',
'baseball bat': '棒球棒',
'baseball glove': '棒球手套',
'skateboard': '滑板',
'surfboard': '冲浪板',
'tennis racket': '网球拍',
'bottle': '瓶子',
'wine glass': '酒杯',
'cup': '杯子',
'fork': '叉子',
'knife': '刀',
'spoon': '勺子',
'bowl': '碗',
'banana': '香蕉',
'apple': '苹果',
'sandwich': '三明治',
'orange': '橙子',
'broccoli': '西兰花',
'carrot': '胡萝卜',
'hot dog': '热狗',
'pizza': '披萨',
'donut': '甜甜圈',
'cake': '蛋糕',
'chair': '椅子',
'couch': '沙发',
'potted plant': '盆栽植物',
'bed': '床',
'dining table': '餐桌',
'toilet': '厕所',
'tv': '电视',
'laptop': '笔记本电脑',
'mouse': '鼠标',
'remote': '遥控器',
'keyboard': '键盘',
'cell phone': '手机',
'phone': '手机',
'microwave': '微波炉',
'oven': '烤箱',
'toaster': '烤面包机',
'sink': '水槽',
'refrigerator': '冰箱',
'book': '书',
'clock': '时钟',
'vase': '花瓶',
'scissors': '剪刀',
'teddy bear': '泰迪熊',
'hair drier': '吹风机',
'toothbrush': '牙刷'
}
Label_Color = {
'car': (30, 144, 255), # 红色
'person': (0, 255, 0), # 绿色
'smoke': (220, 20, 60),
'helmet': (123, 104, 238),
'head': (65, 105, 225),
'Smoke': (0, 191, 255),
'Fire': (176, 224, 230),
'bicycle': (0, 206, 209),
'motorcycle': (255, 192, 203),
'airplane': (0, 128, 128),
'bus': (255, 165, 0),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="10.165.11.5", port=6005)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/zhang-zhiliang666/speed.git
git@gitee.com:zhang-zhiliang666/speed.git
zhang-zhiliang666
speed
speed
master

搜索帮助