MIT 6.824学习笔记3 Go语言并发解析 - Pentium.Labs

MIT 6.824学习笔记3 Go语言并发解析

之前看过一个go语言并发的介绍:https://www.cnblogs.com/pdev/p/10936485.html 但这个太简略啦。下面看点深入的

还记得https://www.cnblogs.com/pdev/p/11095475.html中我们写过一个简单的爬虫。这里面就用到了Go的两种并发方式:


1. Go routines和Go channels(ConcurrentChannel),这是Go语言特有的一种并发方式,可以简化编程

1.1 Go routines

Goroutines 可以看作是轻量级线程。创建一个 goroutine 非常简单,只需要把 go 关键字放在函数调用语句前。为了说明这有多么简单,我们创建两个 finder 函数,并用 go 调用,让它们每次找到 "ore" 就打印出来。

package main
import (
    "fmt"
    "time"
    "math/rand"
)

func finder(mines [5]string, coreid int) {
    <-time.After(time.Second * time.Duration(coreid))
    rand.Seed(time.Now().UnixNano())
    idx := rand.Intn(5)
    fmt.Println(time.Now(), coreid, mines[idx])
}

func main() {
    theMine := [5]string{"rock", "ore", "gold", "copper", "sliver"}
    go finder(theMine, 1)
    go finder(theMine, 2)
    <-time.After(time.Second * 3) //you can ignore this for now
    fmt.Println(time.Now(), "END")
}

程序的输出如下:
F:\My Drive\19summer\6824>go run gor.go 2019-08-01 17:45:41.0985917 -0500 CDT m=+1.001057201 1 ore 2019-08-01 17:45:42.0986489 -0500 CDT m=+2.001114401 2 ore 2019-08-01 17:45:43.0987061 -0500 CDT m=+3.001171601 END

从执行时间可以看出,两个finder是并发运行的

但这两个线程是彼此独立的。如果他们需要交流信息呢?就需要Go channel了。

1.2 Go Channel

Channels 允许 go routines 之间相互通信。你可以把 channel 看作管道,goroutines 可以往里面发消息,也可以从中接收其它 go routines 的消息。

myFirstChannel := make(chan string)

Goroutines 可以往 channel 发送消息,也可以从中接收消息。这是通过箭头操作符 (<-) 完成的,它指示 channel 中的数据流向。

myFirstChannel <-"hello" // Send

myVariable := <- myFirstChannel // Receive

再来看一个程序:

package main
import (
    "fmt"
    "time"
)

func main() {
    theMine := [5]string{"ore1", "ore2", "ore3", "ore4", "ore5"}
    oreChan := make(chan string)

    // Finder
    go func(mine [5]string) {
        for _, item := range mine {
            oreChan <- item //send
            fmt.Println("Miner: Send " + item + " to breaker")
        }
    }(theMine)

    // Ore Breaker
    go func() {
        for i := 0; i < 5; i++ {
            foundOre := <-oreChan //receive
            <-time.After(time.Nanosecond * 10)
            fmt.Println("Miner: Receive " + foundOre + " from finder")
        }
    }()

    <-time.After(time.Second * 5) // Again, ignore this for now
}

程序的输出如下:

F:\My Drive\19summer\6824>go run gor2.go
Miner: Send ore1 to breaker
Miner: Receive ore1 from finder
Miner: Send ore2 to breaker
Miner: Receive ore2 from finder
Miner: Send ore3 to breaker
Miner: Receive ore3 from finder
Miner: Send ore4 to breaker
Miner: Receive ore4 from finder
Miner: Send ore5 to breaker
Miner: Receive ore5 from finder

可以看到已经可以通过go channel在线程之间进行通信啦!

在receive和fmt.Println之间的<-time.After(time.Nanosecond * 10)是为了方便在命令行查看输出,否则因为cpu运行程序太快了,命令行打印顺序会和实际运行顺序不一样。

1.3 阻塞的Go Channel

默认的,信道的存消息和取消息都是阻塞的 (叫做无缓冲的信道)。也就是说, 无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好。Channels 阻塞 goroutines 发生在各种情形下。这能在 goroutines 各自欢快地运行之前,实现彼此之间的短暂同步。

Blocking on a Send:一旦一个 goroutine(gopher) 向一个 channel 发送了数据,它就被阻塞了,直到另一个 goroutine 从该 channel 取走数据。

Blocking on a Receive:和发送时情形类似,当channel是空的时,一个 goroutine 可能阻塞着等待从一个 channel 获取数据。

一开始接触阻塞的概念可能令人有些困惑,但你可以把它想象成两个 goroutines(gophers) 之间的交易。 其中一个 gopher 无论是等着收钱还是送钱,都需要等待交易的另一方出现。

既然已经了解 goroutine 通过 channel 通信可能发生阻塞的不同情形,让我们讨论两种不同类型的 channels: unbuffered 和 buffered 。选择使用哪一种 channel 可能会改变程序的运行表现。

Unbuffered Channels:在前面的例子中我们一直在用 unbuffered channels,它们与众不同的地方在于每次只有一份数据可以通过。无论如何,我们测试到的无缓冲信道的大小都是0 (len(channel))

Buffered Channels:在并发程序中,时间协调并不总是完美的。在挖矿的例子中,我们可能遇到这样的情形:开矿 gopher 处理一块矿石所花的时间,寻矿 gohper 可能已经找到 3 块矿石了。为了不让寻矿 gopher 浪费大量时间等着给开矿 gopher 传送矿石,我们可以使用 buffered channel。我们先创建一个容量为 3 的 buffered channel。

bufferedChan := make(chan string, 3)

buffered 和 unbuffered channels 工作原理类似,但有一点不同—在需要另一个 gorountine 取走数据之前,我们可以向 buffered channel 发送3份数据,而在buffer满之前都不会发生阻塞,而当第4份数据发过来时就会发生阻塞。也就是说,缓冲信道会在满容量的时候加锁。

无缓冲区的channel可以理解为make(chan string, 0)

例如下面的程序:

package main
import (
    "fmt"
    "time"
)

func main() {
    bufferedChan := make(chan string, 3)

    go func() {
        bufferedChan <-"first"
        fmt.Println("Sent 1st")
        bufferedChan <-"second"
        fmt.Println("Sent 2nd")
        bufferedChan <-"third"
        fmt.Println("Sent 3rd")
    }()

    <-time.After(time.Second * 1)

    go func() {
        firstRead := <- bufferedChan
        fmt.Println("Receiving..")
        fmt.Println(firstRead)
        secondRead := <- bufferedChan
        fmt.Println(secondRead)
        thirdRead := <- bufferedChan
        fmt.Println(thirdRead)
    }()

    <-time.After(time.Second * 5) // Again, ignore this for now
}

输出结果如下:

F:\My Drive\19summer\6824>go run gor2.go
Sent 1st
Sent 2nd
Sent 3rd
Receiving..
first
second
third

相比最初的例子,已经有了很大改进!现在每个函数都独立地运行在各自的 goroutines 中。此外,每次处理完一块矿石,它就会被带进挖矿流水线的下一个阶段。

其实,缓冲信道是先进先出的,我们可以把缓冲信道看作为一个线程安全的队列:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

1.4 其他一些概念

匿名的 Goroutines

我们可以用如下方式创建一个匿名函数并运行在它的 goroutine 中。如果只需要调用一次函数,通过这种方式我们可以让它在自己的 goroutine 中运行,而不需要创建一个正式的函数声明。

go func() {
    fmt.Println("I\'m running in my own go routine")
}()

和匿名函数的定义非常像

main 函数是一个 goroutine

main 函数确实运行在自己的 goroutine 中!更重要的是要知道,一旦 main 函数返回,它将关掉当前正在运行的其他 goroutines。这就是为什么我们在 main 函数的最后设置了一个定时器—它创建了一个 channel,并在 5 秒后发送一个值。通过添加上面这行代码,main routine 会阻塞,以给其他 goroutines 5 秒的时间来运行。否则主线程就会过早结束,导致finder没有机会执行

<-time.After(time.Second * 5) // Receiving from channel after 5 sec

可是采用等待的办法并不好,如果能像Python一样有个thread.join()来阻塞主线程,等待所有子线程跑完就好了。

有一种方法可以阻塞 main 函数直到其他所有 goroutines 都运行完。通常的做法是创建一个 done channel, main 函数在等待读取它时被阻塞。一旦完成工作,向这个 channel 发送数据,程序就会结束了。

func main() {
    doneChan := make(chan string)

    go func() {
        // Do some work…
        doneChan <- "I\'m all done!"
    }()

    <-doneChan // block until go routine signals work is done
}

可以遍历 channel

在前面的例子中我们让 miner 在 for 循环中迭代 3 次从 channel 中读取数据。如果我们不能确切知道将从 finder 接收多少块矿石呢?

类似于对集合数据类型 (注: 如 slice) 进行遍历,你也可以遍历一个 channel。更新前面的 miner 函数,我们可以这样写:

// Ore Breaker
go func() {
    for foundOre := range oreChan {
        fmt.Println("Miner: Received " + foundOre + " from finder")
    }
}()

由于 miner 需要读取 finder 发送给它的所有数据,遍历 channel 能确保我们接收到已经发送的所有数据。

注意遍历 channel 会阻塞,直到有新数据被发送到 channel。下面这个程序就会发生死锁:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    for v := range ch {
        fmt.Println(v)
    }
}

原因是range不等到信道关闭是不会结束读取的。也就是如果 缓冲信道干涸了,那么range就会阻塞当前goroutine, 所以死锁咯。在所有数据发送完之后避免 go routine 阻塞的唯一方法就是用 "close(channel)" 关掉 channel。如下程序

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// 显式地关闭信道
close(ch)

for v := range ch {
    fmt.Println(v)
}

被关闭的信道会禁止数据流入, 是只读的。我们仍然可以从关闭的信道中取出数据,但是不能再写入数据了。

对 channel 进行非阻塞读写(不用担心channel空/满造成阻塞)

有一个技巧,利用 Go 的 select case 语句可以实现对 channel 的非阻塞读。通过使用这这种语句,如果 channel 有数据,goroutine 将会从中读取,否则就执行默认的分支。

myChan := make(chan string)

go func(){
    myChan <- "Message!"
}()

select { case msg := <- myChan: fmt.Println(msg) default: fmt.Println("No Msg") }
<-time.After(time.Second * 1) select { case msg := <- myChan: fmt.Println(msg) default: fmt.Println("No Msg") } 程序输出如下: No Msg Message!

非阻塞写也是使用同样的 select case 语句来实现,唯一不同的地方在于,case 语句看起来像是发送而不是接收。

select {
    case myChan <- "message":
        fmt.Println("sent the message")
    default:
        fmt.Println("no message sent")
}

1.5 并发和并行

默认地, Go所有的goroutines只能在一个线程(一个cpu核心)里跑 。 也就是说, 两个go routine不是并行的,但是是并发的。在同一个原生线程里,如果当前goroutine不发生阻塞,它是不会让出CPU时间给其他同线程的goroutines的,这是Go运行时对goroutine的调度,我们也可以使用runtime包来手工调度。

前面带有sleep的程序看时间像是“并行”的,是因为sleep函数则阻塞掉了 当前goroutine, 当前goroutine主动让其他goroutine执行, 所以形成了逻辑上的并行, 也就是并发。而对于下面这段程序,两个goroutine是一个一个进行的,打印的结果总是一样的:

var quit chan int = make(chan int)

func loop() {
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", i)
    }
    quit <- 0
}

func main() {
    go loop()
    go loop()

    for i := 0; i < 2; i++ {
        <- quit
    }
}

F:\My Drive\19summer\6824>go run gor2.go
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

还有一个很有意思的例子:https://segmentfault.com/q/1010000000207474

为了能实现真正的多核并行,我们需要用到runtime包(runtime包是goroutine的调度器),来显式的指定要用两个核心。有两种实现方案:

1. 指定要用几个核

package main
import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(coreid int) {
    for i := 0; i < 1000; i++ { //为了观察,跑多些
        fmt.Printf("%d-%d ", coreid, i)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多使用2个核

    go loop(0)
    go loop(1)

    for i := 0; i < 2; i++ {
        <- quit
    }
}
这种输出将会是不规律的两个线程交替输出,达到了真正的并行

2. 显式地让出CPU时间( 其实这种主动让出CPU时间的方式仍然是在单核里跑。但手工地切换goroutine导致了看上去的“并行”。)

package main
import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(coreid int) {
    for i := 0; i < 10; i++ { //为了观察,跑多些
        runtime.Gosched() // 显式地让出CPU时间给其他goroutine
        fmt.Printf("%d-%d ", coreid, i)
    }
    quit <- 0
}

func main() {
    go loop(0)
    go loop(1)

    for i := 0; i < 2; i++ {
        <- quit
    }
}
输出是非常有规律的交替进行:
F:\My Drive\19summer\6824>go run gor2.go
1-0 0-0 1-1 0-1 1-2 0-2 1-3 0-3 1-4 0-4 1-5 0-5 1-6 0-6 1-7 0-7 1-8 0-8 1-9 0-9

关于runtime包几个函数:

  • Gosched 让出cpu
  • NumCPU 返回当前系统的CPU核数量
  • GOMAXPROCS 设置最大的可同时使用的CPU核数
  • Goexit 退出当前goroutine(但是defer语句会照常执行)

我们知道“进程是资源分配的最小单位,线程是CPU调度的最小单位”。那么go routine和线程有什么关系呢?可以看go官方文档中的一段话(https://golang.org/doc/faq#goroutines):

Why goroutines instead of threads?

Goroutines are part of making concurrency easy to use. The idea, which has been around for a while, is to multiplex independently executing functions—coroutines(协程)—onto a set of threads. When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won\'t be blocked. The programmer sees none of this, which is the point. The result, which we call goroutines, can be very cheap: they have little overhead beyond the memory for the stack, which is just a few kilobytes.

To make the stacks small, Go\'s run-time uses resizable, bounded stacks. A newly minted goroutine is given a few kilobytes, which is almost always enough. When it isn\'t, the run-time grows (and shrinks) the memory for storing the stack automatically, allowing many goroutines to live in a modest amount of memory. The CPU overhead averages about three cheap instructions per function call. It is practical to create hundreds of thousands of goroutines in the same address space. If goroutines were just threads, system resources would run out at a much smaller number.

协程可以理解为同一个线程通过上下文切换来“超线程”,并发执行两个工作。( https://www.liaoxuefeng.com/wiki/897692888725344/923057403198272 )

对于 进程、线程,都是有内核进行调度,有 CPU 时间片的概念,进行 抢占式调度(有多种调度算法)
对于 协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的, 因为是由用户程序自己控制,那么就很难像抢占式调度那样做到强制的 CPU 控制权切换到其他进程/线程,通常只能进行 协作式调度,需要协程自己主动把控制权转让出去之后,其他协程才能被执行到。
本质上,goroutine 就是协程。 不同的是,Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装
和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的CPU (P) 转让出去,让其他 goroutine
能被调度并执行,也就是 Golang 从语言层面支持了协程。Golang 的一大特色就是从语言层面原生支持协程,在函数或
者方法前面加 go关键字就可创建一个协程。
https://www.cnblogs.com/liang1101/p/7285955.html

假设我们开了三个Goroutine,但只分配了两个核(两个线程),会发生什么呢?写段程序来试验一下:

package main

import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(id int) { // id: 该goroutine的标号
    for i := 0; i < 100; i++ { //打印10次该goroutine的标号
        fmt.Printf("%d ", id)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多同时使用2个核

    for i := 0; i < 3; i++ { //开三个goroutine
        go loop(i)
    }

    for i := 0; i < 3; i++ {
        <- quit
    }
}


输出结果有很多种:
F:\My Drive\19summer\6824>go run gor2.go
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
F:\My Drive\19summer\6824>go run gor2.go
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
F:\My Drive\19summer\6824>
  • 有时会发生抢占式输出(说明Go开了不止一个原生线程,达到了真正的并行)
  • 有时会顺序输出, 打印完0再打印1, 再打印2(说明Go开一个原生线程,单线程上的goroutine不阻塞不松开CPU)

那么,我们还会观察到一个现象,无论是抢占地输出还是顺序的输出,都会有那么两个数字表现出这样的现象:一个数字的所有输出都会在另一个数字的所有输出之前

原因是, 3个goroutine分配到至多2个线程上,就会至少两个goroutine分配到同一个线程里,单线程里的goroutine 不阻塞不放开CPU, 也就发生了顺序输出。

Ref:

Go并发的一些应用:https://blog.csdn.net/kjfcpua/article/details/18265475

https://stackoverflow.com/questions/13107958/what-exactly-does-runtime-gosched-do

https://studygolang.com/articles/13875

https://blog.csdn.net/kjfcpua/article/details/18265441

https://blog.csdn.net/kjfcpua/article/details/18265461


2. 基于共享变量的并发(ConcurrentMutex),可以理解成传统的使用加锁/解锁和信号量来手动处理并发

1111