1 Star 0 Fork 0

scgg/GPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
pool2.go 3.63 KB
一键复制 编辑 原始数据 按行查看 历史
scgg 提交于 2021-12-22 17:24 . 负载生成器
package GPool
//
//import (
// "fmt"
// "log"
// "sync"
// "sync/atomic"
//)
//
//package GPool
//
import (
"log"
"sync"
"sync/atomic"
"time"
)
type tq struct {
T *task
next *tq
}
// PoolForTaskChain 任务链表方式实现的Pool
// 所有的task存在名为taskQueue的链表里,而非channel中
// 此种方式不适合使用并发模式来添加task,因为task入链时需要上锁,
// 锁会影响task并发添加的性能。同时,如果非并发添加task,那么task数量
// 过多时会占用大量内存
type PoolForTaskChain struct {
taskQueue *tq
cap int32
running int32 // finder的数量
workGroup sync.WaitGroup // 执行任务是否完成
workerHead *finder
mux sync.Mutex
isClose int32
workerCache sync.Pool
recoveryTime time.Duration
}
func NewPoolForTaskChain(cap int32) *PoolForTaskChain {
if cap <= 0 {
cap = defaultSize
}
return &PoolForTaskChain{
taskQueue: &tq{
T: nil,
next: nil,
},
cap: cap,
running: 0,
workGroup: sync.WaitGroup{},
workerHead: &finder{
W: nil,
next: nil,
},
mux: sync.Mutex{},
isClose: 0,
workerCache: sync.Pool{},
recoveryTime: 1 * time.Second,
}
}
func (p *PoolForTaskChain) AddTaskToTQ(target func(...interface{}), args []interface{}) {
if target == nil {
return
}
newTask := &task{
workFunc: target,
args: args,
}
if p.taskQueue.next == nil {
p.taskQueue.next = &tq{
T: newTask,
next: nil,
}
return
}
newTQ := &tq{
T: newTask,
next: nil,
}
newTQ.next = p.taskQueue.next
p.taskQueue.next = newTQ
}
func (p *PoolForTaskChain) getTask() (*task, bool) {
//p.mux.Lock()
//defer p.mux.Unlock()
if p.taskQueue.next == nil {
return nil, false
}
T := p.taskQueue.next
p.taskQueue.next = T.next
T.next = nil
return T.T, true
}
func (p *PoolForTaskChain) recoverWorker() {
for {
if p.workerHead == nil {
return
}
currentTime := time.Now()
p.mux.Lock()
for you := p.workerHead; you != nil; you = you.next {
cur := you.next
if cur == nil {
break
}
if currentTime.Sub(cur.W.heartTime) >= p.recoveryTime {
you.next = cur.next
cur.next = nil
p.workerCache.Put(cur.W)
}
}
p.mux.Unlock()
time.Sleep(1 * time.Second)
}
}
func (p *PoolForTaskChain) addFinder(w *worker) {
p.mux.Lock()
defer p.mux.Unlock()
p.running++
newFinder := &finder{
W: w,
next: nil,
}
if atomic.LoadInt32(&p.running) == 0 {
p.workerHead.next = newFinder
return
}
newFinder.next = p.workerHead.next
p.workerHead.next = newFinder
}
func (p *PoolForTaskChain) getWorker() *worker {
//if isPoolClosed(p) {
// return nil
//}
for {
for h := p.workerHead.next; h != nil; h = h.next {
if h.W.status == FREE {
h.W.mux.Lock()
h.W.status = RUNNING
h.W.heartTime = time.Now() // 更新心跳时间
h.W.mux.Unlock()
return h.W
}
}
var newWorker *worker
if atomic.LoadInt32(&p.running) < p.cap {
if one := p.workerCache.Get(); one != nil {
newWorker = one.(*worker)
} else {
newWorker = &worker{
status: RUNNING,
work: nil,
mux: sync.Mutex{},
heartTime: time.Now(),
}
}
p.addFinder(newWorker)
return newWorker
}
}
}
func (p *PoolForTaskChain) Run_taskChain() {
go p.recoverWorker()
for {
t, err := p.getTask()
if !err {
break
}
p.workGroup.Add(1)
go func() {
oneWorker := p.getWorker()
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = t
oneWorker.run()
}()
}
p.workGroup.Wait()
p.workerHead = nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/scgg/gpool.git
git@gitee.com:scgg/gpool.git
scgg
gpool
GPool
master

搜索帮助