Spring Reactive
基本概念与介绍
相应式编程特性
底层基于数据缓冲队列、消息驱动模型、异步会掉机制。在编码上体现为链式调用,占用更少的资源,支持更大的吞吐量
- 实时性:系统尽可能及时相应
- 回弹性:系统出现失败时仍保持实时性、可用性(通过隔离、遏制等机制实现)
- 弹性:在系统负载发生变化后仍然保持实时性。(即能够自动应对伸缩容)
- 消息驱动:异步线程通过消息交互
与传统 SpringMVC 的不同
SpringMVC 基于阻塞式 IO 架构,简单说就是一个请求一个线程,使用大量的线程进行丹尼尔等待,通常需要额外的架构设计来支持高并发。
Sping WebFlux 基于非阻塞的网络框架,通过缓冲、消息、调度机制来发挥(压榨)多核优势,不再有线程数限制,让少量的线程一直忙,天生支持高并发。
Reactive-Stream Api
基本特性
Reactive-Stream(响应式流)是 JVM 面向 Stream 操作的规范,提供本地化的消息通信机制,具备如下特性:
- 允许无限数据
- 数据有序
- 节点组件在流上异步进行传递,即下一个节点的处理不依赖上一个节点在流内的处理状态,流水线操作
- 强制非阻塞,背压模式(Backpressure 即通过队列交互,使消费者的压力得到转移)
发布订阅模型
JDK9 引入了类 java.util.concurrent.Flow
,为相应式编程的基础,其中包含 4 个关键函数式接口:
Publisher
:发布者Subscriber
:订阅者Subscription
:订阅关系Processor
:处理器(订阅一个消息,处理并发布另一个消息,即订阅者+发布者)
基本用法如下:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class Main {
public static void main(String[] args) throws Exception {
// SubmissionPublisher 是发布者的一个简单默认实现,直接提交数据
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 订阅前发布的数据,因没有订阅者而丢失
for (int i = 0; i < 5; i++) {
publisher.submit("item " + i);
}
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 订阅时
System.out.println(Thread.currentThread() + " : Subscribed : " + subscription.hashCode());
this.subscription = subscription;
// 背压模式:必须请求一次,否则永远不会有数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 下一个元素到达时、接受到新数据时
System.out.println(Thread.currentThread() + " : Received : " + item);
if (item == 8) {
this.subscription.cancel();
return;
}
// 背压模式:请求才有下一个元素,不请求则不会有
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
// 接受到错误时
System.out.println(Thread.currentThread() + " : Error : " + throwable);
}
@Override
public void onComplete() {
// 在等待数据期间,接受到完成信号时
System.out.println(Thread.currentThread() + " : Completed");
}
};
// 订阅关系链
Flow.Processor<String, Integer> processor = new ItemNumberExtractor();
publisher.subscribe(processor);
processor.subscribe(subscriber);
// 订阅者只能拿到订阅后发布的数据
for (int i = 5; i < 10; i++) {
publisher.submit("item " + i);
}
publisher.close();
// 错误关闭,流本身不会产生,直接订阅者会接受到错误信号,所有数据均不会处理,即使逻辑上在发生错误前取消订阅
// publisher.closeExceptionally(new Exception());
// 阻塞主线程,否则主线程结束,子线程也会结束
Thread.sleep(20000L);
}
// 一个处理器同时实现发布者、订阅者接口
private static class ItemNumberExtractor extends SubmissionPublisher<Integer> implements Flow.Processor<String, Integer>, Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread() + " : Processor.Subscribed : " + subscription.hashCode());
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println(Thread.currentThread() + " : Processor.Received : " + item);
// 提取编号,向后传递
this.submit(Integer.parseInt(item.substring(5)));
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread() + " : Processor.Error : " + throwable);
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread() + " : Processor.Completed");
}
}
}
上面的实例在执行后会得到如下输出:
可以说明:
- 有序:数据有序到达一个节点内
- 全异步:在流上的组件,前后节点均为异步执行,下一个节点不依赖上一个节点处理完全部数据
- 非阻塞:发布者(主线程)、处理器(worker-2)、订阅者(worder-1)均为不同线程,由 JVM 底层提供支持(可以追踪源码验证)
- 背压:订阅者主动请求数据后,流内的数据才发挥作用
参考: reactive-stream
Reactor Api
Reactor 被设计用来实现高效的响应式系统
- 响应式核心:提供全异步的运算,直接受 Java Api 支持
- 提供0、1、N的数据序列:
Mono
提供 0、1个数据,Flux
提供多个数据,二者均属于Publisher
- 全面使用非阻塞 IO,包括网络 IO
参考:projectreactor.io - Reactor 官方文档
Flux
与 Mono
基本使用
二者均位于 reactor.core.publisher
类包下,属于 Publisher
,可以被订阅者订阅,允许使用 .just(...)
等方法简单创建一个流,使用时需要导入相关依赖:
Maven 方式:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.11</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Gradle 方式:
dependencies {
implementation platform('io.projectreactor:reactor-bom:2023.0.11')
implementation 'io.projectreactor:reactor-core'
}
针对流的链式调用,每一次都会产生一个新流,比如调用一次 .map(...)
方法,就会有一个新流出现。
事件感知 API
doOnXXX 方法,在发生什么情况时进行回调,如 doOnComplete
方法,会在流收到结束信号时调用对应回调方法(Hook,钩子函数)。有以下钩子函数可用:
doOnComplete
: 流完成时doOnCancel
: 流被取消时,手动取消或全部订阅者取消订阅doOnDiscard
: 元素被忽略时doOnEach
: 每一个元素、信号到达时调用(订阅、请求、取消等都有信号)doOnError
: 流出错时,流出错前的元素仍会正常处理doOnNext
: 得到新元素时,与request(num)
的num
参数无关,每个元素到达都会触发doOnRequest
: 被请求新元素时doOnSubscribe
: 流被订阅时doOnTerminate
: 流被异常、取消而中断时
同样的,订阅者也有相应的钩子函数,如 hookOnSubscribe
, hookOnNext
等,订阅者会提供 hookOnFinally
方法,无论在成功或异常时都会调用
onXxx 方法,发生某事件时调用,与仅进行通知、回调的 doOnXxx 不同,onXxx 方法允许修改元素、信号。比如 onErrorComplete
方法,会将流的 Error 信号转为 Complete 信号(流异常处理)。
此外,Reactor 为 Api 提供了详细的文档和弹珠图图示,可以直接在 IDE 中查看:
也可以通过官方文档查看。善用翻译辅助
流订阅
与前文一样,没有订阅的流审什么也不会做,比如 Flux.range(0,10).log()
不会有任何日志输出,添加订阅者,如 Flux.range(0,10).log().subscribe(...)
后才会有日志输出。允许使用空订阅者,同时接受 errorConsumer
, completeConsumer
。
buffer
buffer(size)
方法,可以为流指定一定尺寸的缓冲区,经缓冲区处理后,元素会在填满缓冲区后一并发送给消费者。比如如下代码:
Flux.range(1, 5) // Flux<Integer>
.buffer(3) // Flux<List<Integer>>
.subscribe(
item -> System.out.println("Received : " + item)
);
// 控制台输出:
// Received : [1, 2, 3]
// Received : [4, 5]
需要注意,reqest(n)
方法表示的是请求几次数据,而不是请求几个数据,在使用 buffer 的情况下,request(n)
表示请求 n * 缓冲区尺寸
个数据
limitRate
limitRate(size)
方法,为流指定预取策略,指定后,第一次会预取指定个元素,元素被消费 75% 后,会另取指定尺寸 75% 个元素,比如如下代码:
Flux.range(1, 10)
.log()
.limitRate(4)
.subscribe();
// 控制台输出:
// [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
// [ INFO] (main) | request(4)
// [ INFO] (main) | onNext(1)
// [ INFO] (main) | onNext(2)
// [ INFO] (main) | onNext(3)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(4)
// [ INFO] (main) | onNext(5)
// [ INFO] (main) | onNext(6)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(7)
// [ INFO] (main) | onNext(8)
// [ INFO] (main) | onNext(9)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(10)
// [ INFO] (main) | onComplete()
generate:同步环境创建流
generate
方法允许通过编程自定义生成一个流,通过状态变化、值与信号的传递来声明一个流,比如如下代码,将生成一个包含 0~4 的流:
private static void generateDemo() {
// 通过编程方式创建数据流
Flux.generate(
// 初始化状态,生成一个 AtomicInteger,也可以使用 `()->0`
AtomicInteger::new,
// 生成数据
(state, sink) -> {
if (state.get() < 0) {
// 传递错误信号
sink.error(new RuntimeException("State < 0"));
} else if (state.get() < 5) {
// 传递数据,允许抛出 RuntimeException
// 但在 generator 中,只允许调用 next 一次
sink.next(state);
} else {
// 传递 complete 信号,结束数据流
sink.complete();
}
// 返回新的状态
state.addAndGet(1);
return state;
}
)
.log()
.subscribe();
}
create:异步多线程环境创建流
create
方法的 emitter
参数提供一个 FluxSink
变量,在多线程环境下,可以使用该变量传递数据。
handle:允许使用自定义处理器
handle
方法可以用于自定义流的处理逻辑,与 map
方法重新映射、生成新的流不同,handle
方法用于消费元素,可以重新定义流,例如:
Flux.range(1, 10)
.map(i -> "a" + i * 2) // 映射为一个新的流
.handle((val, sink) -> {
if (val.length() < 3) {
sink.next(val); // 传递元素
} else {
sink.complete(); // 提前终止流
}
})
.log()
.subscribe();
线程和调度
响应式编程天生为异步,默认使用当前线程完成流操作;在发布者没有指定线程时,默认使用订阅者的线程(背压)
但也支持自定义线程的调度方式,允许通过 publishOn
, subscribeOn
方法指定发布、订阅的调度线程,调度器有如下方式:
Schedulers.immediate()
默认方式,无执行上下文,当前线程执行Schedulers.single()
使用固定的单线程执行Schedulers.boundedElastic
有界的、弹性调度,默认有 10*CPU核心数的线程池,keepAliveTime 为 60sSchedulers.newBoundedElastic
创建一个自定义参数的线程池Schedulers.fromExecutor
使用传入的线程池
常见整流操作
-
flatMap
允许将流内的一个元素拆分为多个,比如:.flatMap(a -> Flux.just(a + "1", a + "2"))
-
concatMap
允许将流内的一个元素拆分为多个,类型无限制 -
concat
直接连接流,Flux
的静态方法,比如:Flux.concat(Flux.just("A", "B", "C"), Flux.just("D", "E", "F"))
-
concatWith
连接另一个流,成员方法,比如:Flux.just("A", "B").concatWith(Flux.just("C", "D"))
-
merge
,mergeWith
: 按照流内元素的先后顺序合并两个流,与concat
不同,merge
不会把后一个流拼到前一个流后面 -
Flux.mergeSequential
: 静态方法,按照流第一个元素到达的顺序排队进行concat
-
defaultIfEmpty
方法,在流无值的情况下返回一个默认值,如:Mono.empty().defaultIfEmpty(8)
-
switchIfEmpty
方法,在流无值的情况下执行一个调用,如Mono.empty().switchIfEmpty(Mono.just(8))
-
zip
: 将两个流整合为一个元素元组(Tuple
)的流,每个元组从两个流(也允许使用多个流,最多8个流一起压缩)各自取一个元素,余下的孤立元素会被忽略,如:Flux.zip(Flux.just("A", "B", "C"), Flux.just(1, 2, 3), (a, b) -> a + b)
-
transform
类似map
操作,但每次订阅依赖的上下文中的变量不会共享,也称无状态转换(方法的回调函数始终执行一次) -
transformDefered
与transform
相反,依赖的上下文中的变量会共享,属于有状态转换(方法的回调函数每次订阅都会执行)
AtomicInteger outer = new AtomicInteger(0);
Flux<String> fluxForSubscribe = Flux.just("A", "B", "C")
.transformDeferred(flux -> {
AtomicInteger inner = new AtomicInteger(0);
System.out.println("Outer : " + outer.get());
System.out.println("Inner : " + inner.get());
return flux.map(item -> String.format("%s,%s,%s", item, inner.incrementAndGet(), outer.incrementAndGet()));
});
fluxForSubscribe.subscribe(v -> System.out.println("① Received : " + v));
fluxForSubscribe.subscribe(v -> System.out.println("② Received : " + v));
// 上述代码,直接执行会得到如下输出:
// Outer : 0
// Inner : 0
// ① Received : A,1,1
// ① Received : B,2,2
// ① Received : C,3,3
// Outer : 3
// Inner : 0
// ② Received : A,1,4
// ② Received : B,2,5
// ② Received : C,3,6
// 上述代码,transformDeferred 替换为 transform 后得到如下输出
// Outer : 0
// Inner : 0
// ① Received : A,1,1
// ① Received : B,2,2
// ① Received : C,3,3
// ② Received : A,4,4
// ② Received : B,5,5
// ② Received : C,6,6
错误处理
与传统命令式编程类似,响应式编程在异常处理上有着相似的处理办法,但流仍然会被结束,不会触发错误中断(onError
回调不会执行)
onErrorReturn(Exception.class, val)
: 异常时返回一个默认值onErrorResume(Function<Exception, Producer>)
: 异常时执行回调函数,支持如下处理:- 将异常处理为一个新的流,替代错误信号
- 根据异常内容动态计算、返回新的值
- 捕获并重新抛出新的异常(推荐使用
onErrorMap
方法完成这个处理)
doFinally
: 可以在流上进行调用,无论如何都执行的方法onErrorContinue(Exception, val)
: 在发生异常时继续处理,在回调方法中传递具体的异常和导致异常的上游元素onErrorComplete
: 把错误信号替换为结束信号,直接吞掉异常onErrorStop
: 错误后停止流,从源头终止,其他订阅者也会收到结束信号,比如流是通过internal
产生的情况,将被终止
补充
重试
支持针对流处理调用 retry
方法,在发生异常时取消并从头开始重新请求流。
Sinks 类
Sinks 在响应式编程中属于接受器、数据管道,所有数据都会经过 Sinks 向下传递
- 单播:
Sinks.many().unicast()
,只能绑定单个订阅者,有多个订阅者尝试绑定时会报错 - 多播:
Sinks.many().multicast()
,允许绑定多个订阅者 - 重放:
Sinks.many().reply()
,该管道支持重放元素,即是否支持给中途加入的订阅者发送完整的数据(从头消费),支持指定重返的元素个数。不适用重放时,订阅者默认从开始订阅的偏移量开始
缓存
允许对流进行缓存 cache(cnt)
,支持指定缓存队列的大小,与 replay
不同的是,cache 只关心最终的状态
阻塞式 API
针对流调用 block()
方法即可调用阻塞,通过阻塞式方式得到实际值
并发流
支持使用多线程并发进行流的处理,比如:Flux.range(1, 100).parallel(10).runOn(Schedulers.newParallel("Multi", 8))
表示将流分配到10个并行任务上处理,交给具有8线程的线程池执行
Context Api
ThreadLocal 在流的处理机制下会实效(非阻塞模型,线程不再固定),因此需要一个新的机制来保留上下文。
一个反直觉的事情是,Context 的传递机制下,上游能看到下游的 Context,比如:
Flux.just("A", "B", "C")
.transformDeferredContextual((flux, context) -> {
// 可以访问到下游设置的 value
System.out.println("Context : " + context.get("key"));
// 返回一个新流
return Flux.just(1, 2, 3);
})
.contextWrite(context -> context.put("key", "value"))
为什么会这样?与传统命令式编程一样,命令式编程的编程习惯是从请求写到数据(controller ▶️ service ▶️ dao),而响应式编程是从数据写到请求(dao(数据发布者) ▶️ service ▶️ controller)
WebFlux
WebFlux: 底层基于 netty + reactor + springweb 的全异步非阻塞式 web 框架
与传统 Servlet 方式的对照:
API 功能 | Servlet | WebFlux |
---|---|---|
前端处理器 | DispatcherServlet | DispatcherHandler |
处理器 | Controller | WebHandler/Controller |
请求、相应 | ServletRequest, ServletResponse | ServerWebExchange |
过滤器 | Filter(HttpFilter) | WebFilter |
异常处理器 | HandlerExceptionReslover | DispatchExceptionHandler |
web 配置 | @EnableWebMvc | @EnableWebFLux |
自定义配置 | WebMvcConfigurer | WebFluxConfigurer |
返回结果 | 任意 | Mono, Flux, 任意 |
发送 REST 请求 | RestTemplate | WebClient |
引入&使用
可以在 Spring 官网 找到对应的文档说明,如 3.2.11版本文档
参考 Spring 文档对应部分 引入相关依赖
参考 Reactive Web 开始 WebFlux 服务的编写
官方文档中,有如下的示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/users")
public class MyRestController {
private final UserRepository userRepository;
private final CustomerRepository customerRepository;
public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
this.userRepository = userRepository;
this.customerRepository = customerRepository;
}
@GetMapping("/{userId}")
public Mono<User> getUser(@PathVariable Long userId) {
return this.userRepository.findById(userId);
}
@GetMapping("/{userId}/customers")
public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
}
@DeleteMapping("/{userId}")
public Mono<Void> deleteUser(@PathVariable Long userId) {
return this.userRepository.deleteById(userId);
}
}
可见,与传统 Servlet 方式相比,除返回值外,没有太大区别。
Reactor Core
通过 Reactor 核心 API,可以自定义实现一个 Web 服务器,比如:
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
public class FluxMainApplication {
public static void main(String[] args) throws Exception {
// 一个简易的 Http server
// 1. 搞一个 HttpHandler,函数接口,接口要求返回一个 Mono<Void> 代表响应结束
HttpHandler handler = (ServerHttpRequest req, ServerHttpResponse res) -> {
System.out.println("Received Request : " + req.getURI());
// 创建响应数据的 BufferFactory
DataBufferFactory factory = res.bufferFactory();
// 直接生成一个 DataBuffer
DataBuffer wrap = factory.wrap("<h1>Hello, World!</h1>".getBytes());
// 写入响应,返回一个 Mono<Void>,直接返回代表结束
return res.writeWith(Mono.just(wrap));
};
// 2. 创建一个 HttpServer,监听端口,接受数据,发给 Handler
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
// 启动一个 netty 服务器
HttpServer.create().host("localhost").port(9876).handle(adapter).bindNow();
// 3. 保持进程
int ignore = System.in.read();
}
}
启动后,访问 http://localhost:9876/
将会得到 Hello World 的页面。
WebFlux 向下兼容 SpringMvc 的大部分注解和 API。在 WebFlux 下,可以按照如下的方式定义一个 Controller:
@RestController
public class DemoController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("<h1>Hello, World!</h1>");
}
}
在启动 SpringBootApplication 后,访问 http://localhost:8080/
将会得到 Hello World 页面。
在 WebFlux 下,可以完成服务端事件推送(ServerSendEvent),比如在 controller 中定义了如下路由:
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sse() {
return Flux
.just("Hello", "World", "!")
.delayElements(Duration.ofMillis(1000));
}
访问对应路由得到如图响应(每秒返回一个数据)
DispatcherHandler
DispatcherHandler 对应 SpringMVC 中,DispatcherServlet 的概念。
- HandlerMapping: 请求映射处理器,保存每个请求由哪个方法处理
- HandlerAdapter: 处理适配器,反射执行目标方法
- HandlerResultHandler: 处理器结果 的 处理器
SpringMVC 中,通过 DispatcherServlet
中的 doDispatch
方法处理全部请求
WebFlux 中,通过 DispatcherHandler
中的 handle
方法来处理全部请求,handler
方法中,有以下核心处理逻辑:
Flux.fromIterable(this.handlerMappings) // 拿 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange)) // 寻找能处理请求(exchange)的 Handler
.next() // 取一个
.switchIfEmpty(createNotFoundError()) // 取不到的处理
.onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
.flatMap(handler -> handleRequestWith(exchange, handler)); // 取到后执行处理
全局异常处理
与 SpringMVC 方式一样,比如:
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler
public String handleException(NullPointerException e) {
System.out.println("NPE occurred: " + e.getMessage());
return "NullPointerException occurred: " + e.getMessage();
}
}
方法参数
SpringMVC 方法可用的方式基本仍然可用,但仍存在不同
方法传参
Controller 方法参数 | 描述 |
---|---|
ServerWebExchange | 封装了请求(ServerHttpRequest )和响应(ServerHttpResponse ),可用于自定义处理请求响应 |
WebSession | 用于访问 session |
HttpMethod | org.springframework.http.HttpMethod,用于获取请求方式 |
Locale | 罕见,国际化 |
TimeZone + ZoneId | 罕见,时区 |
@PathVariable | 路径变量 |
@MatrixVariable | 矩阵变量 |
@RequestParam | 请求变量 |
@RequestHeader | 请求头 |
@CookieValue | Cookie |
@RequestBody | 请求体 |
HttpEntity<B> | 封装后的整个请求实体,可以获取请求体、请求头等数据 |
@RequestPart | 获取文件请求数据(multipart/form-data ) |
Map + Model + @ModelAttribute | 与 Model 交互传递数据使用的方式 |
Errors + BindingResult | 错误处理使用 |
@RequestAttribute | 请求域数据 |
其他基本类型 | 与标注了 @RequestParam 效果相同 |
其他对象类型 | 与标注了 @ModelAttribute 效果相同 |
返回值
Controller 方法返回值 | 描述 |
---|---|
@ResponseBody | 将数据直接传递出去,如果为对象则转为 json |
HttpEntity<B> , ResponseEntity<B> | 仍允许高度自定义响应实体 |
HttpHeaders | 仅返回响应头 |
ErrorResponse | 构建错误响应 |
ProblemDetail | Springboot3 使用 |
String | 与之前用法、规则一样,可以被视图解析器识别 |
View | 直接返回视图对象 |
Map , Model , @ModelAttribute | 与之前用法、规则一样,传递视图使用的数据 |
Rendering | 一种新式的视图对象 |
void | 仅返回响应完成信号 |
Flux<ServerSentEvent> , Observable<ServerSentEvent> , reactive type | 使用 text/event-stream 完成 SSE 请求 |
其他 | 当作给视图的数据 |
文件上传
与传统的 MultipartFile
对应,在 Reactive 中提供了 Filepart
用于接受文件上传,基于零拷贝提供非阻塞的文件处理。
自定义 WebFlux 配置
当使用 @EnableWebFlux
时,将会使用全自定义的 WebFlux,而不会使默认的 WebFluxAutoConfiguration
生效。
可以向容器注入 WebFluxConfigurer
类型的组件以实现自定义配置。比如在配置类中进行如下配置,允许 localhost
的跨域请求:
@Configuration
public class GlobalConfigurer {
@Bean
public WebFluxConfigurer corsConfigurer() {
return new WebFluxConfigurer() {
@Override
public void addCorsMappings(@NonNull CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("http://localhost");
}
};
}
}
Filter
WebFlux 模式下,过滤器必须实现 WebFilter
接口,与 SpringMvc 方式的 Filter 写法类似:
@Component
public class GlobalFilter implements WebFilter {
@Override
@NonNull
public Mono<Void> filter(@NonNull ServerWebExchange exchange, WebFilterChain chain) {
System.out.println("进入 Filter"); // 1. 进入执行
Mono<Void> filter = chain.filter(exchange)
.doFinally((signalType) -> System.out.println("离开 Filter")); // 3. 实际方法返回后执行
System.out.println("Filter 绑定结束"); // 2. filter 方法结束执行
return filter;
}
}
R2DBC
R2DBC: 用于进行响应式数据库操作的驱动,以替代 JDBC,主要区别为使用了响应式,基本用法与 JDBC 相同:引入驱动、获取连接、处理结果。
基本使用
比如如下例子,演示了如何直接从数据库读取数据(根据官网示例修改):
private static void simpleQueryTest() {
// 获取连接工厂
MariadbConnectionConfiguration configuration = MariadbConnectionConfiguration.builder()
.host("123.0.0.0")
.port(6603)
.username("root")
.password("*******")
.database("key3").build();
ConnectionFactory connectionFactory = MariadbConnectionFactory.from(configuration);
// create 方法返回一个连接的发布者,通过 Mono 包装为一个流,以提取发布者中的数据,用于流式处理
Flux<String> realNameFlux = Mono.from(connectionFactory.create())
// flatMany 方法将 Connection 处理为多个数据(查询到多条记录),形成一个新的流(Flux)
.flatMapMany(connection -> connection
// SQL 变量替换
.createStatement("SELECT * FROM users WHERE id = ?")
.bind(0, 6)
.execute())
.flatMap(result -> result
// 提取结果、转换
.map((row, rowMetadata) -> row.get("real_name", String.class)));
// 订阅流,触发流的执行
realNameFlux.subscribe(System.out::println);
}
Springboot 中使用
首先添加相关依赖:
// R2DBC
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
// R2DBC-MariaDB,数据库连接驱动,视实际 DB 使用情况调整
implementation("org.mariadb:r2dbc-mariadb:1.3.0")
Springboot 将通过下列配置类完成自动配置:
R2dbcAutoConfiguration
: 基础连接配置,包括连接参数、连接池等R2dbcDataAutoConfiguration
: 为容器注入数据操作的必要组件:R2DbcEntityTemplate
: 封装数据 CRUD 操作的模板,可以直接使用该组件执行基于Criteria
方式的查询,更复杂的情况,可以使用更底层的DatabaseClient
组件执行,但是需要手动对结果进行映射- 数据类型的映射器、转换器
R2dbcRepositoryAutoConfiguration
: 用于支持 spring data 声明式接口方式的 CRUD,允许在没有任何实现的情况下提供 CRUD 功能R2dbcTransactionManagerAutoConfiguration
: 提供事务管理
使用 R2dbcTemprate 功能特性时,需要在配置类上声明 @EnableR2dbcRepositories
以开启相应功能,完成后,按照下列方式声明实体对应的 Repository(过去使用 Spring data jpa 的方式),即可在业务中使用基本的 CRUD 方法:
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import plus.mori.reactive.model.Users;
@Repository
public interface UsersRepository extends R2dbcRepository<Users, Long> {
// 支持纯声明式的简单查询,不需要实现方法
Flux<Users> findByRealNameLikeAndIdLessThan(String realName, Long id);
// 支持指定 SQL
@Query("SELECT * FROM users WHERE real_name =:aname OR wx_name =:aname")
Flux<Users> findByAllName(@Param("aname") String allName);
}
需要联表查询的情况,可以通过自定义转换器实现或者手动查询关联,这里不做展开
spring security
与常规 web 开发一样,WebFlux 下,仍然遵循相同的 security 鉴权思想。在引入 security 并正确配置后,会通过自动配置完成下列内容(实现所有请求都需要登录才能访问):
SecurityAutoConfiguration
:为容器注入了 SecurityFilterChain,默认实现所有请求都必须登录才能请求,可以通过自定义 SecurityWebFilterChain 组件来实现自定义规则