go使用go-redis操作redis 连接类型,pipline, 发布订阅

内容:

一 . 客户端Client(普通模式,主从模式,哨兵模式)

二. conn连接(连接, pipline, 发布订阅等)

三. 示例程序(连接, pipline, 发布订阅等)

客户端

Client 普通模式的客户端

go redis依据用途提供了多种客户端创建的函数, 如下:

func NewClient(opt *Options) *Client

func NewFailoverClient(failoverOpt *FailoverOptions) *Client

func (c *Client) Context() context.Context

func (c *Client) Do(args ...interface{}) *Cmd

func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd

func (c *Client) Options() *Options

func (c *Client) PSubscribe(channels ...string) *PubSub

func (c *Client) Pipeline() Pipeliner

func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)

func (c *Client) PoolStats() *PoolStats

func (c *Client) Process(cmd Cmder) error

func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error

func (c *Client) SetLimiter(l Limiter) *Client

func (c *Client) Subscribe(channels ...string) *PubSub

func (c *Client) TxPipeline() Pipeliner

func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)

func (c *Client) Watch(fn func(*Tx) error, keys ...string) error

func (c *Client) WithContext(ctx context.Context) *Client

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

NewClient 创建一个普通连接

NewFailoverClient 具有故障检测以及故障转移的client

PSubscribe / Subscribe 发布订阅模式的client

Pipeline 启用pipline管道模式的client

PoolStats 连接池状态

Close 关闭连接

集群模式的ClusterClient

func NewClusterClient(opt *ClusterOptions) *ClusterClient

func (c *ClusterClient) Close() error

func (c *ClusterClient) Context() context.Context

func (c *ClusterClient) DBSize() *IntCmd

func (c *ClusterClient) Do(args ...interface{}) *Cmd

func (c *ClusterClient) DoContext(ctx context.Context, args ...interface{}) *Cmd

func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error

func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error

func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error

func (c *ClusterClient) Options() *ClusterOptions

func (c *ClusterClient) PSubscribe(channels ...string) *PubSub

func (c *ClusterClient) Pipeline() Pipeliner

func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error)

func (c *ClusterClient) PoolStats() *PoolStats

func (c *ClusterClient) Process(cmd Cmder) error

func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error

func (c *ClusterClient) ReloadState() error

func (c *ClusterClient) Subscribe(channels ...string) *PubSub

func (c *ClusterClient) TxPipeline() Pipeliner

func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error)

func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error

func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

方法与client大致相同

哨兵SentinelClient

func NewSentinelClient(opt *Options) *SentinelClient

func (c *SentinelClient) CkQuorum(name string) *StringCmd

func (c SentinelClient) Close() error

func (c *SentinelClient) Context() context.Context

func (c *SentinelClient) Failover(name string) *StatusCmd

func (c *SentinelClient) FlushConfig() *StatusCmd

func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd

func (c *SentinelClient) Master(name string) *StringStringMapCmd

func (c *SentinelClient) Masters() *SliceCmd

func (c *SentinelClient) Monitor(name, ip, port, quorum string) *StringCmd

func (c *SentinelClient) PSubscribe(channels ...string) *PubSub

func (c *SentinelClient) Ping() *StringCmd

func (c *SentinelClient) Process(cmd Cmder) error

func (c *SentinelClient) ProcessContext(ctx context.Context, cmd Cmder) error

func (c *SentinelClient) Remove(name string) *StringCmd

func (c *SentinelClient) Reset(pattern string) *IntCmd

func (c *SentinelClient) Sentinels(name string) *SliceCmd

func (c *SentinelClient) Set(name, option, value string) *StringCmd

func (c *SentinelClient) Slaves(name string) *SliceCmd

func (c SentinelClient) String() string

func (c *SentinelClient) Subscribe(channels ...string) *PubSub

func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

连接:

创建量客户端之后,需要与redis建立连接才能够发送请求进行使用

在连接层, 提供了很多封装后的函数

普通连接Conn

例如:

涉及很多函数, 函数名称基本可以通过redis的命令进行类比找到,不一一列举

func (c Conn) Append(key, value string) *IntCmd

func (c Conn) Auth(password string) *StatusCmd

func (c Conn) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd

func (c Conn) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd

func (c Conn) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd

func (c Conn) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd

func (c Conn) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd

func (c Conn) BgRewriteAOF() *StatusCmd

func (c Conn) BgSave() *StatusCmd

func (c Conn) BitCount(key string, bitCount *BitCount) *IntCmd

func (c Conn) BitField(key string, args ...interface{}) *IntSliceCmd

func (c Conn) BitOpAnd(destKey string, keys ...string) *IntCmd

....

func (c Conn) HKeys(key string) *StringSliceCmd

func (c Conn) HLen(key string) *IntCmd

func (c Conn) HMGet(key string, fields ...string) *SliceCmd

func (c Conn) HMSet(key string, fields map[string]interface{}) *StatusCmd

func (c Conn) HScan(key string, cursor uint64, match string, count int64) *ScanCmd

func (c Conn) HSet(key, field string, value interface{}) *BoolCmd

func (c Conn) HSetNX(key, field string, value interface{}) *BoolCmd

func (c Conn) HVals(key string) *StringSliceCmd

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

Pipeline

pipline 管道批量执行命令,可以节约带宽

提供的方法与conn基本一致

func (c Pipeline) Append(key, value string) *IntCmd

func (c Pipeline) Auth(password string) *StatusCmd

func (c Pipeline) BLPop(timeout time.Duration, keys ...string) *StringSliceCmd

func (c Pipeline) BRPop(timeout time.Duration, keys ...string) *StringSliceCmd

func (c Pipeline) BRPopLPush(source, destination string, timeout time.Duration) *StringCmd

func (c Pipeline) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd

func (c Pipeline) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd

func (c Pipeline) BgRewriteAOF() *StatusCmd

.....

1

2

3

4

5

6

7

8

9

10

11

12

PubSub

发布订阅模式

func (c *PubSub) Channel() <-chan *Message

func (c *PubSub) ChannelSize(size int) <-chan *Message

func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{}

func (c *PubSub) Close() error

func (c *PubSub) PSubscribe(patterns ...string) error

func (c *PubSub) PUnsubscribe(patterns ...string) error

func (c *PubSub) Ping(payload ...string) error

func (c *PubSub) Receive() (interface{}, error)

func (c *PubSub) ReceiveMessage() (*Message, error)

func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error)

func (c *PubSub) String() string

func (c *PubSub) Subscribe(channels ...string) error

func (c *PubSub) Unsubscribe(channels ...string) error

1

2

3

4

5

6

7

8

9

10

11

12

13

可以建立channel Channel()

设置超时时间ReceiveTimeout

Subscribe订阅

Unsubscribe取消订阅

PSubscribe 发布消息等

排他 TX

func (c Tx) Get(key string) *StringCmd

func (c Tx) GetBit(key string, offset int64) *IntCmd

func (c Tx) GetRange(key string, start, end int64) *StringCmd

func (c Tx) GetSet(key string, value interface{}) *StringCmd

func (c Tx) HDel(key string, fields ...string) *IntCmd

func (c Tx) HExists(key, field string) *BoolCmd

func (c Tx) HGet(key, field string) *StringCmd

func (c Tx) HGetAll(key string) *StringStringMapCmd

func (c Tx) HIncrBy(key, field string, incr int64) *IntCmd

func (c Tx) HIncrByFloat(key, field string, incr float64) *FloatCmd

func (c Tx) HKeys(key string) *StringSliceCmd

1

2

3

4

5

6

7

8

9

10

11

在conn的基础上加入排他性功能

提供的函数及方法与conn基本相同, 可以参考conn进行使用

示例程序

示例:

func ExampleClient() {

redisdb := redis.NewClient(&redis.Options{

Addr: "192.168.137.18:6379",

Password: "", // no password set

DB: 0, // use default DB

})

err := redisdb.Set("key", "value", 0).Err()

if err != nil {

panic(err)

}

val, err := redisdb.Get("key").Result()

if err != nil {

panic(err)

}

fmt.Println("key", val)

val2, err := redisdb.Get("missing_key").Result()

if err == redis.Nil {

fmt.Println("missing_key does not exist")

} else if err != nil {

panic(err)

} else {

fmt.Println("missing_key", val2)

}

}

func main() {

ExampleClient()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

输出:

key value

missing_key does not exist

1

2

NewClient创建连接:

redisdb := redis.NewClient(&redis.Options{

Addr: "localhost:6379", // use default Addr

Password: "", // no password set

DB: 0, // use default DB

})

pong, err := redisdb.Ping().Result()

fmt.Println(pong, err)

1

2

3

4

5

6

7

8

如果使用tls/ssl 则在Options参数 TLSConfig *tls.Config 中指定

主从故障转移NewFailoverClient

redisdb := redis.NewFailoverClient(&redis.FailoverOptions{

MasterName: "master",

SentinelAddrs: []string{":26379"},

})

redisdb.Ping()

1

2

3

4

5

集群NewClusterClient

redisdb := redis.NewClusterClient(&redis.ClusterOptions{

Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

})

redisdb.Ping()

1

2

3

4

url样式建立连接 ParseURL

opt, err := redis.ParseURL("redis://:qwerty@localhost:6379/1")

if err != nil {

panic(err)

}

fmt.Println("addr is", opt.Addr)

fmt.Println("db is", opt.DB)

fmt.Println("password is", opt.Password)

// Create client as usually.

_ = redis.NewClient(opt)

1

2

3

4

5

6

7

8

9

10

pipline

pipe := redisdb.Pipeline()

incr := pipe.Incr("pipeline_counter")

pipe.Expire("pipeline_counter", time.Hour)

// Execute

//

// INCR pipeline_counter

// EXPIRE pipeline_counts 3600

//

// using one redisdb-server roundtrip.

_, err := pipe.Exec()

fmt.Println(incr.Val(), err)

1

2

3

4

5

6

7

8

9

10

11

12

13

Pipelined

var incr *redis.IntCmd

_, err := redisdb.Pipelined(func(pipe redis.Pipeliner) error {

incr = pipe.Incr("pipelined_counter")

pipe.Expire("pipelined_counter", time.Hour)

return nil

})

fmt.Println(incr.Val(), err)

1

2

3

4

5

6

7

排他TxPipeline

pipe := redisdb.TxPipeline()

incr := pipe.Incr("tx_pipeline_counter")

pipe.Expire("tx_pipeline_counter", time.Hour)

// Execute

//

// MULTI

// INCR pipeline_counter

// EXPIRE pipeline_counts 3600

// EXEC

//

// using one redisdb-server roundtrip.

_, err := pipe.Exec()

fmt.Println(incr.Val(), err)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

rdb := redis.NewClient(&redis.Options{

Addr: ":6379",

})

rdb.AddHook(redisHook{})

rdb.Pipelined(func(pipe redis.Pipeliner) error {

pipe.Ping()

pipe.Ping()

return nil

})

1

2

3

4

5

6

7

8

9

10

发布订阅

sub := client.Subscribe(queryResp)

iface, err := sub.Receive()

if err != nil {

// handle error

}

// Should be *Subscription, but others are possible if other actions have been

// taken on sub since it was created.

switch iface.(type) {

case *Subscription:

// subscribe succeeded

case *Message:

// received first message

case *Pong:

// pong received

default:

// handle error

}

ch := sub.Channel()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

示例程序

package main

import (

"github.com/go-redis/redis"

"time"

"fmt"

)

func ExampleClient() {

redisdb := redis.NewClient(&redis.Options{

Addr: "192.168.137.18:6379",

Password: "", // no password set

DB: 0, // use default DB

})

//rdb.AddHook()

pubsub := redisdb.Subscribe("mychannel1")

// Wait for confirmation that subscription is created before publishing anything.

_, err := pubsub.Receive()

if err != nil {

panic(err)

}

// Go channel which receives messages.

ch := pubsub.Channel()

// Publish a message.

err = redisdb.Publish("mychannel1", "hello").Err()

if err != nil {

panic(err)

}

time.AfterFunc(time.Second, func() {

// When pubsub is closed channel is closed too.

_ = pubsub.Close()

})

// Consume messages.

for msg := range ch {

fmt.Println(msg.Channel, msg.Payload)

}

}

func main() {

ExampleClient()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

输出:

mychannel1 hello

1

示例2

func ExampleClient2() {

redisdb := redis.NewClient(&redis.Options{

Addr: "192.168.137.18:6379",

Password: "", // no password set

DB: 0, // use default DB

})

//rdb.AddHook()

pubsub := redisdb.Subscribe("mychannel2")

defer pubsub.Close()

for i := 0; i < 2; i++ {

// ReceiveTimeout is a low level API. Use ReceiveMessage instead.

msgi, err := pubsub.ReceiveTimeout(time.Second)

if err != nil {

break

}

switch msg := msgi.(type) {

case *redis.Subscription:

fmt.Println("subscribed to", msg.Channel)

_, err := redisdb.Publish("mychannel2", "hello").Result()

if err != nil {

panic(err)

}

case *redis.Message:

fmt.Println("received", msg.Payload, "from", msg.Channel)

default:

panic("unreached")

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

输出:

subscribed to mychannel2

received hello from mychannel2

————————————————

版权声明:本文为CSDN博主「comprel」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/comprel/article/details/96716708