在上一讲中,我用鸡生蛋,然后人们吃荷包蛋的例子介绍了 Go 并发通道(Channel)中的同步通道。同步通道又被称为无缓冲的通道,它要求数据的发送者和接收者必须配对存在。在实际开发中,我们会用到同步通道,也会用到带缓冲的通道。

和同步通道不同,带缓冲的通道有点类似于快递员(协程 A)和收件人(协程 B)的关系,在他们之间存在一个快递驿站(缓冲区)。寄送快递时,快递员会默认将快递放到驿站中,收件人可以找个合适的时间去驿站中取快递。当驿站放满快递时,新的快递便无法存入其中,必须等待旧的快递被取走。这个过程描述的便是带缓冲的通道的工作流程。

本讲我将会向大家介绍缓冲通道,以及如何构建更安全的通道。

# 缓冲通道

还记得上一讲中的示例吧,为了方便大家回忆,我把完整的示例代码拿了过来:

go
复制代码var syncWait sync.WaitGroup
// 创建通道类型变量,该通道将传送int类型数据
var intChan = make(chan int)
func main() {
   // 执行2个协程任务
   syncWait.Add(2)
   // 开启下蛋任务
   go layEggs()
   // 开启吃荷包蛋任务
   go eatEggs(intChan)
   // 等待协程任务完成
   syncWait.Wait()
}
func layEggs() {
   // 使用断言确保协程任务正常结束
   defer syncWait.Done()
   // 向通道传送int类型值
   intChan <- 1
   // 关闭通道
   close(intChan)
}
func eatEggs(intChan chan int) {
   // 使用断言确保协程任务正常结束
   defer syncWait.Done()
   // 从通道获取int类型值
   eggCounts := <-intChan
   // 输出结果
   fmt.Printf("吃%d个荷包蛋", eggCounts)
}

在这段代码中,intChan 就是构建的同步通道,通道内允许传送的数据类型是 int 型。main() 函数中开启了两个协程任务,分别是 layEggs() (产蛋)和 eatEggs() (吃蛋)。前者将会向同步通道中传出 1,表示产出 1 个蛋;后者从同步通道中读取值, 结果为 1,表示拿出 1 个蛋来吃。

在现实生活中,如果要统计一只鸡一周能产多少个鸡蛋,用上述同步方法就不是特别合适了。我们通常会用一个容器来存放这只鸡每天产下的鸡蛋,然后在 7 天后数容器内的鸡蛋的数量,便可得知这只鸡在这周产下的鸡蛋总数了。这里的“容器”其实就是缓冲通道中的缓冲区了。

若要实现这种统计并非难事,只需将上述代码稍加修改即可。

首要任务就是修改通道的创建模式,根据示例要求,需要统计 7 天的产蛋总量,我们便可将缓冲区的容量定为7。具体代码修改如下:

go
复制代码const DaysOfWeek = 7
var intChan = make(chan int, DaysOfWeek)

这里声明了一个常量,表示一周的天数,同时也规定了 intChan 通道的缓冲区大小就是 7。考虑到稍后在发送和接收时都需要用到缓冲区大小值,所以将该常量声明为全局可访问的。

请大家留意同步通道和缓冲通道在声明时的区别,只在于是否定义缓冲区的大小。当缓冲区大小的值为 0 时,通道的类型将为同步通道。

接下来只考虑产蛋的部分,即数据的发送端。假设这只鸡 7 天中每一天都会产下 1 个鸡蛋,且每天都将产下的鸡蛋拿到盛蛋的容器中。我们使用一个 for 循环结构来描述这个过程,将 layEggs() 函数修改如下:

go
复制代码func layEggs() {
	defer syncWait.Done()
	for i := 0; i < DaysOfWeek; i++ {
		time.Sleep(time.Millisecond * 500)
		intChan <- 1
         fmt.Println("产鸡蛋了")
	}
	close(intChan)
}

在每次 for 循环一开始,都延迟了 0.5 秒执行,表示 1 天。

接着再来考虑收集鸡蛋的过程,和产蛋类似,该过程每天都进行一次,因此也可使用 for 循环来描述,具体代码如下:

go
复制代码func collectEggs(intChan chan int) {
   defer syncWait.Done()
   var eggCounts int
   for i := 0; i < DaysOfWeek; i++ {
      eggCounts += <-intChan
      fmt.Println("鸡蛋被收集了")
   }
   fmt.Printf("本周共产%d个鸡蛋\n", eggCounts)
}

最后,保持 main() 函数不做修改,运行整个代码,可以观察到控制台如下输出:

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

本周共产7个鸡蛋

显然,“产鸡蛋了”和“鸡蛋被收集了”成对出现 7 次。表示这只鸡每天会产 1 个鸡蛋,这颗鸡蛋也会被按天收取。最终程序输出了一周产蛋总数为 7 颗,程序运行结束。

在使用同步通道时,一个强制性的要求便是每次发送和接收都必须成对存在。反之,在使用缓冲通道时则没有如此强制性的要求。就拿上例来说,虽然这只鸡在 7 天内每天都会产鸡蛋,但如果将收集鸡蛋的工作安排在周一至周五,即 5 天,就需要将 connectEggs() 函数体中循环的终止条件改为 i<5 。如此修改后,程序依然会正常运行,产鸡蛋的工作同样会执行 7 次,但收鸡蛋的工作只会执行 5 次。具体控制台输出将如下所示:

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

产鸡蛋了

鸡蛋被收集了

本周共产5个鸡蛋

产鸡蛋了

产鸡蛋了

可以看到,“产鸡蛋了”和“鸡蛋被收集了”成对出现 5 次。

如果更激进一些,从 main() 中去掉 collectEggs() 函数的调用,程序还能正常运行吗?

答案是:肯定的。试想一下,现实生活中的快递驿站并不会因某个人没取快递而关门,快递员也不会关心收件人是否取快递,只需将快递放到驿站就大功告成了。本例中的鸡产鸡蛋也是类似的道理,无论鸡蛋被怎样处理,它该下蛋还是会下蛋。

# 构建安全的通道

在实际开发中,协程的使用可以说是非常普遍且复杂,多个协程之间的通信更是常见。由于协程各为异步运行的状态,这就要求开发者在使用通道时多加注意,以防造成不可预期的结果。

在接下来的内容中,我将为大家介绍两个重要的有助于增强通道健壮性的方法,即通道的关闭和单向通道的构建。

# 通道的关闭

不知道大家注意到没有,小册中有关通道的示例在发送数据后都会调用内置的 close() 函数关闭通道。实际上,在关闭通道方面也是有一些讲究的。

正如前面的示例那样,我们应该只让某个通道的唯一发送者关闭该通道,这是关闭通道的原则之一。试想,如果多个发送者共用相同的通道,且都会在某种条件下关闭。那么一旦关闭了该通道,其它发送者就再也没有机会使用通道发送数据了。

从另一个角度讲,发送者最好使用各自的通道。当然,如果非要多个发送者共用一个通道,可以通过恢复机制来规避程序宕机。但这样做是不推荐的,因为它违反了关闭通道的原则。

❗️ 注意:通道关闭后,无法再通过它发送数据,但不会影响数据的接收。

除此之外,关闭通道还有一个原则是不允许关闭一个已经关闭了的通道。否则也会引发程序宕机,错误信息为:panic: close of closed channel。由此便引出一个问题:如何判断通道已经关闭了呢?

我们可以通过尝试从通道中接收值来判断通道是否关闭。我们将本讲示例代码中的 collectEggs() 函数稍加修改如下:

go
复制代码func collectEggs(intChan chan int) {
   defer syncWait.Done()
   var eggCounts int
   for i := 0; i < DaysOfWeek; i++ {
      eggCounts += <-intChan
      fmt.Println("鸡蛋被收集了")
   }
   _, isOpen := <-intChan
   if !isOpen {
      fmt.Printf("本周共产%d个鸡蛋\n", eggCounts)
   }
}

请各位留意最后 5 行代码。尝试从通道接收值时,除了可以得到值本身外,还可得到一个布尔类型的值。当这个布尔类型的值为 true 时,通道打开;反之,则表示通道已经被关闭了。

如上修改后,控制台输出结果不变。

另外,使用 for-range 循环结构可简化上述代码。当通道关闭后,for-range 循环会自动跳出。下面的代码与上面的代码具有同样的运行结果。

go
复制代码func collectEggs(intChan chan int) {
   defer syncWait.Done()
   var eggCounts int
   for intValue := range intChan {
      eggCounts += intValue
      fmt.Println("鸡蛋被收集了")
   }
   fmt.Printf("本周共产%d个鸡蛋\n", eggCounts)
}

# 单向通道的使用

本讲的最后,我们来聊聊单向通道的构建。

在实际项目中,有时候需要特别规定数据的流向,以确保其正确性。这有点类似于单行道和双向车道,前者只能按照规定的方向行驶,后者来去都是自由的。

本讲案例中, layEggs() 和 collectEggs() 都使用了 intChan 通道。但很明显,前者只负责数据的发送,后者只负责数据的接收。像这种情况,我们就可以基于 intChan ,构建只能发送的通道,用于 layEggs() 函数;构建只能接收的通道,用于 collectEggs() 函数。

💡 提示:单向通道不是凭空声明的,它需要基于已有的通道。

结合本例,下面的代码基于已有的 intChan 创建了名为 readOnlyIntChan 的只接收通道:

go
复制代码var readOnlyIntChan <-chan int = intChan

这句代码中, <-chan 表示只接收通道。与其相反, chan<- 则表示只发送通道。下面的代码基于已有的 intChan 创建了名为 sendOnlyIntChan 的只发送通道:

go
复制代码var sendOnlyIntChan chan<- int = intChan

# 小结

🎉 恭喜,您完成了本次课程的学习!

📌 以下是本次课程的重点内容总结:

  1. 缓冲通道的使用;
  2. 构建安全的通道;
    • 通道的关闭;
    • 单向通道的构建。

本讲继续上一讲的话题:通道(Channel)。首先介绍的就是缓冲通道的使用,示例还是鸡生蛋。不过这次加了缓冲区,即存放鸡蛋的容器。鸡产蛋后,先将鸡蛋放入缓冲区中,等待人来统计。缓冲区的大小被预先定义,超过缓冲区大小的数据无法放入其中。另外,即使没有数据的接收方,发送方也可以将数据放入缓冲区中,不影响其正常运行。这一点是与同步通道最大的区别。

另外,为了增强程序的健壮性,本讲介绍了关闭通道时的两个重要原则:即只让某个通道的唯一发送者关闭该通道不允许关闭一个已经关闭了的通道。由这两个原则引申出的两个知识点是:发送者最好使用各自的通道以及从通道中接收值来判断通道是否关闭

最后,为了确保数据的正确流向,本讲还介绍了单向通道的构建。值得一提的是,构建单向通道必须以某个已经存在的通道为基础。强制向只接收的单向通道中发送数据将导致程序运行出错,反之亦然。

➡️ 在下次课程中,我们会继续介绍 Go 语言中的并发,具体内容如下:

  1. 并发中的 Select 结构
  2. Go 语言中的定时器
    • Timer
    • Ticker