本讲是 Go 并发编程专题的最后一个部分,在本讲中,我会介绍两个和并发安全相关的包。一个是 sync.Mutex,另一个是 sync.Atomic,它们分别用于锁和原子操作。

有的朋友可能会问:并发竟然还有不安全的,难道会烧毁电脑吗?我们不妨看看下面这段代码:

go
复制代码var testInt = 0
var syncWait sync.WaitGroup
func main() {
   syncWait.Add(2)
   go testFunc()
   go testFunc()
   syncWait.Wait()
   fmt.Println(testInt)
}
func testFunc() {
   defer syncWait.Done()
   for i := 0; i < 1000; i++ {
      testInt += 1
   }
}

通过前面的学习,这段代码理解起来并不难。main() 函数中开启了两个相同的协程任务,具体内容是对公共变量 testInt 进行自增 1 操作。每个协程任务都会自增 1000 次,两个任务并发,理应自增 2000 次,最终输出 testInt 的值应该是 2000。

不信你试试看,反复运行程序,果不其然还真有不是 2000 的时候。最为奇怪的是,计算结果居然还会有变化!这是为何呢?

其实,这就是并发“不安全”的体现了。由于 testInt 是公共变量,两个任务同时对其操作,导致数据竞争,计算出错误的结果。

代入实际的运行场景,假如某个时刻 testInt 的值为 5,两个 testFunc() 函数同时进行自增 1 操作。此时,具体的自增操作都会是 5+1=6。如此一来,虽然进行了 2 次自增操作,但最终结果和自增 1 次无异。

想象一下,两个人同时向第三个人的银行账户里转钱,如果发生类似的数据竞争问题,其结果可想而知(但如果是同时消费,似乎就赚到了)……

在大多数编程语言中,多线程/多进程/多协程都会存在类似的问题,规避它们的思路类似。都是通过锁或原子操作规避数据竞争,从而保护数据的安全。Go 语言也不例外。

# 互斥锁

互斥锁是 Go 语言中最为简单粗暴的锁,所以我们先从它学起。

从前文中的示例可以看出,发生不安全并发的根源在于公共变量 testInt,所以我们只需恰当地将其保护起来就行了。之所以说互斥锁简单粗暴,就是因为被它加锁的代码一旦运行,就必须等待其结束后才能再次运行。

它的使用方法也很简单粗暴,我们对上述示例代码稍加修改即可实现互斥锁保护了:

go
复制代码var testInt = 0
var syncWait sync.WaitGroup
var locker sync.Mutex
func main() {
   syncWait.Add(2)
   go testFunc()
   go testFunc()
   syncWait.Wait()
   fmt.Println(testInt)
}
func testFunc() {
	defer syncWait.Done()
	defer locker.Unlock()
	locker.Lock()
	for i := 0; i < 1000; i++ {
		testInt += 1
	}
}

大家可以看到,locker 是一开头就声明了的 sync.Mutex 类型变量,locker.Lock() 是加锁,locker.Unlock() 是解锁。在 testFunc() 函数中,一上来便执行了加锁操作,互斥锁“锁住”的代码是自增 1000 的逻辑。最后,为了确保后续代码顺利执行,使用断言执行解锁操作。

如此修改后,反复运行这段代码,控制台将始终输出 2000。

到此,计算结果总算是正确了。但大家想一想,如此加锁后,并发和串行执行似乎没什么区别。因为虽然并发了任务,但任务中的下次计算必须等上次计算完成后才能开始,这和串行执行任务并没有本质不同。有没有什么办法既能发挥并发优势,又能确保数据安全呢?当然有,那就是使用读写互斥锁。

# 读写互斥锁

想象一下,假如有一段庆祝生日的视频,大概 5GB 左右,现在要把它上传到百度网盘和阿里云盘中。假如同时开始上传任务,会有一方处于等待状态吗?显然,这通常是不会的。受整体带宽限制,虽然每个网盘上传的速度都变慢了,但上传还是会同时进行的。

再想象一下,当我们用网页和手机同时登录网银,同时查询账户余额时。作为服务器端,无需关心它们的顺序,只要将正确的返回值给到网页和手机客户端就行了。因为查询操作并不会改变金额,账户始终是安全的。

从上面两个例子中可以初步归纳出一个结论:只要共享的数据不发生改变,几乎不会用到锁。反之,如果强行对“读操作”加锁,反而会影响性能

据此规律,我们可以用 “读写互斥锁”充分发挥并发优势,只在写操作上串行,保证数据安全

具体说来,读写互斥锁的运行机制是这样的:

  • 当协程任务获得读操作锁后,读操作并发运行,写操作等待;
  • 当协程任务获得写操作锁后,考虑到数据可能发生变化,所以无论是读还是写操作都要等待。

使用读写互斥锁和使用简单的互斥锁很类似,不同的是需要声明 sync.Mutex 类型变量。写操作的方法依然 locker.Lock() 是加锁,locker.Unlock() 是解锁。读操作的方法则是 locker.RLock() 和 locker.RUnlock()。

下面用实际的代码示例来演示上述运行逻辑,以下代码模拟了读写文件的过程:

go
复制代码var syncWait sync.WaitGroup
var locker sync.RWMutex
func main() {
   syncWait.Add(3)
   go read5Sec()
   time.Sleep(time.Millisecond * 500)
   go read3Sec()
   go read1Sec()
   syncWait.Wait()
   fmt.Println("程序运行结束", time.Now().Format("15:04:05"))
}
func read5Sec() {
   defer syncWait.Done()
   defer locker.RUnlock()
   locker.RLock()
   fmt.Println("读文件耗时5秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 5)
   fmt.Println("读文件耗时5秒 结束", time.Now().Format("15:04:05"))
}
func read3Sec() {
   defer syncWait.Done()
   defer locker.RUnlock()
   locker.RLock()
   fmt.Println("读文件耗时3秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 3)
   fmt.Println("读文件耗时3秒 结束", time.Now().Format("15:04:05"))
}
func read1Sec() {
   defer syncWait.Done()
   defer locker.RUnlock()
   locker.RLock()
   fmt.Println("读文件耗时1秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 1)
   fmt.Println("读文件耗时1秒 结束", time.Now().Format("15:04:05"))
}

可以看到,read1Sec()、read3Sec() 和 read5Sec() 函数分别模拟了读文件的操作,所需时长分别是 1、3、5 秒。这 3 个函数中,都使用了读写互斥锁对等待时间进行了读操作的加锁和解锁。在 main() 函数中通过协程的方式首先启动了耗时 5 秒的任务,在 0.5 秒后,同时启动了剩余的 2 个任务。

程序运行后,可在控制台看到如下输出:

读文件耗时5秒 开始 10:42:25

读文件耗时1秒 开始 10:42:26

读文件耗时3秒 开始 10:42:26

读文件耗时1秒 结束 10:42:27

读文件耗时3秒 结束 10:42:29

读文件耗时5秒 结束 10:42:30

程序运行结束 10:42:30

显然,3个读操作的协程任务同时运行,实现了真正的并发。

💡 提示:上述代码中的 “15:04:05” 是为了格式化时间用的。Go 语言语法要求必须传入 2006 年1 月 2 日 15 时 04 分 05 秒 -0700 时区这个时间点(Go 语言的诞生时间)才能正常被格式化,并不是大多数编程语言中的 YMD HMS 格式。

接下来添加写操作协程,并修改 main() 函数,具体如下:

go
复制代码func main() {
	syncWait.Add(6)
	go write1Sec()
	time.Sleep(time.Second * 1)
	go read5Sec()
	time.Sleep(time.Second * 2)
	go read3Sec()
	time.Sleep(time.Millisecond * 500)
	go write3Sec()
	time.Sleep(time.Millisecond * 500)
	go read1Sec()
	go write5Sec()
	syncWait.Wait()
	fmt.Println("程序运行结束", time.Now().Format("15:04:05"))
}
func write5Sec() {
   defer syncWait.Done()
   defer locker.Unlock()
   locker.Lock()
   fmt.Println("写文件耗时5秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 5)
   fmt.Println("写文件耗时5秒 结束", time.Now().Format("15:04:05"))
}
func write3Sec() {
   defer syncWait.Done()
   defer locker.Unlock()
   locker.Lock()
   fmt.Println("写文件耗时3秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 3)
   fmt.Println("写文件耗时3秒 结束", time.Now().Format("15:04:05"))
}
func write1Sec() {
   defer syncWait.Done()
   defer locker.Unlock()
   locker.Lock()
   fmt.Println("写文件耗时1秒 开始", time.Now().Format("15:04:05"))
   time.Sleep(time.Second * 1)
   fmt.Println("写文件耗时1秒 结束", time.Now().Format("15:04:05"))
}

上述代码包含 3 个模拟写文件任务的函数,分别耗时 1、3、5 秒完成。此外,还包含了对 main() 函数的修改。

我们重点关注 main() 函数。程序启动后,首先开启了耗时 1 秒(10:51:35)的写文件任务。根据读写互斥锁的运行规律,当协程任务获得写操作锁后,考虑到数据可能发生变化,无论是读还是写操作都要等待

所以即使在 0.5 秒时启动了读文件的任务,也会等待到写文件完成才能干活。1 秒后(10:51:36),写文件任务完成,读文件操作开始并发执行。在这些操作开始后的 2.5 秒(大约10:51:40)时,写文件任务加入。

此时,由于协程任务获得读操作锁,读操作可以并发运行,写操作必须等待。 所以此时的写文件任务还要再等待 2.5 秒才能得到执行。2.5 秒后(10:51:41)读文件任务完成,写文件任务开始执行。

照此规律,此后 0.5 秒(大约10:51:42)时,又加入了读文件任务。这次的读文件需要等待写文件任务完成(10:51:44)后才能执行。随着此次读文件任务的开启,写文件任务也会同时开启。但由于读写之间互斥,只能等待其中一方运行结束后才能开始另一方的执行。

程序运行后,控制台将输出:

写文件耗时1秒 开始 10:51:35

写文件耗时1秒 结束 10:51:36

读文件耗时5秒 开始 10:51:36

读文件耗时3秒 开始 10:51:37

读文件耗时3秒 结束 10:51:40

读文件耗时5秒 结束 10:51:41

写文件耗时3秒 开始 10:51:41

写文件耗时3秒 结束 10:51:44

读文件耗时1秒 开始 10:51:44

读文件耗时1秒 结束 10:51:45

写文件耗时5秒 开始 10:51:45

写文件耗时5秒 结束 10:51:50

程序运行结束 10:51:50

简单对比,使用读写互斥锁运行上述任务,总耗时 15 秒。但如果使用简单的互斥锁,所有任务都会串行工作,没有任何并发优势(尽管使用了并发),耗时将长达 18 秒。

❗️ 注意:在使用锁或读写互斥锁时,一定要注意避免出现 A 等 B ,B 等 C,C 等 A 的情况。如此无限循环也会导致程序进入无限的循环等待中。

# 原子操作

所谓“原子操作”,简单理解就是指那些进行过程中不能被打断的操作。比如本讲一上来的示例中,对 testInt 的并发 2 次循环累加就可以使用原子操作来避免结果不准的问题。当然,原子操作也会使 2 次任务串行化,无法发挥并发的优势。但原子操作是无锁的,往往直接通过 CPU 指令直接实现,某些同步技术的实现恰恰基于原子操作。

Go 语言中的原子操作通过 sync/atomic 包实现,具体特性如下:

  • 原子操作都是非入侵式的;
  • 原子操作共有五种:增减、比较并交换、载入、存储、交换
  • 原子操作支持的类型类型包括 int32、int64、uint32、uint64、uintptr、unsafe.Pointer(在本讲末尾会有详细的附录列举)。

原子操作使用起来并不难,我们直接上代码。

下面的代码演示了使用原子操作实现对 testInt 的 2 次累加:

go
复制代码var testInt int32 = 0
var syncWait sync.WaitGroup
func main() {
   syncWait.Add(2)
   go testFunc()
   go testFunc()
   syncWait.Wait()
   fmt.Println(testInt)
}
func testFunc() {
   defer syncWait.Done()
   for i := 0; i < 1000; i++ {
      atomic.AddInt32(&testInt, 1)
   }
}

控制台会输出 2000。

实际上,原子操作不仅实现起来更方便,性能也比锁更快。感兴趣的朋友可以适当做压力测试,对比加锁和原子操作针对相同任务的耗时时长。

# 小结

🎉 恭喜,您完成了本次课程的学习!

📌 以下是本次课程的重点内容总结:

  1. 安全地使用 Go 并发
  • 互斥锁
  • 读写互斥锁
  • 原子操作

本讲是 Go 并发专题的收尾,作为最后一讲,我向大家介绍了如何安全地开启并发任务。

互斥锁是最简单粗暴的一种锁,只要加了锁,后续相同的任务只能排队进行。这也直接导致了并发的优势不复存在,加了锁的任务被改为串行进行。

为了更高效地加锁,同时不降低数据安全性,我们使用读写互斥锁。这种锁在性能和安全之间做到尽可能的平衡。当协程任务获得读操作锁后,读操作并发运行,写操作等待;当协程任务获得写操作锁后,考虑到数据可能发生变化,所以无论是读还是写操作都要等待。

当然,无论是互斥锁还是读写互斥锁,都要注意避免让程序陷入无限等待循环中。

最后,Go SDK 中的 sync/atomic 包提供了原子操作。与加锁相比,原子操作更易于实现,且性能更高。

➡️ 在下次课程中,我们会开始 Go 语言中反射的专题,具体内容如下:

  1. 反射的基本使用
  2. 反射的三个定律

# 附录一 原子操作常用函数一览表

操 作 分 类 函 数 声 明
读操作 func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) func LoadUint32(addr uint32) (val uint32) func LoadUint64(addr uint64) (val uint64) func LoadUintptr(addr uintptr) (val uintptr) func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer)
写操作 func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
修改操作 func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
交换操作 func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
比较并交换操作 func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)