Files
base-golang/pkg/cache/redis.go
2026-02-07 15:44:37 +08:00

720 lines
20 KiB
Go

package cache
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
// RedisRepo Redis接口
type RedisRepo interface {
// Client 获取原生客户端
Client() *redis.Client
// String 操作
Set(ctx context.Context, key, value string, ttl time.Duration) error
Get(ctx context.Context, key string) (string, error)
GetSet(ctx context.Context, key, value string) (string, error)
SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error)
MGet(ctx context.Context, keys ...string) ([]interface{}, error)
MSet(ctx context.Context, pairs ...interface{}) error
// Key 操作
Del(ctx context.Context, keys ...string) (int64, error)
Exists(ctx context.Context, keys ...string) (int64, error)
Expire(ctx context.Context, key string, ttl time.Duration) (bool, error)
ExpireAt(ctx context.Context, key string, tm time.Time) (bool, error)
TTL(ctx context.Context, key string) (time.Duration, error)
Rename(ctx context.Context, key, newKey string) error
// 计数器操作
Incr(ctx context.Context, key string) (int64, error)
IncrBy(ctx context.Context, key string, value int64) (int64, error)
Decr(ctx context.Context, key string) (int64, error)
DecrBy(ctx context.Context, key string, value int64) (int64, error)
// Hash 操作
HSet(ctx context.Context, key string, values ...interface{}) error
HGet(ctx context.Context, key, field string) (string, error)
HGetAll(ctx context.Context, key string) (map[string]string, error)
HDel(ctx context.Context, key string, fields ...string) (int64, error)
HExists(ctx context.Context, key, field string) (bool, error)
HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error)
HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
HKeys(ctx context.Context, key string) ([]string, error)
HLen(ctx context.Context, key string) (int64, error)
// List 操作
LPush(ctx context.Context, key string, values ...interface{}) (int64, error)
RPush(ctx context.Context, key string, values ...interface{}) (int64, error)
LPop(ctx context.Context, key string) (string, error)
RPop(ctx context.Context, key string) (string, error)
BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)
BRPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error)
LLen(ctx context.Context, key string) (int64, error)
LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
// Set 操作
SAdd(ctx context.Context, key string, members ...interface{}) (int64, error)
SMembers(ctx context.Context, key string) ([]string, error)
SIsMember(ctx context.Context, key string, member interface{}) (bool, error)
SRem(ctx context.Context, key string, members ...interface{}) (int64, error)
SCard(ctx context.Context, key string) (int64, error)
// ZSet 操作
ZAdd(ctx context.Context, key string, members ...redis.Z) (int64, error)
ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
ZRangeWithScores(ctx context.Context, key string, start, stop int64) ([]redis.Z, error)
ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error)
ZRem(ctx context.Context, key string, members ...interface{}) (int64, error)
ZCard(ctx context.Context, key string) (int64, error)
ZScore(ctx context.Context, key, member string) (float64, error)
// Pipeline 操作
Pipeline() redis.Pipeliner
TxPipeline() redis.Pipeliner
// Pub/Sub 操作
Publish(ctx context.Context, channel string, message interface{}) error
Subscribe(ctx context.Context, channels ...string) *redis.PubSub
// 健康检查
Ping(ctx context.Context) error
Info(ctx context.Context, section ...string) (string, error)
// 连接管理
Close() error
PoolStats() *redis.PoolStats
}
// RedisConfig Redis配置
type RedisConfig struct {
// 基础配置
Addr string `yaml:"addr" json:"addr"` // 地址,如: localhost:6379
Password string `yaml:"password" json:"password"` // 密码
DB int `yaml:"db" json:"db"` // 数据库编号
// 连接池配置
PoolSize int `yaml:"pool_size" json:"pool_size"` // 最大连接数
MinIdleConns int `yaml:"min_idle_conns" json:"min_idle_conns"` // 最小空闲连接数
// 超时配置
DialTimeout time.Duration `yaml:"dial_timeout" json:"dial_timeout"` // 连接超时
ReadTimeout time.Duration `yaml:"read_timeout" json:"read_timeout"` // 读超时
WriteTimeout time.Duration `yaml:"write_timeout" json:"write_timeout"` // 写超时
PoolTimeout time.Duration `yaml:"pool_timeout" json:"pool_timeout"` // 连接池超时
// 重试配置
MaxRetries int `yaml:"max_retries" json:"max_retries"` // 最大重试次数
MinRetryBackoff time.Duration `yaml:"min_retry_backoff" json:"min_retry_backoff"` // 最小重试间隔
MaxRetryBackoff time.Duration `yaml:"max_retry_backoff" json:"max_retry_backoff"` // 最大重试间隔
// 连接存活
ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time" json:"conn_max_idle_time"` // 最大空闲时间
ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime" json:"conn_max_lifetime"` // 最大生命周期
}
// DefaultRedisConfig 默认配置
func DefaultRedisConfig() *RedisConfig {
return &RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 100,
MinIdleConns: 10,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolTimeout: 4 * time.Second,
MaxRetries: 3,
MinRetryBackoff: 8 * time.Millisecond,
MaxRetryBackoff: 512 * time.Millisecond,
ConnMaxIdleTime: 5 * time.Minute,
ConnMaxLifetime: 30 * time.Minute,
}
}
// redisRepo Redis实现
type redisRepo struct {
client *redis.Client
config *RedisConfig
mu sync.RWMutex
closed bool
}
// NewRedis 创建Redis实例
func NewRedis(cfg *RedisConfig) (RedisRepo, error) {
if cfg == nil {
cfg = DefaultRedisConfig()
}
// 合并默认配置
mergeDefaultConfig(cfg)
// 创建客户端
client := redis.NewClient(&redis.Options{
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
PoolSize: cfg.PoolSize,
MinIdleConns: cfg.MinIdleConns,
DialTimeout: cfg.DialTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
PoolTimeout: cfg.PoolTimeout,
MaxRetries: cfg.MaxRetries,
MinRetryBackoff: cfg.MinRetryBackoff,
MaxRetryBackoff: cfg.MaxRetryBackoff,
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
ConnMaxLifetime: cfg.ConnMaxLifetime,
})
// Ping 测试连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("redis ping 失败: %w", err)
}
return &redisRepo{
client: client,
config: cfg,
closed: false,
}, nil
}
// Client 获取原生客户端
func (r *redisRepo) Client() *redis.Client {
return r.client
}
// ========== String 操作 ==========
// Set 设置值
func (r *redisRepo) Set(ctx context.Context, key, value string, ttl time.Duration) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.Set(ctx, key, value, ttl).Err()
}
// Get 获取值
func (r *redisRepo) Get(ctx context.Context, key string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
val, err := r.client.Get(ctx, key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
// GetSet 设置新值并返回旧值
func (r *redisRepo) GetSet(ctx context.Context, key, value string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
val, err := r.client.GetSet(ctx, key, value).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
// SetNX 仅当key不存在时设置
func (r *redisRepo) SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error) {
if err := r.checkClosed(); err != nil {
return false, err
}
return r.client.SetNX(ctx, key, value, ttl).Result()
}
// MGet 批量获取
func (r *redisRepo) MGet(ctx context.Context, keys ...string) ([]interface{}, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.MGet(ctx, keys...).Result()
}
// MSet 批量设置
func (r *redisRepo) MSet(ctx context.Context, pairs ...interface{}) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.MSet(ctx, pairs...).Err()
}
// ========== Key 操作 ==========
// Del 删除键
func (r *redisRepo) Del(ctx context.Context, keys ...string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.Del(ctx, keys...).Result()
}
// Exists 检查键是否存在
func (r *redisRepo) Exists(ctx context.Context, keys ...string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.Exists(ctx, keys...).Result()
}
// Expire 设置过期时间
func (r *redisRepo) Expire(ctx context.Context, key string, ttl time.Duration) (bool, error) {
if err := r.checkClosed(); err != nil {
return false, err
}
return r.client.Expire(ctx, key, ttl).Result()
}
// ExpireAt 设置过期时间点
func (r *redisRepo) ExpireAt(ctx context.Context, key string, tm time.Time) (bool, error) {
if err := r.checkClosed(); err != nil {
return false, err
}
return r.client.ExpireAt(ctx, key, tm).Result()
}
// TTL 获取剩余时间
func (r *redisRepo) TTL(ctx context.Context, key string) (time.Duration, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.TTL(ctx, key).Result()
}
// Rename 重命名键
func (r *redisRepo) Rename(ctx context.Context, key, newKey string) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.Rename(ctx, key, newKey).Err()
}
// ========== 计数器操作 ==========
// Incr 自增1
func (r *redisRepo) Incr(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.Incr(ctx, key).Result()
}
// IncrBy 自增指定值
func (r *redisRepo) IncrBy(ctx context.Context, key string, value int64) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.IncrBy(ctx, key, value).Result()
}
// Decr 自减1
func (r *redisRepo) Decr(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.Decr(ctx, key).Result()
}
// DecrBy 自减指定值
func (r *redisRepo) DecrBy(ctx context.Context, key string, value int64) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.DecrBy(ctx, key, value).Result()
}
// ========== Hash 操作 ==========
// HSet 设置hash字段
func (r *redisRepo) HSet(ctx context.Context, key string, values ...interface{}) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.HSet(ctx, key, values...).Err()
}
// HGet 获取hash字段
func (r *redisRepo) HGet(ctx context.Context, key, field string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
val, err := r.client.HGet(ctx, key, field).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
// HGetAll 获取所有hash字段
func (r *redisRepo) HGetAll(ctx context.Context, key string) (map[string]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.HGetAll(ctx, key).Result()
}
// HDel 删除hash字段
func (r *redisRepo) HDel(ctx context.Context, key string, fields ...string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.HDel(ctx, key, fields...).Result()
}
// HExists 检查hash字段是否存在
func (r *redisRepo) HExists(ctx context.Context, key, field string) (bool, error) {
if err := r.checkClosed(); err != nil {
return false, err
}
return r.client.HExists(ctx, key, field).Result()
}
// HIncrBy hash字段自增
func (r *redisRepo) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.HIncrBy(ctx, key, field, incr).Result()
}
// HIncrByFloat hash字段浮点自增
func (r *redisRepo) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.HIncrByFloat(ctx, key, field, incr).Result()
}
// HKeys 获取所有hash字段名
func (r *redisRepo) HKeys(ctx context.Context, key string) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.HKeys(ctx, key).Result()
}
// HLen 获取hash字段数量
func (r *redisRepo) HLen(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.HLen(ctx, key).Result()
}
// ========== List 操作 ==========
// LPush 从左侧推入
func (r *redisRepo) LPush(ctx context.Context, key string, values ...interface{}) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.LPush(ctx, key, values...).Result()
}
// RPush 从右侧推入
func (r *redisRepo) RPush(ctx context.Context, key string, values ...interface{}) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.RPush(ctx, key, values...).Result()
}
// LPop 从左侧弹出
func (r *redisRepo) LPop(ctx context.Context, key string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
val, err := r.client.LPop(ctx, key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
// RPop 从右侧弹出
func (r *redisRepo) RPop(ctx context.Context, key string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
val, err := r.client.RPop(ctx, key).Result()
if err == redis.Nil {
return "", nil
}
return val, err
}
// BLPop 阻塞式从左侧弹出
func (r *redisRepo) BLPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.BLPop(ctx, timeout, keys...).Result()
}
// BRPop 阻塞式从右侧弹出
func (r *redisRepo) BRPop(ctx context.Context, timeout time.Duration, keys ...string) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.BRPop(ctx, timeout, keys...).Result()
}
// LLen 获取列表长度
func (r *redisRepo) LLen(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.LLen(ctx, key).Result()
}
// LRange 获取列表范围
func (r *redisRepo) LRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.LRange(ctx, key, start, stop).Result()
}
// ========== Set 操作 ==========
// SAdd 添加集合成员
func (r *redisRepo) SAdd(ctx context.Context, key string, members ...interface{}) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.SAdd(ctx, key, members...).Result()
}
// SMembers 获取所有集合成员
func (r *redisRepo) SMembers(ctx context.Context, key string) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.SMembers(ctx, key).Result()
}
// SIsMember 检查是否是集合成员
func (r *redisRepo) SIsMember(ctx context.Context, key string, member interface{}) (bool, error) {
if err := r.checkClosed(); err != nil {
return false, err
}
return r.client.SIsMember(ctx, key, member).Result()
}
// SRem 删除集合成员
func (r *redisRepo) SRem(ctx context.Context, key string, members ...interface{}) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.SRem(ctx, key, members...).Result()
}
// SCard 获取集合成员数量
func (r *redisRepo) SCard(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.SCard(ctx, key).Result()
}
// ========== ZSet 操作 ==========
// ZAdd 添加有序集合成员
func (r *redisRepo) ZAdd(ctx context.Context, key string, members ...redis.Z) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.ZAdd(ctx, key, members...).Result()
}
// ZRange 获取有序集合范围
func (r *redisRepo) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.ZRange(ctx, key, start, stop).Result()
}
// ZRangeWithScores 获取有序集合范围(带分数)
func (r *redisRepo) ZRangeWithScores(ctx context.Context, key string, start, stop int64) ([]redis.Z, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.ZRangeWithScores(ctx, key, start, stop).Result()
}
// ZRevRange 倒序获取有序集合范围
func (r *redisRepo) ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error) {
if err := r.checkClosed(); err != nil {
return nil, err
}
return r.client.ZRevRange(ctx, key, start, stop).Result()
}
// ZRem 删除有序集合成员
func (r *redisRepo) ZRem(ctx context.Context, key string, members ...interface{}) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.ZRem(ctx, key, members...).Result()
}
// ZCard 获取有序集合成员数量
func (r *redisRepo) ZCard(ctx context.Context, key string) (int64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.ZCard(ctx, key).Result()
}
// ZScore 获取成员分数
func (r *redisRepo) ZScore(ctx context.Context, key, member string) (float64, error) {
if err := r.checkClosed(); err != nil {
return 0, err
}
return r.client.ZScore(ctx, key, member).Result()
}
// ========== Pipeline 操作 ==========
// Pipeline 获取pipeline
func (r *redisRepo) Pipeline() redis.Pipeliner {
return r.client.Pipeline()
}
// TxPipeline 获取事务pipeline
func (r *redisRepo) TxPipeline() redis.Pipeliner {
return r.client.TxPipeline()
}
// ========== Pub/Sub 操作 ==========
// Publish 发布消息
func (r *redisRepo) Publish(ctx context.Context, channel string, message interface{}) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.Publish(ctx, channel, message).Err()
}
// Subscribe 订阅频道
func (r *redisRepo) Subscribe(ctx context.Context, channels ...string) *redis.PubSub {
return r.client.Subscribe(ctx, channels...)
}
// ========== 健康检查 ==========
// Ping 检查连接
func (r *redisRepo) Ping(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
return r.client.Ping(ctx).Err()
}
// Info 获取服务器信息
func (r *redisRepo) Info(ctx context.Context, section ...string) (string, error) {
if err := r.checkClosed(); err != nil {
return "", err
}
return r.client.Info(ctx, section...).Result()
}
// ========== 连接管理 ==========
// Close 关闭连接
func (r *redisRepo) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.closed {
return nil
}
r.closed = true
return r.client.Close()
}
// PoolStats 获取连接池统计
func (r *redisRepo) PoolStats() *redis.PoolStats {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return nil
}
stats := r.client.PoolStats()
return stats
}
// ========== 内部方法 ==========
// checkClosed 检查是否已关闭
func (r *redisRepo) checkClosed() error {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
return errors.New("redis 连接已关闭")
}
return nil
}
// mergeDefaultConfig 合并默认配置
func mergeDefaultConfig(cfg *RedisConfig) {
if cfg.Addr == "" {
cfg.Addr = "localhost:6379"
}
if cfg.PoolSize == 0 {
cfg.PoolSize = 100
}
if cfg.MinIdleConns == 0 {
cfg.MinIdleConns = 10
}
if cfg.DialTimeout == 0 {
cfg.DialTimeout = 5 * time.Second
}
if cfg.ReadTimeout == 0 {
cfg.ReadTimeout = 3 * time.Second
}
if cfg.WriteTimeout == 0 {
cfg.WriteTimeout = 3 * time.Second
}
if cfg.PoolTimeout == 0 {
cfg.PoolTimeout = 4 * time.Second
}
if cfg.MaxRetries == 0 {
cfg.MaxRetries = 3
}
if cfg.MinRetryBackoff == 0 {
cfg.MinRetryBackoff = 8 * time.Millisecond
}
if cfg.MaxRetryBackoff == 0 {
cfg.MaxRetryBackoff = 512 * time.Millisecond
}
if cfg.ConnMaxIdleTime == 0 {
cfg.ConnMaxIdleTime = 5 * time.Minute
}
if cfg.ConnMaxLifetime == 0 {
cfg.ConnMaxLifetime = 30 * time.Minute
}
}