java中关于异步转同步的一些解决方案的对比与思考。【spring mvc堵塞式】

发布于:2025-06-23 ⋅ 阅读:(18) ⋅ 点赞:(0)

1、Spring MVC堵塞式编程中的技术方案

a) 最简单的方案,使用 DeferredResult 代码如下,

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@RestController()
@RequestMapping("/v1/defered")
@RequiredArgsConstructor
public class DeferredResultController {
    // 延迟执行服务
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    @PostMapping("/test")
    public DeferredResult<String> simpleTest() {
        // 1. 创建DeferredResult对象
        DeferredResult<String> deferredResult = new DeferredResult<>(1000L);
        // 2. 设置超时回调,当DeferredResult超时后,会执行这个回调函数
        deferredResult.onTimeout(() -> {
            log.info("DeferredResult超时了");
            deferredResult.setResult("延迟!");
        });

        // 延迟2秒钟执行
        executorService.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.info("执行结果:{}", deferredResult.getResult());
            deferredResult.setResult("hello world1");
        });
        return deferredResult;
    }

}
代码解读:
  1. 创建DeferredResult对象,该对象默认设置了1s的超时时间,
  2. 设置超时回调,如果在1秒钟内未回调结果,那么则执行 onTimeout的回调函数。
  3. 模拟执行一个异步线程,2秒之后再执行结果。
最终控制台输出如下。

DeferredResult超时了
执行结果:延迟!

会发现,最后超时之后,仍然会执行成功,不过deferredResult里面的结果被填充。

用户收到的结果

延迟!

b) 上点难度,使用redis监听事件,根据事件的不同返回不同的数据值

1. 首先配置data.redis框架中的RedisTemplate模板以及RedisMessageListenerContainer(监听容器)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class MyRedisConfig {

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        return container;
    }
}
2. 编写异步转同步方法。
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@RestController()
@RequestMapping("/v1/redis/defered")
@RequiredArgsConstructor
public class DeferredResultRedisController {
    @Autowired
    private RedisTemplate<String, String> redisTemplate2;

    @Autowired
    private RedisMessageListenerContainer container;

    private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();
    private final Map<String, MessageListener> listeners = new ConcurrentHashMap<>();

    @GetMapping("/async/subscribe/{channel}")
    public DeferredResult<String> subscribe(@PathVariable String channel) {
        DeferredResult<String> result = new DeferredResult<>(20000l, "Timeout6666");
        String channelName = "channel:" + channel;
        deferredResults.put(channelName, result);

        // 创建消息监听器
        MessageListener listener = new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String receivedChannel = new String(message.getChannel());
                String msg;
                try {
                    // 模拟可能抛出异常的处理
                    msg = parseMessage(message.getBody());
                } catch (Exception e) {
                    // 手动设置错误结果
                    result.setErrorResult("Message parsing failed: " + e.getMessage());
                    return;
                }
                if (receivedChannel.equals(channelName)) {
                    DeferredResult<String> deferred = deferredResults.remove(channelName);
                    if (deferred != null) {
                        deferred.setResult(msg);
                    }
                    container.removeMessageListener(this);
                    listeners.remove(channelName);
                }
            }

            private String parseMessage(byte[] body) throws Exception {
                // 模拟解析错误
                if (body == null) {
                    throw new IllegalArgumentException("Message body is null");
                }
                return new String(body);
            }
        };

        // 添加监听器
        try {
            container.addMessageListener(listener, new ChannelTopic(channelName));
            listeners.put(channelName, listener);
        } catch (Exception e) {
            // Redis 连接异常
            result.setErrorResult("Failed to subscribe: " + e.getMessage());
        }

        // 处理异步异常
        result.onError(throwable -> {
            System.err.println("Async error for channel " + channelName + ": " + throwable.getMessage());
            releaseResource(channelName);
            // 可选:设置默认错误响应
            result.setErrorResult("Error: " + throwable.getMessage());
        });

        // 清理订阅
        result.onCompletion(() -> {
            log.info("Async complete for channel " + channelName);
            releaseResource(channelName);
        });
        result.onTimeout(() -> {
            log.info("Async timed out for channel " + channelName);
            releaseResource(channelName);
        });

        return result;
    }

    private void releaseResource(String channelName) {
        deferredResults.remove(channelName);
        MessageListener l = listeners.remove(channelName);
        if (l != null) {
            container.removeMessageListener(l);
        }
    }

    @PostMapping("/publish/{channel}")
    public String publish(@PathVariable String channel, @RequestBody String message) {
        redisTemplate2.convertAndSend("channel:" + channel, message);
        return "Published to channel: " + channel;
    }

}

步骤流程

c) 单节点简单方案。使用spring event

只涉及核心代码

@RestController
public class AsyncEventController {

    private final ApplicationEventPublisher eventPublisher;
    private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();

    public AsyncEventController(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    // 异步请求
    @GetMapping("/async/event/{id}")
    public DeferredResult<String> asyncRequest(@PathVariable String id) {
        DeferredResult<String> result = new DeferredResult<>(30000L, "Timeout");
        deferredResults.put(id, result);

        // 模拟异步任务,发布事件
        eventPublisher.publishEvent(new CustomEvent(id, "Processed data for " + id));

        result.onCompletion(() -> deferredResults.remove(id));
        result.onTimeout(() -> deferredResults.remove(id));
        result.onError(throwable -> {
            System.err.println("Error for " + id + ": " + throwable.getMessage());
            deferredResults.remove(id);
        });

        return result;
    }

    // 事件监听器
    @Async
    @EventListener
    public void handleEvent(CustomEvent event) {
        DeferredResult<String> result = deferredResults.remove(event.getId());
        if (result != null) {
            result.setResult(event.getData());
        }
    }
}

// 自定义事件类
class CustomEvent extends ApplicationEvent {
    private final String id;
    private final String data;

    public CustomEvent(String id, String data) {
        super(id);
        this.id = id;
        this.data = data;
    }

    public String getId() {
        return id;
    }

    public String getData() {
        return data;
    }
}

c.1 ) 使用kafka、rockmq等消息队列监听实现。实现类似于spring event,此时代码略。

这里有个非常重要的问题,如果使用的是kafka监听事件,但是节点是多节点,如何保证接收到的事件所在的节点是发起ReferredResult回调的节点。

我们知道kafka监听事件的时候如果是非广播模式的话,那么消费的信息可能在任一节点中,如何保证消费的数据是在一个节点上的呢?可以使用如下的技术方方案,通过 nodeID与lisenterId进行关联,消费到的时候不属于当前节点,那么转发请求到指定节点。

@KafkaListener(topics = "response-topic", groupId = "my-group")
    public void receiveMessage(String message, String key) {
        // 解析消息,格式为 nodeId:message
        String[] parts = message.split(":", 2);
        String targetNodeId = parts[0];
        String data = parts[1];

        if (targetNodeId.equals(nodeId)) {
            // 当前节点处理
            DeferredResult<String> result = deferredResults.remove(key);
            if (result != null) {
                result.setResult(data);
            }
            requestNodeMap.remove(key);
        } else {
            // 转发到目标节点(假设通过 HTTP)
            forwardToNode(targetNodeId, key, data);
        }
    }

    private void forwardToNode(String targetNodeId, String requestId, String data) {
        // 假设通过 HTTP 转发到目标节点
        // 示例:RestTemplate 调用目标节点的 /internal/callback/{requestId}
        // 实际实现需要节点地址映射(如通过服务发现)
        // restTemplate.postForObject("http://" + targetNodeId + "/internal/callback/" + requestId, data, String.class);
        System.out.println("Forwarding to node: " + targetNodeId + ", requestId: " + requestId + ", data: " + data);
    }

节点标识 + 转发:通过 HTTP 或 Kafka 转发消息,灵活但有额外开销。

那么在多节点环境中,使用 Kafka 监听事件并触发 DeferredResult 延迟结果 回调,需要解决消息到正确节点的路由问题,还有其他的两种方案:
Kafka 分区分配:通过分区键将消息路由到目标节点,效率最高。
共享存储:使用 Redis 或数据库存储请求和节点映射,简单可靠。

方案可能还有更多,比如使用zookeep去做,那么方案很多,适合自己的才是最好的。


网站公告

今日签到

点亮在社区的每一天
去签到