Kehaw

基于Spring boot 进行 RSocket 的开发


RSocket 是一种应用程序协议,他提供了 Reactive Streams 语义实现,他可以替代 HTTP。

在本教程中,我们将使用Spring Boot研究RSocket,尤其是它如何帮助抽象出较底层的RSocket API。

依赖

显而易见,Spring Boot已经为我们准备好了 Starter,我们可以通过 spring-boot-starter-rsocket 开始我们的第一个 Spring RSocket 程序。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

与往常一样,Starter 会将所有的依赖包加载进来,例如rsocket-corersocket-transport-netty

1. 一个简单的程序

我们继续开发我们的示例应用程序。为了突出 RSocket 提供的交互模型,我们将创建一个交易应用程序。我们的交易应用程序将包含一个客户端和一个服务器。

先编写服务端程序

由于我们具有spring-boot-starter-rsocket依赖关系,因此Spring Boot为我们自动配置了RSocket服务的大多数配置。与Spring Boot一样,我们可以通过修改特定属性的方式来更改RSocket服务器的默认配置值。

例如,通过将以下配置添加到application.yml文件中,来更改RSocket服务器的端口:

spring:
  rsocket:
    server:
      port: 7000

我们还可以更改其他属性,以根据需要进一步修改服务器,如果是正版IDEA用户可以直接点击属性名进入到源码中查看都有哪些配置项,这里就不一一介绍了,详情参考这里

再写一个客户端程序

客户端与服务端类似,但是虽然 RSocket 完成了大多数的配置工作,我们依然需要手动的注入一些 Bean,这在以后的版本中应该会有所改善,当然手动注入 Bean 的优势在于可以更清晰的了解工作原理,我怀疑 Spring 是故意这么做的。

@Configuration
public class ClientConfiguration {
 
    @Bean
    public RSocket rSocket() {
        return RSocketFactory
          .connect()
          .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
          .frameDecoder(PayloadDecoder.ZERO_COPY)
          .transport(TcpClientTransport.create(7000))
          .start()
          .block();
    }
 
    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

在这里,我们创建了一个 RSocket 客户端,并将其配置成在7000端口上监听 TCP 传输,这根我们刚才的服务端配置是一样的。

接下来我们注入了一个 RSocketRequester bean,它是对 RSocket 的一个包装,这个 bean 将作用于我们与 RSocket 服务端交互的时候。

定义了这些 Bean 之后,我们的最简版 RSocket demo 结构就算完成了。

接下来,我们将探索不同的交互模型,并了解如何基于 Spring Boot 进行实际开发。

2.实现 Request/Response

Request/Response 是最基本、最常见的交互模型,我们通常使用的 HTTP 通信就是基于这个模型去做的。

在我们的交易程序中,我们将提供一个请求并相应股票数据的例子。

服务端

其实服务端的响应Controller与 Spring MVC 中的 Controller 几乎没有什么区别,最大的不同在于我们使用 @MessageMapping 来代替了之前的 @RequestMapping(或一大堆Get/Post/Delete/Put mapping)。

@Controller
public class MarketDataRSocketController {
 
    private final MarketDataRepository marketDataRepository;
 
    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }
 
    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        return marketDataRepository.getOne(marketDataRequest.getStock());
    }
}

除此之外,我们会发现,我们的响应对象也由传统的实体类变成了 Mono<T> 对象。

客户端

接下来,我们的RSocket客户就要开始询问股票的当前价格并得到一个Response了。

要发起RSocket请求,我们应该使用RSocketRequester类:

@RestController
public class MarketDataRestController {
 
    private final RSocketRequester rSocketRequester;
 
    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }
 
    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        return rSocketRequester
          .route("currentMarketData")
          .data(new MarketDataRequest(stock))
          .retrieveMono(MarketData.class);
    }
}

我们客户端提供了了一个传统的 HTTP Controller 来便于我们通过浏览器实验。而在方法内部,我们使用了 RSocket 去与服务提供方进行交互。

在这里我们可以注意到,route 是用来与路由进行交互,告诉服务方即将调用哪个服务(与普通的 Controller 类似),通过 data 来传递数据,最后用 retrieveMono 方法来获取数据。

似曾相识对吧,跟一个 JavaScript 的 Ajax 库 Axios 很像对吧?

运行你的程序。

3. 实现甩手掌柜模式(Fire and forget)

顾名思义就是客户端发起一个请求,但是不关心服务方是否响应。

在我们的交易应用程序中,一些客户端将充当数据源,并将市场数据推送到服务器。

服务端

让我们创建另外一个 endpoint:

@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
    marketDataRepository.add(marketData);
    return Mono.empty();
}

与 Request/Response 最大的不同在于,我们这里使用 Mono<Void> 来响应,代表着我们不打算通知客户端。

客户端

在客户端,我们来看一下如何充当一个甩手掌柜:

@GetMapping(value = "/collect")
public Publisher<Void> collect() {
    return rSocketRequester
      .route("collectMarketData")
      .data(getMarketData())
      .send();
}

简单对吧,只是 send() 就Okay了,非常的简单明了。

4. 请求流(Stream)

好了,我们知道了一个不怎么常见的交互模型 —— Request Stream。

简单而言,就是客户端发起一个请求之后,服务端将会不停的发送数据流,这个就很有意思了,在实际的底层协议中,用服务器端在收到一个请求之后,会发起多次响应来描述这个交互模式最合适。

服务端

我们又新增了一个endpoint在服务端:

@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
    return marketDataRepository.getAll(marketDataRequest.getStock());
}

注意,这里与上面不同的点在于,我们使用了Flux<T> 来替代了 Mono<T>

客户端

其实客户端肯定需要相应的改变接收数据的方法:

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
    return rSocketRequester
      .route("feedMarketData")
      .data(new MarketDataRequest(stock))
      .retrieveFlux(MarketData.class);
}

我们这里就使用了 retrieveFlux() 来接受数据。同时,我们的测试接口中,用 MediaType.TEXT_EVENT_STREAM_VALUE来返回数据给浏览器,主要用作测试。

5. 异常捕获

我们通过 @MessageExceptionHandler 注解来声明式的捕获服务异常:

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
    return Mono.just(MarketData.fromException(e));
}

在这里我们举了个简单的例子,用 Mono<T> 来返回异常数据,而在其他交互模式中进行相应的变动即可。

Kehaw

👨‍💻Ke Haw 🇨🇳👨‍👩‍👧‍👦

风吹云散去,夜色好观星
Java | 前端 | 大数据

专注于 Spring Cloud 微服务架构与数据处理,研究一切与Java相关的开发技术,包括一部分前端技术。

目前的工作主要是关于B2B大宗商品在线交易领域的数据处理。如果对本站的部分内容感兴趣,请通过邮件、Twitter联系我🤝。

Fork me on Gitee
基于Spring Security + OAuth2 + JWT 的权限认证(一) Java-Stream学习第四步:数据处理 Java-Stream学习第三步:终端操作 Java-Stream学习第二步:处理流 Java-Stream学习第一步:创建流 Electron使用串口通信 Electron下调用DLL文件 国外SaaS服务供应商都是干什么的:Part1 为什么Kafka会丢失消息 Spring Boot中使用JSR380验证框架
Description lists
Kehaw's blog
Site description
人初做事,如鸡伏卵,不舍而生气渐充;如燕营巢,不息而结构渐牢;如滋培之木,不见其长,有时而大;如有本之泉,不舍昼夜,盈科而后进,放乎四海。
Copyright
© 2014 Copyright Kehaw | All rights reserved.