谈谈自己对GO的Mutex的理解

语言: CN / TW / HK

目前 GO 已经更新到了 1.14 的版本

咱们一般人如果直接去看 mutex 的源码的话,其实是比较难理解为什么写成了现在这个样子,尤其是加锁里面的各种逻辑判断太多了,各种位运算一脸懵逼,其实我们只要掌握它最初的设计思想,那么后面新增的逻辑,理解起来都很简单了。

Mutex最初版本

Mutex第一版代码加上注释不过才109行。非常精简,下面介绍一下我对第一版 Mutex 源码的理解

// Mutex有state和sema两个成员变量,这一点是在1.14没有变化的
// 其中 state 字段代表当前锁的状态,sema是控制锁状态的信号量,主要关注state就行
//
// state 比较复杂,state一共32位
// 最低位代表 locked状态, 0表示未上锁,1表示上锁
// 倒数第二位 woken状态,0 表示未唤醒,1表示已唤醒
// 剩余30位用于表示当前有多少个goroutine等待互斥锁的释放,代表最多支持2^30个goroutine
type Mutex struct {
    state int32
    sema  uint32
}

我们接下来看它的 Lock 方法

func (m *Mutex) Lock() {
    // 首先直接CAS尝试获取锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if raceenabled {
            raceAcquire(unsafe.Pointer(m))
        }
        // 上锁成功后,直接返回
        return
    }

    // CAS获取锁失败
    // awoke 默认是未唤醒状态
    awoke := false
    for {
        // 当前state赋值给old
        old := m.state
        // 给old上锁
        new := old | mutexLocked
        // 如果old本身就已经上了锁的话
        if old&mutexLocked != 0 {
            // goroutine等待数 + 1
            new = old + 1<<mutexWaiterShift
        }
        // 如果当前g被唤醒了
        if awoke {
            // 把woken标记清除掉
            new &^= mutexWoken
        }
        // 更新一下当前锁的状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果old本身就是解锁状态
            if old&mutexLocked == 0 {
                // 那么代表抢锁成功,直接退出for循环
                break
            }
            // 不是解锁状态
            // 尝试获取信号量,进入等待队列,等待被唤醒
            runtime_Semacquire(&m.sema)
            // 被唤醒,awoke设置true,继续for循环
            awoke = true
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

简单总结一下 Lock 的逻辑,分几种情况说明一下

第一种情况:第一次上锁的时候,直接走第一步 CAS 上锁,成功返回

第二种情况: Mutex 已经被另一个 g 上锁,那么 stateg 等待数+1,更新当前的锁状态,然后就进入队列,等待被唤醒,等到另个一 g 调用了 Unlock 方法之后,当前 g 被唤醒,然后设置 awoken=true ,再执行一遍 for 循环,此时 locked 位就是未上锁状态(0), new 就是代表上锁,然后清除 woken 位,然后再 CAS 更新 newstate 上,因为之前的锁是未上锁状态,那么就代表抢锁成功, break ,返回

第三种情况:和第二种一样,只不过,在 CAS 更新 newstate 上时,有其他 g 先改掉了 state 的值,那么就继续for循环,然后重复到第二种情况。

接下来看下 Unlock 方法

func (m *Mutex) Unlock() {
    if raceenabled {
        _ = m.state
        raceRelease(unsafe.Pointer(m))
    }

    // 一开始也是直接去掉加锁状态
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 判断一下是否解锁了一个未加锁的Mutex
    if (new+mutexLocked)&mutexLocked == 0 {
        // 直接panic
        panic("sync: unlock of unlocked mutex")
    }

    // 把解锁后的值赋值给old
    old := new
    for {
        // 如果此时没有需要等待获取锁的G
        // 或者当前Mutex已经被抢锁成功或者已经有被唤醒的G,那么就可以直接返回
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        
        // g等待数-1,然后设置唤醒标记位
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        // 更新Mutex的state的值
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 手动唤醒一个被runtime_Semacquire阻塞的G
            runtime_Semrelease(&m.sema)
            // 返回
            return
        }
        // 更新state失败,说明有其他G修改了state的值,那么,重新赋值一下,再进行下一次循环
        old = m.state
    }
}

Unlock要比 Lock 简单很多,所以这里不总结了,看注释就能明白

到这里,最初版本的 Mutex 源码已经分析完了,关键还是在上锁的方法里面。上锁逻辑非常简单粗暴,直接 CAS 获取锁,失败就 G 等待数+1,然后进入队列,等待被唤醒。

那么,如果仔细想想,就会发现性能上还是有可以改进的地方。

我们应用Mutex的时候,肯定把锁粒度控制的越小越好,那么这样的话就很可能会出现这么一个问题,当第一次上锁CAS失败的时候,mutex已经被其他G解锁了,但是当前G就还是直接进入队列,等待被唤醒,这样的话其实就会带来额外的调度开销。

所以, Mutex 后面引进了自旋锁的概念 自旋锁提交代码

Mutex 引入自旋锁

Currently sync.Mutex is fully cooperative. That is, once contention is discovered,

the goroutine calls into scheduler. This is suboptimal as the resource can become

free soon after (especially if critical sections are short). Server software

usually runs at ~~50% CPU utilization, that is, switching to other goroutines

is not necessary profitable.

This change adds limited active spinning to sync.Mutex if:

  1. running on a multicore machine and
  2. GOMAXPROCS>1 and
  3. there is at least one other running P and
  4. local runq is empty. As opposed to runtime mutex we don't do passive spinning, because there can be work on global runq on on other

Ps.

简单概括一下,就是为了解决锁粒度非常小的时候,给系统带来的不必要的调度开销

不过自旋要先满足几个条件

首先程序要跑在多核的机器上,然后GOMAXPROCS要大于1,并且此时有至少一个P的local runq是空的,才能进入到自旋的状态

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻

看一下更新之后的 Lock 方法

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if raceenabled {
            raceAcquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0 // 自旋的次数( <= 4)
    for {
        old := m.state
        new := old | mutexLocked
        // 没有解锁
        if old&mutexLocked != 0 {
            // 判断是否满足自旋的状态
            if runtime_canSpin(iter) {
                // 当woken标记位没有被设置,而且等待G数量不等于0,并设置woken标记位成功
                // 这里设置woken标记位的原因是,通知Unlock不用去唤醒等待队列里面的G了
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    // 标记awoke=true
                    awoke = true
                }
                // runtime_doSpin -> sync_runtime_doSpin
                // 每次自旋30个时钟周期,最多120个周期
                runtime_doSpin()
                iter++
                // 再次执行for循环
                continue
            }
            // 自旋结束之后,G等待数量+1
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // 这里多了个判断woken状态不一致的逻辑
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0 // 重置iter
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

相比于第一版的 Mutex ,这里只在加锁的方法里面增加了自旋锁的逻辑

Mutex 已经上锁的时候,当前 G 在满足自旋条件下,进入自旋状态,在自旋中,其他 G 解锁了 Mutex ,那么当前 G 就设置了 woken 标记位,这样其他 GUnlock 的时候就不会去等待队列里面唤醒 G 了,然后当前 G 就顺理成章的抢到了锁

这样自旋锁在锁粒度非常小的场景下的能对其性能带来一定的优化。

引入自旋锁之后,又带来了一个问题。就是 G 等待队列的长尾问题。因为从等待队列里面被唤醒,然后再去抢锁,对本身就在执行的 G 来说,被唤醒的 G 其实是很难抢过当前执行的 G 的,这样的话,等待队列里面的 G ,就会被饿死(长时间获取不到锁),这样对等待队列的 G 来说其实是不公平的。

所以Mutex后面引入了饥饿模式 饥饿模式代码

Mutex引入饥饿模式

本次代码变动还是挺大的

先看下提交者的介绍

Add new starvation mode for Mutex.

In starvation mode ownership is directly handed off from

unlocking goroutine to the next waiter. New arriving goroutines

don't compete for ownership.

Unfair wait time is now limited to 1ms.

Also fix a long standing bug that goroutines were requeued

at the tail of the wait queue. That lead to even more unfair

acquisition times with multiple waiters.

Performance of normal mode is not considerably affected.

简单概括一下,就是解决了等待G队列的长尾问题

饥饿模式下,直接由unlock把锁交给等待队列中排在第一位的G,同时,饥饿模式下,新进来的G不会参与抢锁也不会进入自旋状态,会直接进入等待队列的尾部。

饥饿模式的触发条件,当一个G等待锁时间超过1毫秒时,Mutex切换到饥饿模式

饥饿模式的取消条件,当一个G获取到锁且在等待队列的末尾,或者这个G获取锁的等待时间在1ms内,那么Mutex切换回正常模式

带来的改变

Mutex.state的倒数第三位,变成了mutexStarving标记位,0表示正常模式,1表示饥饿模式,与此同时,支持的最大等待G数量从2^30^个 变成了2^29^个

接下来还是主要关注 Lock 方法,我只在新增的逻辑上添加注释了,我直接贴1.14的 Lock 代码,较1.9的版本没什么改变

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    // 这里封装了一下
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 默认是正常模式
    awoke := false
    iter := 0
    old := m.state
    for {
        // 当前Mutex在饥饿模式下已经被锁了的话,当前G不进入自旋
        // 只有Mutex在正常模式且被锁了的情况下,并且满足自旋的条件,才会进入到自旋逻辑里面
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // 如果当前不是饥饿模式
        if old&mutexStarving == 0 {
            // 加锁
            new |= mutexLocked
        }
        // 如果Mutex已经被锁,或者是在饥饿模式
        if old&(mutexLocked|mutexStarving) != 0 {
            // 等待的G数量+1
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        // 如果已经是饥饿模式,并且Mutex是被锁的状态
        if starving && old&mutexLocked != 0 {
            // 切换成饥饿模式
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // 更新state值
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 非饥饿模式下抢锁成功
            if old&(mutexLocked|mutexStarving) == 0 {
                // 退出
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            // 如果之前已经设置过waitStartTime的话,queueLifo就是true了
            queueLifo := waitStartTime != 0
            // 没有设置过,获取下运行时间
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 阻塞,等待被唤醒
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 如果等待时间超过1ms,设置starving = true,否则就是false
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果Mutex已经是饥饿模式
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                // 如果当前G是在饥饿模式下被唤醒的
                // 加个判断state是否正确设置的逻辑
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // delta = -7 (1..... 0111)
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 退出饥饿模式
                    delta -= mutexStarving
                }
                // 更新state
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

Unlock方法改动就非常小了

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // 不是饥饿模式
    if new&mutexStarving == 0 {
        old := new
        for {
            // G等待队列==0,直接返回
            // (或者,处于woken模式,直接返回
            // 或者,处于locked模式,直接返回
            // 或者处于饥饿模式,直接返回)
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 唤醒G等待队列的首个G
        runtime_Semrelease(&m.sema, true, 1)
    }
}

总结

Mutex经过两次演进,都解决了不同的问题。 Mutex 用法非常简单,里面的原理不感兴趣的话其实没必要深究,知道个大概的逻辑就行了。

补充:

mutex的等待G队列的顺序是FIFO

饥饿模式下,性能其实很低,主要就是为了解决长尾问题的

分享到: