go基础第六篇:并发之channel

go不推荐使用共享内存机制,而是推荐使用CSP并发模型机制。

CSP全称是Communicating Sequential Processes,可以翻译成通信顺序进程(Communicating翻译成通信的意思)。简单解释就是,CSP模型由并发执行的实体组成,实体之间通过发送消息进行通信,发送消息时使用的是通道。CSP模型的关键是通道,而不是发送消息的实体。口诀是Do not communicate by sharing memory,instead,share memory by communicating。不要以共享内存的方式来通信,相反,要通过通信来共享内存。

go的CSP并发模型,是通过goroutine和通道channel实现的。goroutine是并发执行的实体,底层使用协程(coroutine)实现并发。coroutine运行在用户态,从而避免了内核态和用户态的切换导致的成本。

channel可分为两种:

一种是普通channel,也称为非缓冲channel,通道里无法存储数据,消费者必须先于生产者准备好,否则生产者就会阻塞。

另一种是buffered channel,称为缓冲channel,创建时要指定通道的大小,在未达到指定大小时,生产者可以任意往通道中放数据,不会阻塞,直到达到指定大小后,阻塞。

新建channel也是用make关键字:

新建普通channel,语法是make(chan type)或者make(chan type, 0),如ch := make(chan string),通道想放什么类型的数据都行,如int、string,甚至可以是interface{}。

新建buffered channel,语法是make(chan type, value),value是个大于0的整数,如ch := make(chan string, 10),即只需要在make函数中添加第二个参数,指定channel的大小。

往通道中放数据、从通道中取数据,都要用到一个特殊的操作符<-,小于号后面跟一个中横线,好像一个左箭头,箭头的指向就是数据的流向,往通道中放数据,箭头要指向通道,通道<-,从通道中取数据,箭头要背向通道,<-通道。

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    fmt.Println(service())
    otherTask()
    fmt.Println("cost", time.Since(start))
}

上例中,service函数需要执行1s,otherTask函数需要执行2s,上面这个程序,会先执行service函数,再执行otherTask函数,总耗时在3s。

现在我们要求总耗时2s,且能够在主协程中获取service函数的返回值。观察otherTask函数和service函数,otherTask函数和service函数的返回值没有关系,所以可以并行执行service函数和otherTask。把service函数另起一个协程执行,那么如何在主协程中获取子协程中的值呢?建个channel就好了,子协程往channel中放,主协程从channel中取。

改造如下:

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func asyncService(ch chan string) {
    go func() {
        result := service()
        ch <- result
    }()
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    ch := make(chan string)
    asyncService(ch)
    otherTask()
    result := <-ch
    fmt.Println("result= " + result)
    fmt.Println("cost", time.Since(start))
}

如上,在主协程中创建了一个channel,在子协程中执行service函数,并将返回值放入主协程创建的channel,在主协程中就可以从channel中取数据了。用的channel是一个普通channel,子协程执行完service函数后,把service函数返回值放到channel时会阻塞,因为otherTask函数还没执行完,主协程还不会从channel中取数据,直到otherTask函数执行完,主协程从channel中取数据,子协程才能把service函数返回值放到channel,主协程取出并使用。

我们还可以优化一下,把子协程解放出来,没必要阻塞一段时间,占用资源。用buffered channel替换普通channel,优化后代码如下:

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func asyncService(ch chan string) {
    go func() {
        result := service()
        fmt.Println("service执行完毕")
        ch <- result
        fmt.Println("channel放入完毕")
    }()
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    ch := make(chan string, 1)
    asyncService(ch)
    otherTask()
    result := <-ch
    fmt.Println("result= " + result)
    fmt.Println("cost", time.Since(start))
}

只改动了一点,即把ch := make(chan string)改为了ch := make(chan string, 1)

channel关闭 close(ch)。重复关闭同一个channel会panic。

v, ok := <-ch

ok为true时,表示channel未关闭,可正常从通道中取数据;ok为false时,表示通道已关闭,取出来的数据为相应数据类型的零值,如0,空字符串,nil等等。

向一个已关闭的channel发送消息会panic,但是从已关闭的channel读取消息不会panic,且能读出channel中还未被读取的消息,此时ok为true,取到消息。若消息均已被读取,则ok为false,取到该类型的零值。

channel关闭的广播机制

所有的channel接收者在channel关闭时都会立刻从阻塞等待中返回,且上述ok值为false。由此,可用于同时向多个订阅者发送信号,如退出信号。

可以用range遍历channel,在遍历前要确保channel处于关闭状态,否则会panic。示例如下:

func main() {
    c := make(chan int, 3)

    c <- 1
    c <- 2
    c <- 3

    close(c)
    for k := range c {
        fmt.Println(k)
    }
}

所有任务完成之后,才往下执行。示例如下:

func main() {
    fmt.Println("A")
    num := 10
    c := make(chan string, num)
    for i := 0; i < num; i++ {
        i := i
        go func() {
            defer func() {
                c <- ""
            }()
            printI(i)
        }()
    }
    for i := 0; i < num; i++ {
        <-c
    }
    fmt.Println("B")
}

func printI(i int) {
    fmt.Println(i)
}

channel在Go中是一等公民,它是线程安全的,面对并发问题,应首先想到channel。