Go语言完整解析Go!Go!Go!,一数据类型 之 Channel & Goroutine

一:概述

如果说goroutine是Golang的并发体的话,那么channel就是他们之间的通信机制,可以让goroutine通过channel给另一个goroutine发送信息

channel本身是一种数据类型,同时他还会细分是哪一种channel,这种细分表示在channel中发送的数据的类型

wxy:望文生义,channel就是一个管道,管道本身代表着一种数据类型,同时管道中流淌的是什么类型的数据,则又是更进一层的细分

goroutine中若有channel且阻塞了,则称goroutine泄露,这种goroutine是不会被垃圾回收的。

channel类型源码

路径:go\src\runtime
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements.由元素构成的是一块循环链表 elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index 发送索引,用来标记 recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several fields in sudogs blocked on this channel. // Do not change another G's status while holding this lock (in particular, do not ready a G), as this can deadlock with stack shrinking. lock mutex }
type waitq struct {
first *sudog
last *sudog
}
解析:
go\src\go\types
type Chan struct { dir ChanDir elem Type }
// A ChanDir value indicates a channel direction.
type ChanDir int

二,channel的基本信息

1. 双向 channel(full chan)

make会创建一个底层的结构,得到的是一个对底层的"引用", 如下:

var ch chan int          //声明一个int类型的双向channel
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3

特点:

1). channel的拷贝是引用的拷贝,即二者使用的是一同一个底层对象;

2). channel的零值是nil;

3). 相同(底层)类型的channel可以使用==运算符做比较,具体比较的内容是底层的对象;

4). 有两种方式可以创建不带缓存的channel;

5). 可以指定channel的大小,代表这个管道中可以有几个底层对象;

2. 双向channel的两个主要操作:发送和接收(SendRecv)

ch <- x  // a send statement 
x = <-ch // a receive expression in an assignment statement 
<-ch     // a receive statement; result is discarded

Tips:

1. 发送和接收都是使用 " <-" 来表示;

2. 根据channel所在的位置来确认是发送还是接收:

官方说: The <- operator associates with the leftmost chan

即:

首先,<- 代表数据的流向,本着 <- 从属于其左边的原则,

然后,核心对象是channel,发送 还是 接收 是针对channel来操作的,即“发送给"channel or ”接收自“channel;

最后,ch <- x, 代表把x的值给ch, 所以是send一个数据给ch

x = <- ch 和 <-ch, 代表将ch的值取出来, 即接收来自ch的内容赋值给在左则为承受者

3. 单向 channel(single chan)

当channel作为函数的参数时,一般总是专门用于发送或者接收,为了表明这种意图并防止被滥用,

Go预研提供了单方向的channel,分别用于只发送(SendOnly)或只接收(RecvOnly)。

官方定义:

chan T          // can be used to send and receive values of type T,代表双向channel
chan<- float64  // can only be used to send float64s, 只用于发送一个float64类型的数据给channel
<-chan int      // can only be used to receive ints, 只用于从channel中接收一个int类型的数据到某个(leftmost的)变量

常用场景:

/*将接收类型的channel作为入参,进入函数体后接收赋值*/
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x //naturals作为接收channel,依次接收0-99的数据 } close(out) //接收完毕,关闭这个channle,也相当于给其他使用该channel的goroutine一个信号 } /*发送类型的channel: in作为入参, 管道内的值将在函数中被取出来(理解成从channel中发送出去),
而out, 为接收类型的channel,也作为入参用来承载计算结果(理解成channel接收计算结果)*/ func squarer(out chan<- int, in <-chan int) { for v := range in { //从natural中取出来自counter塞入的数据,进行计算,然后将计算结果填充到square这个channel中,一旦natural被close,则循环结束 out <- v * v } close(out) //一旦跳出了循环,表示counter()中的填充行为已经结束,所以本计算函数也结束 } /*发送类型channel in, 将在函数内部发射(送)内部元素值出来*/ func printer(in <-chan int) { for v := range in { fmt.Println(v) } }
naturals := make(chan int) //双向channel squares := make(chan int) //双向channel go counter(naturals) //双向channel隐式转换成单向channel(接收类型), 之后会在go routine中被填充数据 go squarer(squares, naturals) //双向channel分别隐式转换成接收类型和发送类型的单向channel, 后者(发送类型)channel将在go routine中发送内部数据出去, 前者(接收类型)则接收计算结果 printer(squares) //squares再上一次调用中被注入可计算结果,在本函数中作为发送类型, 被打印出来

解析:

1) 发送或接收的对象是channel, 结合箭头的方向,"chan<-"代表向chan中发送数据,所以称为发送channel;

"<-chan"代表从chan中获取数据,所以称为接收channel;

wxy: 鉴于这两个概念容易用混,我的经验是只要遵循mostleft原则, 知道这个channel是做什么的,就不要纠结他是"发送channel"还是"接收channel"

2) 双向channel可以直接隐式地转换成只发送类型的channel或只接收类型channel, 反之则不支持。

三, channel的用法

1. channel的关闭-close(ch)

关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,而对于一个只接收channel调用close, 将会报出编译错误。

例1: 一个常用的用法,利用规范channel为单方向的,确保重复关闭,关闭只能有一处,即只

type Manager struct {
internalStop <-chan struct{} //不能用来关闭
internalStopper chan<- struct{} //可以用来关闭
}
func NewManager() *Manager {
stop := make(chan struct{})
return &Manager{
internalStop: stop,
internalStopper: stop,
}
}

需要注意的是:

type Monitor struct {
managerStopper chan<- struct{}
}
测试逻辑:
systemStopCh := SetupSignalHandler() go func(stop <-chan struct{}){ for { time.Sleep(time.Second * 2) //4.等待 m := NewManager() //1.5先初始化manager,并将stopper交给monitor monitor.managerStopper = m.internalStopper //阻塞着,知道外面的Stopper select { case <-m.internalStop: //3.被关闭,重新启动一次循环 fmt.Println("Receive internal stop signal received, so begin to another [crd contrller] loop!") case <-stop: fmt.Println("Receive stop signal received, so close crd controller and return!") } } }(systemStopCh) go func(){ time.Sleep(time.Second * 3) //2.monitor关闭channel close(monitor.managerStopper) time.Sleep(time.Second * 1) close(monitor.managerStopper) //5.再次关闭 }()
结果:

Receive internal stop signal received, so begin to another [crd contrller] loop!

panic: close of closed channel

解析: 多goroutine时,注意数据安全

解决办法1:

type Monitor struct {
    clustersLock sync.RWMutex
    managerStopperSetter bool
    managerStopper     chan<- struct{}
}


func (mo *Monitor) SetMGRStopper(stopper chan<- struct{}) {
    mo.clustersLock.Lock()
    defer mo.clustersLock.Unlock()
    mo.managerStopperSetter = true
    mo.managerStopper = stopper
}


func (mo *Monitor) StopMGRStopper() {
    mo.clustersLock.Lock()
    defer mo.clustersLock.Unlock()
    if mo.managerStopperSetter {
        close(mo.managerStopper)
        mo.managerStopperSetter = false
    }
}

2.无缓存channel

3. select的用法

var ch3 chan<- interface{} //声明一个单向channel, 初始零值为nil
for {
select {   case <-ch1:   // ...   case x, ok := <-ch2: //显式判断ch2是否被关闭,如果关闭, 则跳出循环
  if !ok { break;}   // ...use x..
  ch3 = xxxx //为ch3赋值,激活第三个case   case ch3 <- y: //初始会阻塞, 待激活后可被选择   // ...   default:   // ...   }
}

4. range的用法

测试2:

func squarer(out chan<- int, in <-chan int) {
    for v := range in {   //从in中不断读取数据
        out <- v * v      //计算后添加到out中
    }
    //close(out)     //注释掉这里,即squares这个channel一直不关闭
}
...
printer(squares)
结果
报错:fatal error: all goroutines are asleep - deadlock!

测试3:

go printer(squares)
for{
time.Sleep(time.Second * 600) //10min后
break
}
close(squares)

结果:正常打印,10min后,打印"print over"

解析:

1. select会等待case中有能够执行的case时去执行。即只有其中有条件满足时,select才会去通信(channel操作)并执行case之后的 语句;

这时候其它case语句则再没有机会执行。一个没有任何case的select语句写作select{},会永远地等待下 去。

多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被 select的机会。

2. channel的零值是nil, 对一个nil的channel发送和接收都会永远阻塞,所以在select语句中操作nil值的channel时,永远都不会被select到。

可以用nil类型的channel来激活或者禁用case, 比如用于处理输入和输出事件时,超时和取消的逻辑。

3. select是一个一次性的操作,所以如果想要循环检查直到得到结果,就需要在外层使用for循环。

4. 如果想要能够监测到channel是否已经关闭,则需要结合ok pattern使用。

5. 当range用于channel的时候,会一直不断的扫描channel读取(可以写入么?)其中的元素,没有就阻塞有就进入执行体,直到这channel被关闭

待学习参考链接:

https://blog.csdn.net/u010853261/article/details/85231944