背景
基于更复杂的情况和更高的开发要求,我们可能会遇到必须同时要使用webflux和websocket的情况。
版本约定
- JDK21
- Springboot 3.2.0
- Fastjson2
- lombok
配置文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
</parent>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.54</version>
</dependency>
</dependencies>
代码
只要引入webflux,就同时引入了websocket,不需要再次引入websocket
使用webflux
@RestController
@RequestMapping("/user")
public class UserFlux {
@Autowired
private UserService userService;
@GetMapping("/get")
public Mono<Result<User>> get() {
return Mono.just(Result.httpSuccess(userService.getUser()));
}
/**
* 服务器推送
*
* @return 由服务器决定推送多少次多少数据,推送结束前不会断开连接
*
* @apiNote (SSE - > Server Send Event)
*/
@GetMapping(value = "/flux", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<String> flux() {
return Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
JSONObject obj = new JSONObject();
obj.put("data", "hello,flux" + i);
return obj.toJSONString();
}));
}
}
这里比较值得注意的是Flux返回值,这个返回值从性质上说有点像会自动close的websocket。我们看下这个/flux的返回值:
{
"data": "hello,flux1"
}{
"data": "hello,flux2"
}{
"data": "hello,flux3"
}{
"data": "hello,flux4"
}
注意这不是我拼接的,是调试结果就是这样。也就是说,/flux是分帧输出,具有流式的特性。
使用websocket
这里选择使用手动注册websocket而非Endpoint自动注解,主要是因为我想对handler做规范化
配置文件
@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
handler基类
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// 强制规定基类必须有泛型约束入参出参。强制规定必须进行参数校验
@Slf4j
public abstract class BaseSocketHandler<T, R> implements WebSocketHandler {
@Override
@NonNull
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
log.info("与sessionId:【{}】 建立连接", sessionId);
Flux<WebSocketMessage> receive = session.receive();
Flux<R> fluxHandled = receive.flatMap(webSocketMessage -> {
String payloadAsText = webSocketMessage.getPayloadAsText();
if (!JSON.isValid(payloadAsText)) {
log.error("收取参数不合法:{}", payloadAsText);
session.close();
throw new IllegalArgumentException("参数不合法");
}
TypeReference<T> reference = getTypeReference();
if (!check(payloadAsText, reference)) {
log.error("参数校验不通过:{}", payloadAsText);
session.close();
throw new IllegalArgumentException("参数校验不通过");
}
return handler(payloadAsText, reference);
}).onErrorResume(throwable -> {
log.error("连接异常,即将关闭", throwable);
session.close();
return Mono.error(throwable);
});
return session.send(
Mono.from(fluxHandled).map(payload -> session.textMessage(JSON.toJSONString(payload)))
);
}
public abstract boolean check(String payloadObject, TypeReference<T> typeReference);
public abstract Mono<R> handler(String payload, TypeReference<T> typeReference);
protected abstract TypeReference<T> getTypeReference();
}
实现类
// 这样继承基类的handler使用时非常简单不说,由于上层做了处置,还会更安全更好做日志
public class NoticeHandler extends BaseSocketHandler<User, UserInfo> {
@Override
public boolean check(String payloadObject, TypeReference<User> userTypeReference) {
User user = JSON.parseObject(payloadObject, userTypeReference);
return !Objects.isNull(user.getId()) && user.getId() > 0;
}
@Override
public Mono<UserInfo> handler(String payload, TypeReference<User> typeReference) {
User user = JSON.parseObject(payload, typeReference);
UserInfo userInfo = new UserInfo();
BeanUtils.copyProperties(user, userInfo);
return Mono.just(userInfo);
}
@Override
protected TypeReference<User> getTypeReference() {
return new TypeReference<>() {
};
}
}
注册路由
import com.xu.socket.NoticeHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import java.util.HashMap;
import java.util.Map;
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
public ReactiveWebSocketServerHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/notice", new NoticeHandler());
setUrlMap(map);
setOrder(100);
}
}