Go语言无锁队列组件的实现 ,chan/interface/select

go代码中要实现异步很简单,go funcName()。

但是进程需要控制协程数量在合理范围内,对应大批量任务可以使用“协程池 + 无锁队列”实现。

2. golang无锁队列实现思路

  • Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。无锁队列使用带buff的chan存储数据。
  • interface{} (类似c++的void*, java的Object)可以与任意类型互转。无锁队列使用interface{}作为数据存储类型。
  • select可以处理多个信号, 可以用来解决channel阻塞问题。

3. 代码实现

package main

import (
        "fmt"
        "time"
)

type DataContainer struct {
        Queue chan interface{}
}

func NewDataContainer(max_queue_len int) (dc *DataContainer){
        dc = &DataContainer{}
        dc.Queue = make(chan interface{}, max_queue_len)
        return dc
}

//非阻塞push
func (dc *DataContainer) Push(data interface{}, waittime time.Duration) bool{
        click := time.After(waittime)
        select {
        case dc.Queue <- data:
                return true
        case <- click:
                return false
        }
}

//非阻塞pop
func (dc *DataContainer) Pop(waittime time.Duration) (data interface{}){
        click := time.After(waittime)
        select {
        case data =<-dc.Queue:
                return data
        case <- click:
                return nil
        }
}

//test
var MAX_WAIT_TIME = 10 *time.Millisecond
func main(){
        type dataItem struct {
                name string
                age int
        }

        datacotainer := NewDataContainer(2)
        //add
        fmt.Printf("res=%v\n", datacotainer.Push(&dataItem{"zhangsan",25}, MAX_WAIT_TIME))
        fmt.Printf("res=%v\n", datacotainer.Push(&dataItem{"lisi",30}, MAX_WAIT_TIME))
        fmt.Printf("res=%v\n", datacotainer.Push(&dataItem{"wangwu",28}, MAX_WAIT_TIME))

        //get
        var item interface{}
        item = datacotainer.Pop(MAX_WAIT_TIME)
        if item != nil{
                if tmp,ok := item.(*dataItem); ok{      //interface转为具体类型
                        fmt.Printf("item name:%v, age:%v\n", tmp.name, tmp.age)
                }
        }
}