go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器

一、使用场景

大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。

所以需要对多个协程进行控制。

二、使用知识

1. 从一个未初始化的管道读会阻塞

2.从一个关闭的管道读不会阻塞

利用两个管道和select 进行控制

三、上代码

控制器代码

package util

import (
        "errors"
        "sync"
)

const (
        //STOP 停止
        STOP = iota
        //START 开始
        START
        //PAUSE 暂停
        PAUSE
)

//Control 控制器
type Control struct {
        ch1  chan struct{}
        ch2  chan struct{}
        stat int64
        lock sync.RWMutex
}

var (
        //ErrStat 错误状态
        ErrStat = errors.New("stat error")
)

//NewControl 获得一个新Control
func NewControl() *Control {
        return &Control{
                ch1:  make(chan struct{}),
                ch2:  nil,
                stat: START,
                lock: sync.RWMutex{},
        }
}

//Stop 停止
func (c *Control) Stop() error {
        c.lock.Lock()
        defer c.lock.Unlock()
        if c.stat == START {
                c.ch2 = nil
                close(c.ch1)
                c.stat = STOP
        } else if c.stat == PAUSE {
                ch2 := c.ch2
                c.ch2 = nil
                close(c.ch1)
                close(ch2)
                c.stat = STOP
        } else {
                return ErrStat
        }
        return nil
}

//Pause 暂停
func (c *Control) Pause() error {
        c.lock.Lock()
        defer c.lock.Unlock()
        if c.stat == START {
                c.ch2 = make(chan struct{})
                close(c.ch1)
                c.stat = PAUSE
        } else {
                return ErrStat
        }
        return nil
}

//Start 开始
func (c *Control) Start() error {
        c.lock.Lock()
        defer c.lock.Unlock()
        if c.stat == PAUSE {
                c.ch1 = make(chan struct{})
                close(c.ch2)
                c.stat = START
        } else {
                return ErrStat
        }
        return nil
}

//C 控制管道
func (c *Control) C() <-chan struct{} {
        c.lock.RLock()
        defer c.lock.RUnlock()
        return c.ch1
}

//Wait 等待
func (c *Control) Wait() bool {
        c.lock.RLock()
        ch2 := c.ch2
        c.lock.RUnlock()
        if ch2 == nil {  //通过赋值nil 发送停止推出命令
                return false
        }
        <-ch2  //会进行阻塞
        return true
}

使用代码

     for {
                select {
                case part, ok := <-c.Partitions():
                        if !ok {
                                conf.Logger.Error("get kafka Partitions not ok", regular.Name)
                                return
                        }
                        go readFromPart(c, part, regular, respChan)
                case <-regular.C():   //regular 为Control 类
                        if !regular.Wait() {
                                conf.Logger.Debug("Stop! ")
                                return
                        }
                        conf.Logger.Debug("Start! ")
                }
        }

这样就可以随时随地的控制工程中的协程

regular  := util.NewControl()
regular.Pause()
regular.Start()
regular.Stop()