Spring Reactive
基本概念与介绍
相应式编程特性
底层基于数据缓冲队列、消息驱动模型、异步会掉机制。在编码上体现为链式调用,占用更少的资源,支持更大的吞吐量
- 实时性:系统尽可能及时相应
 - 回弹性:系统出现失败时仍保持实时性、可用性(通过隔离、遏制等机制实现)
 - 弹性:在系统负载发生变化后仍然保持实时性。(即能够自动应对伸缩容)
 - 消息驱动:异步线程通过消息交互
 
与传统 SpringMVC 的不同
SpringMVC 基于阻塞式 IO 架构,简单说就是一个请求一个线程,使用大量的线程进行丹尼尔等待,通常需要额外的架构设计来支持高并发。
Sping WebFlux 基于非阻塞的网络框架,通过缓冲、消息、调度机制来发挥(压榨)多核优势,不再有线程数限制,让少量的线程一直忙,天生支持高并发。
Reactive-Stream Api
基本特性
Reactive-Stream(响应式流)是 JVM 面向 Stream 操作的规范,提供本地化的消息通信机制,具备如下特性:
- 允许无限数据
 - 数据有序
 - 节点组件在流上异步进行传递,即下一个节点的处理不依赖上一个节点在流内的处理状态,流水线操作
 - 强制非阻塞,背压模式(Backpressure 即通过队列交互,使消费者的压力得到转移)
 
发布订阅模型
JDK9 引入了类 java.util.concurrent.Flow,为相应式编程的基础,其中包含 4 个关键函数式接口:
Publisher:发布者Subscriber:订阅者Subscription:订阅关系Processor:处理器(订阅一个消息,处理并发布另一个消息,即订阅者+发布者)

基本用法如下:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class Main {
    public static void main(String[] args) throws Exception {
        // SubmissionPublisher 是发布者的一个简单默认实现,直接提交数据
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 订阅前发布的数据,因没有订阅者而丢失
        for (int i = 0; i < 5; i++) {
            publisher.submit("item " + i);
        }
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 订阅时
                System.out.println(Thread.currentThread() + " : Subscribed : " + subscription.hashCode());
                this.subscription = subscription;
                // 背压模式:必须请求一次,否则永远不会有数据
                this.subscription.request(1);
            }
            @Override
            public void onNext(Integer item) {
                // 下一个元素到达时、接受到新数据时
                System.out.println(Thread.currentThread() + " : Received : " + item);
                if (item == 8) {
                    this.subscription.cancel();
                    return;
                }
                // 背压模式:请求才有下一个元素,不请求则不会有
                this.subscription.request(1);
            }
            @Override
            public void onError(Throwable throwable) {
                // 接受到错误时
                System.out.println(Thread.currentThread() + " : Error : " + throwable);
            }
            @Override
            public void onComplete() {
                // 在等待数据期间,接受到完成信号时
                System.out.println(Thread.currentThread() + " : Completed");
            }
        };
        // 订阅关系链
        Flow.Processor<String, Integer> processor = new ItemNumberExtractor();
        publisher.subscribe(processor);
        processor.subscribe(subscriber);
        // 订阅者只能拿到订阅后发布的数据
        for (int i = 5; i < 10; i++) {
            publisher.submit("item " + i);
        }
        publisher.close();
        // 错误关闭,流本身不会产生,直接订阅者会接受到错误信号,所有数据均不会处理,即使逻辑上在发生错误前取消订阅
        // publisher.closeExceptionally(new Exception());
        // 阻塞主线程,否则主线程结束,子线程也会结束
        Thread.sleep(20000L);
    }
    
    // 一个处理器同时实现发布者、订阅者接口
    private static class ItemNumberExtractor extends SubmissionPublisher<Integer> implements Flow.Processor<String, Integer>, Flow.Subscriber<String> {
        private Flow.Subscription subscription;
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(Thread.currentThread() + " : Processor.Subscribed : " + subscription.hashCode());
            this.subscription = subscription;
            this.subscription.request(1);
        }
        @Override
        public void onNext(String item) {
            System.out.println(Thread.currentThread() + " : Processor.Received : " + item);
            // 提取编号,向后传递
            this.submit(Integer.parseInt(item.substring(5)));
            this.subscription.request(1);
        }
        @Override
        public void onError(Throwable throwable) {
            System.out.println(Thread.currentThread() + " : Processor.Error : " + throwable);
        }
        @Override
        public void onComplete() {
            System.out.println(Thread.currentThread() + " : Processor.Completed");
        }
    }
}
上面的实例在执行后会得到如下输出:

可以说明:
- 有序:数据有序到达一个节点内
 - 全异步:在流上的组件,前后节点均为异步执行,下一个节点不依赖上一个节点处理完全部数据
 - 非阻塞:发布者(主线程)、处理器(worker-2)、订阅者(worder-1)均为不同线程,由 JVM 底层提供支持(可以追踪源码验证)
 - 背压:订阅者主动请求数据后,流内的数据才发挥作用
 
参考: reactive-stream
Reactor Api
Reactor 被设计用来实现高效的响应式系统
- 响应式核心:提供全异步的运算,直接受 Java Api 支持
 - 提供0、1、N的数据序列:
Mono提供 0、1个数据,Flux提供多个数据,二者均属于Publisher - 全面使用非阻塞 IO,包括网络 IO
 
参考:projectreactor.io - Reactor 官方文档
Flux 与 Mono 基本使用
二者均位于 reactor.core.publisher 类包下,属于 Publisher,可以被订阅者订阅,允许使用 .just(...) 等方法简单创建一个流,使用时需要导入相关依赖:
Maven 方式:
<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.11</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>
Gradle 方式:
dependencies {
     implementation platform('io.projectreactor:reactor-bom:2023.0.11')
     implementation 'io.projectreactor:reactor-core' 
}
针对流的链式调用,每一次都会产生一个新流,比如调用一次 .map(...) 方法,就会有一个新流出现。
事件感知 API
doOnXXX 方法,在发生什么情况时进行回调,如 doOnComplete 方法,会在流收到结束信号时调用对应回调方法(Hook,钩子函数)。有以下钩子函数可用:
doOnComplete: 流完成时doOnCancel: 流被取消时,手动取消或全部订阅者取消订阅doOnDiscard: 元素被忽略时doOnEach: 每一个元素、信号到达时调用(订阅、请求、取消等都有信号)doOnError: 流出错时,流出错前的元素仍会正常处理doOnNext: 得到新元素时,与request(num)的num参数无关,每个元素到达都会触发doOnRequest: 被请求新元素时doOnSubscribe: 流被订阅时doOnTerminate: 流被异常、取消而中断时
同样的,订阅者也有相应的钩子函数,如 hookOnSubscribe, hookOnNext 等,订阅者会提供 hookOnFinally 方法,无论在成功或异常时都会调用
onXxx 方法,发生某事件时调用,与仅进行通知、回调的 doOnXxx 不同,onXxx 方法允许修改元素、信号。比如 onErrorComplete 方法,会将流的 Error 信号转为 Complete 信号(流异常处理)。
此外,Reactor 为 Api 提供了详细的文档和弹珠图图示,可以直接在 IDE 中查看:

也可以通过官方文档查看。善用翻译辅助
流订阅
与前文一样,没有订阅的流审什么也不会做,比如 Flux.range(0,10).log() 不会有任何日志输出,添加订阅者,如 Flux.range(0,10).log().subscribe(...) 后才会有日志输出。允许使用空订阅者,同时接受 errorConsumer, completeConsumer。
buffer
buffer(size) 方法,可以为流指定一定尺寸的缓冲区,经缓冲区处理后,元素会在填满缓冲区后一并发送给消费者。比如如下代码:
Flux.range(1, 5) // Flux<Integer>
    .buffer(3)  // Flux<List<Integer>>
    .subscribe(
        item -> System.out.println("Received : " + item)
    );
// 控制台输出:
// Received : [1, 2, 3]
// Received : [4, 5]
需要注意,reqest(n) 方法表示的是请求几次数据,而不是请求几个数据,在使用 buffer 的情况下,request(n) 表示请求 n * 缓冲区尺寸 个数据
limitRate
limitRate(size) 方法,为流指定预取策略,指定后,第一次会预取指定个元素,元素被消费 75% 后,会另取指定尺寸 75% 个元素,比如如下代码:
Flux.range(1, 10)
    .log()
    .limitRate(4)
    .subscribe();
// 控制台输出:
// [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
// [ INFO] (main) | request(4)
// [ INFO] (main) | onNext(1)
// [ INFO] (main) | onNext(2)
// [ INFO] (main) | onNext(3)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(4)
// [ INFO] (main) | onNext(5)
// [ INFO] (main) | onNext(6)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(7)
// [ INFO] (main) | onNext(8)
// [ INFO] (main) | onNext(9)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(10)
// [ INFO] (main) | onComplete()
generate:同步环境创建流
generate 方法允许通过编程自定义生成一个流,通过状态变化、值与信号的传递来声明一个流,比如如下代码,将生成一个包含 0~4 的流:
    private static void generateDemo() {
        // 通过编程方式创建数据流
        Flux.generate(
                        // 初始化状态,生成一个 AtomicInteger,也可以使用 `()->0`
                        AtomicInteger::new,
                        // 生成数据
                        (state, sink) -> {
                            if (state.get() < 0) {
                                // 传递错误信号
                                sink.error(new RuntimeException("State < 0"));
                            } else if (state.get() < 5) {
                                // 传递数据,允许抛出 RuntimeException
                                // 但在 generator 中,只允许调用 next 一次
                                sink.next(state);
                            } else {
                                // 传递 complete 信号,结束数据流
                                sink.complete();
                            }
                            // 返回新的状态
                            state.addAndGet(1);
                            return state;
                        }
                )
                .log()
                .subscribe();
    }
create:异步多线程环境创建流
create 方法的 emitter 参数提供一个  FluxSink 变量,在多线程环境下,可以使用该变量传递数据。
handle:允许使用自定义处理器
handle 方法可以用于自定义流的处理逻辑,与 map 方法重新映射、生成新的流不同,handle 方法用于消费元素,可以重新定义流,例如:
        Flux.range(1, 10)
                .map(i -> "a" + i * 2)  // 映射为一个新的流
                .handle((val, sink) -> {
                    if (val.length() < 3) {
                        sink.next(val);  //  传递元素
                    } else {
                        sink.complete();  // 提前终止流
                    }
                })
                .log()
                .subscribe();
线程和调度
响应式编程天生为异步,默认使用当前线程完成流操作;在发布者没有指定线程时,默认使用订阅者的线程(背压)
但也支持自定义线程的调度方式,允许通过 publishOn, subscribeOn 方法指定发布、订阅的调度线程,调度器有如下方式:
Schedulers.immediate()默认方式,无执行上下文,当前线程执行Schedulers.single()使用固定的单线程执行Schedulers.boundedElastic有界的、弹性调度,默认有 10*CPU核心数的线程池,keepAliveTime 为 60sSchedulers.newBoundedElastic创建一个自定义参数的线程池Schedulers.fromExecutor使用传入的线程池
常见整流操作
- 
flatMap允许将流内的一个元素拆分为多个,比如:.flatMap(a -> Flux.just(a + "1", a + "2")) - 
concatMap允许将流内的一个元素拆分为多个,类型无限制 - 
concat直接连接流,Flux的静态方法,比如:Flux.concat(Flux.just("A", "B", "C"), Flux.just("D", "E", "F")) - 
concatWith连接另一个流,成员方法,比如:Flux.just("A", "B").concatWith(Flux.just("C", "D")) - 
merge,mergeWith: 按照流内元素的先后顺序合并两个流,与concat不同,merge不会把后一个流拼到前一个流后面 - 
Flux.mergeSequential: 静态方法,按照流第一个元素到达的顺序排队进行concat - 
defaultIfEmpty方法,在流无值的情况下返回一个默认值,如:Mono.empty().defaultIfEmpty(8) - 
switchIfEmpty方法,在流无值的情况下执行一个调用,如Mono.empty().switchIfEmpty(Mono.just(8)) - 
zip: 将两个流整合为一个元素元组(Tuple)的流,每个元组从两个流(也允许使用多个流,最多8个流一起压缩)各自取一个元素,余下的孤立元素会被忽略,如:Flux.zip(Flux.just("A", "B", "C"), Flux.just(1, 2, 3), (a, b) -> a + b) - 
transform类似map操作,但每次订阅依赖的上下文中的变量不会共享,也称无状态转换(方法的回调函数始终执行一次) - 
transformDefered与transform相反,依赖的上下文中的变量会共享,属于有状态转换(方法的回调函数每次订阅都会执行) 
        AtomicInteger outer = new AtomicInteger(0);
        Flux<String> fluxForSubscribe = Flux.just("A", "B", "C")
                .transformDeferred(flux -> {
                    AtomicInteger inner = new AtomicInteger(0);
                    System.out.println("Outer : " + outer.get());
                    System.out.println("Inner : " + inner.get());
                    return flux.map(item -> String.format("%s,%s,%s", item, inner.incrementAndGet(), outer.incrementAndGet()));
                });
        fluxForSubscribe.subscribe(v -> System.out.println("① Received : " + v));
        fluxForSubscribe.subscribe(v -> System.out.println("② Received : " + v));
        // 上述代码,直接执行会得到如下输出:
        // Outer : 0
        // Inner : 0
        // ① Received : A,1,1
        // ① Received : B,2,2
        // ① Received : C,3,3
        // Outer : 3
        // Inner : 0
        // ② Received : A,1,4
        // ② Received : B,2,5
        // ② Received : C,3,6
        // 上述代码,transformDeferred 替换为 transform 后得到如下输出
        // Outer : 0
        // Inner : 0
        // ① Received : A,1,1
        // ① Received : B,2,2
        // ① Received : C,3,3
        // ② Received : A,4,4
        // ② Received : B,5,5
        // ② Received : C,6,6
错误处理
与传统命令式编程类似,响应式编程在异常处理上有着相似的处理办法,但流仍然会被结束,不会触发错误中断(onError 回调不会执行)
onErrorReturn(Exception.class, val): 异常时返回一个默认值onErrorResume(Function<Exception, Producer>): 异常时执行回调函数,支持如下处理:- 将异常处理为一个新的流,替代错误信号
 - 根据异常内容动态计算、返回新的值
 - 捕获并重新抛出新的异常(推荐使用 
onErrorMap方法完成这个处理) 
doFinally: 可以在流上进行调用,无论如何都执行的方法onErrorContinue(Exception, val): 在发生异常时继续处理,在回调方法中传递具体的异常和导致异常的上游元素onErrorComplete: 把错误信号替换为结束信号,直接吞掉异常onErrorStop: 错误后停止流,从源头终止,其他订阅者也会收到结束信号,比如流是通过internal产生的情况,将被终止
补充
重试
支持针对流处理调用 retry 方法,在发生异常时取消并从头开始重新请求流。
Sinks 类
Sinks 在响应式编程中属于接受器、数据管道,所有数据都会经过 Sinks 向下传递
- 单播:
Sinks.many().unicast(),只能绑定单个订阅者,有多个订阅者尝试绑定时会报错 - 多播:
Sinks.many().multicast(),允许绑定多个订阅者 - 重放:
Sinks.many().reply(),该管道支持重放元素,即是否支持给中途加入的订阅者发送完整的数据(从头消费),支持指定重返的元素个数。不适用重放时,订阅者默认从开始订阅的偏移量开始 
缓存
允许对流进行缓存 cache(cnt),支持指定缓存队列的大小,与 replay 不同的是,cache 只关心最终的状态
阻塞式 API
针对流调用 block() 方法即可调用阻塞,通过阻塞式方式得到实际值
并发流
支持使用多线程并发进行流的处理,比如:Flux.range(1, 100).parallel(10).runOn(Schedulers.newParallel("Multi", 8)) 表示将流分配到10个并行任务上处理,交给具有8线程的线程池执行
Context Api
ThreadLocal 在流的处理机制下会实效(非阻塞模型,线程不再固定),因此需要一个新的机制来保留上下文。
一个反直觉的事情是,Context 的传递机制下,上游能看到下游的 Context,比如:
        Flux.just("A", "B", "C")
                .transformDeferredContextual((flux, context) -> {
                    // 可以访问到下游设置的 value
                    System.out.println("Context : " + context.get("key"));
                    // 返回一个新流
                    return Flux.just(1, 2, 3);
                })
                .contextWrite(context -> context.put("key", "value"))
为什么会这样?与传统命令式编程一样,命令式编程的编程习惯是从请求写到数据(controller ▶️ service ▶️ dao),而响应式编程是从数据写到请求(dao(数据发布者) ▶️ service ▶️ controller)
WebFlux
WebFlux: 底层基于 netty + reactor + springweb 的全异步非阻塞式 web 框架
与传统 Servlet 方式的对照:
| API 功能 | Servlet | WebFlux | 
|---|---|---|
| 前端处理器 | DispatcherServlet | DispatcherHandler | 
| 处理器 | Controller | WebHandler/Controller | 
| 请求、相应 | ServletRequest, ServletResponse | ServerWebExchange | 
| 过滤器 | Filter(HttpFilter) | WebFilter | 
| 异常处理器 | HandlerExceptionReslover | DispatchExceptionHandler | 
| web 配置 | @EnableWebMvc | @EnableWebFLux | 
| 自定义配置 | WebMvcConfigurer | WebFluxConfigurer | 
| 返回结果 | 任意 | Mono, Flux, 任意 | 
| 发送 REST 请求 | RestTemplate | WebClient | 

引入&使用
可以在 Spring 官网 找到对应的文档说明,如 3.2.11版本文档
参考 Spring 文档对应部分 引入相关依赖
参考 Reactive Web 开始 WebFlux 服务的编写
官方文档中,有如下的示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/users")
public class MyRestController {
    private final UserRepository userRepository;
    private final CustomerRepository customerRepository;
    public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
        this.userRepository = userRepository;
        this.customerRepository = customerRepository;
    }
    @GetMapping("/{userId}")
    public Mono<User> getUser(@PathVariable Long userId) {
        return this.userRepository.findById(userId);
    }
    @GetMapping("/{userId}/customers")
    public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
        return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
    }
    @DeleteMapping("/{userId}")
    public Mono<Void> deleteUser(@PathVariable Long userId) {
        return this.userRepository.deleteById(userId);
    }
}
可见,与传统 Servlet 方式相比,除返回值外,没有太大区别。
Reactor Core
通过 Reactor 核心 API,可以自定义实现一个 Web 服务器,比如:
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
public class FluxMainApplication {
    public static void main(String[] args) throws Exception {
        // 一个简易的 Http server
        // 1. 搞一个 HttpHandler,函数接口,接口要求返回一个 Mono<Void> 代表响应结束
        HttpHandler handler = (ServerHttpRequest req, ServerHttpResponse res) -> {
            System.out.println("Received Request : " + req.getURI());
            // 创建响应数据的 BufferFactory
            DataBufferFactory factory = res.bufferFactory();
            // 直接生成一个 DataBuffer
            DataBuffer wrap = factory.wrap("<h1>Hello, World!</h1>".getBytes());
            // 写入响应,返回一个 Mono<Void>,直接返回代表结束
            return res.writeWith(Mono.just(wrap));
        };
        // 2. 创建一个 HttpServer,监听端口,接受数据,发给 Handler
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
        // 启动一个 netty 服务器
        HttpServer.create().host("localhost").port(9876).handle(adapter).bindNow();
        // 3. 保持进程
        int ignore = System.in.read();
    }
}
启动后,访问 http://localhost:9876/ 将会得到 Hello World 的页面。
WebFlux 向下兼容 SpringMvc 的大部分注解和 API。在 WebFlux 下,可以按照如下的方式定义一个 Controller:
@RestController
public class DemoController {
    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("<h1>Hello, World!</h1>");
    }
}
在启动 SpringBootApplication 后,访问 http://localhost:8080/ 将会得到 Hello World 页面。
在 WebFlux 下,可以完成服务端事件推送(ServerSendEvent),比如在 controller 中定义了如下路由:
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> sse() {
        return Flux
                .just("Hello", "World", "!")
                .delayElements(Duration.ofMillis(1000));
    }
访问对应路由得到如图响应(每秒返回一个数据)

DispatcherHandler
DispatcherHandler 对应 SpringMVC 中,DispatcherServlet 的概念。
- HandlerMapping: 请求映射处理器,保存每个请求由哪个方法处理
 - HandlerAdapter: 处理适配器,反射执行目标方法
 - HandlerResultHandler: 处理器结果 的 处理器
 
SpringMVC 中,通过 DispatcherServlet 中的 doDispatch 方法处理全部请求
WebFlux 中,通过 DispatcherHandler 中的 handle 方法来处理全部请求,handler 方法中,有以下核心处理逻辑:
Flux.fromIterable(this.handlerMappings)  // 拿 handlerMappings
    .concatMap(mapping -> mapping.getHandler(exchange))  // 寻找能处理请求(exchange)的 Handler
    .next()  // 取一个
    .switchIfEmpty(createNotFoundError())  // 取不到的处理
    .onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
    .flatMap(handler -> handleRequestWith(exchange, handler));  // 取到后执行处理
全局异常处理
与 SpringMVC 方式一样,比如:
@RestControllerAdvice
public class GlobalExceptionHandler {
    @ExceptionHandler
    public String handleException(NullPointerException e) {
        System.out.println("NPE occurred: " + e.getMessage());
        return "NullPointerException occurred: " + e.getMessage();
    }
}
方法参数
SpringMVC 方法可用的方式基本仍然可用,但仍存在不同
方法传参
| Controller 方法参数 | 描述 | 
|---|---|
ServerWebExchange | 封装了请求(ServerHttpRequest)和响应(ServerHttpResponse),可用于自定义处理请求响应 | 
WebSession | 用于访问 session | 
HttpMethod | org.springframework.http.HttpMethod,用于获取请求方式 | 
Locale | 罕见,国际化 | 
TimeZone + ZoneId | 罕见,时区 | 
@PathVariable | 路径变量 | 
@MatrixVariable | 矩阵变量 | 
@RequestParam | 请求变量 | 
@RequestHeader | 请求头 | 
@CookieValue | Cookie | 
@RequestBody | 请求体 | 
HttpEntity<B> | 封装后的整个请求实体,可以获取请求体、请求头等数据 | 
@RequestPart | 获取文件请求数据(multipart/form-data) | 
Map + Model + @ModelAttribute | 与 Model 交互传递数据使用的方式 | 
Errors + BindingResult | 错误处理使用 | 
@RequestAttribute | 请求域数据 | 
| 其他基本类型 | 与标注了 @RequestParam 效果相同 | 
| 其他对象类型 | 与标注了 @ModelAttribute 效果相同 | 
返回值
| Controller 方法返回值 | 描述 | 
|---|---|
@ResponseBody | 将数据直接传递出去,如果为对象则转为 json | 
HttpEntity<B>, ResponseEntity<B> | 仍允许高度自定义响应实体 | 
HttpHeaders | 仅返回响应头 | 
ErrorResponse | 构建错误响应 | 
ProblemDetail | Springboot3 使用 | 
String | 与之前用法、规则一样,可以被视图解析器识别 | 
View | 直接返回视图对象 | 
Map, Model, @ModelAttribute | 与之前用法、规则一样,传递视图使用的数据 | 
Rendering | 一种新式的视图对象 | 
void | 仅返回响应完成信号 | 
Flux<ServerSentEvent>, Observable<ServerSentEvent>, reactive type | 使用 text/event-stream 完成 SSE 请求 | 
| 其他 | 当作给视图的数据 | 
文件上传
与传统的 MultipartFile 对应,在 Reactive 中提供了 Filepart 用于接受文件上传,基于零拷贝提供非阻塞的文件处理。
自定义 WebFlux 配置
当使用 @EnableWebFlux 时,将会使用全自定义的 WebFlux,而不会使默认的 WebFluxAutoConfiguration 生效。
可以向容器注入 WebFluxConfigurer 类型的组件以实现自定义配置。比如在配置类中进行如下配置,允许 localhost 的跨域请求:
@Configuration
public class GlobalConfigurer {
    @Bean
    public WebFluxConfigurer corsConfigurer() {
        return new WebFluxConfigurer() {
            @Override
            public void addCorsMappings(@NonNull CorsRegistry registry) {
                registry.addMapping("/**")
                        .allowedOrigins("http://localhost");
            }
        };
    }
}
Filter
WebFlux 模式下,过滤器必须实现 WebFilter 接口,与 SpringMvc 方式的 Filter 写法类似:
@Component
public class GlobalFilter implements WebFilter {
    @Override
    @NonNull
    public Mono<Void> filter(@NonNull ServerWebExchange exchange, WebFilterChain chain) {
        System.out.println("进入 Filter");  // 1. 进入执行
        Mono<Void> filter = chain.filter(exchange)
                .doFinally((signalType) -> System.out.println("离开 Filter"));  // 3. 实际方法返回后执行
        System.out.println("Filter 绑定结束");  // 2. filter 方法结束执行
        return filter;
    }
}
R2DBC
R2DBC: 用于进行响应式数据库操作的驱动,以替代 JDBC,主要区别为使用了响应式,基本用法与 JDBC 相同:引入驱动、获取连接、处理结果。
基本使用
比如如下例子,演示了如何直接从数据库读取数据(根据官网示例修改):
    private static void simpleQueryTest() {
        // 获取连接工厂
        MariadbConnectionConfiguration configuration = MariadbConnectionConfiguration.builder()
                .host("123.0.0.0")
                .port(6603)
                .username("root")
                .password("*******")
                .database("key3").build();
        ConnectionFactory connectionFactory = MariadbConnectionFactory.from(configuration);
        // create 方法返回一个连接的发布者,通过 Mono 包装为一个流,以提取发布者中的数据,用于流式处理
        Flux<String> realNameFlux = Mono.from(connectionFactory.create())
                // flatMany 方法将 Connection 处理为多个数据(查询到多条记录),形成一个新的流(Flux)
                .flatMapMany(connection -> connection
                        // SQL 变量替换
                        .createStatement("SELECT * FROM users WHERE id = ?")
                        .bind(0, 6)
                        .execute())
                .flatMap(result -> result
                        // 提取结果、转换
                        .map((row, rowMetadata) -> row.get("real_name", String.class)));
        // 订阅流,触发流的执行
        realNameFlux.subscribe(System.out::println);
    }
Springboot 中使用
首先添加相关依赖:
// R2DBC
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
// R2DBC-MariaDB,数据库连接驱动,视实际 DB 使用情况调整
implementation("org.mariadb:r2dbc-mariadb:1.3.0")
Springboot 将通过下列配置类完成自动配置:
R2dbcAutoConfiguration: 基础连接配置,包括连接参数、连接池等R2dbcDataAutoConfiguration: 为容器注入数据操作的必要组件:R2DbcEntityTemplate: 封装数据 CRUD 操作的模板,可以直接使用该组件执行基于Criteria方式的查询,更复杂的情况,可以使用更底层的DatabaseClient组件执行,但是需要手动对结果进行映射- 数据类型的映射器、转换器
 
R2dbcRepositoryAutoConfiguration: 用于支持 spring data 声明式接口方式的 CRUD,允许在没有任何实现的情况下提供 CRUD 功能R2dbcTransactionManagerAutoConfiguration: 提供事务管理
使用 R2dbcTemprate 功能特性时,需要在配置类上声明 @EnableR2dbcRepositories 以开启相应功能,完成后,按照下列方式声明实体对应的 Repository(过去使用 Spring data jpa 的方式),即可在业务中使用基本的 CRUD 方法:
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import plus.mori.reactive.model.Users;
@Repository
public interface UsersRepository extends R2dbcRepository<Users, Long> {
    // 支持纯声明式的简单查询,不需要实现方法
    Flux<Users> findByRealNameLikeAndIdLessThan(String realName, Long id);
    // 支持指定 SQL
    @Query("SELECT * FROM users WHERE real_name =:aname OR wx_name =:aname")
    Flux<Users> findByAllName(@Param("aname") String allName);
}
需要联表查询的情况,可以通过自定义转换器实现或者手动查询关联,这里不做展开
spring security
与常规 web 开发一样,WebFlux 下,仍然遵循相同的 security 鉴权思想。在引入 security 并正确配置后,会通过自动配置完成下列内容(实现所有请求都需要登录才能访问):
SecurityAutoConfiguration:为容器注入了 SecurityFilterChain,默认实现所有请求都必须登录才能请求,可以通过自定义 SecurityWebFilterChain 组件来实现自定义规则