Redigo学习笔记

最近想要深入学习一下Redis, 在了解了Redis基本使用方法之后, 通过学习一个Golang语言的Redis客户端库Redigo, 来进一步加深对Redis命令和协议的理解.

概述

Redigo是Redis的golang版客户端, 支持Redis的所有命令. 官方文档在这里.

实现

基本API

Redigo API的调用方式与redis-cli非常像:

1
n, err := conn.Do("APPEND", "key", "value")

通过Conn接口来操作Redis.

1
2
3
4
5
6
7
8
type Conn interface {
Close() error
Err() error
Do(commandName string, args ...interface{}) (reply interface{}, err error)
Send(commandName string, args ...interface{}) error
Flush() error
Receive() (reply interface{}, err error)
}

该接口由conn struct实现. 其中的Do()方法等价于Send() -> Flush() -> Receive(). 之所以额外声明这3个接口, 主要是为了满足Pipeline功能和发布订阅功能的需要.

可以通过Dial()函数或NewConn()函数创建Conn实例:

1
2
func Dial(network, address string, options ...DialOption) (Conn, error) // 一般用这个
func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn // 更底层的创建方式

其中比较关键的部分是Redis协议处理和参数类型处理. 在发送过程中, 这两个关键点封装在writeCommand()函数中. (具体请参考Redis通信协议)

1
2
3
4
5
6
7
8
9
10
11
12
func (c *conn) writeCommand(cmd string, args []interface{}) error {
c.writeLen('*', 1+len(args)) // *<参数数量> CR LF
if err := c.writeString(cmd); err != nil {
return err
}
for _, arg := range args {
if err := c.writeArg(arg, true); err != nil { // $<参数arg的字节数量> CR LF <参数arg的数据> CR LF
return err
}
}
return nil
}

在writeArg()函数中, 将传入参数转换为对应的string数据然后写入.

在接收过程中, 按照Redis协议回复的不同类型进行处理

1
2
3
4
5
6
7
8
9
10
func (c *conn) readReply() (interface{}, error) {
...
switch line[0] {
case '+': ... // 状态回复
case '-': ... // 错误回复
case ':': ... // 整数回复
case '$': ... // 批量回复
case '*': ... // 多条批量回复, 递归调用readReply()
...
}

在reply.go中提供了一些类型转换函数, 便于直接将interface{}类型的reply转换为需要的类型.

Pipeline

Pipeline主要借助Send(), Flush(), Receive()这三个函数实现. 具体用法在官方文档中也有给出, 如下:

1
2
3
4
c.Send("SET", "foo", "bar")
c.Send("GET", "foo")
c.Flush()
v, err = c.Receive() // reply from GET

事务是基于Pipeline的, 只需要在开始和结尾使用MULTI和EXEC命令.

1
2
3
4
c.Send("MULTI")
c.Send("INCR", "foo")
c.Send("INCR", "bar")
r, err := c.Do("EXEC")

连接池

Redigo提供了一个简单的连接池实现. 主要配置参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Pool struct {
Dial func() (Conn, error) // 创建连接的函数
TestOnBorrow func(c Conn, t time.Time) error // 在取用连接时进行测试的钩子函数
MaxIdle int // 最大空闲连接数
MaxActive int // 最大连接数, 如无限制则设置为0
IdleTimeout time.Duration // 空闲超时时间
Wait bool // Get()是否阻塞
MaxConnLifetime time.Duration // 关闭连接的最大等待时间
// 内部相关参数
chInitialized uint32 // ch通道初始化标识位
mu sync.Mutex // 锁
closed bool // 连接池关闭标识位
active int // 连接池中可用连接数
ch chan struct{} // 当Wait == true时, 用于实现阻塞队列的通道
idle idleList // 空闲连接列表
}

通过NewPool()工厂函数创建连接池对象.

Pool对象中持有一个idleList空闲连接列表 (双向链表) 来维护空闲连接.

通过连接池获取连接需要调用Pool的Get()方法, 该方法会返回一个Conn实例:

1
2
3
4
5
6
7
func (p *Pool) Get() Conn {
pc, err := p.get(nil)
if err != nil {
return errorConn{err}
}
return &activeConn{p: p, pc: pc}
}

这个Conn实例内部包含了指向连接池的指针和指向连接池中连接的指针, 同时包含了一个连接状态.

1
2
3
4
5
type activeConn struct {
p *Pool
pc *poolConn
state int
}

连接池的示意图如下.

Redigo_Pool_idleList

获取连接时, 需要根据Pool的Wait和MaxActive参数进行判断. 如果是阻塞式获取 (Wait == true), 且设置了最大连接数 (MaxActive > 0), 则需要阻塞等待, 直到连接数小于MaxActive时, 才能获取到连接. 获取连接时, 从空闲连接双向链表的首部获取 (popFront()方法).

当连接使用完以后, 需要调用Close()方法释放连接. 由于是从连接池中获取的连接, 因此这个操作并不是真正释放连接, 而是对连接进行清理之后, 将其重新放回连接池. activeConn中的state属性标识了连接的状态, 主要作用就是对一些命令, 在释放连接时需要做一些额外的清理工作, 比如利用连接池中的连接进行Pipeline操作, 或者订阅操作时, 在释放连接时就需要调用DISCARD, UNSUBSCRIBE等命令进行清理, 然后才能将连接放回连接池中.

发布订阅

通过Send()方法发送一个SUBSCRIBE命令, 向某个频道发起订阅, 然后循环调用Receive()方法接收事件即可.

Lua脚本

Redigo提供了一个Script结构, 封装了执行Lua脚本需要的一些方法.

1
2
3
4
5
6
7
8
9
type Script struct {
keyCount int
src string
hash string
}
func (s *Script) Load(c Conn) error // SCRIPT LOAD src
func (s *Script) Send(c Conn, keysAndArgs ...interface{}) error // EVAL src args, 并用keyCount进行参数个数校验
func (s *Script) SendHash(c Conn, keysAndArgs ...interface{}) error // EVALSHA hash args, , 并用keyCount进行参数个数校验

参考资料