Go 读写锁源码阅读

问题

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var lock sync.RWMutex
var wg *sync.WaitGroup

func Writer(ctx context.Context) {
	ticker := time.NewTicker(10 * time.Millisecond)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			lock.Lock()
			time.Sleep(5 * time.Millisecond)
			lock.Unlock()
		case <-ctx.Done():
			return
		}
	}

}

func Reader() {
	defer wg.Done()
	lock.RLock()
	time.Sleep(1000 * time.Millisecond)
	defer lock.RUnlock()

	lock.RLock()
	time.Sleep(100 * time.Millisecond)
	defer lock.RUnlock()
}

func main() {
	wg = new(sync.WaitGroup)
	wg.Add(1)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go Writer(ctx)
	go Reader()
	wg.Wait()
}

执行上面的代码,会发生死锁,为什么?

初次使用可能意识不到,认为同一个协程两次获取读锁,也没什么大不了的,实际上 go 的读写锁是禁止递归获取读锁的。设想如果 Reader 先获取到读锁,然后 Writer 试图获取写锁,写锁会因为有 goroutine 持有读锁而堵塞在加锁的过程中,然后 Reader 尝试再次获取读锁,会因为有 goroutine 试图持有写锁而堵塞,从而形成死锁。

如果不了解读写锁的底层实现,看了上面的文字还是有可能领会不到为什么会出现死锁,为什么 Reader 再次获取读锁会仅仅应为 goroutine 试图持有写锁而堵塞,仅仅是试图都不行?

源码

go 的版本为 1.23.0

读写锁的结构如下:

1
2
3
4
5
6
7
type RWMutex struct {
	w           Mutex        // held if there are pending writers
	writerSem   uint32       // semaphore for writers to wait for completing readers
	readerSem   uint32       // semaphore for readers to wait for completing writers
	readerCount atomic.Int32 // number of pending readers
	readerWait  atomic.Int32 // number of departing readers
}

w 是多个写者尝试申请写锁时的互斥锁,用于选出一位优先的写者。 readerCount 是所有的读者数量(包括持有读锁和等待读锁的),readerWait 是写者尝试获取写锁时读者的数量(用于尽力实现公平调度)。

rwmutexMaxReaders 是包中的一个常量,大小 $2^{30}$,表示最多读者数量。

加写锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

关于 race 的部分可以忽略。可以看到首先加写者的竞争锁,然后 rw.readerCount.Add(-rwmutexMaxReaders),这是将 readerCount 置为负数,目的可以见加读锁的过程。用 r 获取到了所有读者的数量,之后将 readerWait 置为 r,然后等待其他 r 位读者释放读锁发送写信号量。

加读锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	if rw.readerCount.Add(1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

首先将读者数量加一,然后奇怪的事情来了——为什么要检查读者数量是否小于 0?可以看到注释锁小于 0 意味着有写者在前面,还记得写者申请写锁的时候 rw.readerCount.Add(-rwmutexMaxReaders) 将读者数量置为负数来表明有写者吗?这就呼应上了。如果前面有写者,尝试获取读者的协程会堵塞等待读信号量。这个信号量由谁来发送呢?可以带着这个疑问看写锁释放的过程。

但是为什么要这样(堵塞)做呢?公平性。如果不让新来的读者堵塞在写者后面,会出现写者饥饿,长时间获取不到锁。

释放读锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	if r := rw.readerCount.Add(-1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if rw.readerWait.Add(-1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

过程是:读者数量减 1,然后检查读者数量是否为负,如果为负,意味有写者在等待前面的读者释放,这个时候就要检查是否轮到写者上锁了。可以看到检查的过程 rUnlockSlow:首先将写者前面的读者数量 readerWait 减 1,然后检查是否为 0,为 0 意味没有读者了,可以给写者发送信号量了。

释放写锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	r := rw.readerCount.Add(rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		fatal("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

写锁释放需要做两个事情:

  1. 给排在写者后面的读者释放信号量
  2. 释放写者尝试加写者锁时获取到的互斥锁,供后续的写者获取

总结

回到刚才的问题,当 Reader 获取到第一个读锁时,Writer 尝试获取写锁,会堵塞在源码的:

1
2
3
if r != 0 && rw.readerWait.Add(r) != 0 {
	runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}

而当 Reader 尝试获取第二个读锁的时候,Reader 会堵塞在:

1
2
3
4
if rw.readerCount.Add(1) < 0 {
	// A writer is pending, wait for it.
	runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}

两者互相堵塞,导致死锁。

这一切的根源是什么呢?公平性,为了保证公平性,写者在没有获取到写锁的时候已经拥有了堵塞后续读者的权利,但是没有释放写锁来释放后续读者的权利。

说到公平性,可以思考一下 go 的读写锁是否实现了公平性?

公平性:读者和写者的调度顺序。 如果读者和写者按照来的顺序调度,多个相邻的读者可以同时调度,我们说是公平的

在大多数情况下,读写是公平的,但是在释放写锁的时候,首先是给读者释放信号量,其次才是释放尝试加写锁时获取到的互斥锁,这一先后顺序会导致这一批读者中有一部分读者(调度机制)会先于写者调度,但实际上新的写者有可能才是先来的那位(在所有读者和写者中)。

读写锁的优先级:

  1. 读操作优先锁:提供了最大并发性,但在锁竞争比较激烈的情况下,可能会导致写操作饥饿。这是由于只要还有一个读线程持锁,写线程就拿不到锁。多个读者可以立刻拿到锁,这意味着一个写者可能一直在等锁,期间新的读者一直可以拿到锁。极端情况下,写者线程可能会一直等锁,直到所有一开始就拿到锁的读者释放锁。
  2. 写操作优先锁:如果队列中有写者在等锁,则阻止任何读者拿锁,来避免了写操作饥饿的问题。一旦所有已经开始的读操作完成,等待的写操作立即获得锁。和读操作优先锁相比,写操作优先锁的不足在于在写者存在的情况下并发度低。内部实现需要两把互斥锁。
  3. 未指定优先级锁:不提供任何读/写的优先级保证。

go 的读写锁是写操作优先。

0%