1 Star 0 Fork 17

蔡学江/multimedia_histreamer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
1123 4.70 KB
一键复制 编辑 原始数据 按行查看 历史
蔡学江 提交于 2024-06-12 06:47 . 123
diff --git a/src/osal/task/pthread/pipeline_threadpool.cpp b/src/osal/task/pthread/pipeline_threadpool.cpp
index 7d99ad76..341f73de 100644
--- a/src/osal/task/pthread/pipeline_threadpool.cpp
+++ b/src/osal/task/pthread/pipeline_threadpool.cpp
@@ -92,15 +92,18 @@ std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(const std::string
void PipeLineThreadPool::DestroyThread(const std::string &groupId)
{
- AutoLock lock(mutex_);
- if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
- return;
- }
- std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId];
- for (auto thread : *threadList.get()) {
- thread->Exit();
+ std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList;
+ {
+ AutoLock lock(mutex_);
+ if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
+ return;
+ }
+ threadList = workerGroupMap[groupId];
+ workerGroupMap.erase(groupId);
}
- workerGroupMap.erase(groupId);
+ for (auto thread : *threadList.get()) {
+ thread->Exit();
+ }
}
PipeLineThread::PipeLineThread(std::string name, TaskType type, TaskPriority priority)
@@ -110,11 +113,12 @@ PipeLineThread::PipeLineThread(std::string name, TaskType type, TaskPriority pri
loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority));
std::string threadName = name + "_" + TaskTypeConvert(type);
loop_->SetName(threadName);
- threadExit_ = false;
threadExiting_ = false;
if (loop_->CreateThread([this] { Run(); })) {
+ threadExit_ = false;
} else {
MEDIA_LOG_E("task " PUBLIC_LOG_S " create failed", name.c_str());
+ threadExit_ = true;
}
}
@@ -126,7 +130,7 @@ PipeLineThread::~PipeLineThread()
void PipeLineThread::Exit()
{
AutoLock lock(mutex_);
- if (threadExit_.load()) {
+ if (threadExit_.load() || !loop_) {
return;
}
threadExiting_ = true;
@@ -137,7 +141,7 @@ void PipeLineThread::Exit()
if (IsRunningInSelf()) {
return;
}
- syncCond_.Wait(lock, [this] { return threadExit_.load(); });
+ loop_ = nullptr;
}
void PipeLineThread::Run()
@@ -174,7 +178,6 @@ void PipeLineThread::Run()
}
AutoLock lock(mutex_);
threadExit_ = true;
- syncCond_.NotifyAll();
}
void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task)
diff --git a/src/osal/task/pthread/taskInner.cpp b/src/osal/task/pthread/taskInner.cpp
index 2ad61227..c34e9ca2 100644
--- a/src/osal/task/pthread/taskInner.cpp
+++ b/src/osal/task/pthread/taskInner.cpp
@@ -63,13 +63,13 @@ void TaskInner::Init()
void TaskInner::DeInit()
{
MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit", name_.c_str());
+ pipelineThread_->RemoveTask(shared_from_this());
{
AutoLock lock1(jobMutex_);
AutoLock lock2(stateMutex_);
runningState_ = RunningState::STOPPED;
topProcessUs_ = -1;
}
- pipelineThread_->RemoveTask(shared_from_this());
MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit done", name_.c_str());
}
@@ -257,13 +257,13 @@ int64_t TaskInner::NextJobUs()
void TaskInner::HandleJob()
{
AutoLock lock(jobMutex_);
+ stateMutex_.lock();
+ if (topProcessUs_ == -1) {
+ MEDIA_LOG_W_T("HandleJob no need");
+ stateMutex_.unlock();
+ return;
+ }
if (singleLoop_) {
- stateMutex_.lock();
- if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
- topProcessUs_ = -1;
- stateMutex_.unlock();
- return;
- }
// unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
stateMutex_.unlock();
int64_t nextDelay = job_();
@@ -273,7 +273,6 @@ void TaskInner::HandleJob()
}
} else {
std::function<void()> nextJob;
- stateMutex_.lock();
if (topIsJob_) {
nextJob = std::move(jobQueue_.begin()->second);
jobQueue_.erase(jobQueue_.begin());
@@ -281,12 +280,10 @@ void TaskInner::HandleJob()
nextJob = std::move(msgQueue_.begin()->second);
msgQueue_.erase(msgQueue_.begin());
}
- {
- // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
- stateMutex_.unlock();
- nextJob();
- replyCond_.NotifyAll();
- }
+ // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
+ stateMutex_.unlock();
+ nextJob();
+ replyCond_.NotifyAll();
AutoLock lock(stateMutex_);
UpdateTop();
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/caihdl/multimedia_histreamer.git
git@gitee.com:caihdl/multimedia_histreamer.git
caihdl
multimedia_histreamer
multimedia_histreamer
master

搜索帮助

D67c1975 1850385 1daf7b77 1850385