go标准库的学习-sync互斥

https://studygolang.com/pkgdoc

导入方法:

import "sync"

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

本包的类型的值不应被拷贝。

Locker

type Locker interface {
    Lock()   //用来设置互斥量的上锁
    Unlock() //用来解锁互斥量
}

Locker接口代表一个可以加锁和解锁的对象。

Once

type Once struct {
    m    Mutex
    done uint32
}

Do

func (o *Once) Do(f func())

Do方法当且仅当第一次被调用时才执行函数f。换句话说,给定变量:

var once Once

如果once.Do(f)被多次调用,只有第一次调用会执行f,即使f每次调用Do 提供的f值不同。需要给每个要执行仅一次的函数都建立一个Once类型的实例。

Do用于必须刚好运行一次的初始化。因为f是没有参数的,因此可能需要使用闭包来提供给Do方法调用:

config.once.Do(func() { config.init(filename) })

因为只有f返回后Do方法才会返回,f若引起了Do的调用,会导致死锁。

举例:

package main

import (
    "fmt"
    "sync"
)

func main() {
    //声明once变量
    var once sync.Once
    //用于记录once中的onceBody函数到底调用了几次
    var count int
    onceBody := func() {
        count++
        fmt.Println("Only once", count)
    }
    done := make(chan int)
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody) //虽然被多次调用,但是只有第一次调用会执行onceBody函数
            done <- i //返回都为10,是因为只有i为10后才能到下面的接收通道处,通道的阻塞才会打开
        }()
    }
    for i := 0; i < 10; i++ {
        fmt.Println(<-done)
    }
}

返回:

userdeMBP:go-learning user$ go run test.go
Only once 1  //由此可见的确是只执行了一次
10
10
10
10
10
10
10
10
10
10

Mutex

//Mutex是一个互斥锁
// 零值为解锁状态
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema  uint32
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。

Lock

func (m *Mutex) Lock()

Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。

Unlock

func (m *Mutex) Unlock()

Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。

举例:

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine 可以访问共享资源

package main

import (
    "fmt"
    "sync"
)

var (
    // 逻辑中使用的某个变量
    count int

    // 与变量对应的使用互斥锁,一般情况下,建议将互斥锁的粒度设置得越小越好,降低因为共享访问时等待的时间
    countGuard sync.Mutex //保证修改 count 值的过程是一个原子过程,不会发生并发访问冲突
)

func GetCount() int {

    // 锁定,此时如果另外一个 goroutine 尝试继续加锁时将会发生阻塞,直到这个 countGuard 被解锁
    countGuard.Lock()

    // 在函数退出时解除锁定
    defer countGuard.Unlock()

    return count
}

func SetCount(c int) {
    countGuard.Lock()
    count = c
    countGuard.Unlock()
}

func main() {

    // 可以进行并发安全的设置
    SetCount(1)

    // 可以进行并发安全的获取
    fmt.Println(GetCount())

}

返回:

userdeMBP:go-learning user$ go run -race test.go
1
userdeMBP:go-learning user$ go run test.go
1

RWMutex

type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

RWMutex是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

Lock

func (rw *RWMutex) Lock()

Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入。

Unlock

func (rw *RWMutex) Unlock()

Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误。

RLock

func (rw *RWMutex) RLock()

RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取。

RUnlock

func (rw *RWMutex) RUnlock()

Runlock方法解除rw的读取锁状态,如果m未加读取锁会导致运行时错误。

RLocker

func (rw *RWMutex) RLocker() Locker

Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。

举例:

将上面互斥锁例子中的一部分代码修改为读写互斥锁:

var (
    // 逻辑中使用的某个变量
    count int

    // 与变量对应的使用读写互斥锁,差别就是当另一个goroutine也要读取改数据时,不会发生阻塞
    countGuard sync.RWMutex
)

func GetCount() int {

    // 锁定
    countGuard.RLock()

    // 在函数退出时解除锁定
    defer countGuard.RUnlock()

    return count
}

Cond 条件变量

cond的主要作用就是获取锁之后,wait()方法会等待一个通知,来进行下一步锁释放等操作,以此控制锁合适释放,释放频率。即使用Wait()的方法来控制何时去竞争锁,就像是使用了一个“双开关”一样

条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。

type Cond struct {
    // 在观测或更改条件时L会冻结
    L Locker
    // 包含隐藏或非导出字段
}

Cond实现了一个构建在锁的基础上的条件变量,一个线程集合地,供线程等待或者宣布某事件的发生。

每个Cond实例都有一个相关的锁Locker(一般是*Mutex或*RWMutex类型的值),它必须在改变条件时或者调用Wait方法时保持锁定,即在调用Wait()等方法前一定要先调用cond.L.Lock()先进行上锁。Cond可以创建为其他结构体的字段,Cond在开始使用后不能被拷贝。

NewCond

func NewCond(l Locker) *Cond

使用锁l创建一个*Cond,然后就可以使用结构体中的Locker,举例:cond.L.Lock()\cond.L.Unlock()

Broadcast

func (c *Cond) Broadcast()

Broadcast唤醒所有等待c的线程(即调用了Wait()而挂起的goroutine)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

Signal

func (c *Cond) Signal()

Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

Wait

func (c *Cond) Wait()

让goroutine在这个cond上进行等待。Wait自行解锁c.L并阻塞当前线程,在之后线程恢复执行时,Wait方法会在返回前锁定c.L。和其他系统不同,Wait除非被Broadcast或者Signal唤醒,不会主动返回。

因为线程中Wait方法是第一个恢复执行的,而此时c.L未加锁。调用者不应假设Wait恢复时条件已满足,相反,调用者应在循环中等待:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

举例:

package main 
import(
    "fmt"
    "sync"
    "time"
)
var locker = new(sync.Mutex)//创建一个互斥锁
var cond = sync.NewCond(locker)

func testCond(x int){
    cond.L.Lock()//上锁,该goroutine获取锁
    cond.Wait()  //在获取锁后用于等待通知,只有调用Signal()或Broadcast()后才获得通知进行下一步操作,暂时阻塞
    x += 5
    fmt.Println(x)
    time.Sleep(time.Second * 1) //等待一秒后再去释放锁
    cond.L.Unlock() //用于释放锁
}

func main() {
    for i := 0; i < 4; i++{
        go testCond(i)
    }
    fmt.Println("start test")
    time.Sleep(time.Second * 3) //等待3秒后下发一个通知给等待通知的goroutine
    cond.Signal() //下发一个通知
    time.Sleep(time.Second * 3) //等待3秒后再下发一个通知给等待通知的goroutine
    cond.Signal() //下发一个通知
    time.Sleep(time.Second * 3) //等待3秒后直接广播给所有等待通知的goroutine
    cond.Broadcast()
    time.Sleep(time.Second * 4)
}

返回:

userdeMacBook-Pro:go-learning user$ go run test.go
start test
6
5
7
8

WaitGroup

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    state1 [3]uint32
}

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。

除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务

Add

func (wg *WaitGroup) Add(delta int)

Add方法向内部计数加上delta,delta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。

Done

func (wg *WaitGroup) Done()

Done方法减少WaitGroup计数器的值,即-1,应在线程的最后执行。

Wait

func (wg *WaitGroup) Wait()

Wait方法阻塞直到WaitGroup计数器减为0。

举例说明,当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成:

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {

    // 声明一个等待组
    var wg sync.WaitGroup

    // 准备一系列的网站地址
    var urls = []string{
        "http://www.github.com/",
        "https://www.qiniu.com/",
        "https://www.golangtc.com/",
    }

    // 遍历这些地址
    for _, url := range urls {

        // 每一个任务开始时, 将等待组增加1
        wg.Add(1)

        // 开启一个并发
        go func(url string) {

            // 使用defer, 表示函数完成时将等待组值减1
            // wg.Done() 方法等效于执行 wg.Add(-1)
            defer wg.Done() 

            // 使用http访问提供的地址
            // Get() 函数会一直阻塞直到网站响应或者超时
            _, err := http.Get(url)

            //在网站响应和超时后,打印这个网站的地址和可能发生的错误
            fmt.Println(url, err)

            // 通过参数传递url地址
        }(url)
    }

    // 等待所有的网站都响应或者超时后,任务完成,Wait 就会停止阻塞。
    wg.Wait()

    fmt.Println("over")
}

返回:

userdeMBP:go-learning user$ go run test.go
https://www.golangtc.com/ <nil>
http://www.github.com/ <nil>
https://www.qiniu.com/ Get https://www.qiniu.com/: dial tcp 124.200.113.148:443: i/o timeout
over

Pool

type Pool struct {
    // 可选参数New指定一个函数在Get方法可能返回nil时来生成一个值
    // 该参数不能在调用Get方法时被修改
    New func() interface{}
    // 包含隐藏或非导出字段
}

如果Pool为空,则调用New返回一个新创建的对象。如果没有设置New,则返回nil。

Pool是一个可以分别存取的临时对象的集合,目的是用来保存和复用临时对象,以减少内存分配,降低GC压力。

Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。所以如果事先Put进去100个对象,下次Get的时候发现Pool是空也是有可能的。

这个特性的一个好处就在于不用担心Pool会一直增长,因为Go已经帮你在Pool中做了回收机制。

这个清理过程是在每次垃圾回收GC之前做的。垃圾回收GC是固定两分钟触发一次。

而且每次清理会将Pool中的所有对象都清理掉!

Pool可以安全的被多个线程同时使用。

Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。

Pool的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。

Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。

另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

Get

func (p *Pool) Get() interface{}

Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。Get方法也可能选择无视内存池,将其当作空的。调用者不应认为Get的返回这和传递给Put的值之间有任何关系。

假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。

Put

func (p *Pool) Put(x interface{})

Put方法将x放入池中。

举一个最简单的例子:

package main 
import(
    "fmt"
    "sync"
)


func main() {
    //创建一个对象,如果pool为空,就调用该New;如果没有定义New,则返回nil
    pipe := &sync.Pool{
        New: func() interface{} {
            return "hello ,i am New"
            },
    }
    //在pool中放入字符串
    pipe.Put("put some string to Pool")
    //然后将其取出
    fmt.Println(pipe.Get())
    //如果再取就没有了,会自动调用New
    fmt.Println(pipe.Get())
}

返回:

userdeMacBook-Pro:go-learning user$ go run test.go
put some string to Pool
hello ,i am New

如果没有New且Pool为空,Get()返回<nil>:

package main 
import(
    "fmt"
    "sync"
)


func main() {
    //创建一个对象,如果pool为空,就调用该New;如果没有定义New,则返回nil
    pipe := &sync.Pool{}
    //然后将其取出,因为没有New,且pool为空,返回nil
    fmt.Println(pipe.Get()) //<nil>
}

1、缓存对象的数量和期限

1)pool创建的时候是不能指定大小的,所有sync.Pool的缓存对象数量是没有限制的(只受限于内存),因此使用sync.pool是没办法做到控制缓存对象数量的个数的。

2)而且sync.Pool缓存的期限只是两次gc之间这段时间,举例说明:

package main 
import(
    "fmt"
    "sync"
)


func main() {
    //创建一个对象,如果pool为空,就调用该New;如果没有定义New,则返回nil
    pipe := &sync.Pool{
        New: func() interface{} {
            return "hello ,New"
        },
    }
    fmt.Println(pipe.Get())//hello ,New
    pipe.Put("hello, put")
    fmt.Println(pipe.Get())//hello, put
}

但是如果中间使用了runtime.GC(),缓存会被清空,那么结果就会变成:

package main 
import(
    "fmt"
    "sync"
    "runtime"
)

func main() {
    //创建一个对象,如果pool为空,就调用该New;如果没有定义New,则返回nil
    pipe := &sync.Pool{
        New: func() interface{} {
            return "hello ,New"
        },
    }
    fmt.Println(pipe.Get())//hello ,New
    pipe.Put("hello, put")
    runtime.GC() //作用是GC执行一次垃圾回收
    fmt.Println(pipe.Get())//hello ,New
}