设计思路
- 任务表示:每个任务通过一个特定格式的键来表示。键名可以包含任务ID等信息,值可以是任务的具体内容或指向任务详情的引用。
- 过期机制:利用Redis的
EXPIRE
命令为任务设置过期时间,当到达设定的时间点时,Redis会自动删除该键,并触发相应的事件。 - 事件监听:通过Redis的键空间通知(Keyspace Notifications)监听键过期事件,并在接收到事件后执行对应的任务逻辑。
Redis配置与启用键空间通知
确保你的Redis服务器开启了键空间通知功能。可以通过修改redis.conf
文件添加如下配置:
notify-keyspace-events Ex
或者在运行时动态设置:
CONFIG SET notify-keyspace-events Ex
Ex
表示仅监听键过期的事件。
Spring Boot 实现详细步骤
1. Maven依赖
确保你的pom.xml
中包含以下依赖项:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 其他依赖 -->
</dependencies>
2. 配置Redis连接
在application.properties
或application.yml
中配置Redis连接信息:
spring.redis.host=localhost
spring.redis.port=6379
3. 创建和调度任务
创建一个服务类用于调度任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class TaskScheduler {
@Autowired
private StringRedisTemplate redisTemplate;
public void scheduleTask(String taskId, long delayInSeconds) {
// 设置任务到Redis中,并指定过期时间
redisTemplate.opsForValue().set(taskId, "taskPayload", delayInSeconds, java.util.concurrent.TimeUnit.SECONDS);
}
}
4. 监听过期事件并处理任务
定义一个监听器来处理过期事件:
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class ExpiredKeyMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = new String(message.getBody());
System.out.println("Expired key detected: " + expiredKey);
// 根据expiredKey解析任务信息,并执行相应的任务逻辑
}
}
注册该监听器:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
ExpiredKeyMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new PatternTopic("__keyevent@*__:expired"));
return container;
}
}
5. 使用自定义注解简化任务调度(可选)
如果希望使用注解的方式简化任务调度,可以定义一个自定义注解,并编写Aspect来处理它。
定义注解:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScheduledTask {
String taskId();
long delayInSeconds();
}
编写Aspect:
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class ScheduledTaskAspect {
@Autowired
private StringRedisTemplate redisTemplate;
@Around("@annotation(scheduledTask)")
public Object scheduleTask(ProceedingJoinPoint joinPoint, ScheduledTask scheduledTask) throws Throwable {
// 获取方法签名作为任务负载
String methodSignature = joinPoint.getSignature().toString();
// 调度任务到Redis中
redisTemplate.opsForValue().set(scheduledTask.taskId(), methodSignature, scheduledTask.delayInSeconds(), java.util.concurrent.TimeUnit.SECONDS);
return null; // 或者根据需要返回实际结果
}
}
示例服务使用自定义注解
import org.springframework.stereotype.Service;
@Service
public class MyTaskService {
@ScheduledTask(taskId = "task1", delayInSeconds = 60)
public void executeTask() {
System.out.println("Executing task...");
}
}
最佳实践和注意事项
- 任务重复执行控制:考虑使用分布式锁避免同一任务被多次执行。
- 错误处理:为任务执行逻辑添加适当的异常捕获和重试机制。
- 性能考量:对于高并发场景,评估是否需要优化Redis连接池配置,并考虑任务队列化处理以提高吞吐量。
通过上述步骤,你可以构建一个基于Redis的分布式定时任务系统。根据具体的应用需求,可能还需要进一步调整和优化系统的设计,例如引入任务状态管理、任务失败重试策略等高级特性。