Spring Reactive

基本概念与介绍

相应式编程特性

底层基于数据缓冲队列、消息驱动模型、异步会掉机制。在编码上体现为链式调用,占用更少的资源,支持更大的吞吐量

  • 实时性:系统尽可能及时相应
  • 回弹性:系统出现失败时仍保持实时性、可用性(通过隔离、遏制等机制实现)
  • 弹性:在系统负载发生变化后仍然保持实时性。(即能够自动应对伸缩容)
  • 消息驱动:异步线程通过消息交互

与传统 SpringMVC 的不同

SpringMVC 基于阻塞式 IO 架构,简单说就是一个请求一个线程,使用大量的线程进行丹尼尔等待,通常需要额外的架构设计来支持高并发。

Sping WebFlux 基于非阻塞的网络框架,通过缓冲、消息、调度机制来发挥(压榨)多核优势,不再有线程数限制,让少量的线程一直忙,天生支持高并发。

参考: 官方介绍 - 相应式宣言

官网对比图

Reactive-Stream Api

基本特性

Reactive-Stream(响应式流)是 JVM 面向 Stream 操作的规范,提供本地化的消息通信机制,具备如下特性:

  • 允许无限数据
  • 数据有序
  • 节点组件在流上异步进行传递,即下一个节点的处理不依赖上一个节点在流内的处理状态,流水线操作
  • 强制非阻塞,背压模式(Backpressure 即通过队列交互,使消费者的压力得到转移)

发布订阅模型

JDK9 引入了类 java.util.concurrent.Flow,为相应式编程的基础,其中包含 4 个关键函数式接口:

  • Publisher:发布者
  • Subscriber:订阅者
  • Subscription:订阅关系
  • Processor:处理器(订阅一个消息,处理并发布另一个消息,即订阅者+发布者)

各角色关系

基本用法如下:

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class Main {

    public static void main(String[] args) throws Exception {

        // SubmissionPublisher 是发布者的一个简单默认实现,直接提交数据
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        // 订阅前发布的数据,因没有订阅者而丢失
        for (int i = 0; i < 5; i++) {
            publisher.submit("item " + i);
        }

        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 订阅时
                System.out.println(Thread.currentThread() + " : Subscribed : " + subscription.hashCode());
                this.subscription = subscription;
                // 背压模式:必须请求一次,否则永远不会有数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 下一个元素到达时、接受到新数据时
                System.out.println(Thread.currentThread() + " : Received : " + item);
                if (item == 8) {
                    this.subscription.cancel();
                    return;
                }
                // 背压模式:请求才有下一个元素,不请求则不会有
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                // 接受到错误时
                System.out.println(Thread.currentThread() + " : Error : " + throwable);
            }

            @Override
            public void onComplete() {
                // 在等待数据期间,接受到完成信号时
                System.out.println(Thread.currentThread() + " : Completed");
            }
        };

        // 订阅关系链
        Flow.Processor<String, Integer> processor = new ItemNumberExtractor();
        publisher.subscribe(processor);
        processor.subscribe(subscriber);
        // 订阅者只能拿到订阅后发布的数据
        for (int i = 5; i < 10; i++) {
            publisher.submit("item " + i);
        }
        publisher.close();
        // 错误关闭,流本身不会产生,直接订阅者会接受到错误信号,所有数据均不会处理,即使逻辑上在发生错误前取消订阅
        // publisher.closeExceptionally(new Exception());

        // 阻塞主线程,否则主线程结束,子线程也会结束
        Thread.sleep(20000L);
    }
    
    // 一个处理器同时实现发布者、订阅者接口
    private static class ItemNumberExtractor extends SubmissionPublisher<Integer> implements Flow.Processor<String, Integer>, Flow.Subscriber<String> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(Thread.currentThread() + " : Processor.Subscribed : " + subscription.hashCode());
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(String item) {
            System.out.println(Thread.currentThread() + " : Processor.Received : " + item);
            // 提取编号,向后传递
            this.submit(Integer.parseInt(item.substring(5)));
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println(Thread.currentThread() + " : Processor.Error : " + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println(Thread.currentThread() + " : Processor.Completed");
        }
    }
}

上面的实例在执行后会得到如下输出:

Flow_api.webp

可以说明:

  • 有序:数据有序到达一个节点内
  • 全异步:在流上的组件,前后节点均为异步执行,下一个节点不依赖上一个节点处理完全部数据
  • 非阻塞:发布者(主线程)、处理器(worker-2)、订阅者(worder-1)均为不同线程,由 JVM 底层提供支持(可以追踪源码验证)
  • 背压:订阅者主动请求数据后,流内的数据才发挥作用

参考: reactive-stream

Reactor Api

Reactor 被设计用来实现高效的响应式系统

  • 响应式核心:提供全异步的运算,直接受 Java Api 支持
  • 提供0、1、N的数据序列:Mono 提供 0、1个数据,Flux 提供多个数据,二者均属于 Publisher
  • 全面使用非阻塞 IO,包括网络 IO

参考:projectreactor.io - Reactor 官方文档

FluxMono 基本使用

二者均位于 reactor.core.publisher 类包下,属于 Publisher,可以被订阅者订阅,允许使用 .just(...) 等方法简单创建一个流,使用时需要导入相关依赖:

Maven 方式:

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.11</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

Gradle 方式:

dependencies {
     implementation platform('io.projectreactor:reactor-bom:2023.0.11')
     implementation 'io.projectreactor:reactor-core' 
}

针对流的链式调用,每一次都会产生一个新流,比如调用一次 .map(...) 方法,就会有一个新流出现。

事件感知 API

doOnXXX 方法,在发生什么情况时进行回调,如 doOnComplete 方法,会在流收到结束信号时调用对应回调方法(Hook,钩子函数)。有以下钩子函数可用:

  • doOnComplete: 流完成时
  • doOnCancel: 流被取消时,手动取消或全部订阅者取消订阅
  • doOnDiscard: 元素被忽略时
  • doOnEach: 每一个元素、信号到达时调用(订阅、请求、取消等都有信号)
  • doOnError: 流出错时,流出错前的元素仍会正常处理
  • doOnNext: 得到新元素时,与 request(num)num 参数无关,每个元素到达都会触发
  • doOnRequest: 被请求新元素时
  • doOnSubscribe: 流被订阅时
  • doOnTerminate: 流被异常、取消而中断时

同样的,订阅者也有相应的钩子函数,如 hookOnSubscribe, hookOnNext 等,订阅者会提供 hookOnFinally 方法,无论在成功或异常时都会调用

onXxx 方法,发生某事件时调用,与仅进行通知、回调的 doOnXxx 不同,onXxx 方法允许修改元素、信号。比如 onErrorComplete 方法,会将流的 Error 信号转为 Complete 信号(流异常处理)。

此外,Reactor 为 Api 提供了详细的文档和弹珠图图示,可以直接在 IDE 中查看:

reactor_doc.webp

也可以通过官方文档查看。善用翻译辅助

流订阅

与前文一样,没有订阅的流审什么也不会做,比如 Flux.range(0,10).log() 不会有任何日志输出,添加订阅者,如 Flux.range(0,10).log().subscribe(...) 后才会有日志输出。允许使用空订阅者,同时接受 errorConsumer, completeConsumer

buffer

buffer(size) 方法,可以为流指定一定尺寸的缓冲区,经缓冲区处理后,元素会在填满缓冲区后一并发送给消费者。比如如下代码:

Flux.range(1, 5) // Flux<Integer>
    .buffer(3)  // Flux<List<Integer>>
    .subscribe(
        item -> System.out.println("Received : " + item)
    );
// 控制台输出:
// Received : [1, 2, 3]
// Received : [4, 5]

需要注意,reqest(n) 方法表示的是请求几次数据,而不是请求几个数据,在使用 buffer 的情况下,request(n) 表示请求 n * 缓冲区尺寸 个数据

limitRate

limitRate(size) 方法,为流指定预取策略,指定后,第一次会预取指定个元素,元素被消费 75% 后,会另取指定尺寸 75% 个元素,比如如下代码:

Flux.range(1, 10)
    .log()
    .limitRate(4)
    .subscribe();
// 控制台输出:
// [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
// [ INFO] (main) | request(4)
// [ INFO] (main) | onNext(1)
// [ INFO] (main) | onNext(2)
// [ INFO] (main) | onNext(3)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(4)
// [ INFO] (main) | onNext(5)
// [ INFO] (main) | onNext(6)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(7)
// [ INFO] (main) | onNext(8)
// [ INFO] (main) | onNext(9)
// [ INFO] (main) | request(3)
// [ INFO] (main) | onNext(10)
// [ INFO] (main) | onComplete()

generate:同步环境创建流

generate 方法允许通过编程自定义生成一个流,通过状态变化、值与信号的传递来声明一个流,比如如下代码,将生成一个包含 0~4 的流:

    private static void generateDemo() {
        // 通过编程方式创建数据流
        Flux.generate(
                        // 初始化状态,生成一个 AtomicInteger,也可以使用 `()->0`
                        AtomicInteger::new,
                        // 生成数据
                        (state, sink) -> {
                            if (state.get() < 0) {
                                // 传递错误信号
                                sink.error(new RuntimeException("State < 0"));
                            } else if (state.get() < 5) {
                                // 传递数据,允许抛出 RuntimeException
                                // 但在 generator 中,只允许调用 next 一次
                                sink.next(state);
                            } else {
                                // 传递 complete 信号,结束数据流
                                sink.complete();
                            }
                            // 返回新的状态
                            state.addAndGet(1);
                            return state;
                        }
                )
                .log()
                .subscribe();
    }

create:异步多线程环境创建流

create 方法的 emitter 参数提供一个 FluxSink 变量,在多线程环境下,可以使用该变量传递数据。

handle:允许使用自定义处理器

handle 方法可以用于自定义流的处理逻辑,与 map 方法重新映射、生成新的流不同,handle 方法用于消费元素,可以重新定义流,例如:

        Flux.range(1, 10)
                .map(i -> "a" + i * 2)  // 映射为一个新的流
                .handle((val, sink) -> {
                    if (val.length() < 3) {
                        sink.next(val);  //  传递元素
                    } else {
                        sink.complete();  // 提前终止流
                    }
                })
                .log()
                .subscribe();

线程和调度

响应式编程天生为异步,默认使用当前线程完成流操作;在发布者没有指定线程时,默认使用订阅者的线程(背压)

但也支持自定义线程的调度方式,允许通过 publishOn, subscribeOn 方法指定发布、订阅的调度线程,调度器有如下方式:

  • Schedulers.immediate() 默认方式,无执行上下文,当前线程执行
  • Schedulers.single() 使用固定的单线程执行
  • Schedulers.boundedElastic 有界的、弹性调度,默认有 10*CPU核心数的线程池,keepAliveTime 为 60s
  • Schedulers.newBoundedElastic 创建一个自定义参数的线程池
  • Schedulers.fromExecutor 使用传入的线程池

常见整流操作

  • flatMap 允许将流内的一个元素拆分为多个,比如:.flatMap(a -> Flux.just(a + "1", a + "2"))

  • concatMap 允许将流内的一个元素拆分为多个,类型无限制

  • concat 直接连接流,Flux 的静态方法,比如:Flux.concat(Flux.just("A", "B", "C"), Flux.just("D", "E", "F"))

  • concatWith 连接另一个流,成员方法,比如:Flux.just("A", "B").concatWith(Flux.just("C", "D"))

  • merge, mergeWith: 按照流内元素的先后顺序合并两个流,与 concat 不同,merge 不会把后一个流拼到前一个流后面

  • Flux.mergeSequential: 静态方法,按照流第一个元素到达的顺序排队进行 concat

  • defaultIfEmpty 方法,在流无值的情况下返回一个默认值,如:Mono.empty().defaultIfEmpty(8)

  • switchIfEmpty 方法,在流无值的情况下执行一个调用,如 Mono.empty().switchIfEmpty(Mono.just(8))

  • zip: 将两个流整合为一个元素元组(Tuple)的流,每个元组从两个流(也允许使用多个流,最多8个流一起压缩)各自取一个元素,余下的孤立元素会被忽略,如:Flux.zip(Flux.just("A", "B", "C"), Flux.just(1, 2, 3), (a, b) -> a + b)

  • transform 类似 map 操作,但每次订阅依赖的上下文中的变量不会共享,也称无状态转换(方法的回调函数始终执行一次)

  • transformDeferedtransform 相反,依赖的上下文中的变量会共享,属于有状态转换(方法的回调函数每次订阅都会执行)

        AtomicInteger outer = new AtomicInteger(0);
        Flux<String> fluxForSubscribe = Flux.just("A", "B", "C")
                .transformDeferred(flux -> {
                    AtomicInteger inner = new AtomicInteger(0);
                    System.out.println("Outer : " + outer.get());
                    System.out.println("Inner : " + inner.get());
                    return flux.map(item -> String.format("%s,%s,%s", item, inner.incrementAndGet(), outer.incrementAndGet()));
                });
        fluxForSubscribe.subscribe(v -> System.out.println("① Received : " + v));
        fluxForSubscribe.subscribe(v -> System.out.println("② Received : " + v));
        // 上述代码,直接执行会得到如下输出:
        // Outer : 0
        // Inner : 0
        // ① Received : A,1,1
        // ① Received : B,2,2
        // ① Received : C,3,3
        // Outer : 3
        // Inner : 0
        // ② Received : A,1,4
        // ② Received : B,2,5
        // ② Received : C,3,6
        // 上述代码,transformDeferred 替换为 transform 后得到如下输出
        // Outer : 0
        // Inner : 0
        // ① Received : A,1,1
        // ① Received : B,2,2
        // ① Received : C,3,3
        // ② Received : A,4,4
        // ② Received : B,5,5
        // ② Received : C,6,6

错误处理

与传统命令式编程类似,响应式编程在异常处理上有着相似的处理办法,但流仍然会被结束,不会触发错误中断(onError 回调不会执行)

  1. onErrorReturn(Exception.class, val): 异常时返回一个默认值
  2. onErrorResume(Function<Exception, Producer>): 异常时执行回调函数,支持如下处理:
    • 将异常处理为一个新的流,替代错误信号
    • 根据异常内容动态计算、返回新的值
    • 捕获并重新抛出新的异常(推荐使用 onErrorMap 方法完成这个处理)
  3. doFinally: 可以在流上进行调用,无论如何都执行的方法
  4. onErrorContinue(Exception, val): 在发生异常时继续处理,在回调方法中传递具体的异常和导致异常的上游元素
  5. onErrorComplete: 把错误信号替换为结束信号,直接吞掉异常
  6. onErrorStop: 错误后停止流,从源头终止,其他订阅者也会收到结束信号,比如流是通过 internal 产生的情况,将被终止

补充

重试

支持针对流处理调用 retry 方法,在发生异常时取消并从头开始重新请求流。

Sinks 类

Sinks 在响应式编程中属于接受器、数据管道,所有数据都会经过 Sinks 向下传递

  • 单播:Sinks.many().unicast(),只能绑定单个订阅者,有多个订阅者尝试绑定时会报错
  • 多播:Sinks.many().multicast(),允许绑定多个订阅者
  • 重放:Sinks.many().reply(),该管道支持重放元素,即是否支持给中途加入的订阅者发送完整的数据(从头消费),支持指定重返的元素个数。不适用重放时,订阅者默认从开始订阅的偏移量开始

缓存

允许对流进行缓存 cache(cnt),支持指定缓存队列的大小,与 replay 不同的是,cache 只关心最终的状态

阻塞式 API

针对流调用 block() 方法即可调用阻塞,通过阻塞式方式得到实际值

并发流

支持使用多线程并发进行流的处理,比如:Flux.range(1, 100).parallel(10).runOn(Schedulers.newParallel("Multi", 8)) 表示将流分配到10个并行任务上处理,交给具有8线程的线程池执行

Context Api

ThreadLocal 在流的处理机制下会实效(非阻塞模型,线程不再固定),因此需要一个新的机制来保留上下文。

一个反直觉的事情是,Context 的传递机制下,上游能看到下游的 Context,比如:

        Flux.just("A", "B", "C")
                .transformDeferredContextual((flux, context) -> {
                    // 可以访问到下游设置的 value
                    System.out.println("Context : " + context.get("key"));
                    // 返回一个新流
                    return Flux.just(1, 2, 3);
                })
                .contextWrite(context -> context.put("key", "value"))

为什么会这样?与传统命令式编程一样,命令式编程的编程习惯是从请求写到数据(controller ▶️ service ▶️ dao),而响应式编程是从数据写到请求(dao(数据发布者) ▶️ service ▶️ controller)

WebFlux

WebFlux: 底层基于 netty + reactor + springweb 的全异步非阻塞式 web 框架

与传统 Servlet 方式的对照:

API 功能ServletWebFlux
前端处理器DispatcherServletDispatcherHandler
处理器ControllerWebHandler/Controller
请求、相应ServletRequest, ServletResponseServerWebExchange
过滤器Filter(HttpFilter)WebFilter
异常处理器HandlerExceptionResloverDispatchExceptionHandler
web 配置@EnableWebMvc@EnableWebFLux
自定义配置WebMvcConfigurerWebFluxConfigurer
返回结果任意Mono, Flux, 任意
发送 REST 请求RestTemplateWebClient

异同图示

引入&使用

可以在 Spring 官网 找到对应的文档说明,如 3.2.11版本文档

参考 Spring 文档对应部分 引入相关依赖

参考 Reactive Web 开始 WebFlux 服务的编写

官方文档中,有如下的示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/users")
public class MyRestController {

    private final UserRepository userRepository;

    private final CustomerRepository customerRepository;

    public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
        this.userRepository = userRepository;
        this.customerRepository = customerRepository;
    }

    @GetMapping("/{userId}")
    public Mono<User> getUser(@PathVariable Long userId) {
        return this.userRepository.findById(userId);
    }

    @GetMapping("/{userId}/customers")
    public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
        return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
    }

    @DeleteMapping("/{userId}")
    public Mono<Void> deleteUser(@PathVariable Long userId) {
        return this.userRepository.deleteById(userId);
    }

}

可见,与传统 Servlet 方式相比,除返回值外,没有太大区别。

Reactor Core

Spring 文档

通过 Reactor 核心 API,可以自定义实现一个 Web 服务器,比如:

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;

public class FluxMainApplication {

    public static void main(String[] args) throws Exception {
        // 一个简易的 Http server

        // 1. 搞一个 HttpHandler,函数接口,接口要求返回一个 Mono<Void> 代表响应结束
        HttpHandler handler = (ServerHttpRequest req, ServerHttpResponse res) -> {
            System.out.println("Received Request : " + req.getURI());
            // 创建响应数据的 BufferFactory
            DataBufferFactory factory = res.bufferFactory();
            // 直接生成一个 DataBuffer
            DataBuffer wrap = factory.wrap("<h1>Hello, World!</h1>".getBytes());
            // 写入响应,返回一个 Mono<Void>,直接返回代表结束
            return res.writeWith(Mono.just(wrap));
        };

        // 2. 创建一个 HttpServer,监听端口,接受数据,发给 Handler
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
        // 启动一个 netty 服务器
        HttpServer.create().host("localhost").port(9876).handle(adapter).bindNow();

        // 3. 保持进程
        int ignore = System.in.read();
    }
}

启动后,访问 http://localhost:9876/ 将会得到 Hello World 的页面。

WebFlux 向下兼容 SpringMvc 的大部分注解和 API。在 WebFlux 下,可以按照如下的方式定义一个 Controller:

@RestController
public class DemoController {
    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("<h1>Hello, World!</h1>");
    }
}

在启动 SpringBootApplication 后,访问 http://localhost:8080/ 将会得到 Hello World 页面。

在 WebFlux 下,可以完成服务端事件推送(ServerSendEvent),比如在 controller 中定义了如下路由:

    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> sse() {
        return Flux
                .just("Hello", "World", "!")
                .delayElements(Duration.ofMillis(1000));
    }

访问对应路由得到如图响应(每秒返回一个数据)

webflux_sse.webp

DispatcherHandler

DispatcherHandler 对应 SpringMVC 中,DispatcherServlet 的概念。

  • HandlerMapping: 请求映射处理器,保存每个请求由哪个方法处理
  • HandlerAdapter: 处理适配器,反射执行目标方法
  • HandlerResultHandler: 处理器结果 的 处理器

SpringMVC 中,通过 DispatcherServlet 中的 doDispatch 方法处理全部请求 WebFlux 中,通过 DispatcherHandler 中的 handle 方法来处理全部请求,handler 方法中,有以下核心处理逻辑:

Flux.fromIterable(this.handlerMappings)  // 拿 handlerMappings
    .concatMap(mapping -> mapping.getHandler(exchange))  // 寻找能处理请求(exchange)的 Handler
    .next()  // 取一个
    .switchIfEmpty(createNotFoundError())  // 取不到的处理
    .onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
    .flatMap(handler -> handleRequestWith(exchange, handler));  // 取到后执行处理

全局异常处理

与 SpringMVC 方式一样,比如:

@RestControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler
    public String handleException(NullPointerException e) {
        System.out.println("NPE occurred: " + e.getMessage());
        return "NullPointerException occurred: " + e.getMessage();
    }
}

方法参数

SpringMVC 方法可用的方式基本仍然可用,但仍存在不同

方法传参

Controller 方法参数描述
ServerWebExchange封装了请求(ServerHttpRequest)和响应(ServerHttpResponse),可用于自定义处理请求响应
WebSession用于访问 session
HttpMethodorg.springframework.http.HttpMethod,用于获取请求方式
Locale罕见,国际化
TimeZone + ZoneId罕见,时区
@PathVariable路径变量
@MatrixVariable矩阵变量
@RequestParam请求变量
@RequestHeader请求头
@CookieValueCookie
@RequestBody请求体
HttpEntity<B>封装后的整个请求实体,可以获取请求体、请求头等数据
@RequestPart获取文件请求数据(multipart/form-data
Map + Model + @ModelAttributeModel 交互传递数据使用的方式
Errors + BindingResult错误处理使用
@RequestAttribute请求域数据
其他基本类型与标注了 @RequestParam 效果相同
其他对象类型与标注了 @ModelAttribute 效果相同

返回值

Controller 方法返回值描述
@ResponseBody将数据直接传递出去,如果为对象则转为 json
HttpEntity<B>, ResponseEntity<B>仍允许高度自定义响应实体
HttpHeaders仅返回响应头
ErrorResponse构建错误响应
ProblemDetailSpringboot3 使用
String与之前用法、规则一样,可以被视图解析器识别
View直接返回视图对象
Map, Model, @ModelAttribute与之前用法、规则一样,传递视图使用的数据
Rendering一种新式的视图对象
void仅返回响应完成信号
Flux<ServerSentEvent>, Observable<ServerSentEvent>, reactive type使用 text/event-stream 完成 SSE 请求
其他当作给视图的数据

文件上传

与传统的 MultipartFile 对应,在 Reactive 中提供了 Filepart 用于接受文件上传,基于零拷贝提供非阻塞的文件处理。

自定义 WebFlux 配置

当使用 @EnableWebFlux 时,将会使用全自定义的 WebFlux,而不会使默认的 WebFluxAutoConfiguration 生效。

可以向容器注入 WebFluxConfigurer 类型的组件以实现自定义配置。比如在配置类中进行如下配置,允许 localhost 的跨域请求:

@Configuration
public class GlobalConfigurer {

    @Bean
    public WebFluxConfigurer corsConfigurer() {
        return new WebFluxConfigurer() {
            @Override
            public void addCorsMappings(@NonNull CorsRegistry registry) {
                registry.addMapping("/**")
                        .allowedOrigins("http://localhost");
            }
        };
    }
}

Filter

WebFlux 模式下,过滤器必须实现 WebFilter 接口,与 SpringMvc 方式的 Filter 写法类似:

@Component
public class GlobalFilter implements WebFilter {
    @Override
    @NonNull
    public Mono<Void> filter(@NonNull ServerWebExchange exchange, WebFilterChain chain) {
        System.out.println("进入 Filter");  // 1. 进入执行
        Mono<Void> filter = chain.filter(exchange)
                .doFinally((signalType) -> System.out.println("离开 Filter"));  // 3. 实际方法返回后执行
        System.out.println("Filter 绑定结束");  // 2. filter 方法结束执行
        return filter;
    }
}

R2DBC

R2DBC: 用于进行响应式数据库操作的驱动,以替代 JDBC,主要区别为使用了响应式,基本用法与 JDBC 相同:引入驱动、获取连接、处理结果。

基本使用

比如如下例子,演示了如何直接从数据库读取数据(根据官网示例修改):

    private static void simpleQueryTest() {
        // 获取连接工厂
        MariadbConnectionConfiguration configuration = MariadbConnectionConfiguration.builder()
                .host("123.0.0.0")
                .port(6603)
                .username("root")
                .password("*******")
                .database("key3").build();
        ConnectionFactory connectionFactory = MariadbConnectionFactory.from(configuration);
        // create 方法返回一个连接的发布者,通过 Mono 包装为一个流,以提取发布者中的数据,用于流式处理
        Flux<String> realNameFlux = Mono.from(connectionFactory.create())
                // flatMany 方法将 Connection 处理为多个数据(查询到多条记录),形成一个新的流(Flux)
                .flatMapMany(connection -> connection
                        // SQL 变量替换
                        .createStatement("SELECT * FROM users WHERE id = ?")
                        .bind(0, 6)
                        .execute())
                .flatMap(result -> result
                        // 提取结果、转换
                        .map((row, rowMetadata) -> row.get("real_name", String.class)));
        // 订阅流,触发流的执行
        realNameFlux.subscribe(System.out::println);
    }

Springboot 中使用

首先添加相关依赖:

// R2DBC
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
// R2DBC-MariaDB,数据库连接驱动,视实际 DB 使用情况调整
implementation("org.mariadb:r2dbc-mariadb:1.3.0")

Springboot 将通过下列配置类完成自动配置:

  1. R2dbcAutoConfiguration: 基础连接配置,包括连接参数、连接池等
  2. R2dbcDataAutoConfiguration: 为容器注入数据操作的必要组件:
    • R2DbcEntityTemplate: 封装数据 CRUD 操作的模板,可以直接使用该组件执行基于 Criteria 方式的查询,更复杂的情况,可以使用更底层的 DatabaseClient 组件执行,但是需要手动对结果进行映射
    • 数据类型的映射器、转换器
  3. R2dbcRepositoryAutoConfiguration: 用于支持 spring data 声明式接口方式的 CRUD,允许在没有任何实现的情况下提供 CRUD 功能
  4. R2dbcTransactionManagerAutoConfiguration: 提供事务管理

使用 R2dbcTemprate 功能特性时,需要在配置类上声明 @EnableR2dbcRepositories 以开启相应功能,完成后,按照下列方式声明实体对应的 Repository(过去使用 Spring data jpa 的方式),即可在业务中使用基本的 CRUD 方法:

import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import plus.mori.reactive.model.Users;

@Repository
public interface UsersRepository extends R2dbcRepository<Users, Long> {

    // 支持纯声明式的简单查询,不需要实现方法
    Flux<Users> findByRealNameLikeAndIdLessThan(String realName, Long id);

    // 支持指定 SQL
    @Query("SELECT * FROM users WHERE real_name =:aname OR wx_name =:aname")
    Flux<Users> findByAllName(@Param("aname") String allName);
}

需要联表查询的情况,可以通过自定义转换器实现或者手动查询关联,这里不做展开

spring security

官方文档

与常规 web 开发一样,WebFlux 下,仍然遵循相同的 security 鉴权思想。在引入 security 并正确配置后,会通过自动配置完成下列内容(实现所有请求都需要登录才能访问):

  • SecurityAutoConfiguration:为容器注入了 SecurityFilterChain,默认实现所有请求都必须登录才能请求,可以通过自定义 SecurityWebFilterChain 组件来实现自定义规则