sync包
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine
可以访问共享资源。Go语言中使用sync
包的Mutex
类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:
package main
import (
"fmt"
"sync"
)
var x = 0
const N = 5000000
var wg sync.WaitGroup
var lock sync.Mutex //原子锁
func add() {
for i := 0; i < N; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync
包中的RWMutex
类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine
如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine
获取写锁之后,其他的goroutine
无论是获取读锁还是写锁都会等待。
读写锁示例:
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
const N = 10
const M = 1000
func read() {
defer wg.Done()
rwlock.RLock()
//lock.Lock()
fmt.Println(x)
time.Sleep(time.Millisecond)
//lock.Unlock()
rwlock.RUnlock()
}
func write() {
defer wg.Done()
rwlock.Lock()
//lock.Lock()
x = x + 1
time.Sleep(time.Millisecond * 50)
//lock.Unlock()
rwlock.Unlock()
}
func main() {
start := time.Now()
for i := 0; i < N; i++ {
go write()
wg.Add(1)
}
for i := 0; i < M; i++ {
go read()
wg.Add(1)
}
wg.Wait()
fmt.Println(time.Now().Sub(start))
}
需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
sync.WaitGroup
在代码中生硬的使用time.Sleep
肯定是不合适的,Go语言中可以使用sync.WaitGroup
来实现并发任务的同步。 sync.WaitGroup
有以下几个方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup
内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
我们利用sync.WaitGroup
将上面的代码优化一下:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
需要注意sync.WaitGroup
是一个结构体,传递的时候要传递指针。
sync.Once
说在前面的话:这是一个进阶知识点。
在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync
包中提供了一个针对只执行一次场景的解决方案–sync.Once
。
sync.Once
只有一个Do
方法,其签名如下:
func (o *Once) Do(f func()) {}
备注:如果要执行的函数f
需要传递参数就需要搭配闭包来使用。
加载配置文件示例
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多个goroutine
并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine
都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在这种情况下就会出现即使判断了icons
不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons
的时候不会被其他的goroutine
操作,但是这样做又会引发性能问题。
所引发的问题可能会有:
-
可能多个goroutine执行时会多次访问
if icons == nil
而多次执行初始化 -
可能因编译器重排后的缘故使得还没完全初始化时造成的访问异常
go
icons = make(map[string]image.Image)
// 中间有一个goroutine 在初始化后直接访问'left'
icons["left"] = loadIcon("left.png")
使用sync.Once
改造的示例代码如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
并发安全的单例模式
下面是借助sync.Once
实现的并发安全的单例模式:
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Once
其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
注意
- Once 中的锁是一个结构体,也就是值类型,如果调用的时候应该传引用,或者全局调用
once.Do()
括号内必须是函数调用,如果执行语句则要改写成闭包或者匿名函数
sync.Map
Go语言中内置的map不是并发安全的。请看下面的示例:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上面的代码开启少量几个goroutine
的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes
错误。
像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync
包中提供了一个开箱即用的并发安全版map–sync.Map
。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map
内置了诸如Store
、Load
、LoadOrStore
、Delete
、Range
等操作方法。
package main
import (
"fmt"
"strconv"
"sync"
)
var m1 = make(map[string]int)
var lock sync.Mutex
var wg sync.WaitGroup
func get(key string) int {
return m1[key]
}
func set(key string, value int) {
m1[key] = value
}
func Test1() {
//fatal error: concurrent map writes
//出现其现象是因为 map本身不支持并发安全
//解决办法1:直接加互斥锁
for i := 0; i < 20; i++ {
wg.Add(1)
go func(x int) {
key := strconv.Itoa(x)
lock.Lock()
set(key, x)
lock.Unlock()
lock.Lock()
fmt.Printf("key:%v,value:%v\n", key, get(key))
lock.Unlock()
wg.Done()
}(i)
}
wg.Wait()
}
//解决办法2: 直接使用sync中所提供的sync.Map中内置函数
var m2 = sync.Map{}
func Test2() {
//fatal error: concurrent map writes
for i := 0; i < 20; i++ {
wg.Add(1)
go func(x int) {
key := strconv.Itoa(x)
//必须使用 sync.Map内置Store方法设置键值对
m2.Store(key, x) //set(key, x)
//必须使用 sync.Map内置的Load方法进行查看
value, _ := m2.Load(key)
fmt.Printf("key:%v,value:%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
func main() {
//Test1()
Test2()
}
原子操作
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic
提供。
atomic包
方法 | 解释 |
---|---|
func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) 等 | 读取操作 |
func StoreInt32(addr int32, val int32) func StoreInt64(addr int64, val int64) 等 | 写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) 等 | 修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) 等 | 交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)等 | 比较并交换操作 |
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var wg sync.WaitGroup
var lock sync.Mutex
var x int64 = 0
func Add() {
defer wg.Done()
// 方法一 :直接加互斥锁
//lock.Lock()
//x = x + 1
//lock.Unlock()
// 方法二 : 使用内置的原子操作函数
atomic.AddInt64(&x, 1)
}
func main() {
wg.Add(100000)
for i := 0; i < 100000; i++ {
go Add()
}
wg.Wait()
fmt.Println(x)
}