720 lines
20 KiB
Go
720 lines
20 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// RedisRepo Redis接口
|
|
type RedisRepo interface {
|
|
// Client 获取原生客户端
|
|
Client() *redis.Client
|
|
|
|
// String 操作
|
|
Set(ctx context.Context, key, value string, ttl time.Duration) error
|
|
Get(ctx context.Context, key string) (string, error)
|
|
GetSet(ctx context.Context, key, value string) (string, error)
|
|
SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error)
|
|
MGet(ctx context.Context, keys ...string) ([]interface{}, error)
|
|
MSet(ctx context.Context, pairs ...interface{}) error
|
|
|
|
// Key 操作
|
|
Del(ctx context.Context, keys ...string) (int64, error)
|
|
Exists(ctx context.Context, keys ...string) (int64, error)
|
|
Expire(ctx context.Context, key string, ttl time.Duration) (bool, error)
|
|
ExpireAt(ctx context.Context, key string, tm time.Time) (bool, error)
|
|
TTL(ctx context.Context, key string) (time.Duration, error)
|
|
Rename(ctx context.Context, key, newKey string) error
|
|
|
|
// 计数器操作
|
|
Incr(ctx context.Context, key string) (int64, error)
|
|
IncrBy(ctx context.Context, key string, value int64) (int64, error)
|
|
Decr(ctx context.Context, key string) (int64, error)
|
|
DecrBy(ctx context.Context, key string, value int64) (int64, error)
|
|
|
|
// Hash 操作
|
|
HSet(ctx context.Context, key string, values ...interface{}) error
|
|
HGet(ctx context.Context, key, field string) (string, error)
|
|
HGetAll(ctx context.Context, key string) (map[string]string, error)
|
|
HDel(ctx context.Context, key string, fields ...string) (int64, error)
|
|
HExists(ctx context.Context, key, field string) (bool, error)
|
|
HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error)
|
|
HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
|
|
HKeys(ctx context.Context, key string) ([]string, error)
|
|
HLen(ctx context.Context, key string) (int64, error)
|
|
|
|
// List 操作
|
|
LPush(ctx context.Context, key string, values ...interface{}) (int64, error)
|
|
RPush(ctx context.Context, key string, values ...interface{}) (int64, error)
|
|
LPop(ctx context.Context, key string) (string, error)
|
|
RPop(ctx context.Context, key string) (string, error)
|
|
BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)
|
|
BRPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)
|
|
LLen(ctx context.Context, key string) (int64, error)
|
|
LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
|
|
|
|
// Set 操作
|
|
SAdd(ctx context.Context, key string, members ...interface{}) (int64, error)
|
|
SMembers(ctx context.Context, key string) ([]string, error)
|
|
SIsMember(ctx context.Context, key string, member interface{}) (bool, error)
|
|
SRem(ctx context.Context, key string, members ...interface{}) (int64, error)
|
|
SCard(ctx context.Context, key string) (int64, error)
|
|
|
|
// ZSet 操作
|
|
ZAdd(ctx context.Context, key string, members ...redis.Z) (int64, error)
|
|
ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
|
|
ZRangeWithScores(ctx context.Context, key string, start, stop int64) ([]redis.Z, error)
|
|
ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error)
|
|
ZRem(ctx context.Context, key string, members ...interface{}) (int64, error)
|
|
ZCard(ctx context.Context, key string) (int64, error)
|
|
ZScore(ctx context.Context, key, member string) (float64, error)
|
|
|
|
// Pipeline 操作
|
|
Pipeline() redis.Pipeliner
|
|
TxPipeline() redis.Pipeliner
|
|
|
|
// Pub/Sub 操作
|
|
Publish(ctx context.Context, channel string, message interface{}) error
|
|
Subscribe(ctx context.Context, channels ...string) *redis.PubSub
|
|
|
|
// 健康检查
|
|
Ping(ctx context.Context) error
|
|
Info(ctx context.Context, section ...string) (string, error)
|
|
|
|
// 连接管理
|
|
Close() error
|
|
PoolStats() *redis.PoolStats
|
|
}
|
|
|
|
// RedisConfig Redis配置
|
|
type RedisConfig struct {
|
|
// 基础配置
|
|
Addr string `yaml:"addr" json:"addr"` // 地址,如: localhost:6379
|
|
Password string `yaml:"password" json:"password"` // 密码
|
|
DB int `yaml:"db" json:"db"` // 数据库编号
|
|
|
|
// 连接池配置
|
|
PoolSize int `yaml:"pool_size" json:"pool_size"` // 最大连接数
|
|
MinIdleConns int `yaml:"min_idle_conns" json:"min_idle_conns"` // 最小空闲连接数
|
|
|
|
// 超时配置
|
|
DialTimeout time.Duration `yaml:"dial_timeout" json:"dial_timeout"` // 连接超时
|
|
ReadTimeout time.Duration `yaml:"read_timeout" json:"read_timeout"` // 读超时
|
|
WriteTimeout time.Duration `yaml:"write_timeout" json:"write_timeout"` // 写超时
|
|
PoolTimeout time.Duration `yaml:"pool_timeout" json:"pool_timeout"` // 连接池超时
|
|
|
|
// 重试配置
|
|
MaxRetries int `yaml:"max_retries" json:"max_retries"` // 最大重试次数
|
|
MinRetryBackoff time.Duration `yaml:"min_retry_backoff" json:"min_retry_backoff"` // 最小重试间隔
|
|
MaxRetryBackoff time.Duration `yaml:"max_retry_backoff" json:"max_retry_backoff"` // 最大重试间隔
|
|
|
|
// 连接存活
|
|
ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time" json:"conn_max_idle_time"` // 最大空闲时间
|
|
ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" json:"conn_max_lifetime"` // 最大生命周期
|
|
}
|
|
|
|
// DefaultRedisConfig 默认配置
|
|
func DefaultRedisConfig() *RedisConfig {
|
|
return &RedisConfig{
|
|
Addr: "localhost:6379",
|
|
Password: "",
|
|
DB: 0,
|
|
PoolSize: 100,
|
|
MinIdleConns: 10,
|
|
DialTimeout: 5 * time.Second,
|
|
ReadTimeout: 3 * time.Second,
|
|
WriteTimeout: 3 * time.Second,
|
|
PoolTimeout: 4 * time.Second,
|
|
MaxRetries: 3,
|
|
MinRetryBackoff: 8 * time.Millisecond,
|
|
MaxRetryBackoff: 512 * time.Millisecond,
|
|
ConnMaxIdleTime: 5 * time.Minute,
|
|
ConnMaxLifetime: 30 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// redisRepo Redis实现
|
|
type redisRepo struct {
|
|
client *redis.Client
|
|
config *RedisConfig
|
|
mu sync.RWMutex
|
|
closed bool
|
|
}
|
|
|
|
// NewRedis 创建Redis实例
|
|
func NewRedis(cfg *RedisConfig) (RedisRepo, error) {
|
|
if cfg == nil {
|
|
cfg = DefaultRedisConfig()
|
|
}
|
|
|
|
// 合并默认配置
|
|
mergeDefaultConfig(cfg)
|
|
|
|
// 创建客户端
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: cfg.Addr,
|
|
Password: cfg.Password,
|
|
DB: cfg.DB,
|
|
PoolSize: cfg.PoolSize,
|
|
MinIdleConns: cfg.MinIdleConns,
|
|
DialTimeout: cfg.DialTimeout,
|
|
ReadTimeout: cfg.ReadTimeout,
|
|
WriteTimeout: cfg.WriteTimeout,
|
|
PoolTimeout: cfg.PoolTimeout,
|
|
MaxRetries: cfg.MaxRetries,
|
|
MinRetryBackoff: cfg.MinRetryBackoff,
|
|
MaxRetryBackoff: cfg.MaxRetryBackoff,
|
|
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
|
|
ConnMaxLifetime: cfg.ConnMaxLifetime,
|
|
})
|
|
|
|
// Ping 测试连接
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
return nil, fmt.Errorf("redis ping 失败: %w", err)
|
|
}
|
|
|
|
return &redisRepo{
|
|
client: client,
|
|
config: cfg,
|
|
closed: false,
|
|
}, nil
|
|
}
|
|
|
|
// Client 获取原生客户端
|
|
func (r *redisRepo) Client() *redis.Client {
|
|
return r.client
|
|
}
|
|
|
|
// ========== String 操作 ==========
|
|
|
|
// Set 设置值
|
|
func (r *redisRepo) Set(ctx context.Context, key, value string, ttl time.Duration) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Set(ctx, key, value, ttl).Err()
|
|
}
|
|
|
|
// Get 获取值
|
|
func (r *redisRepo) Get(ctx context.Context, key string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
val, err := r.client.Get(ctx, key).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// GetSet 设置新值并返回旧值
|
|
func (r *redisRepo) GetSet(ctx context.Context, key, value string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
val, err := r.client.GetSet(ctx, key, value).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// SetNX 仅当key不存在时设置
|
|
func (r *redisRepo) SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
return r.client.SetNX(ctx, key, value, ttl).Result()
|
|
}
|
|
|
|
// MGet 批量获取
|
|
func (r *redisRepo) MGet(ctx context.Context, keys ...string) ([]interface{}, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.MGet(ctx, keys...).Result()
|
|
}
|
|
|
|
// MSet 批量设置
|
|
func (r *redisRepo) MSet(ctx context.Context, pairs ...interface{}) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.MSet(ctx, pairs...).Err()
|
|
}
|
|
|
|
// ========== Key 操作 ==========
|
|
|
|
// Del 删除键
|
|
func (r *redisRepo) Del(ctx context.Context, keys ...string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.Del(ctx, keys...).Result()
|
|
}
|
|
|
|
// Exists 检查键是否存在
|
|
func (r *redisRepo) Exists(ctx context.Context, keys ...string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.Exists(ctx, keys...).Result()
|
|
}
|
|
|
|
// Expire 设置过期时间
|
|
func (r *redisRepo) Expire(ctx context.Context, key string, ttl time.Duration) (bool, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
return r.client.Expire(ctx, key, ttl).Result()
|
|
}
|
|
|
|
// ExpireAt 设置过期时间点
|
|
func (r *redisRepo) ExpireAt(ctx context.Context, key string, tm time.Time) (bool, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
return r.client.ExpireAt(ctx, key, tm).Result()
|
|
}
|
|
|
|
// TTL 获取剩余时间
|
|
func (r *redisRepo) TTL(ctx context.Context, key string) (time.Duration, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.TTL(ctx, key).Result()
|
|
}
|
|
|
|
// Rename 重命名键
|
|
func (r *redisRepo) Rename(ctx context.Context, key, newKey string) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Rename(ctx, key, newKey).Err()
|
|
}
|
|
|
|
// ========== 计数器操作 ==========
|
|
|
|
// Incr 自增1
|
|
func (r *redisRepo) Incr(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.Incr(ctx, key).Result()
|
|
}
|
|
|
|
// IncrBy 自增指定值
|
|
func (r *redisRepo) IncrBy(ctx context.Context, key string, value int64) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.IncrBy(ctx, key, value).Result()
|
|
}
|
|
|
|
// Decr 自减1
|
|
func (r *redisRepo) Decr(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.Decr(ctx, key).Result()
|
|
}
|
|
|
|
// DecrBy 自减指定值
|
|
func (r *redisRepo) DecrBy(ctx context.Context, key string, value int64) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.DecrBy(ctx, key, value).Result()
|
|
}
|
|
|
|
// ========== Hash 操作 ==========
|
|
|
|
// HSet 设置hash字段
|
|
func (r *redisRepo) HSet(ctx context.Context, key string, values ...interface{}) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.HSet(ctx, key, values...).Err()
|
|
}
|
|
|
|
// HGet 获取hash字段
|
|
func (r *redisRepo) HGet(ctx context.Context, key, field string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
val, err := r.client.HGet(ctx, key, field).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// HGetAll 获取所有hash字段
|
|
func (r *redisRepo) HGetAll(ctx context.Context, key string) (map[string]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.HGetAll(ctx, key).Result()
|
|
}
|
|
|
|
// HDel 删除hash字段
|
|
func (r *redisRepo) HDel(ctx context.Context, key string, fields ...string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.HDel(ctx, key, fields...).Result()
|
|
}
|
|
|
|
// HExists 检查hash字段是否存在
|
|
func (r *redisRepo) HExists(ctx context.Context, key, field string) (bool, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
return r.client.HExists(ctx, key, field).Result()
|
|
}
|
|
|
|
// HIncrBy hash字段自增
|
|
func (r *redisRepo) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.HIncrBy(ctx, key, field, incr).Result()
|
|
}
|
|
|
|
// HIncrByFloat hash字段浮点自增
|
|
func (r *redisRepo) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.HIncrByFloat(ctx, key, field, incr).Result()
|
|
}
|
|
|
|
// HKeys 获取所有hash字段名
|
|
func (r *redisRepo) HKeys(ctx context.Context, key string) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.HKeys(ctx, key).Result()
|
|
}
|
|
|
|
// HLen 获取hash字段数量
|
|
func (r *redisRepo) HLen(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.HLen(ctx, key).Result()
|
|
}
|
|
|
|
// ========== List 操作 ==========
|
|
|
|
// LPush 从左侧推入
|
|
func (r *redisRepo) LPush(ctx context.Context, key string, values ...interface{}) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.LPush(ctx, key, values...).Result()
|
|
}
|
|
|
|
// RPush 从右侧推入
|
|
func (r *redisRepo) RPush(ctx context.Context, key string, values ...interface{}) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.RPush(ctx, key, values...).Result()
|
|
}
|
|
|
|
// LPop 从左侧弹出
|
|
func (r *redisRepo) LPop(ctx context.Context, key string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
val, err := r.client.LPop(ctx, key).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// RPop 从右侧弹出
|
|
func (r *redisRepo) RPop(ctx context.Context, key string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
val, err := r.client.RPop(ctx, key).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
return val, err
|
|
}
|
|
|
|
// BLPop 阻塞式从左侧弹出
|
|
func (r *redisRepo) BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.BLPop(ctx, timeout, keys...).Result()
|
|
}
|
|
|
|
// BRPop 阻塞式从右侧弹出
|
|
func (r *redisRepo) BRPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.BRPop(ctx, timeout, keys...).Result()
|
|
}
|
|
|
|
// LLen 获取列表长度
|
|
func (r *redisRepo) LLen(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.LLen(ctx, key).Result()
|
|
}
|
|
|
|
// LRange 获取列表范围
|
|
func (r *redisRepo) LRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.LRange(ctx, key, start, stop).Result()
|
|
}
|
|
|
|
// ========== Set 操作 ==========
|
|
|
|
// SAdd 添加集合成员
|
|
func (r *redisRepo) SAdd(ctx context.Context, key string, members ...interface{}) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.SAdd(ctx, key, members...).Result()
|
|
}
|
|
|
|
// SMembers 获取所有集合成员
|
|
func (r *redisRepo) SMembers(ctx context.Context, key string) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.SMembers(ctx, key).Result()
|
|
}
|
|
|
|
// SIsMember 检查是否是集合成员
|
|
func (r *redisRepo) SIsMember(ctx context.Context, key string, member interface{}) (bool, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
return r.client.SIsMember(ctx, key, member).Result()
|
|
}
|
|
|
|
// SRem 删除集合成员
|
|
func (r *redisRepo) SRem(ctx context.Context, key string, members ...interface{}) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.SRem(ctx, key, members...).Result()
|
|
}
|
|
|
|
// SCard 获取集合成员数量
|
|
func (r *redisRepo) SCard(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.SCard(ctx, key).Result()
|
|
}
|
|
|
|
// ========== ZSet 操作 ==========
|
|
|
|
// ZAdd 添加有序集合成员
|
|
func (r *redisRepo) ZAdd(ctx context.Context, key string, members ...redis.Z) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.ZAdd(ctx, key, members...).Result()
|
|
}
|
|
|
|
// ZRange 获取有序集合范围
|
|
func (r *redisRepo) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.ZRange(ctx, key, start, stop).Result()
|
|
}
|
|
|
|
// ZRangeWithScores 获取有序集合范围(带分数)
|
|
func (r *redisRepo) ZRangeWithScores(ctx context.Context, key string, start, stop int64) ([]redis.Z, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.ZRangeWithScores(ctx, key, start, stop).Result()
|
|
}
|
|
|
|
// ZRevRange 倒序获取有序集合范围
|
|
func (r *redisRepo) ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.client.ZRevRange(ctx, key, start, stop).Result()
|
|
}
|
|
|
|
// ZRem 删除有序集合成员
|
|
func (r *redisRepo) ZRem(ctx context.Context, key string, members ...interface{}) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.ZRem(ctx, key, members...).Result()
|
|
}
|
|
|
|
// ZCard 获取有序集合成员数量
|
|
func (r *redisRepo) ZCard(ctx context.Context, key string) (int64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.ZCard(ctx, key).Result()
|
|
}
|
|
|
|
// ZScore 获取成员分数
|
|
func (r *redisRepo) ZScore(ctx context.Context, key, member string) (float64, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.client.ZScore(ctx, key, member).Result()
|
|
}
|
|
|
|
// ========== Pipeline 操作 ==========
|
|
|
|
// Pipeline 获取pipeline
|
|
func (r *redisRepo) Pipeline() redis.Pipeliner {
|
|
return r.client.Pipeline()
|
|
}
|
|
|
|
// TxPipeline 获取事务pipeline
|
|
func (r *redisRepo) TxPipeline() redis.Pipeliner {
|
|
return r.client.TxPipeline()
|
|
}
|
|
|
|
// ========== Pub/Sub 操作 ==========
|
|
|
|
// Publish 发布消息
|
|
func (r *redisRepo) Publish(ctx context.Context, channel string, message interface{}) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Publish(ctx, channel, message).Err()
|
|
}
|
|
|
|
// Subscribe 订阅频道
|
|
func (r *redisRepo) Subscribe(ctx context.Context, channels ...string) *redis.PubSub {
|
|
return r.client.Subscribe(ctx, channels...)
|
|
}
|
|
|
|
// ========== 健康检查 ==========
|
|
|
|
// Ping 检查连接
|
|
func (r *redisRepo) Ping(ctx context.Context) error {
|
|
if err := r.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Ping(ctx).Err()
|
|
}
|
|
|
|
// Info 获取服务器信息
|
|
func (r *redisRepo) Info(ctx context.Context, section ...string) (string, error) {
|
|
if err := r.checkClosed(); err != nil {
|
|
return "", err
|
|
}
|
|
return r.client.Info(ctx, section...).Result()
|
|
}
|
|
|
|
// ========== 连接管理 ==========
|
|
|
|
// Close 关闭连接
|
|
func (r *redisRepo) Close() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if r.closed {
|
|
return nil
|
|
}
|
|
|
|
r.closed = true
|
|
return r.client.Close()
|
|
}
|
|
|
|
// PoolStats 获取连接池统计
|
|
func (r *redisRepo) PoolStats() *redis.PoolStats {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if r.closed {
|
|
return nil
|
|
}
|
|
|
|
stats := r.client.PoolStats()
|
|
return stats
|
|
}
|
|
|
|
// ========== 内部方法 ==========
|
|
|
|
// checkClosed 检查是否已关闭
|
|
func (r *redisRepo) checkClosed() error {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if r.closed {
|
|
return errors.New("redis 连接已关闭")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeDefaultConfig 合并默认配置
|
|
func mergeDefaultConfig(cfg *RedisConfig) {
|
|
if cfg.Addr == "" {
|
|
cfg.Addr = "localhost:6379"
|
|
}
|
|
if cfg.PoolSize == 0 {
|
|
cfg.PoolSize = 100
|
|
}
|
|
if cfg.MinIdleConns == 0 {
|
|
cfg.MinIdleConns = 10
|
|
}
|
|
if cfg.DialTimeout == 0 {
|
|
cfg.DialTimeout = 5 * time.Second
|
|
}
|
|
if cfg.ReadTimeout == 0 {
|
|
cfg.ReadTimeout = 3 * time.Second
|
|
}
|
|
if cfg.WriteTimeout == 0 {
|
|
cfg.WriteTimeout = 3 * time.Second
|
|
}
|
|
if cfg.PoolTimeout == 0 {
|
|
cfg.PoolTimeout = 4 * time.Second
|
|
}
|
|
if cfg.MaxRetries == 0 {
|
|
cfg.MaxRetries = 3
|
|
}
|
|
if cfg.MinRetryBackoff == 0 {
|
|
cfg.MinRetryBackoff = 8 * time.Millisecond
|
|
}
|
|
if cfg.MaxRetryBackoff == 0 {
|
|
cfg.MaxRetryBackoff = 512 * time.Millisecond
|
|
}
|
|
if cfg.ConnMaxIdleTime == 0 {
|
|
cfg.ConnMaxIdleTime = 5 * time.Minute
|
|
}
|
|
if cfg.ConnMaxLifetime == 0 {
|
|
cfg.ConnMaxLifetime = 30 * time.Minute
|
|
}
|
|
}
|