【go学习】goroutine并发学习总结

go最大的特性就是并发了,所以这一块是go的重点知识,我自己花了一些时间,好好学习这个基础知识。

声明

文章内容为个人学习理解,所以文章如果有不对之处,非常感谢指出。

goroutine

说起go并发,一般都会指go协程,即goroutine,而实现goroutin的关键字就是 go

我学习go并发用到的关键字总结

  • go //异步执行
  • chan // make channel 用于各协程间通信 (重点)
    • ch := make(chan 数据类型, 缓冲区容量) // 格式
  • sync.Mutex // 锁
    • m.Lock()// 实例化的sync.Mutex对象 即var m sync.Mutex
    • m.Unlock()
  • sync.Once // 只执行一次函数
    • var once sync.Once once.DO(func(){})
  • sync.WaitGroup // 等待所有协程执行完
    • wg.Add(协程数量) // 添加执行的协程数量
    • wg.Done() // 每执行完一个单元协程就执行一次Done 把上面添加的数量 -1
    • wg.Wait() // 等待所有的协程执行完
  • runtime.NumGoroutine() // 查看当前运行的协程数量

最基础的一个demo

package main

import (
        "fmt"
        "time"
)

func main() {
        fmt.Println("Hello World")
        // 异步执行协程函数
        go func() {
                fmt.Println("goroutine 执行了")
        }()
        // sleep 1 秒的目的是为了 等待协执行完看到结果 不至于 主程序执行完了 协程还没结束
        time.Sleep(1*time.Second)
}

channel 协程间数据通信

go里面协程间可以使用channel进行通信,不推荐直接使用变量交互数据,避免数据操作混乱。

协程数据相当于一个 管道队列机构 先进先出

无缓冲区

无缓冲区时,必须同时读取才行,否则会报错 fatal error: all goroutines are asleep - deadlock!

错误的例子

func main(){
       // 定义一个int类型的channel  channel 只能使用make创建
        c1 := make(chan int)
       // 无缓冲区 单独存 或者取 或者 先后执行存取都会报错
        // c1 <- 1
        n := <- c1
        fmt.Println(n)
}

正确的例子

func main(){
        c1 := make(chan int)
        go func() {
                c1 <- 1
        }()
        n := <- c1
       // 输出1
        fmt.Println(n)
}

补充说明一下,当缓冲区还没有数据进来时,读取操作会被阻塞, 比如以下例子

func main(){
        c1 := make(chan int)
        go func() {
                // sleep 5秒
                time.Sleep(5*time.Second)
                c1 <- 1
        }()
       // 先输出 ----
        fmt.Println("-----")
        n := <- c1
        // 等待5秒缓冲区有数据了  程序才会接着往下执行
        fmt.Println("+++++")
        fmt.Println(n)
}

有缓冲区

func main(){
      // 声明一个channel Buffer为3 表示可以存储 3个 int 数据
      c := make(chan int, 3)
      c <- 1
      c <- 2
      c <- 8
      // 取出一个数据(可以选择不接收) 队列顺序
      <-c
      // 查看channel的 Buffer 容量
      t.Log(cap(c))
      // 也可以多次取值
      x, y := <-c, <-c
      t.Log(fmt.Sprintf("%d + %d = %d", x, y, x+y))
}

上面的例子中,一个个的写入和读取channel中的数据,特别麻烦,于是就有循环写入和读取。需要注意的是使用range循环读取时,channel必须得用close来结束,

否则就回因为range无法判断是否结束,而导致异常

func main(){
      // 声明一个 string chan
      c := make(chan string, 3)

      // 协程异步调用 添加到chan
      go func(n int, c chan string) {
              for i := 0; i < n; i++ {
                    // 循环添加到
                    c <- fmt.Sprintf("--- %d ---", i)
              }
              // 结束添加
              close(c)
      }(cap(c), c)

      // 循环取出
      for v := range c {
              t.Log(v)
      }
}

多个channnel操作

使用sync.WaitGroup等待多个goroutine同时执行完,使用selectcase随机执行返回值

// 两个协程任务分别往不同的channel存入数据
func AsyncCh1(n int, c chan string, wg *sync.WaitGroup) {
        for i := 0; i < n; i++ {
                c <- fmt.Sprintf("++ %d ++", i)
        }
       // Done() 结束时会把之前添加的协程数量减一
        wg.Done()
}

func AsyncCh2(n int, c chan string, wg *sync.WaitGroup) {
        for i := n; i > 0; i-- {
                c <- fmt.Sprintf("-- %d --", i)
        }
        wg.Done()
}

func main() {
        var wg sync.WaitGroup

        ch1 := make(chan string, 5)
        ch2 := make(chan string, 5)
       
       // 添加协程执行数量 
        wg.Add(2)
        go AsyncCh1(cap(ch1), ch1, &wg)
        //wg.Add(1)
        go AsyncCh2(cap(ch2), ch2, &wg)

        // 等待协程全部执行完
        wg.Wait()
            
        for i := 0; i < 10; i++ {
                // 随机从任意的channel中取值 如果有就回立即返回
                select {
                case ret1 := <-ch1:
                        t.Log(ret1)
                case ret2 := <-ch2:
                        t.Log(ret2)
                // 设置的超时  如果任意的协程超过100毫秒就回报错
                case <-time.After(time.Millisecond * 100):
                        t.Error("time out")
                }
        }
}

模拟实现超时操作,把上面的例子改造一下,然后添加超时操作。

func AsyncCh1(n int, c chan string) {
       // 添加3秒sleep 
        time.Sleep(3*time.Second)
        for i := 0; i < n; i++ {
                c <- fmt.Sprintf("++ %d ++", i)
        }
       // 去掉等待结束完成 
        //wg.Done()
}

func AsyncCh2(n int, c chan string) {
        time.Sleep(3*time.Second)
        for i := n; i > 0; i-- {
                c <- fmt.Sprintf("-- %d --", i)
        }
}

func TestSelect(t *testing.T) {
        // 去掉等待
        //var wg sync.WaitGroup

        ch1 := make(chan string, 5)
        ch2 := make(chan string, 5)

        go AsyncCh1(cap(ch1), ch1)
        go AsyncCh2(cap(ch2), ch2)

        for i := 0; i < 10; i++ {
                select {
                case ret1 := <-ch1:
                        t.Log(ret1)
                case ret2 := <-ch2:
                        t.Log(ret2)
                // 添加1秒超时检测  结果就是前两个数据会打印time out
                case <-time.After(time.Second * 1):
                        t.Error("time out")
                }
        }
}

模拟生产和消费

如果是使用了close关闭chan,那么channel取值其实是有两个返回值的,相当于close发出了一个信号

v, ok := <-c;if ok {
        fmt.Println(fmt.Sprintf("有数据  %d", v))
} else {
        fmt.Println("没有数据")
}

如下完整例子

// 不断的生产
func Producer1(c chan int, wg *sync.WaitGroup) {

        for i := 0; i <= 10; i++ {
                fmt.Println(fmt.Sprintf("生产了++++++ %d", i))
                time.Sleep(time.Millisecond * 100)
                c <- i
        }

        // 关闭channel
        close(c)
        //c <- 22  // 关闭后就不能发了  panic: send on closed channel

        wg.Done()
}

// 不断的从chanel里面拿
func Consumer(c chan int, wg *sync.WaitGroup) {

        for {
                //time.Sleep(time.Millisecond*800)
                // 判断生产者是否已经停止了
                v, ok := <-c
                if ok {
                        fmt.Println(fmt.Sprintf("-------消费了 %d", v))
                } else {
                        fmt.Println("结束")
                        break
                }
        }
        wg.Done()

}

func main() {
        var wg sync.WaitGroup
        c := make(chan int, 20)
        wg.Add(2)
        go Producer1(c, &wg)
        go Consumer(c, &wg)
        wg.Wait()

}

仅读和仅写的channel

func Producer5(writeC chan<- int) {
        for i := 0; i < 10; i++ {
                fmt.Printf("生产+++%d\n", i)
                writeC <- i
        }
}
func Consumer5(redC <-chan int) {
        for i := 0; i < 10; i++ {
                fmt.Printf("-----------------消费 %d \n", <-redC)
        }
}

func main() {

        c := make(chan int, 15)

        // 只读
        var redC <-chan int = c
        // 只写
        var writeC chan<- int = c

        // 生产
        go Producer5(writeC)
        // 消费
        Consumer5(redC)

}

只执行一次的方法 sync.Once


var once sync.Once

func NormalFunc(i int) {
        timeStr := time.Now().Format("2006-01-02 15:04:05")

        fmt.Printf(" %d 测试函数 %s \n", i, timeStr)
}

func SingleFunc(i int) {
        fmt.Printf("单例测试函数执行++ %d \n", i)

        once.Do(func() {
                // 这里面只执行一次
                timeStr := time.Now().Format("2006-01-02 15:04:05")

                fmt.Printf("%d------单例子测试函数 只执行一次 %s \n", i, timeStr)
        })
}

func main() {
        var wg sync.WaitGroup

        for i := 0; i < 10; i++ {
                wg.Add(1)

                go func(i int) {
                        NormalFunc(i)
                        SingleFunc(i)

                        wg.Done()
                }(i)
        }
        wg.Wait()
}

利用channel构建对象池

// 新建一个空结构体 相当于对象
type Tool struct {
        name string
}

// 对象池 用于存储 Tool对象
type ToolsBox struct {
        // 属性是一个 channel 内容是 Tool 结构体指针
        bufChan chan *Tool
}

// 获取工具 给结构体绑定方法
func (p *ToolsBox) GetTool(timeout time.Duration) (*Tool, error) {
        select {
        case tool := <-p.bufChan:
                return tool, nil
        case <-time.After(timeout): //超时控制
                return nil, errors.New("time out")
        }
}

// 用完归还(释放)
func (p *ToolsBox) ReleaseTool(tool *Tool) error {
        select {
        case p.bufChan <- tool:
                return nil
        default:
                return errors.New("overflow")
        }
}

// new一个 ToolBox对象
func NewToolsBox(poolNum int) *ToolsBox {
        objPool := ToolsBox{}
        objPool.bufChan = make(chan *Tool, poolNum)

        for i := 0; i < poolNum; i++ {

                // 生成一个 工具结构体
                tool := &Tool{fmt.Sprintf("????--%d", i)}
                // 存入对象池
                objPool.bufChan <- tool
        }

        return &objPool
}

func main() {

        pool := NewToolsBox(5)

        for i := 0; i < 8; i++ {
                tool, err := pool.GetTool(time.Second * 1)

                if err != nil {
                        t.Log(fmt.Sprintf("---取出有问题 %s 当前容量%d", tool, len(pool.bufChan)))
                } else {
                        // 取出没问题
                        t.Log(fmt.Sprintf("----取出一个 %s 当前容量%d", tool, len(pool.bufChan)))

                        // 接着就释放 和判断写在一起
                        if err := pool.ReleaseTool(tool); err != nil {
                                t.Log("释放有问题")
                        } else {
                                t.Log(fmt.Sprintf("释放一个 +++ %s 当前容量%d", tool, len(pool.bufChan)))
                        }
                }

        }

        t.Log("结束")
}

代码地址

github