Redis实现分布式定时任务

发布于:2025-04-15 ⋅ 阅读:(22) ⋅ 点赞:(0)

设计思路

  1. 任务表示:每个任务通过一个特定格式的键来表示。键名可以包含任务ID等信息,值可以是任务的具体内容或指向任务详情的引用。
  2. 过期机制:利用Redis的EXPIRE命令为任务设置过期时间,当到达设定的时间点时,Redis会自动删除该键,并触发相应的事件。
  3. 事件监听:通过Redis的键空间通知(Keyspace Notifications)监听键过期事件,并在接收到事件后执行对应的任务逻辑。

Redis配置与启用键空间通知

确保你的Redis服务器开启了键空间通知功能。可以通过修改redis.conf文件添加如下配置:

notify-keyspace-events Ex

或者在运行时动态设置:

CONFIG SET notify-keyspace-events Ex

Ex表示仅监听键过期的事件。

Spring Boot 实现详细步骤

1. Maven依赖

确保你的pom.xml中包含以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!-- 其他依赖 -->
</dependencies>
2. 配置Redis连接

application.propertiesapplication.yml中配置Redis连接信息:

spring.redis.host=localhost
spring.redis.port=6379
3. 创建和调度任务

创建一个服务类用于调度任务:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class TaskScheduler {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public void scheduleTask(String taskId, long delayInSeconds) {
        // 设置任务到Redis中,并指定过期时间
        redisTemplate.opsForValue().set(taskId, "taskPayload", delayInSeconds, java.util.concurrent.TimeUnit.SECONDS);
    }
}
4. 监听过期事件并处理任务

定义一个监听器来处理过期事件:

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class ExpiredKeyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = new String(message.getBody());
        System.out.println("Expired key detected: " + expiredKey);
        // 根据expiredKey解析任务信息,并执行相应的任务逻辑
    }
}

注册该监听器:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

@Configuration
public class RedisListenerConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                             ExpiredKeyMessageListener listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listener, new PatternTopic("__keyevent@*__:expired"));
        return container;
    }
}
5. 使用自定义注解简化任务调度(可选)

如果希望使用注解的方式简化任务调度,可以定义一个自定义注解,并编写Aspect来处理它。

定义注解:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScheduledTask {
    String taskId();
    long delayInSeconds();
}

编写Aspect:

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class ScheduledTaskAspect {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Around("@annotation(scheduledTask)")
    public Object scheduleTask(ProceedingJoinPoint joinPoint, ScheduledTask scheduledTask) throws Throwable {
        // 获取方法签名作为任务负载
        String methodSignature = joinPoint.getSignature().toString();
        // 调度任务到Redis中
        redisTemplate.opsForValue().set(scheduledTask.taskId(), methodSignature, scheduledTask.delayInSeconds(), java.util.concurrent.TimeUnit.SECONDS);
        return null; // 或者根据需要返回实际结果
    }
}
示例服务使用自定义注解
import org.springframework.stereotype.Service;

@Service
public class MyTaskService {

    @ScheduledTask(taskId = "task1", delayInSeconds = 60)
    public void executeTask() {
        System.out.println("Executing task...");
    }
}

最佳实践和注意事项

  • 任务重复执行控制:考虑使用分布式锁避免同一任务被多次执行。
  • 错误处理:为任务执行逻辑添加适当的异常捕获和重试机制。
  • 性能考量:对于高并发场景,评估是否需要优化Redis连接池配置,并考虑任务队列化处理以提高吞吐量。

通过上述步骤,你可以构建一个基于Redis的分布式定时任务系统。根据具体的应用需求,可能还需要进一步调整和优化系统的设计,例如引入任务状态管理、任务失败重试策略等高级特性。


网站公告

今日签到

点亮在社区的每一天
去签到