背景
最近学习了Go中Context的源码,Context是在Go 1.7中引入的新特性,主要用于groutine之间信息的传递,属于Go的特有属性,下面内容主要是对源码的一些解读以及对于Context使用的一些感悟,和社区的伙伴们交流交流~😁
源码解读
Context是context包中重要的接口之一,先来看下它的结构:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
- DeadLine() => 返回Context被cancel的时间,就相当于是Context的有效期
- Done() => 返回一个channel,注意这个channel是一个只读的channel,也就是在context.Context中并不会向该channel发送数据,从这个channel中读出数据的唯一方式是将它close掉,这个特性是groutine之间进行通信的关键,后面对于源码的解读对这块儿的说明,伙伴们认真看哦
- Err() => 返回一个错误,这个错误在context.Context中是一组固定的错误,后面会讲到
- Value() => context可以用于存储数据,这个函数可以通过存储的key获得对应的value
内置错误类型
在Context中总共定义了两种错误类型,一种是Canceled
,用在某个Context被取消之后,另外一种是DeadlineExceeded
,字面意思就可以看出来,这个用在某个Context过期之后,在context包中的定义如下:
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded error = deadlineExceededError{}
type deadlineExceededError struct{}
func (deadlineExceededError) Error() string { return "context deadline exceeded" }
func (deadlineExceededError) Timeout() bool { return true }
func (deadlineExceededError) Temporary() bool { return true }
内置Context类型
context.Context中定义了4种类型,这里从简到繁,对这些类型进行一一阐述
- emptyCtx
emptyCtx就是Context定义的一个空类型,平常项目中经常使用的context.Background()以及context.TODO()就来自这个类型,这两个方法分别对应Context中定义的background以及todo,从这里可以看出,在同一个Go项目中,background以及todo都是共用的
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
- valueCtx
valueCtx主要用于值存储和读取的一类Context,先来看看它的结构吧:
type valueCtx struct {
Context
key, val interface{}
}
它将Context(interface)嵌入,这样即使它没有实现Context接口的所有方法,它依旧属于一个Context(interface),这个是Go语言的特性
与valueCtx初始化相关的方法定义如下:
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
该方法最终返回的是一个内嵌父Context的valueCtx,需要注意传递的key需要可比较,否则就不可以通过key相等来判断某个key=>value对儿是否存在了
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
上面两个是valueCtx结构体的方法,String()方法主要用于打印,这里不赘述,valueCtx最为关键的是对于Value()方法的设计,由于每一个实例valueCtx最多存储一对儿key=>value,所以当调用者调用Value方法获取某个key对应的value时,会产生这样的调用效果:
整个过程是一个递归的过程,从当前的Context不断向上寻找,直到找到emptyCtx(Background/TODO)才停下,具体的value方法实现如下:
func value(c Context, key interface{}) interface{} {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}
可以看到,在value方法中,声明了一个for循环,循环的终止条件是在某个父Context中找到key或者走到了emptyCtx
在说接下来的两类Context之前,我们先来看一个接口,因为另外两个Context都实现了该接口,所以这里需要提前说一下,它的定义是这样的:
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
实现了该接口定义方法的Context就是一个可以被cancel的Context
- cancelCtx
type cancelCtx struct {
Context
mu sync.Mutex
done atomic.Value
children map[canceler]struct{}
err error
}
mu的作用主要是并发场景下对于children以及done涉及channel的保护,下来我们一一看看与cancelCtx相关的所有方法吧
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
cancelCtx的初始化相比于valueCtx多出来一步,就是调用propagateCancel
,这个方法的作用主要是将当前创建的cancelCtx挂载到父节点上如果父节点是可取消的话,这样可以在父节点取消的时候同步信号给当前cancelCtx,让它也取消
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
在propagateCancel
中,首先看当前cancelCtx的父Context是否可以被取消,如果不可以,当前的cancelCtx就没有必要挂载到父Context上,接下来如果父Context可被取消并且已经取消,则直接将当前的cancelCtx取消,否则,调用parentCancelCtx
方法判断当前父Context是否是【标准类型cancelCtx】,从而进行相应逻辑的处理
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
我们重点分析一下这个方法,刚开始看的时候不知道为什么会有这么多的判断条件,不知道小伙伴们是不是一样的感受
我们分情况讨论下调用该方法时Context存在的几种情况:
- 当前的父Context为cancelCtx类型
- 当前的父Context为timerCtx类型
- 当前的父Context为自定义的Context类型
第一个条件
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
这个条件是看当前父Context是否已经关闭,如果是的话,则直接返回
第二个条件
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
这个判断条件主要是判断当前Context是否包含cancelCtx,当Context类型为1,2情况时,会直接返回true(timerCtx内嵌了cancelCtx,属于cancelCtx类型),那么当Context类型为自定义的时候,如果才能满足这个判断条件呢?我给出两个demo,相信机智的伙伴看一下就懂了😊,这两个demo都可以通过条件2
demo one:
type MyContext1 struct {
Context context.Context
}
func (my *MyContext1) Deadline() (deadline time.Time, ok bool) {
return
}
func (my *MyContext1) Done() <-chan struct{} {
return make(chan struct{})
}
func (my *MyContext1) Err() error {
return nil
}
func (my *MyContext1) Value(key interface{}) interface{} {
return my.Context
}
func (*MyContext1) cancel(removeFromParent bool, err error) {
return
}
func main() {
parentCtx, _ := context.WithCancel(context.Background())
cur := &MyContext1{parentCtx}
childCtx, _ := context.WithCancel(cur)
fmt.Println(childCtx)
}
demo two:
type MyContext2 struct {
context.Context
}
func main() {
parentCtx, _ := context.WithCancel(context.Background())
cur := &MyContext2{parentCtx}
childCtx, _ := context.WithCancel(cur)
}
第三个条件
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
对于自定义的Context,像demo one实现的那样,这个case是不能通过的,但是对demo two来说是可以的,因为它实现的MyContext并没有自定义Done方法,也就是对于context.Context来说还是可控的
如果发现当前的父Context不是【标准类型cancelCtx】,也就是对于context.Context来说不可控,则会启一个协程,监听该父Context的Done方法,以及当前cancelCtx的Done方法,这里为什么还要监听当前cancelCtx的Done方法呢?是为了防止父Context一直没有Done,导致该goroutine泄漏
接下来我们看cancelCtx中最重要的方法: cancel
方法
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
cancel
方法的流程为close掉当前Context的done(只读 channel),然后遍历当前Context的所有子Context,将它们一一取消,最后将自己从父Context中移除
这里有细心的伙伴可能发现在调用child.cancel(false, err)
传递的参数为false,而在withCancel
中传递的参数为true,这个是因为调用child.cacel(false, err)
说明父Context已经取消了,而且之后父Context会将children置为nil,所以传递true没有必要,而在withCancel
中设置为true指的是如果子Context自己调用cancel方法,父Context没有cancel,那么该子Context需要将自己从父Context中移除
- timerCtx
先来看下它的定义
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
timerCtx内置了cancelCtx,也就是可以当作cancelCtx来使用,同时增加了timer计时器以及过期时间deadline
timerCtx的作用是设置一个超时的Context,比如如果下游1s后没有返回结果,那么该Context就会cancel,声明timerCtx的方法为:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
由于该方法的主体其实是WithDeadline
,所以这里我们直接看这个方法:
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
方法中首先看当前timerCtx的超时时间是否比父Context的超时时间延后,如果是的话,其实和声明一个cancelCtx没有差别了,也就是当前timerCtx的取消取决于父Context的过期时间,之后同样需要将该timerCtx绑定到父Context,然后判断当前时间是否早于当前时间,如果是,则直接取消该timerCtx,否则通过time.AfterFunc设置一个定时任务,到达时间之后执行该timerCtx的cancel方法即可
timerCtx也有自己的cancel方法,与cancelCtx的cancel相比只是多了一个将timerCtx的timer停止掉
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
使用姿势
从Context的类型可以看出,在项目中的使用上我们可以利用valueCtx的属性在各个goroutine中同步数据,可以利用cancelCtx的属性防止goroutine泄漏,利用timerCtx的属性设置接口的响应超时时间,我一一展示下自己使用这些特性的方式
在各个goroutine之间同步数据
func Process(ctx context.Context) {
name := ctx.Value("name").(string)
fmt.Println(name)
}
func main() {
rootCtx := context.Background()
childCtx := context.WithValue(rootCtx, "name", "Jack")
Process(childCtx)
}
传值的这个方式我就不多说了,相信伙伴们都已经灵活的掌握
防止goroutine泄漏
func Process(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel happen")
return
default:
time.Sleep(1 * time.Second)
fmt.Println("hello jhd")
}
}
}
func main() {
rootCtx := context.Background()
parentCtx, parentCancel := context.WithCancel(rootCtx)
childCtx, _ := context.WithCancel(parentCtx)
go Process(childCtx)
time.Sleep(2 * time.Second)
parentCancel()
time.Sleep(3 * time.Second)
}
这里通过父groutine衍生出了一个子goroutine,保证父goroutine取消之后,子goroutine会跟着一起取消,不会发生goroutine的泄漏
设置接口的响应超时时间
func AnoProcess() chan struct{} {
c := make(chan struct{}, 1)
defer func() {
c <- struct{}{}
}()
time.Sleep(time.Second * 6)
return c
}
func Process(ctx context.Context) {
res := make(chan struct{}, 1)
go func() {
res = AnoProcess()
}()
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
case <-res:
fmt.Println("hello jhd")
return
}
}
func main() {
rootCtx := context.Background()
ctx, _ := context.WithTimeout(rootCtx, time.Second*3)
fmt.Println("start exec func at ", time.Now().Second())
Process(ctx)
fmt.Println("end exec func at ", time.Now().Second())
}
在这里给AnoProcess
方法设置了3s的超时时间,3s之后,如果AnoProcess
没有执行完,则直接返回
结束语
好啦,这就是我这次分享的全部内容啦,社区的小伙伴有任何的意见、问题都可以放到评论区,希望可以得到大家的反馈,你们的反馈是我继续创作的最大动力!!