一、简介
RSocket 是一种二进制协议,可用于 TCP、WebSockets 和 Aeron 等字节流传输的应用协议,具有以下交互模型:
1、Request-Response: 发送一条信息,接收一条信息。
2、Request-Stream: 发送一条消息并接收返回的消息流。
3、Channel: 双向发送消息流。
4、Fire-and-Forget: 发送单向消息。
二、服务端代码
1、安装依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2、配置文件添加如下:
spring:
rsocket:
server:
port: 9898
transport: tcp
3、服务端测试代码
package com.example.rsocketservice.controller;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Random;
@RestController
public class SendController {
//Request-Response模式
@MessageMapping("message")
public Mono<String> handleMessage(Mono<String> message) {
return message.doOnNext(msg -> {
System.out.printf("接收到消息:%s%n", msg) ;
}).map(msg -> "服务器成功收到了你的消息!!!") ;
}
//Request-Stream模式
// 必须返回Flux
@MessageMapping("stream")
public Flux<String> handleStream() {
return Flux
.interval(Duration.ofSeconds(2))
// 随机生成
.map(i -> String.valueOf(new Random().nextInt(10000000)))
// 只在此通道中获取10个值
.take(10)
.doOnComplete(() -> {
System.out.println("completed...") ;
}) ;
}
//Channel模式
@MessageMapping("channel")
public Flux<String> handleChannel(Flux<String> datas) {
return datas.doOnNext(ret -> {
System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
}).map(ret -> {
return ret + " - " + new Random().nextInt(1000) ;
}) ;
}
//Fire-and-Forget模式
@MessageMapping("faf")
public Mono<Void> handleFireAndForget(Mono<String> data) {
return data.doOnNext(ret -> {
System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
}).then() ;
}
}
三、客户端测试代码
1、安装依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
2、新建配置类ClientConfiguration
package com.example.rsocketclient.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
@Configuration
public class ClientConfiguration {
@Bean
RSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {
RSocketStrategies strategies = RSocketStrategies.builder()
// .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
// .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
.decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 9898);
return requester;
}
}
3、测试代码
package com.example.rsocketclient.controller;
import jakarta.annotation.Resource;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Random;
@RestController
public class TestController {
@Resource
private RSocketRequester rsocketRequester;
//Request-Response模式
@GetMapping("/message/{body}")
// Request-Response 发送一条信息,接收一条信息。
public void sendMessage(@PathVariable("body") String body) {
this.rsocketRequester
.route("message")
.data(body)
.retrieveMono(String.class)
.subscribe(System.out::println) ;
}
//Request-Stream模式
@GetMapping("stream")
public void sendStream() {
this.rsocketRequester
.route("stream")
.retrieveFlux(String.class)
.subscribe(ret -> {
System.out.printf("%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
}) ;
}
@GetMapping("channel")
// Channel 双向发送消息流。
public void sendChannel() {
this.rsocketRequester
.route("channel")
.data(Flux.just("1", "2", "3", "4", "5", "6").delayElements(Duration.ofSeconds(1)))
.retrieveFlux(String.class)
.subscribe(ret -> {
System.out.printf("【client】%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
}) ;
}
@GetMapping("sendFireAndForget")
// Fire-and-Forget 发送单向消息。
public void sendFireAndForget() {
this.rsocketRequester
.route("faf")
.data(Mono.just(String.valueOf(new Random().nextInt(1000))))
.send()
.subscribe() ;
}
}