代码拉取完成,页面将自动刷新
package GPool
//
//import (
// "fmt"
// "log"
// "sync"
// "sync/atomic"
//)
//
//package GPool
//
import (
"log"
"sync"
"sync/atomic"
"time"
)
type tq struct {
T *task
next *tq
}
// PoolForTaskChain 任务链表方式实现的Pool
// 所有的task存在名为taskQueue的链表里,而非channel中
// 此种方式不适合使用并发模式来添加task,因为task入链时需要上锁,
// 锁会影响task并发添加的性能。同时,如果非并发添加task,那么task数量
// 过多时会占用大量内存
type PoolForTaskChain struct {
taskQueue *tq
cap int32
running int32 // finder的数量
workGroup sync.WaitGroup // 执行任务是否完成
workerHead *finder
mux sync.Mutex
isClose int32
workerCache sync.Pool
recoveryTime time.Duration
}
func NewPoolForTaskChain(cap int32) *PoolForTaskChain {
if cap <= 0 {
cap = defaultSize
}
return &PoolForTaskChain{
taskQueue: &tq{
T: nil,
next: nil,
},
cap: cap,
running: 0,
workGroup: sync.WaitGroup{},
workerHead: &finder{
W: nil,
next: nil,
},
mux: sync.Mutex{},
isClose: 0,
workerCache: sync.Pool{},
recoveryTime: 1 * time.Second,
}
}
func (p *PoolForTaskChain) AddTaskToTQ(target func(...interface{}), args []interface{}) {
if target == nil {
return
}
newTask := &task{
workFunc: target,
args: args,
}
if p.taskQueue.next == nil {
p.taskQueue.next = &tq{
T: newTask,
next: nil,
}
return
}
newTQ := &tq{
T: newTask,
next: nil,
}
newTQ.next = p.taskQueue.next
p.taskQueue.next = newTQ
}
func (p *PoolForTaskChain) getTask() (*task, bool) {
//p.mux.Lock()
//defer p.mux.Unlock()
if p.taskQueue.next == nil {
return nil, false
}
T := p.taskQueue.next
p.taskQueue.next = T.next
T.next = nil
return T.T, true
}
func (p *PoolForTaskChain) recoverWorker() {
for {
if p.workerHead == nil {
return
}
currentTime := time.Now()
p.mux.Lock()
for you := p.workerHead; you != nil; you = you.next {
cur := you.next
if cur == nil {
break
}
if currentTime.Sub(cur.W.heartTime) >= p.recoveryTime {
you.next = cur.next
cur.next = nil
p.workerCache.Put(cur.W)
}
}
p.mux.Unlock()
time.Sleep(1 * time.Second)
}
}
func (p *PoolForTaskChain) addFinder(w *worker) {
p.mux.Lock()
defer p.mux.Unlock()
p.running++
newFinder := &finder{
W: w,
next: nil,
}
if atomic.LoadInt32(&p.running) == 0 {
p.workerHead.next = newFinder
return
}
newFinder.next = p.workerHead.next
p.workerHead.next = newFinder
}
func (p *PoolForTaskChain) getWorker() *worker {
//if isPoolClosed(p) {
// return nil
//}
for {
for h := p.workerHead.next; h != nil; h = h.next {
if h.W.status == FREE {
h.W.mux.Lock()
h.W.status = RUNNING
h.W.heartTime = time.Now() // 更新心跳时间
h.W.mux.Unlock()
return h.W
}
}
var newWorker *worker
if atomic.LoadInt32(&p.running) < p.cap {
if one := p.workerCache.Get(); one != nil {
newWorker = one.(*worker)
} else {
newWorker = &worker{
status: RUNNING,
work: nil,
mux: sync.Mutex{},
heartTime: time.Now(),
}
}
p.addFinder(newWorker)
return newWorker
}
}
}
func (p *PoolForTaskChain) Run_taskChain() {
go p.recoverWorker()
for {
t, err := p.getTask()
if !err {
break
}
p.workGroup.Add(1)
go func() {
oneWorker := p.getWorker()
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = t
oneWorker.run()
}()
}
p.workGroup.Wait()
p.workerHead = nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。