开始使用

配置

进入项目中的configs目录,打开redis.toml,我们可以看到:

  1. [Client]
  2. name = "kratos-demo"
  3. proto = "tcp"
  4. addr = "127.0.0.1:6389"
  5. idle = 10
  6. active = 10
  7. dialTimeout = "1s"
  8. readTimeout = "1s"
  9. writeTimeout = "1s"
  10. idleTimeout = "10s"

在该配置文件中我们可以配置redis的连接方式proto、连接地址addr、连接池的闲置连接数idle、最大连接数active以及各类超时。

初始化

进入项目的internal/dao目录,打开redis.go,其中:

  1. var cfg struct {
  2. Client *memcache.Config
  3. }
  4. checkErr(paladin.Get("redis.toml").UnmarshalTOML(&rc))

使用paladin配置管理工具将上文中的redis.toml中的配置解析为我们需要使用的配置。

  1. // Dao dao.
  2. type Dao struct {
  3. redis *redis.Pool
  4. redisExpire int32
  5. }

在dao的主结构提中定义了redis的连接池对象和过期时间。

  1. d = &dao{
  2. // redis
  3. redis: redis.NewPool(rc.Demo),
  4. redisExpire: int32(time.Duration(rc.DemoExpire) / time.Second),
  5. }

使用kratos/pkg/cache/redis包的NewPool方法进行连接池对象的初始化,需要传入上文解析的配置。

Ping

  1. // Ping ping the resource.
  2. func (d *dao) Ping(ctx context.Context) (err error) {
  3. return d.pingRedis(ctx)
  4. }
  5. func (d *dao) pingRedis(ctx context.Context) (err error) {
  6. conn := d.redis.Get(ctx)
  7. defer conn.Close()
  8. if _, err = conn.Do("SET", "ping", "pong"); err != nil {
  9. log.Error("conn.Set(PING) error(%v)", err)
  10. }
  11. return
  12. }

生成的dao层模板中自带了redis相关的ping方法,用于为负载均衡服务的健康监测提供依据,详见blademaster

关闭

  1. // Close close the resource.
  2. func (d *Dao) Close() {
  3. d.redis.Close()
  4. }

在关闭dao层时,通过调用redis连接池对象的Close方法,我们可以关闭该连接池,从而释放相关资源。

常用方法

发送单个命令 Do

  1. // DemoIncrby .
  2. func (d *dao) DemoIncrby(c context.Context, pid int) (err error) {
  3. cacheKey := keyDemo(pid)
  4. conn := d.redis.Get(c)
  5. defer conn.Close()
  6. if _, err = conn.Do("INCRBY", cacheKey, 1); err != nil {
  7. log.Error("DemoIncrby conn.Do(INCRBY) key(%s) error(%v)", cacheKey, err)
  8. }
  9. return
  10. }

如上为向redis server发送单个命令的用法示意。这里需要使用redis连接池的Get方法获取一个redis连接conn,再使用conn.Do方法即可发送一条指令。 注意,在使用该连接完毕后,需要使用conn.Close方法将该连接关闭。

批量发送命令 Pipeline

kratos/pkg/cache/redis包除了支持发送单个命令,也支持批量发送命令(redis pipeline),比如:

  1. // DemoIncrbys .
  2. func (d *dao) DemoIncrbys(c context.Context, pid int) (err error) {
  3. cacheKey := keyDemo(pid)
  4. conn := d.redis.Get(c)
  5. defer conn.Close()
  6. if err = conn.Send("INCRBY", cacheKey, 1); err != nil {
  7. return
  8. }
  9. if err = conn.Send("EXPIRE", cacheKey, d.redisExpire); err != nil {
  10. return
  11. }
  12. if err = conn.Flush(); err != nil {
  13. log.Error("conn.Flush error(%v)", err)
  14. return
  15. }
  16. for i := 0; i < 2; i++ {
  17. if _, err = conn.Receive(); err != nil {
  18. log.Error("conn.Receive error(%v)", err)
  19. return
  20. }
  21. }
  22. return
  23. }

和发送单个命令类似地,这里需要使用redis连接池的Get方法获取一个redis连接conn,在使用该连接完毕后,需要使用conn.Close方法将该连接关闭。

这里使用conn.Send方法将命令写入客户端的buffer(缓冲区)中,使用conn.Flush将客户端的缓冲区内的命令打包发送到redis server。redis server按顺序返回的reply可以使用conn.Receive方法进行接收和处理。

返回值转换

kratos/pkg/cache/redis包中也提供了Scan方法将redis server的返回值转换为golang类型。

除此之外,kratos/pkg/cache/redis包提供了大量返回值转换的快捷方式:

单个查询

单个查询可以使用redis.Uint64/Int64/Float64/Int/String/Bool/Bytes进行返回值的转换,比如:

  1. // GetDemo get
  2. func (d *Dao) GetDemo(ctx context.Context, key string) (string, error) {
  3. conn := d.redis.Get(ctx)
  4. defer conn.Close()
  5. return redis.String(conn.Do("GET", key))
  6. }

批量查询

批量查询时候,可以使用redis.Int64s,Ints,Strings,ByteSlices方法转换如MGET,HMGET,ZRANGE,SMEMBERS等命令的返回值。 还可以使用StringMap, IntMap, Int64Map方法转换HGETALL命令的返回值,比如:

  1. // HGETALLDemo get
  2. func (d *Dao) HGETALLDemo(c context.Context, pid int64) (res map[string]int64, err error) {
  3. var (
  4. key = keyDemo(pid)
  5. conn = d.redis.Get(c)
  6. )
  7. defer conn.Close()
  8. if res, err = redis.Int64Map(conn.Do("HGETALL", key)); err != nil {
  9. log.Error("HGETALL %v failed error(%v)", key, err)
  10. }
  11. return
  12. }

扩展阅读

memcache模块说明