typeRWMutexstruct{wMutex// held if there are pending writers
writerSemuint32// semaphore for writers to wait for completing readers
readerSemuint32// semaphore for readers to wait for completing writers
readerCountatomic.Int32// number of pending readers
readerWaitatomic.Int32// number of departing readers
}
w 是多个写者尝试申请写锁时的互斥锁,用于选出一位优先的写者。 readerCount 是所有的读者数量(包括持有读锁和等待读锁的),readerWait 是写者尝试获取写锁时读者的数量(用于尽力实现公平调度)。
rwmutexMaxReaders 是包中的一个常量,大小 $2^{30}$,表示最多读者数量。
加写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func(rw*RWMutex)Lock(){ifrace.Enabled{_=rw.w.staterace.Disable()}// First, resolve competition with other writers.
rw.w.Lock()// Announce to readers there is a pending writer.
r:=rw.readerCount.Add(-rwmutexMaxReaders)+rwmutexMaxReaders// Wait for active readers.
ifr!=0&&rw.readerWait.Add(r)!=0{runtime_SemacquireRWMutex(&rw.writerSem,false,0)}ifrace.Enabled{race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))race.Acquire(unsafe.Pointer(&rw.writerSem))}}
关于 race 的部分可以忽略。可以看到首先加写者的竞争锁,然后 rw.readerCount.Add(-rwmutexMaxReaders),这是将 readerCount 置为负数,目的可以见加读锁的过程。用 r 获取到了所有读者的数量,之后将 readerWait 置为 r,然后等待其他 r 位读者释放读锁发送写信号量。
加读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func(rw*RWMutex)RLock(){ifrace.Enabled{_=rw.w.staterace.Disable()}ifrw.readerCount.Add(1)<0{// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem,false,0)}ifrace.Enabled{race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))}}
func(rw*RWMutex)RUnlock(){ifrace.Enabled{_=rw.w.staterace.ReleaseMerge(unsafe.Pointer(&rw.writerSem))race.Disable()}ifr:=rw.readerCount.Add(-1);r<0{// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)}ifrace.Enabled{race.Enable()}}func(rw*RWMutex)rUnlockSlow(rint32){ifr+1==0||r+1==-rwmutexMaxReaders{race.Enable()fatal("sync: RUnlock of unlocked RWMutex")}// A writer is pending.
ifrw.readerWait.Add(-1)==0{// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem,false,1)}}
func(rw*RWMutex)Unlock(){ifrace.Enabled{_=rw.w.staterace.Release(unsafe.Pointer(&rw.readerSem))race.Disable()}// Announce to readers there is no active writer.
r:=rw.readerCount.Add(rwmutexMaxReaders)ifr>=rwmutexMaxReaders{race.Enable()fatal("sync: Unlock of unlocked RWMutex")}// Unblock blocked readers, if any.
fori:=0;i<int(r);i++{runtime_Semrelease(&rw.readerSem,false,0)}// Allow other writers to proceed.
rw.w.Unlock()ifrace.Enabled{race.Enable()}}