go——并发

并发与并行的区别:

并发:逻辑上具备同时处理多个任务的能力

并行:物理上在同一时刻执行多个并发任务

通常都会说程序是并发设计的,也就是说它允许多个任务同时执行,但实际上并不一定真在同一时刻发生。

在单核处理器上,它们能以间隔方式切换执行

并行则依赖多核处理器等物理设备,让多个任务真正在同一时刻执行,它代表了当前程序运行状态。

简单点说,并行是并发设计的理想执行模式。

多线程或多进程是并行的基本条件,但单线程也可用协程(coroutine)做到并发

尽管协程在单个线程上通过主动切换来实现多任务并发,但它也有自己的优势。

除了将因阻塞而浪费的时间找回来外,还免去了线程切换开销,有着不错的执行效率。

协程上运行的多个任务本质上依旧串行,加上可控自主调度,所以并不需要做同步处理。

即使采用多线程也未必能并行。Python就因GIL限制,默认只能并发而不能并行,所以很多时候转而使用“多进程”+“协程”架构。

通常情况下,用多进程来实现分布式和负载均衡,减轻单进程垃圾回收压力;

用多线程(LWP)抢夺更多的处理器资源;用协程来提高处理器时间片利用率。

简单将goroutine归纳为协程并不合适。运行时会创建多个线程来执行并发任务,且任务单元可被调度到其它线程并行执行。

这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

只须在函数调用前添加go关键字即可创建并发任务。

go fmt.PrintLn("hello")

关键字go并非执行并发操作,而是创建一个并发任务单元。

新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。

当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。

相比系统默认MB级别的线程栈,goroutine自定义栈初始仅须2KB,所以才能创建成千上万的并发任务。

自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB规模。

与defer一样,goroutine也会因“延迟执行”而立即计算并复制执行参数。

package main

import (
        "fmt"
        "time"
)

var c int //初始为0

func counter() int {
        c++
        return c
}

func main() {
        a := 100

        go func(x, y int) { //匿名函数直接执行
                //hour min second  时分秒
                time.Sleep(time.Second) //休息一秒钟
                fmt.Println("go", x, y) //执行counter 100 1  //goroutine在main之后执行
        }(a, counter()) //立即计算并复制参数

        a += 100
        fmt.Println("main:", a, counter()) //200 2
        time.Sleep(time.Second * 3)        //等待goroute结束

}

/*
main: 200 2
go 100 1
为什么先打印main?
go创建一个并发单元,但是不会马上执行,而是会放置在队列中
*/

进程退出时不会等待并发任务结束,可用通道(channel)阻塞,然后发出退出信号。

package main

import (
        "fmt"
        "time"
)

func main() {
        // exit := make(chan struct{}) //创建通道。因为仅仅是通知,数据并没有实际意义。

        go func() {
                time.Sleep(time.Second)
                fmt.Println("goroutine done.")

                // close(exit) //关闭通道发出信号
        }()

        fmt.Println("main...")
        // <-exit //如通道关闭,立即解除阻塞。
        fmt.Println("main exit...")
}

<-exit接收操作符,如果exit代表了元素类型为byte的通道类型值,则此表达式就表示从exit中接收一个byte类型值的操作。

如果不创建通道,main直接结束之后进程就结束了,而不会等待并发任务的结束,这样就不会执行并发任务。

所以,我们可以创建一个通道,当通道关闭后才会解除阻塞,整个程序才会结束

除关闭通道外,写入数据也可解除阻塞。

如果等待多个任务结束,推荐使用sync.WaitGroup。

通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        var wg sync.WaitGroup

        for i := 0; i < 10; i++ {
                wg.Add(1) //累加计数,每循环以此加一

                go func(id int) { //创建多个任务单元
                        defer wg.Done() //递减计数,这个任务计数之前减一

                        time.Sleep(time.Second)
                        fmt.Println("goroutine", id, "done")
                }(i)
        }

        fmt.Println("main...")
        wg.Wait() //阻塞,直至wg归零
        fmt.Println("main exit")
}

/*
main...
goroutine 6 done
goroutine 1 done
goroutine 2 done
goroutine 4 done
goroutine 9 done
goroutine 8 done
goroutine 0 done
goroutine 7 done
goroutine 5 done
goroutine 3 done
main exit
*/

  

尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,wait已经退出。

go func(id int) {
        wg.Add(1)
        defer wg.Done()
}

  

可在多处使用wait阻塞,它们都可以接收到通知。

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        var wg sync.WaitGroup
        wg.Add(1)

        go func() {
                wg.Wait()
                fmt.Println("wait exit")
        }()

        go func() {
                time.Sleep(time.Second * 3)
                fmt.Println("done.")
                wg.Done()
        }()

        wg.Wait()
        fmt.Println("main exit.")
}

/*
done.
wait exit
main exit.
*/

  

GOMAXPROCS 线程数量

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。

该数量默认与处理器核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。

package main

import (
        "fmt"
        "math"
        "runtime"
        "sync"
        "time"
)

//测试目标函数
func count() {
        x := 0
        for i := 0; i < math.MaxUint32; i++ {
                x += 1
        }

        fmt.Println(x)
}

//循环执行
func test(n int) {
        for i := 0; i < n; i++ {
                count()
        }
}

//并发执行
func test2(n int) {
        var wg sync.WaitGroup
        wg.Add(n) //计数为4

        for i := 0; i < n; i++ {
                go func() {
                        count()
                        wg.Done()
                }()
        }

        wg.Wait()
}

func main() {
        n := runtime.GOMAXPROCS(0) //4
        fmt.Println(time.Now())
        // test(n)  //6s
        test2(n) //3s
        fmt.Println(time.Now())
}

  

Local Storage 局部存储器

与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。

package main

import (
        "fmt"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var gs [5]struct { //使用数组来进行局部存储
                id     int //编号
                result int //返回值
        }
        fmt.Println(gs)
        for i := 0; i < len(gs); i++ {
                wg.Add(1)

                go func(id int) {
                        defer wg.Done()

                        gs[id].id = id
                        gs[id].result = (id + 1) * 100
                }(i)
        }

        wg.Wait()
        fmt.Printf("%+v\n", gs)
}

/*
[{0 0} {0 0} {0 0} {0 0} {0 0}]
[{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]
*/

  

如果使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。

Gosched

暂停,释放线程去执行其它任务。当前任务被放回队列,等待下次调度时恢复执行

package main

import (
        "fmt"
        "runtime"
)

func main() {
        runtime.GOMAXPROCS(1)
        exit := make(chan struct{})

        go func() {  /任务a
                defer close(exit)

                go func() {  //任务b
                        fmt.Println("b")
                }()

                for i := 0; i < 4; i++ {
                        fmt.Println("a:", i)

                        if i == 1 {  
                                runtime.Gosched()  //让出当前线程,调度执行b
                        }
                }
        }()

        <-exit
}

/*
a: 0
a: 1
b  //这个b有可能打印不出来
a: 2
a: 3
*/

  

该函数很少被使用,因为运行时会主动向长时间运行的任务发出抢占调度。

Goexit

立即终止当前任务,运行时确保所有已注册延迟调用被执行。

该函数不会影响其它并发任务,不会引发panic,自然也就无法捕获。

package main

import (
        "fmt"
        "runtime"
)

func main() {
        exit := make(chan struct{})

        go func() {
                defer close(exit)
                defer fmt.Println("a") //执行3

                func() {
                        defer func() {
                                fmt.Println("b", recover() == nil) //defer总会执行 2
                        }()

                        func() {
                                fmt.Println("c")       //执行1
                                runtime.Goexit()       //立即终止当前任务
                                fmt.Println("c done.") //不会执行
                        }()

                        fmt.Println("b done.") //不会执行
                }()

                fmt.Println("a done.") //不会执行
        }()

        <-exit
        fmt.Println("main exit") //主程序
}

/*
c
b true
a
main exit
*/

  

如果在main里调用Goexit,它会等其它任务结束,然后让进程直接崩溃。

package main

import (
        "fmt"
        "runtime"
        "time"
)

func main() {
        for i := 0; i < 2; i++ {
                go func(x int) {
                        for n := 0; n < 2; n++ {
                                fmt.Printf("%c: %d\n", 'a'+x, n)
                                time.Sleep(time.Millisecond)
                        }
                }(i)
        }

        runtime.Goexit() //等待所有任务结束
        fmt.Println("main exit.")
}

/*
b: 0
a: 0
a: 1
b: 1
fatal error: no goroutines (main called runtime.Goexit) - deadlock!
*/

  

无论身处那一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。

标准库函数os.Exit可终止进程,但不会执行延迟调用。