go并发思想
golang 提供goroutine机制实现并发编程,main函数为主goroutine结束后所有goroutine都将停止工作,goroutine实质为运行的代码片段,它被绑定到运行提供运行环境的Process上(队列),可有内核线程M进行运行。
go并发的核心是通过通信实现内存共享,而不是内存共享实现通信
当一个goroutine需要等待另一个goroutine工作完成后才能运行,则将工作交由自己做,避免复杂的机制
将是否并发的决定交由调用者
func ListDirectory(dir string, fn func(string))
ListDirectory("data",func(){
...
ch<-a
}
)
每次创建一个goroutine后需要指导什么时候它将结束(避免goroutine 泄漏)
func leak(){
ch := make(chan int)
go func(){
val := <- ch
fmt.Println("We receive a value:",val)
}()
}
在执行RPC调用或者sql查询时若由于顺序调用产生的延时不可忽略,需要提供中断方法(Never start a goroutine without knowning when it will stop)
go func() {
record, err := search(key)
ch <- result{record,err}
}
select {
case <-ctx.Done():
return errors.New("search canceled")
case res := <-ch:
if res.err !=nil {
return result.err
}
fmt.Println("Received:",result.record)
return nil
}
对于创建的一类任务,需要在程序停止前将所有的goroutine运行完成,避免工作的丢失(WaitGroup机制) sync.WaitGroup,再通过一层包装结束在一定时间内结束任务
type Track struct {
wg sync.WaitGroup
}
func(t *Tracker) Event(data string) {
//Increment counter so Shudown Knows to wait for this event
t.wg.Add(1)
go gun() {
defer t.wg.Done()
time.Sleep(time.Millisecond)
log.Println(data)
}
}
func (t *Tracker) Shutdown(ctx context.Context) error {
finish := make(chan struct{})
go func(){
t.wg.Wait()
close(ch)
}()
select {
case <-finish:
retrun nil
case <-ctx.Done():
return errors.New("timeout")
}
}
errgroup
- 通过syn.WaitGroup管理goroutine封装错误
- 提供并行流的处理
- 解决 goroutine没有错误的问题
- context的传播与取消
- 示例
sync.atomic 包
- 通过CAS原子操作更新数据
- 在单更新多读区场景下进行维护config更新,不需要使用额外的Mutex
func main() {
var config atomic.Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
}
- 对多更新场景下,并且读数据并发量很高+Mutex实现copy-on-write机制
func main() {
type Map map[string]string
var m atomic.Value
m.Store(make(Map))
var mu sync.Mutex // used only by writers
// read function can be used to read the data without further synchronization
read := func(key string) (val string) {
m1 := m.Load().(Map)
return m1[key]
}
// insert function can be used to update the data without further synchronization
insert := func(key, val string) {
mu.Lock() // synchronize with other potential writers
defer mu.Unlock()
m1 := m.Load().(Map) // load current value of the data structure
m2 := make(Map) // create a new value
for k, v := range m1 {
m2[k] = v // copy all data from the current object to the new one
}
m2[key] = val // do the update that we need
m.Store(m2) // atomically replace the current object with the new one
// At this point all new readers start working with the new version.
// The old version will be garbage collected once the existing readers
// (if any) are done with it.
}
_, _ = read, insert
}
sync.Pool
- 用来保存和复用临时对象,以减少内存的分配,降低GC压力
- 在创建Pool实例时通过设置创建对象的方法,后续GET、Put使用
- 注意:对象使用时需要重放置
channel机制
- 对于需要同步的goroutine间创建大小为0的goroutine(为完成信号时直接类型为strunct{})
- 使用非同步时,chanel使用为1
- channel只能由数据发送方进行关闭,关闭后的通道可以读取数据但通过range可以判定是关闭还是默认零值
context机制
- context的作用域是一个请求,在接受请求后创建,请求完成后结束
- 通过context管理请求链,WithCancel、WithDeadline等方式
- connext与select机制的同时使用可以实现goroutine的生命周期管理,防止leak
- context只能放在函数的第一个参数,并且只存储请求元数据(token等信息)
- context的nil值不能设置一般通过Backgroud或TODO设置
- context中存储的数据必须是安全immultable的,若需要更新使用copy-on-wirte技术与WithValue创建
- context机制实质为链形式机制,Value从当前context向parent context递归查询,而canceled()方式是通过Parent context向父子context传递,对于在监听context.Done的goroutine将直接返回,快速回收
- defer cancel()每次创建后都默认调用
- 对于超时机制是设置一个定时器,在超时时直接cancel掉
sync.once
- 只执行一次的方法