Go与语言实现一个协程池
作者:
acw_zxh
,
2025-04-01 16:08:30
·北京
,
所有人可见
,
阅读 2
package main
import (
"fmt"
"sync"
"time"
"atomic"
)
type Task struct {
f func() error
}
func NewTask(funcArg func() error) *Task{
return &Task{
f: funcArg,
}
}
type Pool struct{
RunningWorkers int64
Capacity int64
JobCh chan *Task
sync.Mutex
}
func NewPool(capacity int64, taskNum int) *Pool{
return &Pool{
Capacity: capacity,
JobCh : make(chan *Task, taskNum),
}
}
func (p *Pool) GetCap() int64{
return p.Capacity
}
func (p *Pool) incRunning(){
atomic.AddInt64(&p.RunningWorkers, 1)
}
func (p *Pool) decRunning(){
atomic.AddInt64(&p.RunningWorkers, -1)
}
func (p *Pool) GetRunningWorkers() int64{
return atomic.LoadInt64(&p.RunningWorkers)
}
func (p *Pool) run() {
p.incRunning()
go func(){
defer func(){
p.decRunning()
}()
for task := range p.JobCh{
task.f()
}
}()
}
func (p *Pool)AddTask(task *Task){
p.Lock()
defer p.Unlock()
if p.GetRunningWorkers() < p.GetCap(){
p.run()
}
p.JobCh <- task
}
func main(){
pool := NewPool(3, 10)
for i := 0;i < 30;i ++ {
pool.AddTask(NewTask(func () error{
fmt.Println("I'm zxh", i)
return nil
}))
}
time.Sleep(1e9)
}