Go语言并发编程总结

Golang :不要通过共享内存来通信,而应该通过通信来共享内存。

这句风靡在Go社区的话,说的就是 goroutine中的 channel .......

他在go并发编程中充当着 类型安全的管道作用。

1、通过golang中的 goroutine 与sync.Mutex进行 并发同步

import(

"fmt"

"sync"

"runtime"

)

var count int =0;

func counter(lock * sync.Mutex){

lock.Lock()

count++

fmt.Println(count)

lock.Unlock()

}

func main(){

lock:=&sync.Mutex{}

for i:=0;i<10;i++{

//传递指针是为了防止 函数内的锁和 调用锁不一致

go counter(lock)

}

for{

lock.Lock()

c:=count

lock.Unlock()

///把时间片给别的goroutine 未来某个时刻执行该routine

runtime.Gosched()

if c>=10{

fmt.Println("goroutine end")

break

}

}

}

2、goroutine之间通过 channel进行通信,channel是和类型相关的 能够理解为 是一种类型安全的管道。

简单的channel 使用

package main

import "fmt"

func Count(ch chan int) {

ch <- 1

fmt.Println("Counting")

}

func main() {

chs := make([]chan int, 10)

for i := 0; i < 10; i++ {

chs[i] = make(chan int)

go Count(chs[i])

fmt.Println("Count",i)

}

for i, ch := range chs {

<-ch

fmt.Println("Counting",i)

}

}

3、Go语言中的select是语言级内置 非阻塞

select {

case <-chan1: // 假设chan1成功读到数据,则进行该case处理语句

case chan2 <- 1: // 假设成功向chan2写入数据,则进行该case处理语句

default: // 假设上面都没有成功,则进入default处理流程

}

能够看出。select不像switch。后面并不带推断条件,而是直接去查看case语句。每一个

case语句都必须是一个面向channel的操作。

比方上面的样例中,第一个case试图从chan1读取

一个数据并直接忽略读到的数据,而第二个case则是试图向chan2中写入一个整型数1,假设这

两者都没有成功。则到达default语句。

4、channel 的带缓冲读取写入

之前我们示范创建的都是不带缓冲的channel,这样的做法对于传递单个数据的场景能够接受,

但对于须要持续传输大量数据的场景就有些不合适了。接下来我们介绍怎样给channel带上缓冲。

从而达到消息队列的效果。

要创建一个带缓冲的channel,事实上也很easy:

c := make(chan int, 1024)

在调用make()时将缓冲区大小作为第二个參数传入就可以,比方上面这个样例就创建了一个大小

为1024的int类型channel,即使没有读取方,写入方也能够一直往channel里写入。在缓冲区被

填完之前都不会堵塞。

从带缓冲的channel中读取数据能够使用与常规非缓冲channel全然一致的方法,但我们也可

以使用range关键来实现更为简便的循环读取:

for i := range c {

fmt.Println("Received:", i)

}

5、用goroutine模拟生产消费者

package main

import "fmt"

import "time"

func Producer (queue chan<- int){

for i:= 0; i < 10; i++ {

queue <- i

}

}

func Consumer( queue <-chan int){

for i :=0; i < 10; i++{

v := <- queue

fmt.Println("receive:", v)

}

}

func main(){

queue := make(chan int, 1)

go Producer(queue)

go Consumer(queue)

time.Sleep(1e9) //让Producer与Consumer完毕

}

6、 通过make 创建通道

make(c1 chan int) 创建的是 同步channel ...读写全然相应

make(c1 chan int ,10) 闯进带缓冲的通道 上来能够写10次

7、随机向通道中写入0或者1

package main

import "fmt"

import "time"

func main(){

ch := make(chan int, 1)

for {

///不停向channel中写入 0 或者1

select {

case ch <- 0:

case ch <- 1:

}

//从通道中取出数据

i := <-ch

fmt.Println("Value received:",i)

time.Sleep(1e8)

}

}

8、带缓冲的channel

之前创建的都是不带缓冲的channel,这样的做法对于传递单个数据的场景能够接受,

但对于须要持续传输大量数据的场景就有些不合适了。接下来我们介绍怎样给channel带上缓冲,

从而达到消息队列的效果。

要创建一个带缓冲的channel,事实上也很easy:

c := make(chan int, 1024)

在调用make()时将缓冲区大小作为第二个參数传入就可以,比方上面这个样例就创建了一个大小

为1024的int类型channel,即使没有读取方。写入方也能够一直往channel里写入,在缓冲区被

填完之前都不会堵塞。

从带缓冲的channel中读取数据能够使用与常规非缓冲channel全然一致的方法。但我们也可

以使用range关键来实现更为简便的循环读取:

for i := range c {

fmt.Println("Received:", i)

}

////////////////////////////////////////以下是測试代码////////////////////////////////////

package main

import "fmt"

import "time"

func A(c chan int){

for i:=0;i<10;i++{

c<- i

}

}

func B(c chan int){

for val:=range c {

fmt.Println("Value:",val)

}

}

func main(){

chs:=make(chan int,10)

//仅仅要有通道操作一定要放到goroutine中否则 会阻塞当前的主线程 而且导致程序退出

//对于同步通道 或者带缓冲的通道 一定要封装成函数 使用 goroutine 包装

go A(chs)

go B(chs)

time.Sleep(1e9)

}

9、关于创建多个goroutine详细到go语言会创建多少个线程

import "os"

func main() {

for i:=0; i<20; i++ {

go func() {

for {

b:=make([]byte, 10)

os.Stdin.Read(b) // will block

}

}()

}

select{}

}

会产生21个线程:

runtime scheduler(src/pkg/runtime/proc.c)会维护一个线程池。当某个goroutine被block后,scheduler会创建一个新线程给其它ready的goroutine

GOMAXPROCS控制的是未被堵塞的全部goroutine被multiplex到多少个线程上执行

10、在channel中也是能够传递channel的,Go语言的channel和map slice等一样都是原生类型

须要注意的是,在Go语言中channel本身也是一个原生类型,与map之类的类型地位一样,因

此channel本身在定义后也能够通过channel来传递。

我们能够使用这个特性来实现*nix上很常见的管道(pipe)特性。管道也是使用很广泛

的一种设计模式。比方在处理数据时,我们能够採用管道设计,这样能够比較easy以插件的方式

添加数据的处理流程。

以下我们利用channel可被传递的特性来实现我们的管道。 为了简化表达。 我们如果在管道中

传递的数据仅仅是一个整型数,在实际的应用场景中这一般会是一个数据块。

首先限定主要的数据结构:

type PipeData struct {

value int

handler func(int) int

next chan int

}

然后我们写一个常规的处理函数。我们仅仅要定义一系列PipeData的数据结构并一起传递给

这个函数,就能够达到流式处理数据的目的:

func handle(queue chan *PipeData) {

for data := range queue {

data.next <- data.handler(data.value)

}

}

11、我们默认创建的是双向通道,单向通道没有意义,可是我们却能够通过强制转换 将双向通道 转换成为单向通道 。

var ch1 chan int // ch1是一个正常的channel,不是单向的

var ch2 chan<- float64// ch2是单向channel,仅仅用于写float64数据

var ch3 <-chan int // ch3是单向channel,仅仅用于读取int数据

channel是一个原生类型,因此不仅 支持被传递,还支持类型转换。仅仅有在介绍了单向channel的概念后。读者才会明确类型转换对于

channel的意义:就是在单向channel和双向channel之间进行转换。

示比例如以下:

ch4 := make(chan int)

ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel

ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel

基于ch4。我们通过类型转换初始化了两个单向channel:单向读的ch5和单向写的ch6。

从设计的角度考虑,全部的代码应该都遵循“最小权限原则” ,

从而避免不是必需地使用泛滥问题, 进而导致程序失控。 写过C++程序的读者肯定就会联想起const 指针的使用方法。

非const指针具备const指针的全部功能。将一个指针设定为const就是明白告诉

函数实现者不要试图对该指针进行改动。

单向channel也是起到这种一种契约作用。

以下我们来看一下单向channel的使用方法:

func Parse(ch <-chan int) {

for value := range ch {

fmt.Println("Parsing value", value)

}

}

除非这个函数的实现者无耻地使用了类型转换。否则这个函数就不会由于各种原因而对ch 进行写,避免在ch中出现非期望的数据。从而非常好地实践最小权限原则。

12、仅仅读仅仅写 单向 channel 代码样例 遵循权限最小化的原则

package main

import "fmt"

import "time"

//接受一个參数 是仅仅同意读取通道 除非直接强制转换 要么你仅仅能从channel中读取数据

func sCh(ch <-chan int){

for val:= range ch {

fmt.Println(val)

}

}

func main(){

//创建一个带100缓冲的通道 能够直接写入 而不会导致 主线程阻塞

dch:=make(chan int,100)

for i:=0;i<100;i++{

dch<- i

}

//传递进去 仅仅读通道

go sCh(dch)

time.Sleep(1e9)

}

13、channel的关闭,以及推断channel的关闭

关闭channel很easy,直接使用Go语言内置的close()函数就可以:

close(ch)

在介绍了怎样关闭channel之后。我们就多了一个问题:怎样推断一个channel是否已经被关

闭?我们能够在读取的时候使用多重返回值的方式:

x, ok := <-ch

这个使用方法与map中的按键获取value的过程比較类似,仅仅须要看第二个bool返回值就可以,如

果返回值是false则表示ch已经被关闭。

14、Go的多核并行化编程 高性能并发编程 必须设置GOMAXPROCS 为最大核数目 这个值由runtime.NumCPU()获取

在运行一些昂贵的计算任务时, 我们希望可以尽量利用现代server普遍具备的多核特性来尽

量将任务并行化,从而达到减少总计算时间的目的。此时我们须要了解CPU核心的数量。并针对

性地分解计算任务到多个goroutine中去并行执行。

以下我们来模拟一个全然能够并行的计算任务:计算N个整型数的总和。我们能够将全部整

型数分成M份,M即CPU的个数。让每一个CPU開始计算分给它的那份计算任务。最后将每一个CPU

的计算结果再做一次累加,这样就能够得到全部N个整型数的总和:

type Vector []float64

// 分配给每一个CPU的计算任务

func (v Vector) DoSome(i, n int, u Vector, c chan int) {

for ; i < n; i++ {

v[i] += u.Op(v[i])

}

c <- 1

// 发信号告诉任务管理者我已经计算完毕了

}

const NCPU = 16

// 如果总共同拥有16核

func (v Vector) DoAll(u Vector) {

c := make(chan int, NCPU) // 用于接收每一个CPU的任务完毕信号

for i := 0; i < NCPU; i++ {

go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)

}

// 等待全部CPU的任务完毕

for i := 0; i < NCPU; i++ {

<-c // 获取到一个数据,表示一个CPU计算完毕了

}

// 到这里表示全部计算已经结束

}

这两个函数看起来设计很合理。DoAll()会依据CPU核心的数目对任务进行切割,然后开

辟多个goroutine来并行运行这些计算任务。

能否够将总的计算时间降到接近原来的1/N呢?答案是不一定。

假设掐秒表(正常点的话,

应该用7.8节中介绍的Benchmark方法) ,会发现总的执行时间没有明显缩短。

再去观察CPU执行

状态, 你会发现虽然我们有16个CPU核心, 但在计算过程中事实上仅仅有一个CPU核心处于繁忙状态,

这是会让非常多Go语言刚開始学习的人迷惑的问题。

官方的答案是,这是当前版本号的Go编译器还不能非常智能地去发现和利用多核的优势。

尽管

我们确实创建了多个goroutine,而且从执行状态看这些goroutine也都在并行执行。但实际上全部

这些goroutine都执行在同一个CPU核心上。 在一个goroutine得到时间片执行的时候, 其它goroutine

都会处于等待状态。从这一点能够看出。尽管goroutine简化了我们写并行代码的过程。但实际上

总体执行效率并不真正高于单线程程序。

在Go语言升级到默认支持多CPU的某个版本号之前。我们能够先通过环境变量设置

GOMAXPROCS的值来控制使用多少个CPU核心。

详细操作方法是通过直接环境变量设置

GOMAXPROCS的值。或者在代码中启动goroutine之前先调用下面这个语句以设置使用16个CPU

核心:

runtime.GOMAXPROCS(16)

究竟应该设置多少个CPU核心呢,事实上runtime包中还提供了另外一个函数NumCPU()来获

取核心数。

能够看到,Go语言事实上已经感知到全部的环境信息。下一版本号中全然能够利用这些

信息将goroutine调度到全部CPU核心上。从而最大化地利用server的多核计算能力。抛弃

GOMAXPROCS仅仅是个时间问题。

15、主动出让时间片给其它 goroutine 在未来的某一时刻再来运行当前goroutine

我们能够在每一个goroutine中控制何时主动出让时间片给其它goroutine,这能够使用runtime

包中的Gosched()函数实现。

实际上,假设要比較精细地控制goroutine的行为,就必须比較深入地了解Go语言开发包中

runtime包所提供的详细功能。

16、Go中的同步

倡导用通信来共享数据。而不是通过共享数据来进行通信,但考虑

到即使成功地用channel来作为通信手段。还是避免不了多个goroutine之间共享数据的问题,Go

语言的设计者尽管对channel有极高的期望。但也提供了妥善的资源锁方案。

17、Go中的同步锁

倡导用通信来共享数据。而不是通过共享数据来进行通信,但考虑

到即使成功地用channel来作为通信手段。还是避免不了多个goroutine之间共享数据的问题。Go

语言的设计者尽管对channel有极高的期望。但也提供了妥善的资源锁方案。

对于这两种锁类型。 不论什么一个Lock()或RLock()均须要保证相应有Unlock()或RUnlock()

调用与之相应,否则可能导致等待该锁的全部goroutine处于饥饿状态,甚至可能导致死锁。锁的

典型使用模式例如以下:

var l sync.Mutex

func foo() {

l.Lock()

//延迟调用 在函数退出 而且局部资源被释放的时候 调用

defer l.Unlock()

//...

}

这里我们再一次见证了Go语言deferkeyword带来的优雅

18、全局唯一操作 sync.Once.Do() sync.atomic原子操作子包

对于从全局的角度仅仅须要执行一次的代码。比方全局初始化操作,Go语言提供了一个Once

类型来保证全局的唯一性操作。详细代码例如以下:

var a string

var once sync.Once

func setup() {

a = "hello, world"

}

func doprint() {

once.Do(setup)

print(a)

}

func twoprint() {

go doprint()

go doprint()

}

假设这段代码没有引入Once, setup()将会被每个goroutine先调用一次, 这至少对于这个

样例是多余的。

在现实中,我们也常常会遇到这种情况。Go语言标准库为我们引入了Once类

型以解决问题。once的Do()方法能够保证在全局范围内仅仅调用指定的函数一次(这里指

setup()函数) 。并且全部其它goroutine在调用到此语句时。将会先被堵塞。直至全局唯一的

once.Do()调用结束后才继续。

这个机制比較轻巧地攻克了使用其它语言时开发人员不得不自行设计和实现这样的Once效果的

难题,也是Go语言为并发性编程做了尽量多考虑的一种体现。

假设没有once.Do()。我们非常可能仅仅能加入一个全局的bool变量,在函数setup()的最后

一行将该bool变量设置为true。在对setup()的全部调用之前,须要先推断该bool变量是否已

经被设置为true,假设该值仍然是false,则调用一次setup(),否则应跳过该语句。实现代码

var done bool = false

func setup() {

a = "hello, world"

done = true

}

func doprint() {

if !done {

setup()

}

print(a)

}

这段代码初看起来比較合理。 可是细看还是会有问题。 由于setup()并非一个原子性操作,

这样的写法可能导致setup()函数被多次调用。从而无法达到全局仅仅运行一次的目标。这个问题的

复杂性也更加体现了Once类型的价值。

为了更好地控制并行中的原子性操作。sync包中还包括一个atomic子包,它提供了对于一

些基础数据类型的原子操作函数。比方以下这个函数:

func CompareAndSwapUint64(val *uint64, old, new uint64) (swapped bool)

就提供了比較和交换两个uint64类型数据的操作。这让开发人员无需再为这种操作专门加入

Lock操作。