通过插图学习Go的并发

你很可能或多或少听说过Go。 它越来越受欢迎。 Go快速,简单,并且拥有一个很棒的社区。 并发模型是学习这门语言最令人兴奋的方面之一。 Go的并发原语使创建并发的多线程程序变得简单而有趣。 我将通过插图介绍Go的并发原语,希望能够将这些概念讲清楚以供将来学习。 本文适用于Go的新手,以及想要了解Go的并发原语:go routine 和 channel 的学习者。

单线程与多线程程序

你可能以前编写过多个单线程程序。编程中的一个常见模式是具有执行特定任务的多个函数,但是直到程序的前一部分为下一个函数准备好数据时,才会调用这些函数。

这就是我们最初设定的第一个例子,即开采矿石。 本例中的函数执行:寻找矿石、开采矿石和冶炼矿石。 在我们的例子中,矿和矿石被表示为一个字符串数组,每个函数接收并返回一个“已处理的”字符串数组。 对于单线程应用程序,程序设计如下。

有3个主要函数。 finderminersmelter。 在这个版本的程序中,我们的函数在单个线程上运行,一个接一个地运行 -- 而这个单线程(名为Gary的土拨鼠)需要完成所有工作。

1
2
3
4
5
6
func main() {
theMine := [5]string{“rock”, “ore”, “ore”, “rock”, “ore”}
foundOre := finder(theMine)
minedOre := miner(foundOre)
smelter(minedOre)
}

在每个函数的末尾打印得到的“矿石”数组,我们得到以下输出:

1
2
3
4
5
From Finder: [ore ore ore]

From Miner: [minedOre minedOre minedOre]

From Smelter: [smeltedOre smeltedOre smeltedOre]

这种编程方式具有易于设计的优点,但是当你希望利用多线程并执行彼此独立的函数时,会发生什么情况呢?这就是并发编程发挥作用的地方。

这种采矿设计效率要高得多。现在多线程( 土拨鼠们 )独立工作;因此,整个过程并不全由Gray这土拨鼠来做。有一个地鼠寻找矿石,有一个土拨鼠在开采矿石,另一个土拨鼠在冶炼矿石——可能所有这些都是同时发生的。

为了将这种功能引入我们的代码,我们需要两件事:一是创建独立工作的土拨鼠,二是土拨鼠相互通信(发送矿石 )的方式。这就是Go的并发原语: goroutine 和 channel。

Go routine

Go routine可以被认为是轻量的线程。创建一个go routine只需在调用函数的前面加上 go 这个关键字如此简单。

举个简单的例子,让我们创建两个寻找矿石的函数,使用go关键字调用它们,并让它们在每次发现矿井中的“矿石”时把他们打印出来。

1
2
3
4
5
6
func main() {
theMine := [5]string{“rock”, “ore”, “ore”, “rock”, “ore”}
go finder1(theMine)
go finder2(theMine)
<-time.After(time.Second * 5) //现在你可以忽略这行代码
}

以下是我们程序的输出结果:

1
2
3
4
5
6
Finder 1 found ore!
Finder 2 found ore!
Finder 1 found ore!
Finder 1 found ore!
Finder 2 found ore!
Finder 2 found ore!

从上面的输出可以看出,寻找矿石函数同时运行。谁先找到矿石没有真正的顺序,当多次运行时,顺序并不总是一样的。

这是很大的进步!现在我们有一个简单的方法来建立多线程(多个土拨鼠 )程序,但是当我们需要独立的goroutine来相互通信时会发生什么呢?欢迎来到神奇的channel世界。

Channel

channel允许goroutine相互通信。你可以将channel视为管道,goroutine可以从管道发送和接收来自其他goroutine的信息。

1
myFirstChannel := make(chan string)

goroutine可以在一个channel上发送和接收数据。这是通过使用指向数据方向的箭头( <- )来实现的。

1
2
myFirstChannel <- "hello" // 发送数据
myVariable := <- myFirstChannel // 接收数据

现在通过使用一个channel,我们可以让我们的寻找矿石的土拨鼠立即将它们发现的矿石发送给我们的矿石采集土拨鼠,而无需等待发现所有矿石后才将矿石送给矿石采集土拨鼠。

我已经更新了示例代码,以便将寻找矿石函数和采矿函数设置为未命名的函数。 如果你从来没有见过lambda函数就先不需要太关注程序的那部分,只要知道每个函数都是用go关键字调用的,所以它们就运行在自己的goroutine里。 重要的是要注意如何使用叫做oreChanchannel来相互传递数据。 别担心,我会在最后解释未命名的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
theMine := [5]string{“ore1”, “ore2”, “ore3”}
oreChan := make(chan string)

// Finder
go func(mine [5]string) {
for _, item := range mine {
oreChan <- item //发送数据
}
}(theMine)

// Ore Breaker
go func() {
for i := 0; i < 3; i++ {
foundOre := <-oreChan //接收数据
fmt.Println(“Miner: Received “ + foundOre + “ from finder”)
}
}()
<-time.After(time.Second * 5) // 现在依然可以不用管这行代码
}

在下面的输出中,你可以看到我们的矿工通过三次读取叫做“oreChan”的channel,每一次收到一块“矿石”。

1
2
3
4
5
Miner: Received ore1 from finder

Miner: Received ore2 from finder

Miner: Received ore3 from finder

太好了,现在我们可以在程序中的不同goroutine( 土拨鼠 )之间发送数据了。在我们开始编写带有channel的复杂程序之前,让我们先介绍一些理解channel属性的关键内容。

Channel的阻塞

在各种情况下,channel会阻塞goroutine。这就让我们的goroutine在各自独立快乐的道路上同步了一会儿。

发送端阻塞

一旦一个goroutine(土拨鼠)在channel上发送数据,这个发送数据的goroutine就会阻塞,直到另一个goroutinechannel接收到发送的数据。

接收端阻塞

类似于在channel上发送数据之后的阻塞,goroutine可以阻塞在等待从没有任何数据的channel上获取数据。

一开始阻塞这个概念可能有点让人不好理解,但你可以把它看作是两个goroutine( 土拨鼠 )之间的事务。无论一个土拨鼠是在等钱还是在送钱,它都要等到交易中的另一个伙伴出现。

现在我们已经了解了goroutine在通过channel进行通信时阻塞的不同方式,让我们讨论两种不同类型的channel:无缓冲channel和缓冲channel。选择你使用的channel类型可以改变程序的行为方式。

无缓冲channel

在前面的例子中,我们一直使用无缓冲channel。使它们独一无二的是,一次只有一条数据适合通过channel

缓冲channel

在并发程序中,时序并不总是完美的。 在我们的采矿案例中,我们可能会遇到这样一种情况:我们的寻找矿石土拨鼠可以在采矿土拨鼠处理一块矿石的时间内找到三块矿石。 为了不让寻找矿石的土拨鼠将大部分时间花费在发送矿石给采矿土拨鼠并一直等待它接收处理完成,我们可以使用缓冲channel。 让我们开始做一个容量为3的缓冲channel

1
bufferedChan := make(chan string, 3)

缓冲channel的工作原理与非缓冲channel相似,只是有一点需要注意:我们可以在需要另外的goroutine读取channel之前将多条数据发送到channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bufferedChan := make(chan string, 3)

go func() {
bufferedChan <- "first"
fmt.Println("Sent 1st")
bufferedChan <- "second"
fmt.Println("Sent 2nd")
bufferedChan <- "third"
fmt.Println("Sent 3rd")
}()

<-time.After(time.Second * 1)

go func() {
firstRead := <- bufferedChan
fmt.Println("Receiving..")
fmt.Println(firstRead)
secondRead := <- bufferedChan
fmt.Println(secondRead)
thirdRead := <- bufferedChan
fmt.Println(thirdRead)
}()

我们两个goroutine之间的打印顺序是:

1
2
3
4
5
6
7
Sent 1st
Sent 2nd
Sent 3rd
Receiving..
first
second
third

为了简单起见,我们不会在最终示例程序中使用缓冲channel,但了解并发工具中可用的channel类型很重要。

注意:使用缓冲channel不会阻止发生阻塞。 例如,如果寻矿土拨鼠比采矿土拨鼠快10倍,并且他们通过大小为2的缓冲channel进行通信,则寻矿土拨鼠仍将在程序中多次被阻塞。

把这些概念放在一起

现在通过goroutinechannel的强大功能,我们可以编写一个程序,使用Go的并发原语充分利用多个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
theMine := [5]string{"rock", "ore", "ore", "rock", "ore"}
oreChannel := make(chan string)
minedOreChan := make(chan string)

// Finder
go func(mine [5]string) {
for _, item := range mine {
if item == "ore" {
oreChannel <- item //发送数据给 oreChannel
}
}
}(theMine)

// Ore Breaker
go func() {
for i := 0; i < 3; i++ {
foundOre := <-oreChannel //从 oreChannel 读取数据
fmt.Println("From Finder: ", foundOre)
minedOreChan <- "minedOre" //发送数据给 minedOreChan
}
}()

// Smelter
go func() {
for i := 0; i < 3; i++ {
minedOre := <-minedOreChan //从 minedOreChan 读取数据
fmt.Println("From Miner: ", minedOre)
fmt.Println("From Smelter: Ore is smelted")
}
}()

<-time.After(time.Second * 5) // 依然可以忽略这行代码

上述代码的输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
From Finder:  ore

From Finder: ore

From Miner: minedOre

From Smelter: Ore is smelted

From Miner: minedOre

From Smelter: Ore is smelted

From Finder: ore

From Miner: minedOre

From Smelter: Ore is smelted

这比我们原来的例子有了很大的改进!现在,我们的每个函数都是独立运行的。而且,每次有一块矿石被加工,它就进入我们采矿线的下一个阶段。

为了将注意力集中在理解channelgoroutine上,我上面没有提到一些重要的信息——如果你不知道这些信息,在你开始编程时可能会引起一些麻烦。现在你已经了解了goroutinechannel的工作方式,让我们先看一看你应该知道的一些信息,然后再开始使用goroutinechannel进行编程。

在你出发前,你应该知道..

匿名goroutine

与我们如何用关键字 go 设置一个函数运行在它自己的goroutine里相似,我们可以用如下格式来创建一个匿名函数运行在它自己的goroutine里:

1
2
3
4
// 匿名`goroutine`
go func() {
fmt.Println("I'm running in my own go routine")
}()

如此一来,如果我们只需要调用一次函数,我们可以将它放在它自己的goroutine中运行,而不用创建正式的函数声明。

主函数是一个goroutine

主函数确实在其自己的goroutine中运行! 更重要的是要知道,一旦主函数返回,它将关闭当前正在运行的其他所有goroutine。 这就是为什么我们在主函数底部有一个定时器 -- 它创建了一个channel,并在5秒后发送了一个值。

1
<-time.After(time.Second * 5) //5秒后从channel获得一个值

还记得一个goroutine会如何在一个channel读取数据的时候一直阻塞到有数据发送给了这个channel吗? 通过添加上面的代码,主函数会阻塞,给我们其他的goroutine5秒额外的时间来运行。

现在有更好的方法来处理阻塞主函数,直到所有其他的goroutine完成。 通常的做法是创建 done channel,主函数在它上面读取数据从而被阻塞。 一旦你完成你的工作,写入这个channel,程序将结束。

1
2
3
4
5
6
7
8
9
10
func main() {
doneChan := make(chan string)

go func() {
// Do some work…
doneChan <- “I’m all done!”
}()

<-doneChan // 阻塞到上面的goroutine给这个doenChan写入数据
}

你可以在channel上使用range

在前面的一个例子中,我们让矿工从for循环中经过3次迭代从channel中读取数据。如果我们不知道到底有多少矿石会从发现者那里送过来,会发生什么?好吧,类似于在集合上使用range,你可以在一个channel使用range

更新我们以前的矿工函数,我们可以写成这样:

1
2
3
4
5
6
// Ore Breaker
go func() {
for foundOre := range oreChan {
fmt.Println(“Miner: Received “ + foundOre + “ from finder”)
}
}()

因为矿工需要读取寻矿者发送给他的所有内容,所以通过在这里的channel使用range可以确保我们收到发送的所有内容。

注意:在一个channel上使用range将被阻塞,直到在channel上发送了另一个数据。在所有需要发送的数据发送完后,停止goroutine被阻塞的唯一方法是用“close(channel)”关闭channel

你可以在channel上进行非阻塞读取

但你不是刚刚告诉我们channel如何阻塞goroutine的吗?! 确实如此,但有一种技术可以使用Go的select case结构在channel上进行非阻塞式读取。 通过使用下面的结构,如果有某些事情发生,你的goroutine将从channel中读取,或运行默认情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
myChan := make(chan string)

go func(){
myChan <- “Message!”
}()

select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println(“No Msg”)
}

<-time.After(time.Second * 1)

select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println(“No Msg”)
}

运行后,上述例子的输出如下:

1
2
No Msg
Message!

你也可以在channel上进行非阻塞发送

非阻塞发送使用相同的select case结构来执行它们的非阻塞操作,唯一的区别是我们的情况看起来像发送而不是接收。

1
2
3
4
5
6
select {
case myChan <- “message”:
fmt.Println(“sent the message”)
default:
fmt.Println(“no message sent”)
}

下一步学习

有很多讲座和博客文章涵盖channelgoroutine的更详细内容。 既然你对这些工具的目的和应用有了深刻的理解,那么你应该能够充分利用以下文章和讲座。

Google I/O 2012 — Go Concurrency Patterns
Rob Pike — ‘Concurrency Is Not Parallelism’
GopherCon 2017: Edward Muller — Go Anti-Patterns

谢谢你抽出时间来阅读这篇文章。我希望你能够了解goroutinechannel以及它们为编写并发程序带来的好处。

原文链接: https://medium.com/@trevor4e/learning-gos-concurrency-through-illustrations-8c4aff603b3