使用rabbitmq发送消息和caffeineCache保存本地

发布于:2024-09-19 ⋅ 阅读:(9) ⋅ 点赞:(0)

目录

使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息

 1.导入依赖

2.编写yml文件,配置连接rabbitmq的信息

3.编写mq的配置类,生成交换机,消息队列,并将他们绑定

4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息

5.查看消息队列是否存在消息

在stock_backend工程中定义消息监听类,并配置本地缓存

1.导入mq依赖和caffeine

 2.编写yml文件

3.编写caffeine的配置类,和mq的配置类

4.编写消息监听类

5. stockService.getInnerMarket()方法

6.debug启动SpringBoot引导类

自动跳到消息监听类

清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新保存到本地缓存中

​编辑 查看caffeineCache成员变量的值

成功缓存key为innerMarketInfosKey

的数据


当我们在查询最新的股票大盘数据时,我们会频繁的向mysql查询数据,会给mysql造成很大的压力,所以我们可以使用caffeineCache本地缓存。

大致思路:
我们先使用stock_job工程采集到国内大盘的最新交易时间的信息时并将数据插入数据库,使用rabbitmq发送消息(消息为当前时间)

在stock_backend工程定义消息队列监听类,如果接收到的时间和发送消息的时间相差一分钟时,就报错,不超过一分钟,就清除之前的本地缓存,再通过mapper向数据库查询数据,然后再重新把最新的国内大盘数据存入本地缓存中。

这样一来,如果stock_job工程没有向数据库中插入最新交易时间的国内大盘数据信息,在stock_backend中查询最新交易时间的国内大盘数据信息时,就会直接从本地缓存中获取数据信息。

使用stock_job工程采集到国内大盘的最新交易时间的信息并插入数据库,使用rabbitmq发送消息

 1.导入依赖

<!--        导入mq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.编写yml文件,配置连接rabbitmq的信息

spring:
  rabbitmq:
    host: 192.168.230.100 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: hhh
    password: 1234
    virtual-host: /

3.编写mq的配置类,生成交换机,消息队列,并将他们绑定

@Configuration
public class MqConfig {
    /**
     * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 国内大盘信息队列
     */
    @Bean
    public Queue innerMarketQueue() {
        return new Queue("innerMarketQueue", true);
    }

    /**
     * 定义路由股票信息的交换机
     */
    @Bean
    public TopicExchange innerMarketTopicExchange() {
        return new TopicExchange("stockExchange", true, false);
    }

    /**
     * 绑定队列到指定交换机
     */
    @Bean
    public Binding bindingInnerMarketExchange() {
        return BindingBuilder.bind(innerMarketQueue()).to(innerMarketTopicExchange())
                .with("inner.market");//设置routingKey
    }
}

4.采集最新到最新的国内大盘数据信息,并插入到数据时时,发送消息

@Override
    public void getInnerMarketInfo() {
		//......
        //解析的数据批量插入数据库
        int count= stockMarketIndexInfoMapper.insertBatch(entities);
        log.info("当前插入了:{}行数据",count);
		//通知后台终端刷新本地缓存,发送的日期数据是告知对方当前更新的股票数据所在时间点
        rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());
    }

5.查看消息队列是否存在消息

在stock_backend工程中定义消息监听类,并配置本地缓存

1.导入mq依赖和caffeine

 <dependencies>
        <!--        导入mq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
<!--        本地缓存依赖-->
        <dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
        </dependency>

 2.编写yml文件

spring:
  rabbitmq:
    host: 192.168.230.130 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: hhh
    password: 1234
    virtual-host: /

3.编写caffeine的配置类,和mq的配置类

 /**
     * 构建缓存bean
     * @return
     */
    @Bean
    public Cache<String,Object> caffeineCache(){
        Cache<String, Object> cache = Caffeine
                .newBuilder()
                .maximumSize(200)//设置缓存数量上限
//                .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
//                .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除
                .initialCapacity(100)// 初始的缓存空间大小
                .recordStats()//开启统计
                .build();
        return cache;
    }

这里不用定义交换机和消息队列 

@Configuration
public class MqConfig {
    /**
     * 重新定义消息序列化的方式,改为基于json格式序列化和反序列化
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

4.编写消息监听类

@Component
@Slf4j
public class MqListener {
    @Autowired
    private Cache<String,Object> caffeineCache;
    @Autowired
    private StockService stockService;
    @RabbitListener(queues = "innerMarketQueue")
    public void acceptInnerMarketInfo(Date date){//消息队列里的数据类型是Date,所以接收的参数类型也是Date
        long differTime = DateTime.now().getMillis() - new DateTime(date).getMillis();
        if(differTime>60000L){
            log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),differTime);
        }
        //发送信息和接收信息在一分钟以内
        //删除key为innerMarketInfosKey的缓存
        caffeineCache.invalidate("innerMarketInfosKey");
        //重新获取数据
        //调用服务更新缓存
        stockService.getInnerMarket();
    }
}

5. stockService.getInnerMarket()方法

    /**
     * 获取国内大盘最新的数据
     * @return
     */
    @Override
    public R<List<InnerMarketDomain>> getInnerMarket() {
        //获取key为innerMarketInfosKey的本地缓存数据,如果不存在,就去数据库中查询数据,并存入本地缓存中
        //本地缓存默认一分钟消失
        R<List<InnerMarketDomain>> result= (R<List<InnerMarketDomain>>) caffeineCache.get("innerMarketInfosKey", key->{
            //1.获取当前时间的最新交易点(精确到分钟,秒和毫秒置为0)
            Date curDate = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();
            //Date curDate = MyDateTimeUtil.getLateDate4Stock(DateTime.now()).toDate();

            //mock data 等后续股票采集job工程完成,再将此代码删除
            curDate=DateTime.parse("2021-12-28 09:31:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();
            //log.info("curDate:{}",curDate);
            //2.获取国内大盘的编码集合
            List<String> mcodes = stockInfoConfig.getInner();
            //3.调用mapper进行查询
            List<InnerMarketDomain> data=stockMarketIndexInfoMapper.getInnerMarket(curDate,mcodes);
            //4.返回数据
           return R.ok(data);
        });
       return result;
    }

6.debug启动SpringBoot引导类

自动跳到消息监听类

清空缓存,然后调用mapper方法重新向数据库查询最新的国内大盘数据,再重新保存到本地缓存中

 查看caffeineCache成员变量的值

成功缓存key为innerMarketInfosKey

的数据