2.7 Go channel 代码示例

channel简单示例

package main

import (
    "fmt"
    "time"
)

//channel的创建,发送,接收
func channe1(){
    //创建,channel是有类型的
    c1 := make(chan int)
    
    
    //接收,在这段程序中接收方必须是一个goroutine,因为只在主程序中发送而不接收,程序会报deadlock
    //通常使用匿名函数开一个与主程序同时执行的内部方法,即并行执行
    go func(){
        fmt.Println("接收数据准备")
        //这里接收channel使用了io输出功能,io是可以被抢占控制权的,即IO的特性
        fmt.Println(<- c1)
        fmt.Println("接收数据完成")
        
        //关闭,不显式关闭时,channel会随主程序(即main)的运行结束而结束
        //如果“接收”处理数据的时间较长,就会出现主程序已经结束,但接收方还没处理完的情况
        //此时可以让主程序sleep一段时间,等待接收方把数据处理完毕再关闭
        close(c1)
        
        fmt.Println("接收结束")
    }()
    
    //发送数据,“接收”程序要在发送之前准备,
    //意思就是发送数据之前,要先为channel准备好接收;
    //否则,执行<- 1将1发送到channel时,go发现没有人接收,会报deadlock
    c1 <- 1
    
    //接收方与主程序同时执行
    //主程序在此停止1毫秒,就相当于主程序等了接收方一毫秒
    time.Sleep(time.Millisecond)
}

func main(){
    channe1()
    fmt.Println("主程序结束")
}

输出

# go run chan1.go 
接收数据准备
1
接收数据完成
接收结束
主程序结束

channel同步

如果一个动作会触发另外一个动作,那么这个行为通常被称为事件(event);如果这个事件不附带信息,那么此类事件又通常被用于同步。

channel有发送、接收、关闭三个操作;

发送触发接收,如果一个channel不发送,那么接收将处于阻塞。这种同步,可用于消息通知。

package main

import (
    "fmt"
    "time"
)

func test(){
    c := make(chan struct{})
    
    go func(){
        fmt.Println("我要花两分钟去看看园子里的花还活着吗")
        time.Sleep(7*time.Second)
        c <- struct{}{}
    }()
    
    //程序会在这里等待7秒
    <- c
    //然后打印出下面这句话
    fmt.Println("这花从屋里移值出去后,活得比以前更好了")
}

func main(){
    test()
}

channel数组

package main

import (
    "fmt"
    "sync"
)

func channel2(){
    //WaitGroup的wait(在主程序中调用)与done(在与主程序并行执行的“接收”方中调用)的交互,
    //可以达到等待所有channel运行完毕,再让主程序运行的效果
    //而不是程序员猜想channel“接收”需要多少时间运行,
    //然后去主程序中设置time.Sleep让主程序等待
    var wtg sync.WaitGroup
    
    //channel数组
    var workers [8]worker
    for i := 0; i < 8; i++{
        //使用引用的方式传送参数,所有的channel公用一个WaitGroup
        workers[i] = createWorker(i,&wtg)
    }
    //要一次性添加完要等待执行的channel个数
    wtg.Add(16)
    for i,worker := range workers {
        worker.in <- 'a' + i
        //wtg.Add(1)   //这种方式会报错
    }
    
    for i,worker := range workers {
        worker.in <- 'A' + i
        //wtg.Add(1)
    }
    //等待所有channel执行完毕,否则一直阻塞
    wtg.Wait()
    fmt.Println("所有channel执行完毕")
}

func createWorker(id int, wtg *sync.WaitGroup) worker{
    //channel作为struct的一个属性
    wk := worker{
        //chan<-表示channel只用于发送数据,即输入
        in :  make(chan int),
        done: func(){
            wtg.Done()
        },
    }
    //channel创建之后,就开始以并行的方式建立“接收”方
    go wk.doWork(id)
    return wk
}


type worker struct {
    in   chan int
    done func()
}

//“接收”方程序
func (wk *worker) doWork(id int){
    
    //接收的确是按数组顺序顺序打印出来的,但这只是程序第一次运行的情况
    //接收是在发送之前就以并行的方式运行起来了,之后数据中每个channel都一直处于阻塞等待状态
    //也就是说数组中的每个channel谁先打印出数据,就表示该谁先发送数据(忽略channel传送数据时长差异)
    for n := range wk.in {
        fmt.Printf("第 %d 次接收的信息为 %c\n",id,n)
        
        //通知主程序工作处理完毕
        wk.done()
    }
}

func main(){
    channel2()
    fmt.Println("主程序结束")
}

输出

# go run chann.go 
第 0 次接收的信息为 a
第 1 次接收的信息为 b
第 2 次接收的信息为 c
第 3 次接收的信息为 d
第 4 次接收的信息为 e
第 5 次接收的信息为 f
第 6 次接收的信息为 g
第 6 次接收的信息为 G
第 7 次接收的信息为 h
第 7 次接收的信息为 H
第 0 次接收的信息为 A
第 1 次接收的信息为 B
第 2 次接收的信息为 C
第 3 次接收的信息为 D
第 4 次接收的信息为 E
第 5 次接收的信息为 F
所有channel执行完毕
主程序结束

这个例子除了介绍buffer channel,即channel数组的使用外,还涉及一个GO语言的重要思想,引用GO创始人的原话:

Don't communicate by sharing memory;share memory by communicating.

不要通过共享内存来通信;通过通信来共享内存。

通过共享内存来通信:程序A运行完的结果返回给标识flag,如果flag为true运行程序B,返之运行程序C;

通过通信来共享内存:当一个channel处理完毕时,不是修改一个变量告诉发送方我处理完了,而通过channel来传达这个信息; 可以再定义一个channel来实现这个功能,此处使用了WaitGroup。

select调度

最简单的select调度

package main

import (
    "fmt"
)

func main(){
    var c1,c2 chan int //nil
    
    //select可以在channel为nil的时候就去接收数据,
    //哪个channel有数据发送过来就进行接收
    select {
        case n:= <- c1:
            fmt.Println("来自c1:",n)
            
        case n:= <- c2:
            fmt.Println("来自c2:",n)
        //如果所有channel都无数据
        default:
            fmt.Println("无数据可接收")
    }

}

输出

# go run chan3.go 
无数据可接收

select调度是多选一,是阻塞的

select是阻塞的

package main

import (
    "fmt"
    "tools"
)

func main() {

    var c1,c2  chan int

    i:= 0

    for {
        i++
        fmt.Println(i)
        tools.SleepBySec(1)

        select {
        case <- c1:
            fmt.Println(1)

        case <- c2:
            fmt.Println(2)
        }
    }
    fmt.Println("over")
}

以上这段代码会输出一次1,然后就被阻塞住,select一直监听着两个case的channel是否有数据过来,阻塞着程序,阻塞着for循环。

package main

import (
    "fmt"
)

func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)
        /* 这里注释掉了,为什么呢?
        这是因为如果不注释掉,大部分程序就会走default
        因为在一个时间段内,发送数据只是占用了部分时间片,而不是所有时间 
        该行注释掉之后,select只有两个选择,输出c1与c2发送过来的数据
  
            //如果所有channel都无数据
            default:
                fmt.Println("无数据可接收")
        */
        }
    }

}

输出,只取一部分输出数据

来自c2: 17529
来自c2: 17530
来自c2: 17531
来自c2: 17532
来自c2: 17533
来自c2: 17534
来自c2: 17535
来自c2: 17536
来自c2: 17537
来自c1: 9834
来自c1: 9835
来自c1: 9836
来自c1: 9837
来自c1: 9838
来自c1: 9839
来自c1: 9840
来自c1: 9841
来自c1: 9842
来自c1: 9843

如果select中的default不注释

package main

import (
    "fmt"
)

func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    i :=0 
    j :=0
    for{
        i++
        fmt.Println("循环:",i)
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)
                
            default:
                j++
                fmt.Println("无数据可接收",j)
        }
    }

}

输出,12755次循环中有12635次走了无数据

循环: 12753
无数据可接收 12633
循环: 12754
无数据可接收 12634
循环: 12755
无数据可接收 12635

无default

package main

import (
    "fmt"
)

func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    i :=0 
    j :=0
    t :=0
    for{
        i++
        fmt.Println("循环:",i)
        t = i - j
        fmt.Println("来自c1:",t)
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                j++
                fmt.Println("来自c2:",n)
        }
    }
}

输出

循环: 10218
来自c1: 4132
来自c2: 6086
循环: 10219
来自c1: 4132
来自c2: 6087
循环: 10220
来自c1: 4132
来自c2: 6088
循环: 10221
来自c1: 4132
来自c2: 6089
package main
import (
    "fmt"
    "time"
)

func ch1() chan int{
    fmt.Println("接收程序开始")
    
    c1 := make(chan int)
    
    go func(){
        i:=0
        for {
            c1 <- i
        }
    }()

    time.Sleep(time.Millisecond)
    return c1
}

func main(){
    a,b,i := 0,0,0
    c1,c2 := ch1(),ch1()
    
    for{
        i++
        
        select{
            case  <- c1:
                a++
            case  <- c2:
                b++
        }
        d := a+b
        fmt.Println("all:",i,d,a,b)    
    }
    
    fmt.Println("主程序结束")
}
all: 74333 74333 36912 37421
all: 74334 74334 36913 37421
all: 74335 74335 36913 37422
all: 74336 74336 36914 37422
all: 74337 74337 36914 37423
all: 74338 74338 36914 37424
all: 74339 74339 36915 37424
all: 74340 74340 36915 37425
all: 74341 74341 36916 37425
all: 74342 74342 36917 37425
all: 74343 74343 36917 37426
all: 74344 74344 36918 37426
all: 74345 74345 36918 37427
all: 74346 74346 36918 37428

有default时,select几乎99%都选择了default,这说明每一次循环(每一个时间片)中,有数据的情况只是占这个时间片的一小部分,是极小的一部分,无数据的情况占绝大部分(12635/12755);

无default时,select在多个channel中的选择机会基本均等(36918/37428),并且,总的循环次数=所有case条件执行次数之和,

这意味着没有任何一次for循环是轮空的,尽管case有数据的情况只是占用一次循环时间中极小的一部分;

意味着每一次的for循环,即每一次的select执行,必会等待一个case满足条件,本次select才会结束;

意味着select是阻塞的,即至少有一个case满足条件时,select才会结束,否则就等待

再次验证一下

package main
import (
    "fmt"
    "time"
)

func ch1() chan int{
    fmt.Println("接收程序开始")
    
    c1 := make(chan int)
    
    go func(){
        i:=0
        for {
            c1 <- i
            time.Sleep(time.Millisecond*3000)
        }
    }()

    
    return c1
}

func main(){
    a,b,i := 0,0,0
    c1,c2 := ch1(),ch1()
    
    for{
        i++
        
        select{
            case  <- c1:
                a++
            case  <- c2:
                b++
        }
        d := a+b
        fmt.Println("all:",i,d,a,b)    
    }
    
    fmt.Println("主程序结束")
}

每发送一次数据行,等待3秒;select每次等待3秒输出

接收程序开始
接收程序开始
all: 1 1 0 1
all: 2 2 1 1
all: 3 3 2 1
all: 4 4 2 2
all: 5 5 3 2
all: 6 6 3 3
all: 7 7 4 3
all: 8 8 4 4

如果两个channel同时到达select,那么select二选一,即随机选择一个优先输出

如果不同时到达,则谁先达就选择谁,代码如下

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)

        }
    }

}

输出

来自c2: 0
来自c1: 0
来自c2: 1
来自c1: 1
来自c2: 2
来自c1: 2
来自c2: 3
来自c1: 3
来自c2: 4
来自c1: 4
来自c2: 5
来自c1: 5
来自c1: 6
来自c2: 6
来自c2: 7
来自c2: 8
来自c1: 7
来自c1: 8

前面有说,一个channel必须先定义接收处理的程序,才能开始发送数据,这而代码却是先发送数据,之后才是select接收,这不是与之前所说的相矛盾吗?

那来验证一下,使用最简单的方式先发送再让select接收试一试

package main

import (
    "fmt"
    //"math/rand"
    //"time"
)
/*
func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}
*/

func main(){
    //var c1,c2 = channel3(),channel3()
    var c1,c2 = make(chan int),make(chan int)
    c1 <- 1
    c2 <- 2
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)

        }
    }
}

输出错误deadlock

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /usr/local/automng/src/goapp/src/test/channel3/chan3.go:27 +0x92
exit status 2

那为什么开始的代码没有报错?

channel是goroutine与goroutine之间的通信

这是因为发送方位于一个goroutine中(go func(){...}())中,goroutine处理了这个问题,

前面有程序中只有一个go,也就是只有一个goroutine,为什么也能运行?

这是因为go程序中,main方法本身也是一个goroutine

下面的代码就是正确的

package main

import (
    "fmt"
    //"math/rand"
    //"time"
)
/*
func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}
*/

func main(){
    //var c1,c2 = channel3(),channel3()
    var c1,c2 = make(chan int),make(chan int)
    
    go func(){
        for {
            c1 <- 1
            c2 <- 2
            //time.Sleep(time.Millisecond)
        }
    }()
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)

        }
    }
}

输出,不断地发送1和2,然后又不断地输出1和2

来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1
来自c2: 2
来自c1: 1

这里的输出与之前的输出又不一样,之前的输出一片c1或c2,而这里是c1与c2交替式输出,一个c1下来接着就是c2,这样的区别很明显;下面的这段话是个人推测,不保证百分之百是这样哈

首先,在计算机中我们看到一个功能正在运行,比如复制一个电影,但实际上“一个”CPU还在同时做很多工作,比如你还打开了一个网页在看视频,只是这个CPU处理任务的速度非常快,至少在我们看来,这两个工作都在进行着;但对计算机来说,这些工作是在不同的时间片中完成的,同时IO流也在不同的设备上快速切换着。

GO程序是非抢占式的,就是GO程序拿到CPU资源之后,自己不释放别人不可以抢走;GO中也有可以抢占式的程序,即别人可以抢走程序执行的控制权,比如IO操作(fmt打印就是输出IO到屏幕),这种操作会中断GO对资源的占用,我们可以认为这是系统需要这样,而GO是运行在系统上才不得不这样;比如一个GO主程序正在执行代码,到fmt的时候,在一些时间片上系统抢了GO程序的资源干了别的事,然后又把资源还给了GO程序。

channel操作也是非抢占式的,先给哪个channel发送数据,那么这个channel如果不被中断,那么它就会先输出,先输出就会被select调度先输出;for中先c1,然后 fmt中断一次,再c2再fmt中断一次,然后下一轮for循环,所以保持了c1,c2,c1,c2……的输出顺序

而成片输出c1或c2的代码中,channel的创建是在方法中,数据发送则是在两个不同的goroutine中,这两个goroutine是并行执行,而不像for里面c1与c2,

for {
            c1 <- 1
            c2 <- 2
            //time.Sleep(time.Millisecond)
        }

程序必定是先c1再c2这么有顺序,下面将变量i变成全局变量,再次执行之前的代码

文章有些长了,再次重贴一下之前的代码,重运行看效果

package main

import (
    "fmt"
    //"math/rand"
    //"time"
)

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //time.Sleep(time.Duration(rand.Intn(1000))*
            //    time.Millisecond)
            //time.Sleep(100*time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)

        }
    }
}

输出,可以看到成片的c2中有一个c1

来自c2: 5445
来自c2: 5446
来自c2: 5447
来自c1: 5448
来自c2: 5448
来自c2: 5449
来自c2: 5450
来自c2: 5451
来自c2: 5452
来自c2: 5453
来自c2: 5454
来自c2: 5455
来自c2: 5456

重点是,有两个5448,这里i是全局变量,每一次取i之后,程序都对代码进行了加1(i++),理论上不应该出现相同的数据; 但实际上在并行处理共享资源的时候都会有这个问题,这个问题暂时跳过,我们先讨论非共享资源的处理; 处理的思路不是加锁,而是前面提到的那个重要理念“不要通过共享内存来通信;通过通信来共享内存。”,我们建立一个channel来处理这个共享资源,这里先跳过这个问题。

[root@itoracle channel3]# go run chan3.go > /tmp/a.txt
^Z
[10]+  Stopped                 go run chan3.go > /tmp/a.txt

[root@itoracle channel3]# cat /tmp/a.txt |grep c1 |wc -l
298250
[root@itoracle channel3]# cat /tmp/a.txt |grep c2 |wc -l
274977

之所以成片,是因为计算机处理速度太快了,将结果输出到文本中进行统计c1与c2的数量差不多,而对于计算机来说,一个很短的时间片却可以处理很多工作

我们在发送数据的时候等待1毫秒,输出结果就不成片连接了

package main

import (
    "fmt"
    //"math/rand"
    //"time"
)

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //time.Sleep(time.Duration(rand.Intn(1000))*
            //    time.Millisecond)
            time.Sleep(100*time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    
    for{
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)

        }
    }
}

输出

来自c1: 0
来自c2: 1
来自c2: 2
来自c1: 3
来自c1: 4
来自c2: 5
来自c2: 6
来自c1: 7
来自c1: 8
来自c2: 9
来自c2: 10
来自c1: 11
来自c1: 12
来自c2: 13
来自c2: 14
来自c1: 15
来自c1: 16
来自c2: 17
来自c2: 18

未完,准备出去吃个午饭……

select只收不发会怎么样

package main

import (
    "fmt"
)

func channel3() chan int{
    out := make(chan int)
    go func(){
        i := 1
        out <- i
        i++
        close(out)
    }()
    return out
}


func main(){
    var c1,c2 = channel3(),channel3()
    
    for{
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)
        }
    }
}

输出,除了第一次输出正常外,之后的全输出的是0

来自c2: 1
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0
来自c2: 0

这意味着,如果发送方不是一直在发送数据的话,接收方就会输出0;我们必须在接收方处理这个问题。当有数据发送的时候,接收方才接收。

select不是一直发数据,接收会怎么处理

package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    i := 0
    
    //创建一个channel,并接收数据
    wk := createWorker(i)
    for{
        i++
        fmt.Printf("第%d次 \n",i)
        select {
            case n:= <- c1:
                wk <- n
                
            case n:= <- c2:
                wk <- n
        }
    }
}

输出,不是每次for循环中select都能收到数据,没有收到数据时,就跳过了for循环

第13次 
worker 0 接收的信息为 11
第14次 
worker 0 接收的信息为 12
worker 0 接收的信息为 13
第15次 
第16次 
worker 0 接收的信息为 14
worker 0 接收的信息为 15
第17次 
第18次 
worker 0 接收的信息为 16
worker 0 接收的信息为 17
第19次 
第20次 
worker 0 接收的信息为 18
worker 0 接收的信息为 19
第21次 
第22次 
worker 0 接收的信息为 20
worker 0 接收的信息为 21

这里解读一下这段代码的意思,为后面更复杂的程序作铺垫

select {
            case n:= <- c1:
                wk <- n
                
            case n:= <- c2:
                wk <- n
        }

在select的代码块中,如果c1 这个channel有数据发送过来,那么就将数据赋值给变量n;如果c2这个channel有数据发送过来,那么就将数据赋值给变量n;n是一个channel,然后又将这个channel传递给了wk(wk也是一个channel),由wk负责将传递过来的数据输出; 如果有一轮for循环中(一个时间 片段中),c1和c2都没有数据传递过来,那么就轮空或者阻塞。这里是两个channel往一个channel中写数据。

select中不仅可以channel输出,还可以有channel输入

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    i := 0
    
    //创建一个channel,并接收数据
    wk := createWorker(i)
    hasDAta := false
    n := 0
    for{
        i++
        fmt.Printf("第%d次 \n",i)
        
        var datawk chan<- int //nil
        if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
        }
        
        select {
            case n = <- c1:
                hasDAta = true //有数据来了
            case n = <- c2:
                hasDAta = true //有数据来了
            case datawk <- n:
                hasDAta = false//数据被转移走了(被输出了)
        }
    }
}

输出

第1次 
第2次 
第3次 
worker 0 接收的信息为 0
第4次 
第5次 
worker 0 接收的信息为 1
第6次 
第7次 
worker 0 接收的信息为 2
第8次 
第9次 
worker 0 接收的信息为 3
第10次 
第11次 
worker 0 接收的信息为 4
第12次 
第13次 
worker 0 接收的信息为 5

channel是一种缓冲,是一种通信缓存;张三向李四抛了一个鸡蛋,如果李四还没准备好接这个鸡蛋,那么鸡蛋就会摔碎在地上,当然张三没有这么笨,他会等李四准备好之后再抛;如果有一个缓冲,张三不用管李四是否准备好,张三有鸡蛋就往缓冲里放,李四发现缓冲有鸡蛋就去取,以通信的方式来达到资源共享,这就是channel。

channel中如果发送数据的速度大于接收的速度,则数据丢失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    i := 0
    
    //创建一个channel,并接收数据
    wk := createWorker(i)
    hasDAta := false
    n := 0
    for{
        i++
        fmt.Printf("第%d次 \n",i)
        
        var datawk chan<- int //nil
        if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
        }
        
        select {
            case n = <- c1:
                hasDAta = true //有数据来了
            case n = <- c2:
                hasDAta = true //有数据来了
            case datawk <- n:
                hasDAta = false//数据被转移走了(被输出了)
        }
    }
}

接收时等待1秒再接收数据,发送是连接发送0,1,2,3,……,输出结果中显示很多数据丢失了

第1次 
第2次 
第3次 
第4次 
第5次 
第6次 
第7次 
worker 0 接收的信息为 0
第8次 
第9次 
第10次 
第11次 
第12次 
worker 0 接收的信息为 4
第13次 
第14次 
第15次 
第16次 
第17次 
第18次 
第19次 
worker 0 接收的信息为 8
第20次 

数据的丢失,并不是因为channel没有输出,而是我们在等待1秒后去打印n的时候,已经是n被多次赋值后的结果了,我们并不是n每次变化就打印n; 同时也表示轮空并不是n没有被赋值,channel没有输出。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    i := 0
    
    //创建一个channel,并接收数据
    wk := createWorker(i)
    hasDAta := false
    n := 0
    for{
        i++
        fmt.Printf("第%d次 \n",i)
        
        var datawk chan<- int //nil
        if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
        }
        
        select {
            case n = <- c1:
                hasDAta = true //有数据来了
                fmt.Printf("c1 n=%d \n",n)
            case n = <- c2:
                hasDAta = true //有数据来了
                fmt.Printf("c2 n=%d \n",n)
            case datawk <- n:
                hasDAta = false//数据被转移走了(被输出了)
        }
    }
}

输出

第1次 
c1 n=0 
第2次 
第3次 
c2 n=1 
第4次 
c1 n=2 
第5次 
c2 n=3 
第6次 
c1 n=4 
第7次 
worker 0 接收的信息为 0
第8次 
c2 n=5 
第9次 
c1 n=6 
第10次 
c2 n=7 
第11次 
c1 n=8 
第12次 
worker 0 接收的信息为 4
第13次 
c2 n=9 
第14次 

为channel接收方加上缓存处理

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    i := 0
    
    //创建一个channel,并接收数据
    wk := createWorker(i)
    n := 0
    
    var values []int
    for{
        i++
        fmt.Printf("第%d次 \n",i)
        
        var datawk chan<- int //nil
        var currentValue int
        if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
            currentValue = values[0]
        }
        
        select {
            case n = <- c1:
                values = append(values,n)
            case n = <- c2:
                values = append(values,n)
            case datawk <- currentValue: //消费掉列表中的第一个值
                values = values[1:]      //列表中的数据前移一位
        }
    }
}

输出

第31次 
第32次 
worker 0 接收的信息为 4
第33次 
第34次 
第35次 
第36次 
worker 0 接收的信息为 5
第37次 
第38次 
第39次 
第40次 
第41次 
worker 0 接收的信息为 6
第42次 
第43次 
第44次 
第45次 
worker 0 接收的信息为 7
第46次 

这样可以看到接收到的数据是连接的,不再断续;轮空代表着select循环的次数,也代表着计算机最快的处理速度,之后的程序不再输出这个。

设置channel超时时间

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    
    //创建一个channel,并接收数据
    wk := createWorker(0)
    n := 0
    
    //时间channel
    end := time.After(10*time.Second)
    i := 0 
    var values []int
    for{

        var datawk chan<- int //nil
        var currentValue int
        if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
            currentValue = values[0]
        }
        
        select {
            case n = <- c1:
                values = append(values,n)
            case n = <- c2:
                values = append(values,n)
            case datawk <- currentValue: //消费掉列表中的第一个值
                values = values[1:]      //列表中的数据前移一位
                
            //一定时间内程序未接收到数据就提示timeout
            case <- time.After(450 * time.Millisecond):
                i++
                fmt.Println("time out ",i)
            case <- end:
                fmt.Println("game over")
                return
        }
    }
}

输出

time out  1
worker 0 接收的信息为 0
worker 0 接收的信息为 1
time out  2
worker 0 接收的信息为 2
worker 0 接收的信息为 3
worker 0 接收的信息为 4
worker 0 接收的信息为 5
worker 0 接收的信息为 6
time out  3
worker 0 接收的信息为 7
worker 0 接收的信息为 8
game over

显出channel缓存队列数据积压程度

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func createWorker(id int) chan<- int{
    c := make(chan int)
    //创建channel之后,就将接收处理放入了goroutine中
    go doWork(id,c)
    return c
}

func doWork(id int,c chan int){
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("worker %d 接收的信息为 %d\n",id,n)
    }
}

var i int = 0
func channel3() chan int{
    out := make(chan int)
    go func(){
        for {
            //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
            time.Sleep(time.Duration(rand.Intn(1000))*
                time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}


func main(){
    //创建两个channel,并发送数据
    var c1,c2 = channel3(),channel3()
    
    //创建一个channel,并接收数据
    wk := createWorker(0)
    n := 0
    
    //时间channel
    end := time.After(10*time.Second)
    i := 0 
    var values []int
    
    //Tick会每过一个时间间隔,发送一个值到channel
    tick := time.Tick(2*time.Second)
    for{

        var datawk chan<- int //nil
        var currentValue int
        if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
            datawk = wk
            currentValue = values[0]
        }
        
        select {
            case n = <- c1:
                values = append(values,n)
            case n = <- c2:
                values = append(values,n)
            case datawk <- currentValue: //消费掉列表中的第一个值;如果该条件被轮空跳过,那么数据会写入队列,而不是因覆盖而消失,所以数据不会丢;并且当前值永远是队列的第一个值,也不会变
                values = values[1:]      //列表中的数据前移一位;只有消费一次,当前值才会变化,不因每次for循环datawk都会被置为nil而出现误差。
                
            case <- tick:
                fmt.Println("队列当前长度为:",len(values))
                
            //一定时间内程序未接收到数据就提示timeout
            case <- time.After(600 * time.Millisecond):
                i++
                fmt.Println("time out ",i)
            case <- end:
                fmt.Println("game over")
                return
        }
    }
}

输出

time out  1
worker 0 接收的信息为 0
队列当前长度为: 7
worker 0 接收的信息为 1
worker 0 接收的信息为 2
队列当前长度为: 16
worker 0 接收的信息为 3
worker 0 接收的信息为 4
队列当前长度为: 23
worker 0 接收的信息为 5
worker 0 接收的信息为 6
队列当前长度为: 28
worker 0 接收的信息为 7
worker 0 接收的信息为 8
game over

基础条件:

case的条件谁先成立,就先执行谁;同时成立,则按一定算法处理,可简单认为随机处理。

这次未被处理的条件,下次条件必定为真,但若同时还有为真的条件,它未必就一定会优先处理,依然有一定随机性。

这是前面数据丢失的原因,正在基于这一点,后面设计了缓存队列。

case在进行条件判断的同时,也是一次数据处理,将currentValue传入datawk channel,而datawk chanel中一旦有数据,就会进行处理,这里是打印输出。

case datawk <- currentValue

原理

func createWorker(id int) chan<- int,这一步创建一个通道,并将这个通道的地址返回给主程序,同时,开启一个goroutine尝试从这个通道取数据;主程序就可以不断地向这个通道存放数据,不存放时候,获取数据的goroutine将处理阻塞状态。

其他

wk <- 1 为一个channel赋值,然后goroutine再对其处理,处理的过程是不会阻塞主程序的执行的,可能会出现goroutine还没处理完,主程序就已经执行完的情况。

文件写:

为每个文件创建一个列表

多个channel往列表中放数据

每个列表有一个channel负责从列表中取出数据写入文件