基于redis的定时状态更新
下面是一个简单的示例,展示如何使用redis实现状态更新,从而满足在某些场景下,既需要频繁更新状态,
又需要保证状态的实时性。以及定时更新状态的需求。
示例说明
- 假设有一个剧目演出计划,确定了剧目演出的开始和结束时间
- 演出状态分为:未开始、进行中、已结束(分别对应0、1、2三个枚举)
- 需要定时更新状态,根据当前时间判断状态是否需要更新
- 定期删除已经结束的剧目演出计划
实现
// 该做法也可以用于订单超时自动取消等场景,通过设置过期时间和处理过期事件,实现订单状态的自动更新。
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"sync"
"time"
)
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "123456", // 确保 Redis 密码正确
DB: 6, // 与订阅的数据库一致
})
func initPerformanceStatus(ctx context.Context, performanceID string) error {
key := fmt.Sprintf("plan:%s", performanceID) // 初始键名为 "plan:01"
return rdb.Set(ctx, key, "0", 0).Err()
}
func startPerformance(ctx context.Context, performanceID string, duration time.Duration) error {
key := fmt.Sprintf("plan:%s", performanceID) // 过期键名仍为 "plan:01"
return rdb.SetEx(ctx, key, "1", duration).Err()
}
func handleExpiredKey(ctx context.Context, msg *redis.Message) {
// 直接使用过期键名(如 "plan:01",与设置时一致)
expiredKey := string(msg.Payload)
fmt.Printf("处理过期键:%s\n", expiredKey) // 新增调试日志
// 更新状态为 2(直接操作原键,无需重新拼接前缀)
err := rdb.Set(ctx, expiredKey, "2", 0).Err()
if err != nil {
fmt.Printf("更新失败:%v\n", err)
} else {
fmt.Printf("状态更新为 2:%s\n", expiredKey)
}
}
func main() {
fmt.Println("主线程开始运行...")
wg := &sync.WaitGroup{}
wg.Add(1)
go hook(wg)
fmt.Println("主线程继续运行, 进行其他工作...")
wg.Wait()
}
func hook(wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()
// 每隔 1 分钟检查一次过期键(演出状态为"2")
CheckExpiredKeys(1)
// 1. 确保键空间通知配置正确(先获取当前配置,避免重复设置)
config, err := rdb.ConfigGet(ctx, "notify-keyspace-events").Result()
if err != nil || config["1"] != "Ex" {
if err := rdb.ConfigSet(ctx, "notify-keyspace-events", "Ex").Err(); err != nil {
fmt.Printf("配置通知失败:%v\n", err)
return
}
}
// 2. 初始化并启动演出
performanceID := "01"
if err := initPerformanceStatus(ctx, performanceID); err != nil {
fmt.Printf("初始化失败:%v\n", err)
return
}
fmt.Println("初始化完成,等待演出开始...")
fuTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2025-05-08 13:17:50", time.Local)
if err != nil {
fmt.Printf("解析时间失败:%v\n", err)
return
}
timestamp := fuTime.Unix() - time.Now().Unix()
fmt.Printf("距离演出开始还有 %d 秒\n", timestamp)
fuTime2, err := time.ParseInLocation("2006-01-02 15:04:05", "2025-05-08 13:18:00", time.Local)
if err != nil {
fmt.Printf("解析时间失败:%v\n", err)
return
}
timestamp2 := fuTime2.Unix() - time.Now().Unix()
fmt.Printf("演出前距离演出结束还有 %d 秒\n", timestamp2)
sub := timestamp2 - timestamp
fmt.Printf("演出后距离演出结束还有 %d 秒\n", sub)
time.AfterFunc(time.Duration(timestamp)*time.Second, func() {
if err := startPerformance(ctx, performanceID, time.Duration(sub)*time.Second); err != nil { // 缩短过期时间便于测试
fmt.Printf("启动演出失败:%v\n", err)
return
}
fmt.Println("演出已开始,等待结束...")
})
// 3. 订阅当前数据库(6)的过期事件
pubSub := rdb.Subscribe(ctx, fmt.Sprintf("__keyevent@%d__:expired", 6)) // 动态生成数据库编号
defer func(pubSub *redis.PubSub) {
err := pubSub.Close()
if err != nil {
fmt.Printf("关闭订阅失败:%v\n", err)
}
}(pubSub)
// 4. 阻塞接收事件(移除协程,简化逻辑便于调试)
for {
msg, err := pubSub.ReceiveMessage(ctx)
if err != nil {
fmt.Printf("接收消息失败:%v\n", err)
break
}
handleExpiredKey(ctx, msg) // 直接处理,避免协程导致的并发问题
}
}
// 定时任务,用于检查过期键并更新状态
func CheckExpiredKeys(minute int64) {
// 1. 初始化定时任务(每隔 minute 分钟执行一次)
interval := time.Duration(minute) * time.Minute // 可通过配置修改
ticker := time.NewTicker(interval)
defer ticker.Stop() // 程序退出时停止 Ticker
// 2. 在 goroutine 中执行定时任务
go func() {
for range ticker.C { // 阻塞直到间隔到达
fmt.Printf("开始执行定时任务(当前时间:%s)\n", time.Now().Format("2006-01-02 15:04:05"))
ctx := context.Background()
// 遍历所有演出键,检查是否为过期键
keys, err := rdb.Keys(ctx, "plan:*").Result()
if err != nil {
panic(err)
}
for _, key := range keys {
v, err := rdb.Get(ctx, key).Result()
if err != nil {
panic(err)
}
if v != "2" {
continue
}
// 过期删除
rdb.Del(ctx, key)
}
}
}()
}
适用场景
- 状态更新频繁,且需要实时性的场景
- 定时更新状态的需求,如订单超时自动取消等场景