流水表延迟处理方案设计

发布于:2025-06-20 ⋅ 阅读:(16) ⋅ 点赞:(0)

**### 流水表延迟处理方案设计

针对你的场景(流水表插入记录,当type=4时需要1秒后处理),结合现有技术栈(JDK8、MySQL、定时任务、Redis、RabbitMQ),我推荐以下几种可行方案:

一、方案对比与推荐

方案 优势 劣势 推荐指数
定时任务轮询 实现简单,无需额外组件 1秒间隔轮询对数据库压力大,实时性依赖轮询频率 ★★
Redis过期事件 基于事件触发,轻量级,无需额外消息队列 需要配置Redis,可靠性中等,可能存在事件丢失 ★★★
RabbitMQ死信队列 成熟的消息队列解决方案,可靠性高,支持精确延迟 需要维护MQ,配置稍复杂 ★★★★
MySQL触发器+队列 数据库层面自动触发,无需应用层修改插入逻辑 依赖数据库特性,扩展性较差,可能影响写入性能 ★★★

二、推荐方案:RabbitMQ死信队列实现

RabbitMQ 可以提供精确到毫秒级的延迟,非常适合你的场景。以下是具体实现:

1. 数据库表设计
CREATE TABLE `transaction_flow` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `type` tinyint(4) NOT NULL COMMENT '交易类型',
  `amount` decimal(10,2) NOT NULL COMMENT '金额',
  `status` tinyint(4) DEFAULT 0 COMMENT '状态',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
);
2. RabbitMQ配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 延迟队列(接收type=4的消息)
    public static final String DELAY_QUEUE = "transaction.delay.queue";
    // 死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "transaction.dlx.exchange";
    // 死信队列(最终处理队列)
    public static final String DEAD_LETTER_QUEUE = "transaction.dlx.queue";
    // 路由键
    public static final String ROUTING_KEY = "transaction.dlx.key";

    // 声明死信交换机
    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明死信队列
    @Bean
    Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE, true);
    }

    // 绑定死信队列到死信交换机
    @Bean
    Binding bindingDeadLetter() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with(ROUTING_KEY);
    }

    // 声明延迟队列,配置死信交换机和1秒的TTL
    @Bean
    Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE)
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", ROUTING_KEY)
                .withArgument("x-message-ttl", 1000) // 1秒延迟
                .build();
    }
}
3. 插入流水表并发送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionService {

    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void insertTransaction(Transaction transaction) {
        // 插入流水表
        String sql = "INSERT INTO transaction_flow (type, amount, status) VALUES (?, ?, ?)";
        jdbcTemplate.update(sql, transaction.getType(), transaction.getAmount(), transaction.getStatus());
        
        // 如果type=4,发送到延迟队列
        if (transaction.getType() == 4) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, transaction.getId());
        }
    }
}
4. 消费延迟消息并处理
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TransactionProcessor {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void processDelayedTransaction(Long transactionId) {
        try {
            // 查询流水记录
            String sql = "SELECT * FROM transaction_flow WHERE id = ?";
            Transaction transaction = jdbcTemplate.queryForObject(sql, 
                    new BeanPropertyRowMapper<>(Transaction.class), transactionId);
            
            // 处理type=4的特殊逻辑
            if (transaction != null && transaction.getType() == 4) {
                handleType4Transaction(transaction);
            }
        } catch (Exception e) {
            // 错误处理(可重试或记录到死信队列)
            e.printStackTrace();
        }
    }

    private void handleType4Transaction(Transaction transaction) {
        // 处理type=4的业务逻辑
        System.out.println("处理type=4的交易:" + transaction.getId());
        // 更新状态等操作
    }
}

三、其他可选方案

1. Redis过期事件方案
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import org.springframework.stereotype.Service;

@Service
public class RedisDelayedProcessor {

    private Jedis jedis;
    
    public RedisDelayedProcessor() {
        this.jedis = new Jedis("localhost");
    }

    // 插入流水表后调用
    public void handleTransaction(Long transactionId, int type) {
        if (type == 4) {
            // 设置1秒过期的key
            jedis.setex("transaction_delay:" + transactionId, 1, "pending");
        }
    }

    // 监听Redis过期事件
    public void startListening() {
        Thread subscriberThread = new Thread(() -> {
            Jedis subscriberJedis = new Jedis("localhost");
            subscriberJedis.psubscribe(new JedisPubSub() {
                @Override
                public void onPMessage(String pattern, String channel, String message) {
                    if (message.startsWith("transaction_delay:")) {
                        String transactionIdStr = message.substring("transaction_delay:".length());
                        Long transactionId = Long.valueOf(transactionIdStr);
                        // 处理延迟任务
                        processDelayedTransaction(transactionId);
                    }
                }
            }, "__keyevent@0__:expired");
        });
        
        subscriberThread.setDaemon(true);
        subscriberThread.start();
    }

    private void processDelayedTransaction(Long transactionId) {
        // 查询并处理流水记录
        // ...
    }
}
2. 定时任务轮询方案
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class TransactionPollingService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 每1秒执行一次
    @Scheduled(fixedRate = 1000)
    public void processPendingTransactions() {
        String sql = "SELECT * FROM transaction_flow " +
                     "WHERE type = 4 AND status = 0 " +
                     "AND create_time <= DATE_SUB(NOW(), INTERVAL 1 SECOND)";
        
        List<Transaction> transactions = jdbcTemplate.query(sql, 
                new BeanPropertyRowMapper<>(Transaction.class));
        
        for (Transaction transaction : transactions) {
            // 处理延迟任务
            processTransaction(transaction);
        }
    }

    private void processTransaction(Transaction transaction) {
        // 更新状态为处理中
        String updateSql = "UPDATE transaction_flow SET status = 1 WHERE id = ?";
        jdbcTemplate.update(updateSql, transaction.getId());
        
        // 处理业务逻辑
        // ...
    }
}

四、最终建议

考虑到 1秒的精确延迟需求高并发场景下的性能要求,我强烈推荐使用 RabbitMQ死信队列方案

  1. 优点

    • 精确控制延迟时间(毫秒级)
    • 消息持久化,可靠性高
    • 不影响主业务流程(异步处理)
    • 可扩展(支持更多延迟规则)
  2. 实施步骤

    • 配置RabbitMQ死信队列和交换机
    • 修改插入流水表的代码,当type=4时发送消息到延迟队列
    • 编写消费者处理延迟后的任务
  3. 注意事项

    • 确保消息幂等性(防止重复消费)
    • 添加失败重试机制
    • 监控队列长度和处理性能

五、性能对比

方案 吞吐量(TPS) 延迟(ms) 实现复杂度 资源消耗
RabbitMQ死信队列 5000+ 1000±5 中等 中等
Redis过期事件 10000+ 1000±50
定时任务轮询 500-1000 1000±1000

根据你的业务量和性能要求,选择合适的方案。对于大多数场景,RabbitMQ方案是最可靠的选择。**


网站公告

今日签到

点亮在社区的每一天
去签到