本讲是 Go 并发编程专题的最后一个部分,在本讲中,我会介绍两个和并发安全相关的包。一个是 sync.Mutex,另一个是 sync.Atomic,它们分别用于锁和原子操作。
有的朋友可能会问:并发竟然还有不安全的,难道会烧毁电脑吗?我们不妨看看下面这段代码:
go
复制代码var testInt = 0
var syncWait sync.WaitGroup
func main() {
syncWait.Add(2)
go testFunc()
go testFunc()
syncWait.Wait()
fmt.Println(testInt)
}
func testFunc() {
defer syncWait.Done()
for i := 0; i < 1000; i++ {
testInt += 1
}
}
通过前面的学习,这段代码理解起来并不难。main() 函数中开启了两个相同的协程任务,具体内容是对公共变量 testInt 进行自增 1 操作。每个协程任务都会自增 1000 次,两个任务并发,理应自增 2000 次,最终输出 testInt 的值应该是 2000。
不信你试试看,反复运行程序,果不其然还真有不是 2000 的时候。最为奇怪的是,计算结果居然还会有变化!这是为何呢?
其实,这就是并发“不安全”的体现了。由于 testInt 是公共变量,两个任务同时对其操作,导致数据竞争,计算出错误的结果。
代入实际的运行场景,假如某个时刻 testInt 的值为 5,两个 testFunc() 函数同时进行自增 1 操作。此时,具体的自增操作都会是 5+1=6。如此一来,虽然进行了 2 次自增操作,但最终结果和自增 1 次无异。
想象一下,两个人同时向第三个人的银行账户里转钱,如果发生类似的数据竞争问题,其结果可想而知(但如果是同时消费,似乎就赚到了)……
在大多数编程语言中,多线程/多进程/多协程都会存在类似的问题,规避它们的思路类似。都是通过锁或原子操作规避数据竞争,从而保护数据的安全。Go 语言也不例外。
# 互斥锁
互斥锁是 Go 语言中最为简单粗暴的锁,所以我们先从它学起。
从前文中的示例可以看出,发生不安全并发的根源在于公共变量 testInt,所以我们只需恰当地将其保护起来就行了。之所以说互斥锁简单粗暴,就是因为被它加锁的代码一旦运行,就必须等待其结束后才能再次运行。
它的使用方法也很简单粗暴,我们对上述示例代码稍加修改即可实现互斥锁保护了:
go
复制代码var testInt = 0
var syncWait sync.WaitGroup
var locker sync.Mutex
func main() {
syncWait.Add(2)
go testFunc()
go testFunc()
syncWait.Wait()
fmt.Println(testInt)
}
func testFunc() {
defer syncWait.Done()
defer locker.Unlock()
locker.Lock()
for i := 0; i < 1000; i++ {
testInt += 1
}
}
大家可以看到,locker 是一开头就声明了的 sync.Mutex 类型变量,locker.Lock() 是加锁,locker.Unlock() 是解锁。在 testFunc() 函数中,一上来便执行了加锁操作,互斥锁“锁住”的代码是自增 1000 的逻辑。最后,为了确保后续代码顺利执行,使用断言执行解锁操作。
如此修改后,反复运行这段代码,控制台将始终输出 2000。
到此,计算结果总算是正确了。但大家想一想,如此加锁后,并发和串行执行似乎没什么区别。因为虽然并发了任务,但任务中的下次计算必须等上次计算完成后才能开始,这和串行执行任务并没有本质不同。有没有什么办法既能发挥并发优势,又能确保数据安全呢?当然有,那就是使用读写互斥锁。
# 读写互斥锁
想象一下,假如有一段庆祝生日的视频,大概 5GB 左右,现在要把它上传到百度网盘和阿里云盘中。假如同时开始上传任务,会有一方处于等待状态吗?显然,这通常是不会的。受整体带宽限制,虽然每个网盘上传的速度都变慢了,但上传还是会同时进行的。
再想象一下,当我们用网页和手机同时登录网银,同时查询账户余额时。作为服务器端,无需关心它们的顺序,只要将正确的返回值给到网页和手机客户端就行了。因为查询操作并不会改变金额,账户始终是安全的。
从上面两个例子中可以初步归纳出一个结论:只要共享的数据不发生改变,几乎不会用到锁。反之,如果强行对“读操作”加锁,反而会影响性能。
据此规律,我们可以用 “读写互斥锁”充分发挥并发优势,只在写操作上串行,保证数据安全。
具体说来,读写互斥锁的运行机制是这样的:
- 当协程任务获得读操作锁后,读操作并发运行,写操作等待;
- 当协程任务获得写操作锁后,考虑到数据可能发生变化,所以无论是读还是写操作都要等待。
使用读写互斥锁和使用简单的互斥锁很类似,不同的是需要声明 sync.Mutex 类型变量。写操作的方法依然 locker.Lock() 是加锁,locker.Unlock() 是解锁。读操作的方法则是 locker.RLock() 和 locker.RUnlock()。
下面用实际的代码示例来演示上述运行逻辑,以下代码模拟了读写文件的过程:
go
复制代码var syncWait sync.WaitGroup
var locker sync.RWMutex
func main() {
syncWait.Add(3)
go read5Sec()
time.Sleep(time.Millisecond * 500)
go read3Sec()
go read1Sec()
syncWait.Wait()
fmt.Println("程序运行结束", time.Now().Format("15:04:05"))
}
func read5Sec() {
defer syncWait.Done()
defer locker.RUnlock()
locker.RLock()
fmt.Println("读文件耗时5秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 5)
fmt.Println("读文件耗时5秒 结束", time.Now().Format("15:04:05"))
}
func read3Sec() {
defer syncWait.Done()
defer locker.RUnlock()
locker.RLock()
fmt.Println("读文件耗时3秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 3)
fmt.Println("读文件耗时3秒 结束", time.Now().Format("15:04:05"))
}
func read1Sec() {
defer syncWait.Done()
defer locker.RUnlock()
locker.RLock()
fmt.Println("读文件耗时1秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 1)
fmt.Println("读文件耗时1秒 结束", time.Now().Format("15:04:05"))
}
可以看到,read1Sec()、read3Sec() 和 read5Sec() 函数分别模拟了读文件的操作,所需时长分别是 1、3、5 秒。这 3 个函数中,都使用了读写互斥锁对等待时间进行了读操作的加锁和解锁。在 main() 函数中通过协程的方式首先启动了耗时 5 秒的任务,在 0.5 秒后,同时启动了剩余的 2 个任务。
程序运行后,可在控制台看到如下输出:
读文件耗时5秒 开始 10:42:25
读文件耗时1秒 开始 10:42:26
读文件耗时3秒 开始 10:42:26
读文件耗时1秒 结束 10:42:27
读文件耗时3秒 结束 10:42:29
读文件耗时5秒 结束 10:42:30
程序运行结束 10:42:30
显然,3个读操作的协程任务同时运行,实现了真正的并发。
💡 提示:上述代码中的 “15:04:05” 是为了格式化时间用的。Go 语言语法要求必须传入 2006 年1 月 2 日 15 时 04 分 05 秒 -0700 时区这个时间点(Go 语言的诞生时间)才能正常被格式化,并不是大多数编程语言中的 YMD HMS 格式。
接下来添加写操作协程,并修改 main() 函数,具体如下:
go
复制代码func main() {
syncWait.Add(6)
go write1Sec()
time.Sleep(time.Second * 1)
go read5Sec()
time.Sleep(time.Second * 2)
go read3Sec()
time.Sleep(time.Millisecond * 500)
go write3Sec()
time.Sleep(time.Millisecond * 500)
go read1Sec()
go write5Sec()
syncWait.Wait()
fmt.Println("程序运行结束", time.Now().Format("15:04:05"))
}
func write5Sec() {
defer syncWait.Done()
defer locker.Unlock()
locker.Lock()
fmt.Println("写文件耗时5秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 5)
fmt.Println("写文件耗时5秒 结束", time.Now().Format("15:04:05"))
}
func write3Sec() {
defer syncWait.Done()
defer locker.Unlock()
locker.Lock()
fmt.Println("写文件耗时3秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 3)
fmt.Println("写文件耗时3秒 结束", time.Now().Format("15:04:05"))
}
func write1Sec() {
defer syncWait.Done()
defer locker.Unlock()
locker.Lock()
fmt.Println("写文件耗时1秒 开始", time.Now().Format("15:04:05"))
time.Sleep(time.Second * 1)
fmt.Println("写文件耗时1秒 结束", time.Now().Format("15:04:05"))
}
上述代码包含 3 个模拟写文件任务的函数,分别耗时 1、3、5 秒完成。此外,还包含了对 main() 函数的修改。
我们重点关注 main() 函数。程序启动后,首先开启了耗时 1 秒(10:51:35)的写文件任务。根据读写互斥锁的运行规律,当协程任务获得写操作锁后,考虑到数据可能发生变化,无论是读还是写操作都要等待。
所以即使在 0.5 秒时启动了读文件的任务,也会等待到写文件完成才能干活。1 秒后(10:51:36),写文件任务完成,读文件操作开始并发执行。在这些操作开始后的 2.5 秒(大约10:51:40)时,写文件任务加入。
此时,由于协程任务获得读操作锁,读操作可以并发运行,写操作必须等待。 所以此时的写文件任务还要再等待 2.5 秒才能得到执行。2.5 秒后(10:51:41)读文件任务完成,写文件任务开始执行。
照此规律,此后 0.5 秒(大约10:51:42)时,又加入了读文件任务。这次的读文件需要等待写文件任务完成(10:51:44)后才能执行。随着此次读文件任务的开启,写文件任务也会同时开启。但由于读写之间互斥,只能等待其中一方运行结束后才能开始另一方的执行。
程序运行后,控制台将输出:
写文件耗时1秒 开始 10:51:35
写文件耗时1秒 结束 10:51:36
读文件耗时5秒 开始 10:51:36
读文件耗时3秒 开始 10:51:37
读文件耗时3秒 结束 10:51:40
读文件耗时5秒 结束 10:51:41
写文件耗时3秒 开始 10:51:41
写文件耗时3秒 结束 10:51:44
读文件耗时1秒 开始 10:51:44
读文件耗时1秒 结束 10:51:45
写文件耗时5秒 开始 10:51:45
写文件耗时5秒 结束 10:51:50
程序运行结束 10:51:50
简单对比,使用读写互斥锁运行上述任务,总耗时 15 秒。但如果使用简单的互斥锁,所有任务都会串行工作,没有任何并发优势(尽管使用了并发),耗时将长达 18 秒。
❗️ 注意:在使用锁或读写互斥锁时,一定要注意避免出现 A 等 B ,B 等 C,C 等 A 的情况。如此无限循环也会导致程序进入无限的循环等待中。
# 原子操作
所谓“原子操作”,简单理解就是指那些进行过程中不能被打断的操作。比如本讲一上来的示例中,对 testInt 的并发 2 次循环累加就可以使用原子操作来避免结果不准的问题。当然,原子操作也会使 2 次任务串行化,无法发挥并发的优势。但原子操作是无锁的,往往直接通过 CPU 指令直接实现,某些同步技术的实现恰恰基于原子操作。
Go 语言中的原子操作通过 sync/atomic 包实现,具体特性如下:
- 原子操作都是非入侵式的;
- 原子操作共有五种:增减、比较并交换、载入、存储、交换;
- 原子操作支持的类型类型包括 int32、int64、uint32、uint64、uintptr、unsafe.Pointer(在本讲末尾会有详细的附录列举)。
原子操作使用起来并不难,我们直接上代码。
下面的代码演示了使用原子操作实现对 testInt 的 2 次累加:
go
复制代码var testInt int32 = 0
var syncWait sync.WaitGroup
func main() {
syncWait.Add(2)
go testFunc()
go testFunc()
syncWait.Wait()
fmt.Println(testInt)
}
func testFunc() {
defer syncWait.Done()
for i := 0; i < 1000; i++ {
atomic.AddInt32(&testInt, 1)
}
}
控制台会输出 2000。
实际上,原子操作不仅实现起来更方便,性能也比锁更快。感兴趣的朋友可以适当做压力测试,对比加锁和原子操作针对相同任务的耗时时长。
# 小结
🎉 恭喜,您完成了本次课程的学习!
📌 以下是本次课程的重点内容总结:
- 安全地使用 Go 并发
- 互斥锁
- 读写互斥锁
- 原子操作
本讲是 Go 并发专题的收尾,作为最后一讲,我向大家介绍了如何安全地开启并发任务。
互斥锁是最简单粗暴的一种锁,只要加了锁,后续相同的任务只能排队进行。这也直接导致了并发的优势不复存在,加了锁的任务被改为串行进行。
为了更高效地加锁,同时不降低数据安全性,我们使用读写互斥锁。这种锁在性能和安全之间做到尽可能的平衡。当协程任务获得读操作锁后,读操作并发运行,写操作等待;当协程任务获得写操作锁后,考虑到数据可能发生变化,所以无论是读还是写操作都要等待。
当然,无论是互斥锁还是读写互斥锁,都要注意避免让程序陷入无限等待循环中。
最后,Go SDK 中的 sync/atomic 包提供了原子操作。与加锁相比,原子操作更易于实现,且性能更高。
➡️ 在下次课程中,我们会开始 Go 语言中反射的专题,具体内容如下:
- 反射的基本使用
- 反射的三个定律
# 附录一 原子操作常用函数一览表
操 作 分 类 | 函 数 声 明 |
---|---|
读操作 | func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) func LoadUint32(addr uint32) (val uint32) func LoadUint64(addr uint64) (val uint64) func LoadUintptr(addr uintptr) (val uintptr) func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer) |
写操作 | func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) |
修改操作 | func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) |
交换操作 | func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) |
比较并交换操作 | func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) |