webflux-project

Entropy Lv4

记录首次基于 Spring Webflux 响应式框架的后端项目开发过程中涉及到的一些东西。

webflux 框架是一个异步非阻塞的响应式框架,旨在提高吞吐量和并发量。webflux 框架的使用过程中涉及到很多函数式编程与响应式编程。下面是一些学习使用过程中的整理。

对象模型 PO BO DTO VO (DO)

基本概念

  • PO (Persistent Object) 持久对象:直接映射到数据库表的一个对象。通常与数据库表结构一一对应,用于ORM(对象关系映射)框架。
  • BO (Business Object) 业务对象:包含业务逻辑的对象,一般位于业务层。BO通常从DAO(数据访问对象)获取数据,进行业务逻辑处理,并返回结果给前端。
  • DTO (Data Transfer Object) 数据传输对象:用于服务层和外部的通信,或是不同服务间的数据传输。DTO主要用于远程调用时减少单个方法调用的参数数量,或是封装方法调用的返回结果。
  • VO (Value Object) 值对象:主要用于表示展示层和客户端的数据对象。VO通常是根据界面的需求来创建的,包含界面显示所需要的数据。
  • DO (Domain Object) 领域对象:在领域驱动设计(DDD)中,领域对象包含了业务逻辑,它代表了业务领域内的一个实体或概念,这时候等同于BO。在另外的开发规范中,DO (Data Object) 作为数据对象,对象属性与数据库字段形成映射关系,这时候等同于PO

数据来源

VO

  • 数据来源:通常直接对应于前端视图展示的数据需求,可能来自于单个PO的转换,或者是多个PO、DTO的组合与转换的结果。
  • 简单场景案例:在一个CRUD应用中,创建用户的表单提交结果可能直接映射为一个UserVO,该VO包含用户的姓名、邮箱等信息,然后直接转换为PO进行数据库操作。

DTO

  • 数据来源:DTO通常用于服务层之间的数据传输,它的数据可能来源于数据库的多表查询结果,或者是BO的转换和封装。
  • 简单场景案例:在微服务架构中,OrderService调用UserService获取用户信息时,UserService可能会返回一个UserDTO,包含用户的基本信息,这个DTO可能是直接从数据库查询得到的,也可能是一个或多个PO转换而来。

BO

  • 数据来源:BO主要在业务逻辑层使用,它的数据来源可能是数据库查询得到的PO,或者是经过业务逻辑处理的DTO。
  • 简单场景案例:在处理订单支付逻辑时,PaymentBO可能包含订单详情、用户信息等多个方面的数据,这些数据可能通过查询数据库得到的PO转换而来,然后PaymentBO会封装支付处理的具体逻辑。

PO

  • 数据来源:PO就是直接对应数据库中的每一张数据表。
  • 简单场景案例:在数据库中存在userorderrecord等数据表,根据每一张表的字段定义对应的PO。

复杂场景案例

一个典型的复杂场景案例,展示了一个电商系统中订单处理的流程,涉及用户下单、订单处理、支付、以及最终的订单状态更新和通知用户。

场景描述

假设有一个电商平台,用户可以浏览商品、添加商品到购物车、下单和支付。系统需要处理订单创建、库存检查、支付处理、订单状态更新和通知用户等一系列操作。

涉及的模型和层次

  • PO(持久对象)OrderOrderItemProductUser等,直接映射数据库中的订单表、订单项表、产品表和用户表。
  • DTO(数据传输对象)OrderCreationDTOPaymentInfoDTO等,用于在服务层之间传递订单创建和支付信息。
  • BO(业务对象)OrderBO,封装订单处理的业务逻辑,如验证库存、计算价格等。
  • VO(值对象)OrderSummaryVOPaymentResultVO等,用于向用户展示订单摘要信息和支付结果。

数据流转

  1. 用户下单
    • 用户在前端页面提交订单,前端发送包含订单详情(如商品ID、数量等)的OrderCreationDTO到后端。
    • Controller层接收OrderCreationDTO,调用Service层处理订单。
  2. 订单处理
    • Service层将OrderCreationDTO转换或直接使用来创建OrderBO
    • OrderBO负责业务逻辑处理,如验证库存、计算总价等,并使用PO操作数据库创建订单和订单项。
    • 如果涉及到跨服务调用(如库存服务),可能会使用其他的DTO进行数据传输。
  3. 支付处理
    • 用户选择支付方式并提交支付信息,后端接收支付信息(如PaymentInfoDTO)并处理支付。
    • 支付服务可能是一个独立的服务,Service层将支付信息封装在DTO中调用支付服务。
    • 支付完成后,更新订单状态,并将结果封装在PaymentResultVO中返回给前端。
  4. 订单完成和通知
    • 订单支付成功后,Service层更新订单状态(使用PO操作数据库)。
    • 系统通知用户订单已完成,可能通过发送OrderSummaryVO到前端展示订单摘要,或发送邮件/短信通知(内容可能基于VO生成)。

在 SpringBoot 三层架构中的使用

Controller层
  • VO(值对象):主要用于Controller层与前端的数据交互。VO专门针对视图(View)的需求设计,封装了用户界面展示所需的数据。Controller层可能会接收前端传来的VO,也可能将数据封装成VO返回给前端。
Service层
  • DTO(数据传输对象):Service层广泛使用DTO来传输跨层次、跨服务的数据。DTO用于封装从Controller传到Service层的数据,或者从Service层传到其他服务的数据。
  • BO(业务对象):BO包含业务逻辑和业务状态信息,Service层使用BO来执行具体的业务操作。BO可以看作是Service层的核心,封装了业务规则和算法。
DAO(或Repository)层
  • PO(持久对象):DAO层使用PO与数据库表直接映射。ORM(对象关系映射)框架如Hibernate、JPA等通常操作PO,将数据库行记录映射成PO,或将PO持久化到数据库中。PO仅仅反映了数据库的结构,而没有复杂的业务逻辑。
数据流转
  1. 前端到后端:前端发送请求,Controller层接收封装成VO的数据 -> Service层将VO转换成DTO或直接使用DTO接收数据,进行业务处理,可能会涉及到BO -> DAO层使用PO与数据库交互。
  2. 后端到前端:DAO层查询数据库,将结果封装成PO -> Service层处理业务逻辑,可能会转换PO为BO或DTO进行业务逻辑的处理 -> Controller层将DTO或BO转换为VO,返回给前端。
图片

外键关联约束下 PO 的映射行为

1. 基本类型

  • 定义:使用基本数据类型或其包装类直接在PO中表示外键字段。
  • 适用场景:当你只需要知道关联表的ID,而不需要访问关联表其他字段的详细信息时。
  • 建议:这种方式简单且高效,适用于关联关系较为简单的场景。

2. 变量级组合

  • 定义:在PO中通过对象变量的形式包含关联表的PO,以此来表示外键关系。
  • 适用场景:需要访问或操作关联表的详细信息时,如展示用户信息时同时需要展示用户的订单信息。
  • 建议:采用变量级组合可以增强模型的表达力,适用于需要在业务逻辑中处理关联数据的场景。使用ORM框架时,这种关系可以通过注解(如JPA的@ManyToOne@OneToMany等)来实现。

3. 类级组合(BO)

  • 定义:通过创建新的类来封装多个表的组合数据,这些类不直接对应单一的数据库表,而是根据业务需求组合多个PO的数据(BO的来源之一)。
  • 适用场景:跨表查询结果的封装,或需要根据业务逻辑组合多个表数据的场景。
  • 建议:类级组合适用于复杂的业务逻辑处理,以及需要将多个表的数据作为一个整体进行处理的情况。这种方法可以提高代码的可读性和维护性,但需要注意避免过度使用,以免引入不必要的复杂性。

4. 数据传输对象(DTO)

  • 定义:特别地,可以创建DTO来封装从数据库查询得到的数据,特别是多表查询的结果,DTO不直接映射到数据库的表结构,而是根据展示或传输的需要定制。
  • 适用场景:在跨服务传输数据,或者在层与层之间传输经过处理的数据时。
  • 建议:当PO与视图或服务间的数据需求不一一对应时,使用DTO可以提供更大的灵活性和清晰的分层。

各种 ER 关系下的对象模型设计

使用变量级别的组合描述实体关系,简要地说明一个参考的对象模型设计方案。

一个简单的图书管理系统,其中包括Author(作者)、Book(书籍)、Review(书评)和BookDetail(书籍详情)四个实体。

实体关系

  • 多对多关系:一本书可以由多个作者共同编写,一个作者也可以写多本书(Book <-> Author)。
  • 一对多关系:一本书可以有多条书评,但每条书评只针对一本书(Book -> Review)。
  • 一对一关系:为了方便展示一对一的关系,可以引入一个新的实体:书籍详情(BookDetail),假设每本书都有一个与之对应的详细信息页(Book -> BookDetail)。

PO 的设计

对应数据表中的各个字段,分析实体关系进行设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Author {
private Long id;
private String name;
private List<Book> books; // 与Book的多对多关系

// 构造器、Getter和Setter
}

public class Book {
private Long id;
private String title;
private List<Author> authors; // 与Author的多对多关系
private List<Review> reviews; // 与Review的一对多关系
private BookDetail bookDetail; // 与BookDetail的一对一关系

// 构造器、Getter和Setter
}

public class Review {
private Long id;
private String content;
private Book book; // 指向Book的外键,表示这条书评属于哪本书

// 构造器、Getter和Setter
}

public class BookDetail {
private Long id;
private String isbn;
private String publisher;
private Book book; // 指向Book的引用,一对一关系

// 构造器、Getter和Setter
}

DTO 的设计

封装多表查询的数据结构,用于服务层和展示层的数据传输。(根据多表查询的结果、前端的请求数据结构等设计,这里是多表查询BookBookDetail的示例,用来展示书籍列表,其中包括书名、作者名字列表和ISBN号。)

1
2
3
4
5
6
7
public class BookListDTO {
private String title;
private List<String> authorsNames; // 书的所有作者的名字
private String isbn; // 书籍详情中的ISBN号

// 构造器、Getter和Setter
}

VO的设计

VO用于表示层,封装最终用户界面所需的数据。(根据前端展示要求设计,例如展示书籍的详细信息,包括书名、ISBN号、出版社、作者列表和书评列表,可以是由若干个 PO、DTO 单独或者组合转换而来。)

1
2
3
4
5
6
7
8
9
public class BookDetailVO {
private String title;
private String isbn;
private String publisher;
private List<String> authors;
private List<String> reviews; // 用户对书籍的评论

// 构造器、Getter和Setter
}

BO的设计

BO封装业务逻辑,基于PO进行操作,可能会涉及多个PO之间的逻辑关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class BookManagementBO {
private Book book;
private List<Author> authors;
private BookDetail bookDetail;
private List<Review> reviews;

public BookManagementBO(Book book) {
this.book = book;
// 假设这里填充了authors、bookDetail、reviews等信息
}

public void publishBook() {
// 实现发布书籍的业务逻辑,可能包括保存书籍信息、保存作者信息等
}

public void addReview(Review review) {
// 添加书评到书籍
}

// 更多业务逻辑方法
}

BO 的设计有些特殊,它主要负责处理业务逻辑,并可以直接访问DAO(数据访问对象)层。但是它的业务逻辑操作的类型又不同于在服务层的业务逻辑操作:

服务层(Service Layer)

服务层主要负责协调应用程序中不同部分的业务逻辑。它为表示层提供了一组可用的操作(API),这些操作代表了应用程序可以执行的业务操作。服务层的职责包括:

  • 事务管理:确保业务操作的事务性,处理应用程序中的事务边界。
  • 应用逻辑:实现应用级别的业务逻辑,如用户权限验证、工作流控制等。
  • 协调数据访问:调用下层的数据访问对象(DAO)来访问和修改数据。
  • 聚合业务操作:组合和协调多个BO的操作,以执行复杂的业务逻辑。
业务对象(BO)

业务对象通常更接近于特定的业务领域模型,封装了与之相关的数据以及对数据进行操作的业务逻辑。BO的职责包括:

  • 封装业务规则:实现特定业务领域的规则和逻辑。
  • 状态管理:管理和维护业务对象的状态。
  • 数据验证:确保业务数据的完整性和有效性。
  • 数据转换:在业务模型和数据模型之间转换数据。
如何划分
  • 粒度和复杂性:如果业务操作涉及多个实体之间的复杂交互或需要执行跨实体的事务管理,通常这些操作应该放在服务层完成。服务层可以调用一个或多个BO来实现这些复杂的业务逻辑。
  • 领域逻辑:如果操作主要涉及单一实体内部的业务规则或逻辑,这些操作可以封装在相应的BO中。BO可以提供细粒度的业务方法,服务层可以利用这些方法来完成更高级别的业务操作。
  • 数据访问:尽管BO可以直接调用DAO来进行数据操作,但更常见的做法是在服务层中调用DAO,然后通过服务层来协调数据访问和业务逻辑。这样做有助于保持业务逻辑层的一致性和事务的完整性。
总结
  • 在服务层中操作:当业务逻辑需要跨多个领域模型或需要应用级别的逻辑处理时。例如针对上面的 DTO 的数据结构查询结果。
  • 在BO中操作:当业务逻辑主要针对单一领域模型且比较独立时。例如针对书籍的发布涉及到保存和查询两个连续操作,添加书评需要对书评的内容进行敏感词过滤等。

基础环境

项目依赖环境

下面的 pom.xml 是项目涉及到的全部依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.entropy</groupId>
<artifactId>webflux-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>gradems</name>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.12.5</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.12.5</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.12.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

数据库环境搭建

使用以下 docker-compose.yaml 配置文件即可快速搭建 postgresql、mongodb、redis 三个数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '3.8'
services:
postgres:
container_name: gradms-postgres
image: 'postgres:latest'
restart: always
environment:
- 'POSTGRES_DB=gradems' # postgresql 数据库名称
- 'POSTGRES_PASSWORD=gradems' # postgresql 数据库密码
- 'POSTGRES_USER=admin' # postgresql 数据库用户名
ports:
- '5432:5432'
volumes:
- 'postgres_data:/var/lib/postgresql/data'
redis:
container_name: gradms-redis
image: 'redis:latest'
restart: always
ports:
- '6379:6379'
mongodb:
container_name: gradms-mongodb
image: 'mongo:latest'
restart: always
ports:
- '27017:27017'
volumes:
- 'mongodb_data:/data/db'

volumes:
postgres_data:
mongodb_data:

运行命令docker-compose up -d启动。

至此,基本的项目依赖环境和数据库环境准备完成。

Webflux 控制层

Webflux 的控制层有两种不同的编写风格,一种是传统控制器风格,一种是函数式端点风格。

传统控制器风格

只需要在传统的控制器上将返回值设置为MonoFlux两个特殊的类型即可。

1
2
3
4
5
6
7
@RestController
public class MyController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Hello, World!");
}
}

在 Spring Webflux 中,MonoFlux是响应式编程中的两种基本类型,它们来自于 Reactor 库:

  • Mono:代表单个值或空值的异步序列。它用于处理最多一个数据项的场景。例如,一个查询单个对象的数据库操作可以返回Mono
  • Flux:代表多个值的异步序列。它用于处理0到N个数据项的场景。例如,一个查询多个对象的数据库操作可以返回Flux

控制层返回值的改动也会影响到服务层,数据访问层的代码。

函数式端点风格

首先创建一个处理器类(handler),需要返回固定的Mono<ServerResponse>类型。

1
2
3
4
5
6
@Component
public class MyHandler {
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok().body(Mono.just("Hello, WebFlux!"), String.class);
}
}

然后是创建路由配置类,用于定义路由节点。

1
2
3
4
5
6
7
8
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> route(MyHandler handler) {
return RouterFunctions
.route(RequestPredicates.GET("/hello"), handler::hello);
}
}

此时传统的控制器被分成了处理器和路由配置两部分。

统一响应结构

在前后端分离项目,前后端之间的数据交互通常遵循统一的固定格式。

一个典型的固定格式包含以下几个关键部分:

  1. 状态码(Code):表示请求处理的结果(如成功、错误的详细信息)。这通常是一个整数值,例如200表示成功,400表示客户端错误,500表示服务器错误。
  2. 消息(Message):提供关于请求处理结果的简短描述,尤其是在发生错误时。这有助于前端开发人员理解发生了什么问题,甚至可能向用户显示这些消息。
  3. 数据(Data):实际的响应数据,可以是任何结构,如对象、数组或简单值。这是请求的主要内容,包含了前端需要的所有数据。

下面通过自定义 API 响应类来设置响应体格式

创建一个 API 响应类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ApiResponse<T> {
private Integer code;
private String message;
private T data; // 使用泛型代替Object提高类型安全

public ApiResponse(Integer code, String message, T data) {
this.code = code;
this.message = message;
this.data = data;
}

// 省略getters和setters...
}

在处理器中使用自定义的 API 响应类封装数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class MyHandler {
public Mono<ServerResponse> hello(ServerRequest request) {
Mono<ApiResponse> resp = Mono.just(new ApiResponse(200, "OK", "Hello, WebFlux!"););
return ServerResponse.ok().body(resp, ApiResponse.class);
}
}
// 或者
@Component
public class MyHandler {
public Mono<ServerResponse> hello(ServerRequest request) {
Mono<ApiResponse> resp = Mono.fromSupplier(() -> new ApiResponse(200, "OK", "Hello, WebFlux!"));
return ServerResponse.ok().body(resp, ApiResponse.class);
}
}

这里Mono.justMono.fromSupplier的区别是:

just是立即创建并包装一个已经存在的值。当你需要包装一个已知的非异步值或非阻塞操作的结果时适合使用just

fromSupplier允许你在响应式流中延迟执行并动态地生成单个值。当你的操作是异步的或者你想要延迟执行某个操作直到订阅发生时适合使用fromSupplier

相关资料推荐:一文搞懂什么是RESTful API

CRUD 操作

Webflux 中进行 CRUD 操作的代码也和传统的 web 框架不太一样,这里大概介绍一下 Webflux 控制层中增删改查操作涉及的常用方法。

路径参数与请求体的提取

路径参数是 RESTful 风格中的概念,将 URL 中指定的值直接提取出来,不同于传统的查询参数,路径参数只包括了值本身,而“键”是在路由配置中定义的。

增删改和条件查询(单个结果)通常都需要一个路径参数指定全局唯一资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 增加(POST)
public Mono<ServerResponse> createItem(ServerRequest request) {
Mono<Item> itemMono = request.bodyToMono(Item.class);
return itemMono.flatMap(item -> service.createItem(item))
.flatMap(item -> ServerResponse.ok().bodyValue(new ApiResponse<>(200, "Item Created Successfully", item)));
}
// 删除(DELETE)
public Mono<ServerResponse> deleteItem(ServerRequest request) {
String id = request.pathVariable("id");
return service.deleteItem(id)
.flatMap(item -> ServerResponse.ok().bodyValue(new ApiResponse<>(200, "Item Deleted Successfully", item)));
}
// 更新(PUT)
public Mono<ServerResponse> updateItem(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Item> itemMono = request.bodyToMono(Item.class);
return itemMono.flatMap(item -> service.updateItem(id, item))
.flatMap(item -> ServerResponse.ok().bodyValue(new ApiResponse<>(200, "Item Updated Successfully", item)));
}

这里的bodyToMono方法用于将请求体转换为相应类型的Mono,主要是针对增加和更新操作,而pathVariable方法用于提取路径变量,主要是针对查询和删除操作。如果使用传统的查询参数可以使用ServerRequest.queryParam的方法获取。

小结

在 WebFlux 的函数式端点风格中,ServerRequest提供了多种方法来获取 HTTP 请求中的不同类型的数据:

  1. 获取响应体数据: 使用request.bodyToMono(Class<T>)来获取请求体中的数据,适用于 POST 和 PUT 请求。
  2. 获取路径参数: 使用request.pathVariable("name")来获取 URL 路径中的变量,适用于任何类型的 HTTP 请求。
  3. 获取查询参数: 使用request.queryParam("name")来获取 URL 查询参数,适用于 GET 请求。

关于 flatMap

flatMap是响应式编程中的一个重要操作符,它用于处理流(如MonoFlux中的元素),并将每个元素转换成另一个流,最后将这些流“扁平化”为一个流。在Reactor库中,这个操作符常用于处理异步操作的结果,尤其是当这些操作本身返回MonoFlux时。

具体是什么?

flatMap操作符接收一个函数作为参数。这个函数对源流中的每个元素进行操作,返回一个新的PublisherMonoFlux)。然后,flatMap将这些返回的Publisher合并成一个单一的Publisher。这使得flatMap非常适合处理嵌套的异步操作。

应该在哪里使用它?

  1. 链式异步调用:当你需要执行一系列的异步操作,并且后一个操作依赖于前一个操作的结果时。例如,从数据库查询一个对象,然后根据该对象的信息去查询另一个服务。
  2. 转换和合并流:当你需要将流中的元素转换为另一种类型的流,并且希望最终结果是一个单一流时。
  3. 处理嵌套的MonoFlux:当操作返回嵌套的Mono<Mono<T>>Flux<Flux<T>>时,使用flatMap可以将其“扁平化”为Mono<T>Flux<T>

什么情况下使用它?

  • 当你想要异步地转换数据,并且每个转换本身返回一个MonoFlux时。
  • 当你需要处理一个操作的结果,这个操作异步地返回另一个可观察的流时。
  • 当你希望并发执行异步操作,并且需要合并它们的结果时。flatMap允许并发执行,但如果需要控制并发度,你可能需要使用其变体如flatMapSequential(保持原始序列顺序)或flatMap带有并发限制的重载方法。

Webflux 数据访问层

整合R2DBC

连接配置

配置 application.yaml 文件,重点关注 url、username、password 这里的配置。

  • url:使用 r2dbc 协议连接 postgresql 数据库,默认本地端口 5432,数据库名称为 gradems。
  • username:数据库用户名。在前面的 docker-compose.yaml 配置文件中已经指定。
  • password:数据库密码。在前面的 docker-compose.yaml 配置文件中已经指定。
1
2
3
4
5
6
7
8
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/gradems
username: admin
password: gradems
pool:
initial-size: 5
max-size: 20
代码实现方式

在使用 WebFlux 框架时,结合 R2DBC 进行反应式数据库操作,主要有以下几种实现方式:

  1. Repository方式:利用Spring Data R2DBC提供的Repository支持,可以通过定义接口来自动实现反应式CRUD操作。这种方式极大简化了数据访问层的代码,开发者只需定义接口并选择性地添加查询方法,Spring Data R2DBC会自动实现这些接口。
  2. DatabaseClient API:Spring Data R2DBC提供了一个高级的DatabaseClient API,适用于更灵活的数据库访问和查询构建。它支持声明式的查询操作,允许开发者以链式调用的方式构建查询并处理结果。这种方式提供了比Repository更灵活的数据访问能力,适合于复杂查询和自定义操作。
  3. R2DBC EntityTemplateR2dbcEntityTemplate提供了一个基于模板的方法来执行数据库操作,包括插入、更新、查询和删除等CRUD操作。它为操作单一实体或批量实体提供了便捷的API,并且可以与R2DBC的反应式特性很好地结合使用。
  4. 自定义R2DBC使用:除了上述封装好的高级API外,开发者还可以直接使用R2DBC的底层API来执行数据库操作。这种方式需要开发者手动管理数据库连接和查询执行,提供了最大的灵活性和控制力,适用于需要高度定制化数据库操作的场景。
  5. R2DBC QueryDSL:对于需要构建类型安全的复杂查询的应用,可以考虑使用QueryDSL与R2DBC结合的方式。这种方法允许开发者以DSL(Domain-Specific Language)的方式构建查询,提高了代码的可读性和维护性。

这里主要介绍最常用的 Repository 方式和 DatabaseClient API 方式,基本满足大部分应用场景。

首先创建以下实体类Person

1
2
3
4
5
6
7
8
9
10
@Table("people") // 如果数据表名和类名匹配,则该注解可省略,否则需要指定映射数据表名
public class Person {
@Id
private Long id;
@Column("name") // 与@Table类似的映射作用,如果名称相同可以省略
private String name;
private Integer age;

// Getters and setters...
}

@Id注解告诉Spring Data R2DBC框架哪个属性用作数据库表中的主键列。Spring Data R2DBC 依赖于该注解来识别实体的唯一标识符(ID),从而执行诸如查找、更新或删除特定实体的操作。如果缺少@Id注解,Spring Data R2DBC 可能无法正确执行这些操作,因为它无法确定哪个字段是实体的主键。

使用Repository 方式

定义PersonRepository接口,ReactiveCrudRepository具有方法名解析的功能,即可以通过方法名解析自动实现该方法的功能,方法名需要遵循特定的命名规则。

1
2
3
4
public interface PersonRepository extends ReactiveCrudRepository<Person, Long> {
Mono<Person> findByName(String name); // 查询单个结果
Flux<Person> findByAge(int age); // 查询多个结果
}

CRUD 操作示例 (在服务层实现)

这里涉及的方法是ReactiveCrudRepository自带的接口实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 增加
Person person = new Person();
person.setName("John Doe");
person.setAge(25);
Mono<Person> savedPerson = personRepository.save(person);
// 删除
Mono<Void> deletePerson = personRepository.deleteById(1L);
// 局部更新
Mono<Person> updatedPerson = personRepository.findById(1L)
.map(p -> {
p.setName("Jane Doe");
return p;
})
.flatMap(personRepository::save);
// 更新
// 更新实际上是通过 save 方法实现的,如果对象存在则更新,否则插入
Person person = new Person();
person.setName("John Doe");
person.setAge(30);
Mono<Person> savedPerson = personRepository.save(person);
// 查询
// 查询单个结果
Mono<Person> person = personRepository.findById(1L);
// 查询多个结果
Flux<Person> people = personRepository.findAll();
使用DatabaseClient API

当使用Spring Data R2DBC的DatabaseClient进行数据库操作时,你会遇到一系列的链式调用方法,这些方法为构建和执行数据库命令提供了灵活的接口。下面是这些方法的简要说明:

.sql(String sql)

  • 含义: 该方法用于指定要执行的SQL命令。
  • 用法: .sql("SELECT * FROM my_table WHERE id = :id"),其中字符串为要执行的SQL命令。

.bind(String name, Object value)

  • 含义: 用于绑定SQL命令中的参数。这是一个防止SQL注入的安全做法,比直接在SQL命令字符串中拼接参数更安全。
  • 用法: .bind("id", 1),这会将SQL命令中的:id参数替换为1

.bind(index, value)

  • 含义: 除了按名称绑定参数外,也可以按位置绑定参数。这在使用没有命名参数的数据库或驱动时特别有用。
  • 用法: .bind(0, "value"),这会将SQL命令中的第一个参数(基于0的索引)绑定为指定的值。

.bindNull(String name, Class<?> type)

  • 含义: 用于绑定SQL命令中的参数为NULL值,同时指定该NULL值的类型。
  • 用法: .bindNull("parameterName", String.class),这会将SQL命令中的:parameterName参数绑定为NULL,类型为String

.then()

  • 含义: 在Reactor中,.then()方法用于忽略前面步骤的结果,并在上一个操作完成后继续执行。在DatabaseClient的上下文中,.then()通常用于执行完数据库操作(如插入、更新、删除)后,不需要处理任何返回结果,只需知道操作已成功完成。

  • 用法: .then()用于链式调用的最后,返回一个Mono<Void>,表示操作完成的信号。

    使用.then()方法后就不能使用.fetch()方法,因为一个不需要返回实际结果,一个需要返回结果。

.map(Function<Row, T> mappingFunction)

  • 含义: 该方法用于将结果集中的每一行(Row)映射(或转换)为另一种类型(T)。这是通过提供一个函数来完成的,该函数接受一个Row对象并返回一个新类型的对象。

  • 用法: .map(row -> new MyObject(row.get("column_name", String.class))),其中MyObject是自定义类,column_name是结果集中的列名。.map方法在查询操作中几乎是一定会使用到的方法。

    .map()在这里是对结果集中的每一项进行转换的操作,它是基于结果流Flux或单个结果Mono的Reactor操作符。当需要对查询结果进行转换时,.map()方法通常与.all(), .one(), 或.first()一起使用。

.one()

  • 含义: 用于从反应式数据库操作中获取恰好一个结果的Mono<T>。如果查询结果为空,则返回Mono.empty();如果查询结果有多于一条记录,通常会抛出异常。这适用于你期望查询返回单一结果的场景。

  • 用法: 在执行数据库查询操作时使用,适合于返回单条记录的查询。

    1
    2
    3
    4
    5
    6
    7
    8
    Mono<Person> person = databaseClient
    .sql("SELECT * FROM person WHERE id = :id")
    .bind("id", 1)
    .map(row -> new Person(
    row.get("id", Long.class),
    row.get("name", String.class),
    row.get("age", Integer.class)))
    .one();

.first()

  • 含义: 与.one()类似,但当存在多条记录时,它会返回结果集中的第一条记录包装在Mono<T>中。如果查询结果为空,则返回Mono.empty()。这适用于你只关心结果集的第一条记录,而不在乎是否还有其他记录的场景。

  • 用法: 在执行数据库查询操作时使用,当你只需要查询结果中的第一条记录。

    1
    2
    3
    4
    5
    6
    7
    Mono<Person> person = databaseClient
    .sql("SELECT * FROM person")
    .map(row -> new Person(
    row.get("id", Long.class),
    row.get("name", String.class),
    row.get("age", Integer.class)))
    .first();

.all()

  • 含义: 用于获取操作结果中的所有记录,返回一个包含所有结果的Flux<T>。这适用于你需要处理查询返回的所有记录的场景。

  • 用法: 在执行数据库查询操作时使用,特别是当你期望获取多条记录作为响应。

    1
    2
    3
    4
    5
    6
    7
    Flux<Person> people = databaseClient
    .sql("SELECT * FROM person")
    .map(row -> new Person(
    row.get("id", Long.class),
    row.get("name", String.class),
    row.get("age", Integer.class)))
    .all();

.fetch()

  • 含义: 该方法用于执行前面定义的SQL命令,并开始处理返回的结果。

  • 用法: 通常跟在.sql()和参数绑定方法(如.bind())之后使用。

    .fetch()方法用于指定如何处理SQL执行的结果,它返回一个FetchSpec,允许你使用.rowsUpdated()来获取受影响的行数(只对于增加、更新、删除等影响到实际数据的操作)。

    fetch同样可以结合onefirstall使用,但是因为fetch返回的是FetchSpec<Map<String, Object>>类型,需要手动强制转换,对于需要查询返回大量字段的操作不是很推荐,通常查询只返回单个字段或少量字段的可以使用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    Mono<Person> person = databaseClient
    .sql("SELECT * FROM person WHERE id = :id")
    .bind("id", 1)
    .fetch()
    .one()
    .map(row -> new Person(
    (Long) row.get("id"),
    (String) row.get("name"),
    (Integer) row.get("age")));

    Mono<Person> person = databaseClient
    .sql("SELECT * FROM person WHERE id = :id")
    .bind("id", 1)
    .fetch()
    .first()
    .map(row -> new Person(
    (Long) row.get("id"),
    (String) row.get("name"),
    (Integer) row.get("age")));

    Flux<Person> person = databaseClient
    .sql("SELECT * FROM person")
    .fetch()
    .all()
    .map(row -> new Person(
    (Long) row.get("id"),
    (String) row.get("name"),
    (Integer) row.get("age")));

    比较两个版本

    • fetch版本:遵循了更常见的Spring Data R2DBC使用模式,即先执行.fetch().all()获取查询结果,然后通过.map()转换。
    • map版本:利用了Spring Data R2DBC提供的一个更直接的映射策略,允许在定义SQL之后立即应用映射函数。这个方法的优势在于代码更简洁,但可能在某些情况下减少了对查询结果处理流程的可见性。

.rowsUpdated()

  • 含义: 在执行更新、插入或删除操作后,.rowsUpdated()用于返回受这些操作影响的行数。

  • 用法: 用于确认操作影响了多少行数据。这个方法只会在增删改等影响实际数据的操作中使用。

    .fetch().rowsUpdated()组合使用返回受影响的行数。

CRUD 操作示例 (在数据访问层实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 增加
Person person = new Person();
person.setName("John Doe");
person.setAge(25);
Mono<Person> savedPerson = databaseClient.sql("INSERT INTO person (name, age) VALUES (:name, :age)")
.bind("name", person.getName())
.bind("age", person.getAge())
.fetch()
.rowsUpdated();
// 删除
Mono<Long> deleteResult = databaseClient.sql("DELETE FROM person WHERE id = :id")
.bind("id", 1)
.fetch()
.rowsUpdated();
// 更新
Mono<Long> updateResult = databaseClient.sql("UPDATE person SET name = :name WHERE id = :id")
.bind("name", "Jane Doe")
.bind("id", 1)
.fetch()
.rowsUpdated();
// 查询
// 查询单个结果
Mono<Person> person = databaseClient.sql("SELECT * FROM person WHERE id = :id")
.bind("id", 1)
.map(row -> new Person(/* initialize properties from row */))
.one();
// 查询多个结果
Flux<Person> people = databaseClient.sql("SELECT * FROM person")
.map(row -> new Person(/* initialize properties from row */))
.all();

批量增删改操作示例

对于批量操作,更推荐的做法是使用请求体(Request Body)来传递数据,特别是使用POSTPUT方法时。这样可以无限制地传递大量数据,同时保持URL的简洁和安全性。

普通版本

  • 批量增加操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    Flux<Person> persons = Flux.just(new Person("John Doe", 25), new Person("Jane Doe", 30));
    Mono<Long> insertResults = persons.flatMap(person ->
    databaseClient.sql("INSERT INTO person (name, age) VALUES (:name, :age)")
    .bind("name", person.getName())
    .bind("age", person.getAge())
    .fetch()
    .rowsUpdated()
    ).reduce(0L, Long::sum); // 计数所有操作的总影响行数

    // 或者
    List<Person> persons = Arrays.asList(new Person("John Doe", 25), new Person("Jane Doe", 30));
    Mono<Long> insertResults = Flux.fromIterable(persons)
    .flatMap(person ->
    databaseClient.sql("INSERT INTO person (name, age) VALUES (:name, :age)")
    .bind("name", person.getName())
    .bind("age", person.getAge())
    .fetch()
    .rowsUpdated()
    ).reduce(0L, Long::sum);

    countreduce都是统计总数的方法,不过count统计的是流中元素的数量,是操作的总数,而reduce统计的是实际操作成功的总数。

  • 批量删除操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    Flux<Long> idsToDelete = Flux.just(1L, 2L, 3L); // 假设这是要删除的ID列表
    Mono<Long> deleteResults = idsToDelete.collectList().flatMap(ids ->
    databaseClient.sql("DELETE FROM person WHERE id IN (:ids)")
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    );

    // 或者
    List<Long> idsToDelete = Arrays.asList(1L, 2L, 3L);
    Mono<Long> deleteResults = Flux.fromIterable(idsToDelete)
    .collectList()
    .flatMap(ids ->
    databaseClient.sql("DELETE FROM person WHERE id IN (:ids)")
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    );
  • 批量更新操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 假设要将所有指定ID的人的年龄更新为30
    Flux<Long> idsToUpdate = Flux.just(1L, 2L, 3L);
    Mono<Long> updateResults = idsToUpdate.flatMap(ids ->
    databaseClient.sql("UPDATE person SET age = :age WHERE id = (:ids)")
    .bind("age", 30)
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    ).reduce(0L, Long::sum);

    // 或者
    List<Long> idsToUpdate = Arrays.asList(1L, 2L, 3L);
    Mono<Long> updateResults = Flux.fromIterable(idsToUpdate)
    .flatMap(ids ->
    databaseClient.sql("UPDATE person SET age = :age WHERE id = (:ids)")
    .bind("age", 30)
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    ).reduce(0L, Long::sum);

Flux.defer版本

  • 批量增加操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    List<Person> persons = Arrays.asList(new Person("John Doe", 25), new Person("Jane Doe", 30));
    Mono<Long> insertResults = Flux.defer(() ->
    Flux.fromIterable(persons)
    .flatMap(person ->
    databaseClient.sql("INSERT INTO person (name, age) VALUES (:name, :age)")
    .bind("name", person.getName())
    .bind("age", person.getAge())
    .fetch()
    .rowsUpdated()
    )
    ).reduce(0L, Long::sum);
  • 批量删除操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    List<Long> idsToDelete = Arrays.asList(1L, 2L, 3L);
    Mono<Long> deleteResults = Flux.defer(() ->
    Flux.fromIterable(idsToDelete)
    .collectList()
    .flatMap(ids ->
    databaseClient.sql("DELETE FROM person WHERE id IN (:ids)")
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    )
    ).reduce(0L, Long::sum);
  • 批量修改操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    List<Long> idsToUpdate = Arrays.asList(1L, 2L, 3L);
    Mono<Long> updateResults = Flux.defer(() ->
    Flux.fromIterable(idsToUpdate)
    .flatMap(ids ->
    databaseClient.sql("UPDATE person SET age = :age WHERE id = (:ids)")
    .bind("age", 30)
    .bind("ids", ids)
    .fetch()
    .rowsUpdated()
    ).reduce(0L, Long::sum);

使用Flux.defer适合于某些特定的场景,主要是当你想确保每次订阅时都执行最新的操作逻辑,特别是在处理动态数据或需要确保操作使用最新状态的场景下。Flux.defer通过延迟操作的创建直到订阅发生,可以确保数据的实时性和操作的最新性。然而,并不是所有场景都适合使用Flux.defer。下面是一些考虑因素:

适合使用Flux.defer的场景

  • 动态数据源:当数据源是动态变化的,比如依赖于时间或外部系统状态,使用Flux.defer可以确保每次订阅都获取最新数据。
  • 避免不必要的计算:如果有一些计算成本较高的操作,你只想在实际需要数据时才执行,Flux.defer可以帮助避免不必要的计算。
  • 条件性操作:当操作需要基于每次订阅时的特定条件来执行,使用Flux.defer可以确保操作符合当前的条件。

不适合使用Flux.defer的场景

  • 静态数据或一次性计算:对于不会改变的数据或只需要计算一次的操作,使用Flux.defer可能没有必要,直接使用Flux.justMono.just等操作可能更合适。
  • 共享操作结果:如果你想让所有订阅者共享同一个操作结果,而不是为每个订阅者重新执行操作,那么Flux.defer不是一个好选择。在这种情况下,可以考虑使用.cache()操作符。
  • 需要即时执行的操作:由于Flux.defer会延迟操作直到订阅发生,如果你需要在定义流的同时立即执行某些操作(例如,预加载数据),Flux.defer可能不适合。
关于响应式 defer 的理解
  1. 对于动态或实时数据:如果操作涉及获取可能随时间变化的数据,或者其结果依赖于当前的系统状态(如数据库中的数据、系统时间或外部服务的状态),那么使用defer是合适的。这确保了每次订阅都能获取到最新的数据,避免了由于早期执行而可能导致的数据过时问题。
  2. 避免抢注问题:在用户注册的场景中,使用defer可以确保在订阅(也就是实际执行注册逻辑)的时刻检查用户名是否已存在。这有助于减少因多个用户几乎同时注册相同用户名而导致的冲突。
  3. 非一成不变的数据:对于不是一成不变的数据,特别是那些可能受到外部状态影响的操作,使用defer来延迟操作的创建和执行,直到订阅发生,是一个很好的实践。这样做可以确保操作的执行反映了最新的状态,提高了程序的健壮性和数据的准确性。
  4. 有副作用的操作:如果操作具有副作用(如修改数据库、调用外部服务等),使用Mono.defer确保副作用操作在每次订阅时执行,而不是在声明阶段就执行,这有助于更好地控制副作用的发生时机。

通俗理解

  • 如果结果可能随着时间或状态的变化而变化,或者操作具有副作用,那么使用defer
  • 对于静态的、确定不变的数据或结果,直接使用Mono.justFlux.fromIterable等其他不涉及延迟执行的操作符即可。

不过,如果完全采用响应式编程,大部分情况下,都不需要显式地使用defer

defer在响应式编程中通常在以下几种情况下需要显式使用:

  1. 确保操作延迟执行:当你需要确保某个操作(如数据库查询、外部服务调用等)确实在每次订阅时执行,而不是在声明流时就执行。这对于包含时间相关的操作或依赖于每次请求可能不同的上下文信息的操作尤其重要。

  2. 使用动态或实时参数:当操作的参数在声明流的时间点可能还不确定,或者参数值可能随时间变化(例如,依赖于用户的实时输入或其他变化的系统状态)时,defer可以确保参数是在实际执行操作时才被评估和使用。

  3. 避免副作用的预先执行:如果某个操作具有副作用(比如修改全局状态或外部资源),使用defer可以防止这些副作用在不适当的时间(如流声明时)就发生,确保副作用操作在订阅时才执行。

  4. 封装非响应式操作:当需要将非响应式操作(比如传统的同步方法调用)转换为响应式流时,defer可以用来封装这些操作,确保它们在响应式上下文中以非阻塞的方式延迟执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ReactiveService {

    // 假设这是一个非响应式的同步操作
    String nonReactiveOperation() {
    // 模拟一些耗时操作,比如数据库查询或远程服务调用
    try {
    Thread.sleep(1000); // 模拟耗时
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    return "非响应式操作的结果";
    }

    // 使用Mono.defer封装非响应式操作
    Mono<String> wrapNonReactiveOperationWithDefer() {
    return Mono.defer(() -> {
    // 调用非响应式的同步操作,并将其结果封装为Mono
    String result = nonReactiveOperation();
    return Mono.just(result);
    });
    }
    }

整合 Reactive MongoDB、Reactive Redis

文档存储

文档存储是一种非关系型数据库(NoSQL)存储方式,它以文档的形式存储和管理数据。文档存储数据库将数据存储为文档,这些文档可以是JSON、XML或其他类似的格式。文档内部可以包含嵌套的数据结构,如列表或字典,从而允许存储复杂的数据结构。文档存储数据库的例子包括 MongoDB、CouchDB 和 Amazon DynamoDB 等。

文档存储与传统的关系型数据库存储(如MySQL、PostgreSQL等)相比,主要有以下区别:

  1. 数据模型:关系型数据库基于表格模型,数据以行和列的形式存储,且每个表格通常需要预定义的模式(schema)。文档存储数据库则以更灵活的文档格式存储数据,每个文档可以有不同的结构,不需要预定义模式。
  2. 灵活性:文档存储数据库因其无模式(schema-less)特性而提供更高的灵活性,便于存储结构化或半结构化数据,同时也更容易适应数据结构的变化。
  3. 查询语言:关系型数据库使用标准的SQL作为查询语言,而文档存储数据库使用基于文档的查询语言,这些语言通常更适合于处理文档型数据。
  4. 扩展性:文档存储数据库通常更容易水平扩展,支持分布式数据存储,而关系型数据库的扩展通常更复杂,尤其是在处理大规模数据时。
  5. 一致性和事务处理:传统的关系型数据库通常提供强一致性和复杂的事务处理特性。相比之下,许多文档存储数据库采用最终一致性模型,并可能提供较为有限的事务处理能力,虽然一些文档存储数据库(如最新版本的MongoDB)已开始支持更复杂的事务处理。
  6. 用例:文档存储数据库适合于需要存储大量非结构化或半结构化数据的应用场景,如内容管理系统、电商平台等。而关系型数据库则适合于需要复杂查询和事务处理的传统企业级应用。
键值对存储

键值对存储是一种简单的数据存储模型,它通过键(key)来访问和存储数据值(value)。这种模型类似于字典或哈希表,其中每个键唯一对应一个值。键值对存储系统通常用于缓存、会话存储、简单的数据记录等场景。与文档存储和关系型数据库相比,键值对存储提供了高速的查找和存储能力,但在数据结构的复杂性和查询能力上相对有限。键值对存储数据库的典型例子就是 Redis。

以下是键值对存储方式的一些主要特点和与传统数据库存储的区别:

  1. 数据模型简单:键值对存储的数据模型极其简单,每个键对应一个值,值可以是字符串、数字或者更复杂的数据结构(取决于具体的键值对存储系统)。不同于关系型数据库的行和列或文档存储的JSON文档,键值对存储不关心值的内部结构。
  2. 高性能:由于数据模型的简单,键值对存储能够提供非常高的读写性能。这使得键值对存储非常适合用作应用程序的缓存层,可以快速响应数据读取请求。
  3. 易于扩展:键值对存储通常支持水平扩展,可以通过添加更多节点来增加数据库的容量和吞吐量,而不需要复杂的数据迁移或重构。
  4. 灵活性:虽然键值对存储的数据模型相对简单,但其值的存储格式可以非常灵活。一些键值对存储系统允许存储复杂的数据结构作为值,如列表、集合或哈希表等。
  5. 有限的查询能力:与关系型数据库或文档存储相比,键值对存储的查询能力较为有限。通常,只能通过键直接访问数据,缺乏对值的复杂查询能力,如基于值的内容进行搜索或过滤。
  6. 适用场景:键值对存储特别适合于需要快速读写访问的场景,如缓存、会话存储(session storage)、实时计数器等。它不适合需要复杂查询或数据关联分析的应用。

关于 MongoDB 和 Redis 的详细介绍参考MongoDB 教程 Redis 教程

连接配置

配置 application.yaml 文件,参考如下

1
2
3
4
5
6
7
8
9
10
11
spring:
# ... r2dbc 的配置内容
data:
mongodb:
uri: mongodb://localhost:27017/your_database # MongoDB连接URI
redis:
host: localhost
port: 6379
reactive:
pool:
max-active: 8 # 根据需要调整连接池大小

这里的your_database是自定义的数据库名称,它会在保存数据时自动创建,不需要手动创建。

特别的,对于 Redis,由于其本身的技术特性和数据模型,导致其无法通过简单的声明式配置实现,通常还需要专门的编程式配置,即配置类。

下面提供了两种配置类:

注意:部分基础配置已经在前面的 application.yaml 中配置完成,下面的配置都是高度自定义的配置。

使用Jackson2JsonRedisSerializer配置

当使用Jackson2JsonRedisSerializer时,你需要为每种类型提供一个序列化器实例。这意味着如果你有多种类型的数据需要存储,你可能需要为每种类型配置不同的ReactiveRedisTemplate实例。不过,通常情况下,一个应用中使用单一类型的情况较多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class JacksonRedisConfig {

@Bean
public <T> ReactiveRedisTemplate<String, T> reactiveRedisTemplateWithJackson(ReactiveRedisConnectionFactory connectionFactory, Class<T> type) {
Jackson2JsonRedisSerializer<T> valueSerializer = new Jackson2JsonRedisSerializer<>(type);
StringRedisSerializer keySerializer = new StringRedisSerializer();

RedisSerializationContext.RedisSerializationContextBuilder<String, T> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, T> context = builder.value(valueSerializer).build();

return new ReactiveRedisTemplate<>(connectionFactory, context);
}
}

使用GenericJackson2JsonRedisSerializer配置

GenericJackson2JsonRedisSerializer的一个优点是它可以在序列化时包含类型信息,从而支持多种类型的数据而无需为每种类型提供单独的序列化器实例。这使得它在处理多种数据类型的场景下更为方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class GenericJacksonRedisConfig {

@Bean
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplateWithGenericJackson(ReactiveRedisConnectionFactory connectionFactory) {
// 创建一个定制化的ObjectMapper实例
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule()); // 注册JavaTimeModule
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); // 确保Java 8日期/时间类型以ISO格式序列化
mapper.findAndRegisterModules(); // 自动注册所有可用的模块(包括JavaTimeModule)

// 使用定制化的ObjectMapper实例创建GenericJackson2JsonRedisSerializer
GenericJackson2JsonRedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer(mapper);
// 上面的配置是为了支持LocalDate、LocalTime、LocalDateTime的序列化
// GenericJackson2JsonRedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer keySerializer = new StringRedisSerializer();

RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Object> context = builder.value(valueSerializer).build();

return new ReactiveRedisTemplate<>(connectionFactory, context);
}
}

对比

  • 如果你的应用主要处理单一类型的数据或者你需要对序列化过程有更细致的控制,Jackson2JsonRedisSerializer可能是一个好的选择。
  • 如果你需要在Redis中存储多种类型的数据,并希望简化序列化和反序列化的处理,那么GenericJackson2JsonRedisSerializer可能更适合你的需求。

通常GenericJackson2JsonRedisSerializer通用性更强,但是由于其类型自动处理的功能导致在某些情况下性能不如Jackson2JsonRedisSerializer高,而且生成的 json 数据包含了类型信息,不如Jackson2JsonRedisSerializer简洁,不方便开发者进行调试。此外,初学者学习推荐先从Jackson2JsonRedisSerializer开始再到GenericJackson2JsonRedisSerializer

不同的配置方式

在Spring Boot应用中,通过application.propertiesapplication.yml文件进行配置属于声明式配置。这种方式允许开发者以简洁明了的形式指定应用配置,而无需编写额外的代码。Spring Boot的自动配置特性会根据这些属性以及应用的依赖关系自动设置合理的默认配置。

声明式配置

声明式配置通常用于配置数据库连接、应用参数、日志级别等。例如,配置MongoDB和PostgreSQL的连接信息:

1
2
3
4
5
6
7
8
9
10
11
12
# MongoDB
spring:
data:
mongodb:
uri: mongodb://username:password@localhost:27017/databaseName

# PostgreSQL
spring:
datasource:
url: jdbc:postgresql://localhost:5432/databaseName
username: username
password: password

编程式配置

编程式配置涉及创建配置类(通常带有@Configuration注解的类),在这些类中,你可以使用Java代码定义Bean和配置应用程序的行为。这种方式提供了更高的灵活性和控制力,适用于需要根据复杂逻辑或条件动态决定配置的场景。

例如,创建Redis的配置类:

1
2
3
4
5
6
7
8
@Configuration
public class RedisConfig {

@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", 6379));
}
}

其他配置方式

除了声明式配置和编程式配置,Spring Boot还支持通过环境变量命令行参数外部配置文件等多种方式来覆盖或指定配置。这提供了极大的灵活性,使得同一个应用可以在不同环境下使用不同的配置。

使用配置类配置MongoDB和PostgreSQL

理论上,你也可以使用配置类去配置MongoDB和PostgreSQL,就像配置Redis那样。然而,考虑到Spring Boot提供了对这些数据库的广泛自动配置支持,通常没有必要手动创建这些配置类。通过声明式配置,你可以更加简洁地利用Spring Boot的自动配置特性,减少样板代码,同时也能够更容易地管理和修改配置。在大多数Spring Boot应用中,采用声明式配置来利用Spring Boot的自动配置功能通常更为简便和高效。这样做不仅减少了配置的复杂性,也使得配置更加集中和易于管理。只有在需要高度自定义配置或Spring Boot的自动配置不能满足需求时,才考虑使用编程式配置。

代码实现方式

MongoDB 也有类似于 R2DBC 的 ReactiveMongoRepository(继承了ReactiveCrudRepository接口),提供了类似的数据访问方式:自带的方法实现、方法名解析、自定义方法(ReactiveMongoTemplate实现)等。

Redis 则没有响应式接口支持,需要自定义实现,主要是通过ReactiveRedisTemplate实现数据库操作。

实体类注解

关于实体类,MongoDB 提供了一些相关的注解

  • @Document: 用于指定实体类对应的MongoDB集合(collection)。可以通过collection属性指定集合的名称,如果不指定,默认使用类的名称(首字母小写)。

    1
    2
    3
    4
    @Document(collection = "departments")
    public class Department {
    // ...
    }
  • @Id: 用于标记实体类中的属性作为文档的ID。MongoDB文档总是有一个_id字段作为唯一标识符,使用此注解可以映射类的属性到这个字段。

    1
    2
    @Id
    private String id;
  • @Field: 用于指定实体类属性映射到文档中的字段。可以通过它自定义字段的名称。

    1
    2
    @Field("dept_name")
    private String departmentName;
  • @Indexed: 用于标记一个字段应该被索引。这对于提高查询性能非常有用,特别是对于经常作为查询条件的字段。

    1
    2
    @Indexed(unique = true)
    private String departmentCode;
  • @Transient: 标记为@Transient的字段不会被持久化到MongoDB中。这对于临时状态或计算得出的属性很有用。

    1
    2
    @Transient
    private int memberCount;
  • @CompoundIndex: 用于类级别,指定复合索引。这可以用来在多个字段上创建索引,以优化特定类型的查询。

    1
    2
    3
    4
    5
    @Document
    @CompoundIndex(def = "{'departmentCode': 1, 'departmentName': -1}")
    public class Department {
    // ...
    }

Redis 关于实体类的注解没有那么多,主要是以下几个,部分注解是通用的。

  • @RedisHash: 用于类,标记该类的对象将被存储在Redis的哈希结构中。@RedisHash注解可以接受一个可选的参数,用于指定在Redis中存储时使用的哈希的名称。
  • @Id: 用于标记实体的id字段,这个字段的值将被用作Redis哈希结构中的key。
  • @Indexed: 可以用于字段上,表示该字段将被索引,适用于进行查询操作。
  • @TimeToLive: 可以用于字段上,指定一个实体或其部分的过期时间(生存时间)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RedisHash("people")
public class Person implements Serializable {

@Id
private String id;

@Indexed
private String name;

private Integer age;

@TimeToLive
private Long expiration = 86400L; // 生存时间设置为24小时(86400秒)

// 省略构造函数、Getter和Setter
}

注意:使用 Redis 缓存的数据都需要实现可序列化接口Serializable

CRUD 操作示例

由于 Reactive MongoDB 的ReactiveMongoRepository是继承ReactiveCrudRepository接口实现的,这里只介绍自定义方法实现,即通过ReactiveMongoTemplate实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 增加
// 单个对象
Person person = new Person("John Doe", 30);
Mono<Person> insertMono = reactiveMongoTemplate.insert(person);
// 多个对象
List<Person> persons = Arrays.asList(new Person("John Doe", 30), new Person("Jane Doe", 25));
Flux<Person> insertFlux = reactiveMongoTemplate.insertAll(persons);

// 删除
// 单个对象
Query query = new Query(Criteria.where("name").is("John Doe"));
Mono<DeleteResult> deleteMono = reactiveMongoTemplate.remove(query, Person.class);
// 多个对象
Query query = new Query(Criteria.where("age").lt(25));
Mono<DeleteResult> deleteMono = reactiveMongoTemplate.remove(query, Person.class);

// 更新
// 单个对象
Query query = new Query(Criteria.where("name").is("John Doe"));
Update update = new Update().set("age", 31);
Mono<UpdateResult> updateMono = reactiveMongoTemplate.updateFirst(query, update, Person.class);
// 多个对象
Query query = new Query(Criteria.where("age").lt(30));
Update update = new Update().set("status", "active");
Mono<UpdateResult> updateMono = reactiveMongoTemplate.updateMulti(query, update, Person.class);

// 查询
// 单个对象
Mono<Person> personMono = reactiveMongoTemplate.findById("someId", Person.class);
Query query = new Query(Criteria.where("name").is("John Doe")); // 或者使用查询对象 Query
Mono<Person> personMono = reactiveMongoTemplate.findOne(query, Person.class);
// 多个对象
Query query = new Query(Criteria.where("age").gt(20));
Flux<Person> personsFlux = reactiveMongoTemplate.find(query, Person.class);

上述示例中的UpdateResultDeleteResult是来自于com.mongodb.reactivestreams.client包,它们提供了操作的结果信息,比如影响的文档数量等。

Redis 也有类似的 CRUD 操作,但是需要考虑更多的模式和策略。

下面先给出 CRUD 操作的示例

单对象 CRUD 操作

  1. 创建或更新(单个对象)

    使用ReactiveRedisTemplateopsForValue().set()方法可以存储单个对象。如果键已存在,它会被更新。

    1
    2
    3
    4
    public Mono<Boolean> savePerson(Person person) {
    String key = "person:" + person.getId();
    return reactiveRedisTemplate.opsForValue().set(key, person);
    }
  2. 查询(单个对象)

    使用opsForValue().get()方法根据键查询单个对象。

    1
    2
    3
    4
    public Mono<Person> findPerson(String id) {
    String key = "person:" + id;
    return reactiveRedisTemplate.opsForValue().get(key);
    }
  3. 删除(单个对象)

    使用delete()方法根据键删除单个对象。

    1
    2
    3
    4
    public Mono<Boolean> deletePerson(String id) {
    String key = "person:" + id;
    return reactiveRedisTemplate.opsForValue().delete(key);
    }

多对象 CRUD 操作

  1. 创建或更新(多个对象)

    Redis 并不直接支持一次性创建或更新多个对象的操作。你需要对每个对象进行遍历并分别调用设置方法。这可以通过Flux来实现。

    1
    2
    3
    4
    public Flux<Boolean> savePersons(List<Person> persons) {
    return Flux.fromIterable(persons)
    .flatMap(person -> savePerson(person)); // savePerson 就是前面单对象操作中的实现方法
    }
  2. 查询(多个对象)

    查询多个对象通常涉及到根据多个键查询。在Redis中,可以使用Flux来遍历键并查询。

    1
    2
    3
    4
    public Flux<Person> findPersons(List<String> ids) {
    return Flux.fromIterable(ids)
    .flatMap(id -> findPerson(id));
    }
  3. 删除(多个对象)

    删除多个对象也需要对每个键进行遍历并删除。

    1
    2
    3
    4
    public Flux<Boolean> deletePersons(List<String> ids) {
    return Flux.fromIterable(ids)
    .flatMap(id -> deletePerson(id));
    }

实现策略

存储多对象数据

  1. 每个对象存储一份数据:这种方式适合于需要频繁访问或更新单个对象的场景。通过使用唯一键为每个对象存储一份数据,可以快速地访问和更新单个对象而不影响其他对象。
  2. 存储对象集合:当需要一次性加载或处理多个对象时,可以将这些对象作为一个集合存储。这种方式适用于数据一致性要求不高或同时操作整个集合的场景。例如,可以将对象序列化为JSON数组存储在单个键下。

Redis中的 CRUD 和缓存失效策略

  • 创建和更新(C和U):在Redis中创建和更新操作通常是通过设置键值对来完成的。如果数据发生变化,直接更新对应的键值对即可。对于缓存场景,如果是更新操作,通常会直接更新缓存中的数据,或者删除缓存中的旧数据让其在下次访问时重新加载。
  • 读取(R):读取操作是Redis中最常见的用例之一,特别是作为缓存时。由于Redis的读取性能非常高,它非常适合用作频繁读取的数据的缓存。
  • 删除(D):在数据变更或不再需要时,可以通过删除操作移除Redis中的数据。对于缓存,当底层数据发生变化导致缓存数据过时时,常见的做法是直接删除缓存中的数据,避免提供过时的信息。

缓存失效策略

  • 主动失效:应用程序在更新底层数据时主动删除或更新缓存中的数据。这种策略可以确保缓存数据的一致性,但需要应用程序显式地管理缓存的失效。
  • 被动失效:利用Redis的EXPIRE命令为缓存数据设置生存时间(TTL),让数据在指定时间后自动过期。这种策略适用于那些即使数据稍微过时也不会造成大问题的场景。

更新缓存 or 清空缓存?

更新缓存

适用场景

  • 数据频繁读取且更新成本相对较低的情况。
  • 需要保证数据一致性,且能够容易地计算出更新后的值。
  • 应用场景对数据实时性要求较高,必须确保缓存中的数据尽可能反映最新状态。

优点

  • 减少对后端数据源的查询压力,提高数据访问速度。
  • 保持缓存数据的实时性和一致性。

缺点

  • 实现复杂度较高,尤其是当更新逻辑复杂或与原始数据生成逻辑不一致时。
  • 在高并发场景下,更新缓存可能引入竞态条件,需要额外的同步机制。

清空缓存

适用场景

  • 数据更新频率低,或者更新操作对性能影响较大的场景。
  • 数据一致性要求不是非常严格,可以接受短时间内缓存数据与数据库数据的不一致。
  • 更新操作涉及复杂逻辑,直接重新生成缓存内容比更新现有缓存更简单或更高效。

优点

  • 实现简单,只需删除缓存项即可,无需处理复杂的更新逻辑。
  • 避免了因缓存数据更新不当而导致的数据不一致问题。

缺点

  • 频繁清空缓存可能导致对后端数据源的访问增加,影响性能。
  • 用户可能会遇到缓存失效后的延迟增加,尤其是对于计算或获取成本较高的数据。

总结

  • 如果能够容易且准确地更新缓存以反映最新数据,而且这样做对性能影响不大,则更新缓存可能是更好的选择。
  • 如果更新缓存的逻辑复杂,或者保持数据最新性的成本较高,清空(删除)缓存然后在下次访问时重新生成可能是更简单、更有效的方法。

封装到数据访问层

MongoDB 可以将自定义的接口和主接口组合起来

首先创建自定义的 repository 接口,声明自定义方法

1
2
3
public interface CustomPersonRepository {
Flux<Person> findCustomQuery(String criteria);
}

然后创建自定义接口的实现类,注入ReactiveMongoTemplate来执行实际的数据库操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CustomPersonRepositoryImpl implements CustomPersonRepository {

private final ReactiveMongoTemplate mongoTemplate;

public CustomPersonRepositoryImpl(ReactiveMongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}

@Override
public Flux<Person> findCustomQuery(String criteria) {
Query query = new Query(Criteria.where("someField").is(criteria));
return mongoTemplate.find(query, Person.class);
}
}

最后组合自定义 repository 接口到主 repository 接口,让主 repository 接口扩展自定义的接口

1
2
3
public interface PersonRepository extends ReactiveMongoRepository<Person, String>, CustomPersonRepository {
// 自带的CRUD方法和查询方法名解析方法
}

Redis 直接定义接口和实现类即可

创建接口

1
2
3
4
5
6
public interface ReactivePersonRepository {
Mono<Person> save(Person person);
Mono<Person> findById(String id);
Flux<Person> findAll();
Mono<Void> deleteById(String id);
}

创建实现类,注入ReactiveRedisTemplateReactiveValueOperations(实际上就是对ReactiveRedisTemplate.opsForValue()的引用,简化了代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Repository
public class ReactivePersonRepositoryImpl implements ReactivePersonRepository {

private final ReactiveRedisTemplate<String, Person> redisTemplate;
private final ReactiveValueOperations<String, Person> reactiveValueOps;

public ReactivePersonRepositoryImpl(ReactiveRedisTemplate<String, Person> redisTemplate) {
this.redisTemplate = redisTemplate;
this.reactiveValueOps = redisTemplate.opsForValue();
}

@Override
public Mono<Person> save(Person person) {
return reactiveValueOps.set(person.getId(), person).thenReturn(person);
}

@Override
public Mono<Person> findById(String id) {
return reactiveValueOps.get(id);
}

@Override
public Flux<Person> findAll() {
return redisTemplate.execute(connection ->
connection.keyCommands().scan(ScanOptions.scanOptions().match("*").build()))
.flatMap(cursor -> Flux.fromIterable(cursor)
.flatMap(key -> reactiveValueOps.get(new String(key)).cast(T.class)));
}

@Override
public Mono<Void> deleteById(String id) {
return reactiveValueOps.delete(id).then();
}
}

更通用的泛型写法

接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface ReactiveGenericRepository {

/**
* 保存一个对象到Redis。
*
* @param key Redis键
* @param value 要保存的对象
* @param <T> 对象的类型
* @return 保存后的对象
*/
<T> Mono<T> save(String key, T value);

/**
* 根据键查找一个对象。
*
* @param key Redis键
* @param <T> 对象的预期类型
* @return 找到的对象
*/
<T> Mono<T> findById(String key);

/**
* 查找所有对象。
*
* @param clazz 对象的类类型,用于类型转换
* @param <T> 对象的预期类型
* @return 找到的所有对象
*/
<T> Flux<T> findAll(Class<T> clazz);

/**
* 根据键删除一个对象。
*
* @param key Redis键
* @return 删除操作的结果
*/
Mono<Boolean> deleteById(String key);
}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Repository
public class ReactiveGenericRepositoryImpl implements ReactiveGenericRepository {

private final ReactiveRedisTemplate<String, Object> redisTemplate;
private final ReactiveValueOperations<String, Object> reactiveValueOps;

public ReactiveGenericRepositoryImpl(ReactiveRedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
this.reactiveValueOps = redisTemplate.opsForValue();
}

@Override
public <T> Mono<T> save(String key, T value) {
return reactiveValueOps.set(key, value).thenReturn(value);
}

@Override
public <T> Mono<T> findById(String key) {
return reactiveValueOps.get(key);
}

@Override
public <T> Flux<T> findAll(Class<T> clazz) {
return redisTemplate.scan(ScanOptions.scanOptions().match("*").build())
.flatMap(key -> redisTemplate.opsForValue().get(key));
}

@Override
public Mono<Boolean> deleteById(String key) {
return reactiveValueOps.delete(key);
}
}

关于泛型的使用

泛型可以在类、接口或方法上使用,具体取决于设计的需求:

  1. 类或接口上的泛型:当你希望定义一个类或接口时就确定其操作的数据类型,可以在类或接口上声明泛型参数。这种方式适用于整个类或接口中所有方法操作的数据类型是一致的情况。

    1
    2
    3
    4
    public interface Repository<T> {
    Mono<T> save(T entity);
    Mono<T> findById(String id);
    }
  2. 方法上的泛型:当你希望在具体的方法调用时确定数据类型,可以在方法上声明泛型参数。这种方式提供了更高的灵活性,允许在同一个类或接口中支持不同类型的操作。

    1
    2
    3
    4
    public interface GenericRepository {
    <T> Mono<T> save(String key, T value);
    <T> Mono<T> findById(String key, Class<T> clazz);
    }

Webflux 服务层

多数据库联合操作

策略

在实际业务场景中,联合多种数据库操作需要考虑诸多的策略:

1.查询策略

  • 缓存优先:对于读操作,通常采用缓存优先的策略。首先查询Redis缓存,如果缓存命中,则直接返回缓存中的数据;如果缓存未命中,则查询MongoDB或PostgreSQL,并将查询结果缓存到Redis中,以便后续请求能够直接从缓存中获取数据。

2.数据存储和一致性

  • 分离存储:MongoDB和PostgreSQL应该处理各自业务范围内的数据,避免存储重复数据。例如,MongoDB可以用于存储文档型或半结构化数据,而PostgreSQL用于存储关系数据和进行复杂查询。

  • 数据一致性:在进行增删改操作时,直接对MongoDB或PostgreSQL进行操作。操作完成后,根据业务需求和数据一致性要求决定是更新Redis缓存还是清空缓存。

3.缓存失效策略

  • 缓存更新:如果数据变更不频繁,或者实时性要求高,可以在数据变更后更新Redis缓存。这可以减少缓存不一致的窗口期,但需要额外的开销来维护缓存数据的更新。

  • 缓存清空:对于高变更频率的数据,或者当缓存更新逻辑较为复杂时,可以选择在数据变更后清空(删除)相关的Redis缓存。这简化了缓存管理,但可能导致更频繁的数据库查询。

4.事务和一致性模型

  • 事务管理:在涉及到多个数据操作的业务逻辑中,考虑使用事务(对于支持事务的数据库,如PostgreSQL)来保证操作的原子性和一致性。

  • 最终一致性:在分布式系统中,完全的一致性很难实现,可以采用最终一致性模型,通过合理的缓存策略和数据同步机制,确保数据在一定时间内达到一致状态。

5.数据备份和恢复

  • 备份策略:定期备份数据是保证数据安全的重要措施。MongoDB和PostgreSQL都提供了备份和恢复的机制,应该根据业务需求制定备份计划并定期执行。
  • 灾难恢复:设计灾难恢复计划,确保在数据丢失或系统故障的情况下能够快速恢复服务。

6.性能优化

  • 索引优化:对于MongoDB和PostgreSQL,合理使用索引可以显著提高查询效率。需要定期评估索引的使用情况和性能影响,并根据实际查询模式进行优化。
  • 缓存策略:对于Redis,合理设计缓存策略,包括选择合适的数据过期时间、使用合适的数据结构等,可以提高缓存效率和命中率。

7.安全性考虑

  • 访问控制:确保数据库的访问控制得当,只有授权的用户和服务才能访问敏感数据。
  • 数据加密:对敏感数据进行加密,包括在传输过程中的加密(如使用SSL/TLS)和存储时的加密。

8.监控和警报

  • 性能监控:监控数据库的性能指标,如响应时间、查询负载等,可以帮助及时发现和解决性能问题。
  • 系统警报:设置警报机制,当出现潜在的问题,如访问异常、资源使用率过高等时,能够及时通知到相关人员。

9.微服务架构下的数据管理

  • 服务间数据一致性:在微服务架构中,不同服务可能会操作不同的数据库实例或类型。设计合理的服务间通信和数据同步机制,以保证跨服务的数据一致性。
  • API网关和服务聚合:考虑使用API网关来聚合来自不同微服务的数据请求和响应,提供统一的数据访问入口。

命令查询责任分离 CQRS

命令查询责任分离(Command Query Responsibility Segregation,CQRS)是一种软件架构模式,它将应用的读操作(查询)和写操作(命令)明确分离开来。CQRS的核心思想是对读和写使用不同的模型,以优化性能、可伸缩性和安全性。这种模式并不一定要求使用两个物理不同的数据库,但在某些实现中,确实会使用不同的数据存储来分别处理读和写操作,以此来充分利用每种存储的优势。

CQRS的关键概念

  • 命令(Command):表示对系统的写操作(如创建、更新、删除),这些操作可以改变系统的状态但不返回任何值。
  • 查询(Query):表示对系统的读操作,这些操作返回系统的当前状态,但不改变状态。

CQRS的优势

  • 性能优化:通过分离读写操作,可以针对查询和命令操作分别优化数据模型和存储机制,从而提高应用的性能。
  • 可伸缩性:可以独立地扩展读操作和写操作的处理能力,适应不同的负载需求。
  • 安全性与复杂性管理:简化复杂的业务逻辑,因为读模型可以专门针对查询进行优化,而写模型则专注于业务规则和数据一致性。
  • 灵活的技术选择:可以针对读和写操作选择最适合的技术和数据库。

CQRS与数据一致性

在使用CQRS时,确实可能会涉及到不同的数据存储系统,这在某些情况下可能引入数据一致性的挑战。例如,当一个命令操作更新了写模型的数据后,读模型的数据可能需要一些时间才能反映这些更改,这导致了最终一致性的情况而不是即时一致性。

为了管理这种一致性问题,通常会采用以下策略之一:

  • 即时同步:在命令操作后立即更新读模型。这种方式尽可能保持读写模型的同步,但可能会影响写操作的性能。
  • 异步更新:通过事件驱动的方式,当写模型更新后异步更新读模型。这种方式可以提高写操作的性能,但读模型可能会暂时与写模型不一致,实现最终一致性。

CQRS最适合那些读写负载差异大、对数据一致性要求较为宽松的场景。在需要强一致性或读写模式没有显著差异的应用中,CQRS可能带来过多的复杂性而不是收益。

CQRS适用场景:读写负载差异大,对数据一致性要求宽松

  • 社交网络的Feed流:用户的动态更新(写操作)可能不是非常频繁,但是用户浏览Feed流(读操作)非常频繁。此外,用户通常可以接受Feed流不是实时更新的,即如果某个好友的动态延迟几秒钟出现在Feed中通常是可接受的。在这种情况下,CQRS可以用来优化Feed流的读取性能,通过将写操作和Feed流的生成/查询逻辑分离来实现。
  • 电商平台的商品推荐系统:商品的购买和评价(写操作)相对于商品推荐的阅读(读操作)来说较少。推荐系统可以基于用户的历史行为和偏好生成,且用户通常可以接受推荐信息不是实时更新的。使用CQRS可以将推荐生成(写操作)和推荐展示(读操作)分离,优化用户的阅读体验。

非CQRS适用场景:强一致性需求或读写模式没有显著差异

  • 银行系统:在处理金融交易(如转账)时,系统必须保证数据的强一致性,以确保账户余额的准确性。在这种场景下,读写操作对数据一致性的要求非常高,采用CQRS可能会引入不必要的复杂性和风险。
  • 实时协作工具:如在线文档编辑器,允许多个用户同时编辑同一个文档。这类应用需要实时同步不同用户的操作,以保持所有用户视图的一致性。由于读写操作紧密相关且对实时性要求很高,CQRS可能不是最佳选择。

针对 CQRS 逻辑读写分离而非物理读写分离的案例

由于实践过程中,简化的设计通常是按物理数据库来划分读写职责,但这可能会造成“ CQRS 就是将两个数据库分成一个主要用于读取的数据库(读取的数据仍需要通过另一个数据库同步写入)和一个只用于写入数据库”的误解(这是在十分理想化的情况下)。

设计一个更复杂的场景,其中PostgreSQL和MongoDB都承担读写操作,但各自侧重于不同类型的读写负载。这样的设计可以展示CQRS并非简单地物理划分读写操作到不同的数据库,而是根据操作的性质和数据模型的优化需求来逻辑分配任务。

场景设定

假设我们正在开发一个电商平台,其中包含产品管理和用户行为分析两大功能。

使用PostgreSQL和MongoDB的CQRS实现

  • PostgreSQL
    • 写操作:负责产品的创建、更新和删除。这些操作需要事务支持和一致性保证,适合关系数据库的特点。
    • 复杂读操作:负责执行复杂的SQL查询,如生成报表、聚合查询等。这些操作利用了PostgreSQL强大的SQL支持和优化器。
  • MongoDB
    • 写操作:负责记录用户行为数据,如页面浏览、点击事件等。这些操作对写入速度有很高要求,并且数据结构可能经常变化,适合文档数据库的特点。
    • 简单读操作:负责提供快速的查询响应,如用户行为数据的即时查询。MongoDB的灵活性和索引优化可以提供高效的查询性能。

数据同步和一致性

在这个例子中,PostgreSQL和MongoDB都承担读写操作,但针对不同的业务场景。为了保持数据一致性:

  • 数据同步:可以使用事件驱动机制同步必要的数据变更。例如,当一个新产品在PostgreSQL中被创建或更新后,相关的信息可以异步同步到MongoDB中的用户行为分析模型。
  • CQRS视角:从CQRS的视角看,我们在逻辑上区分了命令(写操作)和查询(读操作),并根据不同操作的特点选择了最合适的数据存储。即便每种数据库都执行读写操作,但这些操作服务于不同的业务需求和数据模型。

除了命令查询责任分离 CQRS 根据读写职责划分来设计多个数据库行为以外,另一种架构则是按照业务边界划分来设计多个数据库行为。

这种架构下,每一个数据库之间不需要考虑数据同步和一致性问题,因为它们负责的是从业务逻辑上分离的数据,任何一个数据库的业务操作都不会影响另一个数据库的业务操作,就像两个可以独立运行的模块。这种架构设计比较接近于微服务架构的理念:在微服务架构中,应用被分解成一系列较小、松耦合的服务,每个服务围绕着特定的业务功能构建,并可以独立部署、扩展和维护。每个服务通常管理自己的数据库(或数据存储),以保持业务边界的清晰和数据的封装性。

代码实现

使用PostgreSQL存储核心业务数据,MongoDB存储文档或日志类型的数据,而Redis作为缓存层来提高数据读取的性能。

一个博客系统,其中PostgreSQL用于存储文章信息,MongoDB用于存储用户评论,Redis用于缓存热门文章。

有以下已经提前准备好的 repository 接口,假设里面的方法都已经实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 假设的PostgreSQL文章Repository
interface ArticleRepository {
Mono<Article> saveArticle(Article article);
Mono<Article> findArticleById(String id);
Mono<Void> deleteArticleById(String id);
Flux<Article> findAllArticles();
}

// 假设的MongoDB评论Repository
interface CommentRepository {
Mono<Comment> saveComment(Comment comment);
Flux<Comment> findCommentsByArticleId(String articleId);
}

// 假设的Redis缓存操作
interface CacheService {
<T> Mono<T> findInCache(String key, Class<T> type);
<T> Mono<Boolean> updateCache(String key, T data);
}

然后基于这些方法编写服务层,协调不同数据库的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
public class BlogService {

@Autowired
private ArticleRepository articleRepository;

@Autowired
private CommentRepository commentRepository;

@Autowired
private CacheService cacheService;

public Mono<Article> createOrUpdateArticle(Article article) {
return articleRepository.saveArticle(article)
.flatMap(savedArticle -> cacheService.updateCache("article_" + savedArticle.getId(), savedArticle)
.thenReturn(savedArticle));
}

public Mono<Article> getArticleById(String id) {
return cacheService.findInCache("article_" + id, Article.class)
.switchIfEmpty(articleRepository.findArticleById(id)
.flatMap(article -> cacheService.updateCache("article_" + article.getId(), article)
.thenReturn(article)));
}

public Flux<Comment> getCommentsByArticleId(String articleId) {
// 直接从MongoDB获取评论,因为评论不适合缓存
return commentRepository.findCommentsByArticleId(articleId);
}

public Mono<Void> deleteArticleById(String id) {
return articleRepository.deleteArticleById(id)
.then(cacheService.updateCache("article_" + id, null));
}
}

逻辑说明

  • 文章操作(PostgreSQL + Redis):文章的创建、更新、查询和删除操作主要依赖PostgreSQL。Redis用作文章信息的缓存,以加速热门文章的读取性能。
  • 评论操作(MongoDB):评论数据由于其量可能很大并且更新频繁,因此存储在MongoDB中,并且通常不被缓存。
  • 缓存逻辑(Redis):在文章创建或更新后更新缓存,在文章查询时先尝试从缓存获取,缓存未命中则从数据库加载并更新缓存。

Webflux 安全层

整合Spring Security、JWT

在引入 Spring Security 之前,建议先实现 JWT 的生成和验证逻辑,这样方便后面整合 Spring Security 框架来进行认证和授权。

JWT 工具类

创建一个 JWT 工具类来处理 JWT 的创建、解析和验证

一个更完善的版本还应该包括Token的有效期、处理异常、以及更安全的密钥管理等方面的考虑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class JwtUtil {
// 生产环境中应从配置文件或环境变量中安全地获取密钥
private Key key = Keys.secretKeyFor(SignatureAlgorithm.HS256); // 使用HS256算法生成密钥
private long expiration = 3600000; // Token有效期,这里示例为1小时(单位毫秒)

public String generateToken(String username, List<String> roles) {
// Date now = new Date();
// Date expiryDate = new Date(now.getTime() + expiration); // 设置Token过期时间
// 如果不希望使用Date,可以使用以下方法实现
Instant now = Instant.now();
Instant expiryDate = now.plus(1, ChronoUnit.HOURS); // 设置Token过期时间为1小时后
return Jwts.builder()
.setSubject(username)
.setIssuedAt(Date.from(now))
.setExpiration(Date.from(expiryDate))
.claim("roles", roles) // 在这里添加角色信息,方便Spring Security认证
.signWith(key) // 使用生成的密钥签名
.compact();
}

public Claims validateToken(String token) {
return Jwts.parser()
.setSigningKey(key) // 使用密钥验证
.build()
.parseClaimsJws(token)
.getBody();
}

public String getUsernameFromToken(String token) {
Claims claims = validateToken(token);
return claims.getSubject();
}

public List<String> getRolesFromToken(String token) {
Claims claims = validateToken(token);
return claims.get("roles", List.class);
}

public boolean isTokenExpired(String token) {
Claims claims = validateToken(token);
// return claims.getExpiration().before(new Date());
return claims.getExpiration().toInstant().isBefore(Instant.now());
}
}

使用Keys.secretKeyFor生成密钥

  • 生产环境:在生产环境中使用Keys.secretKeyFor(SignatureAlgorithm.HS256)来生成密钥是推荐的做法,因为它能为所使用的签名算法生成足够强度的密钥。为了安全起见,密钥不应该硬编码在代码中,而是应该通过安全的方式(如环境变量、配置服务等)提供,并且保证其安全性(不被泄露)。
  • 测试环境:在测试环境中,使用Keys.secretKeyFor(SignatureAlgorithm.HS256)同样适用,它可以帮助确保测试环境尽可能地模拟生产环境的配置和安全性。然而,考虑到测试环境的便利性和调试需求,可能会选择使用更简单的密钥管理方式,但应确保测试用的密钥不会用于生产环境。

下面提供一个简单的测试类用于测试该工具类的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class JwtUtilTest {

private JwtUtil jwtUtil = new JwtUtil();

@Test
void testGenerateAndValidateToken() {
String username = "testUser";
List<String> roles = Arrays.asList("ROLE_USER", "ROLE_ADMIN");

// Generate token
String token = jwtUtil.generateToken(username, roles);
assertNotNull(token);

// Validate token and extract claims
Claims claims = jwtUtil.validateToken(token);
assertNotNull(claims);
assertEquals(username, claims.getSubject());
assertTrue(claims.get("roles") instanceof List);
assertEquals(roles, claims.get("roles", List.class));

// Check if token is not expired
assertFalse(jwtUtil.isTokenExpired(token));
}

@Test
void testTokenExpiration() throws InterruptedException {
// Assuming we have a very short expiry time for testing
jwtUtil.setExpiration(10); // Set token expiration to 10 milliseconds for the test

String username = "testUser";
List<String> roles = Arrays.asList("ROLE_USER", "ROLE_ADMIN");

// Generate token
String token = jwtUtil.generateToken(username, roles);
Thread.sleep(20); // Wait longer than the token expiration time

// Verify the token is expired
assertTrue(jwtUtil.isTokenExpired(token));
}

@Test
void testToken() {
String username = "user123";
List<String> roles = Arrays.asList("ROLE_USER", "ROLE_ADMIN");

// Generate token with user information
String token = jwtUtil.generateToken(username, roles);
System.out.println("Generated Token: " + token);

// Assume token is being sent to and received from a client in a real application

// Validate token and extract user information
Claims claims = jwtUtil.validateToken(token);
String extractedUsername = claims.getSubject();
List<String> extractedRoles = claims.get("roles", List.class);

System.out.println("Username from Token: " + extractedUsername);
System.out.println("Roles from Token: " + extractedRoles);
}
}

在这个测试类中,testGenerateAndValidateToken测试用例生成一个Token,并验证Token是否正确包含了用户名和角色信息。testTokenExpiration测试用例则是为了测试Token过期逻辑是否正常。这里为了方便测试过期逻辑,在JwtUtil类中定义了setExpiration方法来动态设置Token的过期时间,这主要用于测试目的,实际生产环境中不应该使用此方法。testToken测试用例是用于展示一个简单的Token各个阶段的信息内容。

JWT Token

JWT Token可以包含多种标准的声明(Claim),以下是一些常见的标准声明:

  • iss (Issuer):Token的发行者。
  • sub (Subject):Token的主题,通常用来存储用户的唯一标识。
  • aud (Audience):Token的接收方。
  • exp (Expiration Time):Token的过期时间,通常是一个时间戳,表示Token在此时间之后不再有效。
  • nbf (Not Before):Token的生效时间,表示在此时间之前,Token不可用。
  • iat (Issued At):Token的发行时间。
  • jti (JWT ID):Token的唯一标识符。

除了这些标准声明外,还可以在 Token 中添加自定义声明来存储特定于应用的信息,如用户角色、权限等。使用Claims对象,可以通过get方法以键值对的形式访问这些自定义声明,例如

1
String role = claims.get("role", String.class); // 假设我们在Token中添加了一个名为"role"的自定义声明

案例

假设有一个简单的Web应用,用于文章的阅读和评论,用户需要登录才能评论。在用户登录时,系统会生成一个JWT Token,其中包含如下标准声明:

  • iss (Issuer)"MyArticleApp"。这表明Token是由"MyArticleApp"应用发行的。
  • sub (Subject)"1234567890"。这是用户的唯一标识,通常可以是用户ID、用户名或其他唯一标识用户的信息。在这个例子中,假设它是用户ID。
  • aud (Audience)"MyArticleAppUsers"。这指示了Token的预期接收者,确保Token只被指定的受众使用。
  • exp (Expiration Time)1627499999。这是一个UNIX时间戳,表示Token的过期时间。假设这代表Token将在未来某个时间点过期。
  • nbf (Not Before)1627480000。这同样是一个UNIX时间戳,表示Token在这个时间之前不应被接受。
  • iat (Issued At)1627486400。这表示Token的发行时间。
  • jti (JWT ID)"a87ff679a2f3e71d9181a67b7542122c"。这是Token的唯一标识符,用于防止重放攻击。

如何使用这些信息安全地识别用户身份?

  1. 验证Token的签名:首先,确保Token的签名是有效的,这通过使用发行Token时用到的相同密钥来验证。这一步是必需的,以确保Token未被篡改。
  2. 检查Token的发行者(iss)和受众(aud):确认Token的issaud声明与你的应用期望的值匹配。这有助于确保Token是为你的应用发行的,且仅供指定的受众使用。
  3. 验证Token的有效期:检查当前时间是否在Token的nbf(Not Before)和exp(Expiration Time)声明指定的时间范围内。如果不在这个范围内,Token应被认为是无效的。
  4. 获取和验证用户身份:一旦Token通过了上述验证,你可以信任Token中的sub(Subject)声明。在这个例子中,sub是用户ID"1234567890"。你可以使用这个ID从数据库或其他存储系统中检索用户的详细信息,并在需要时进行进一步的授权检查。

在基于Spring Security和Spring WebFlux的应用中使用JWT进行认证时,JwtAuthenticationManagerJwtSecurityContextRepositoryJwtAuthenticationFilter可以协作以支持认证流程。这三个组件各自承担不同的职责,以下是它们如何协作的简化示例及解释:

JwtAuthenticationFilter

这个过滤器负责拦截进入的HTTP请求,从请求中提取JWT Token,并尝试对其进行验证。

  • 职责:从HTTP请求中提取JWT Token,并创建一个未经认证的Authentication对象(如UsernamePasswordAuthenticationToken),然后将其提交给AuthenticationManager进行认证。

  • 用途:负责拦截进入的请求,从中提取JWT Token,并尝试进行认证。它是连接前端请求和后端安全逻辑的桥梁。

  • 适用场景:几乎所有需要JWT认证的场景。

  • 实现:这个过滤器通常继承自WebFilter接口(对于WebFlux应用)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class JwtAuthenticationFilter implements WebFilter {

    private final JwtAuthenticationManager authenticationManager;

    public JwtAuthenticationFilter(JwtAuthenticationManager authenticationManager) {
    this.authenticationManager = authenticationManager;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    ServerHttpRequest request = exchange.getRequest();
    // 提取JWT Token的逻辑
    // 简单示例中,可以不实现具体逻辑,或根据需要添加
    return chain.filter(exchange);
    }
    }
  • 参考示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class JwtAuthenticationFilter implements WebFilter {

    private final JwtAuthenticationManager authenticationManager;

    public JwtAuthenticationFilter(JwtAuthenticationManager authenticationManager) {
    this.authenticationManager = authenticationManager;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    // 提取请求中的Authorization头部
    String authHeader = exchange.getRequest().getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
    if (authHeader != null && authHeader.startsWith("Bearer ")) {
    String authToken = authHeader.substring(7);
    // 创建未认证的Authentication Token
    Authentication authRequest = new UsernamePasswordAuthenticationToken(authToken, authToken);
    // 使用JwtAuthenticationManager进行认证
    return this.authenticationManager.authenticate(authRequest)
    .flatMap(auth -> chain.filter(exchange).contextWrite(ReactiveSecurityContextHolder.withAuthentication(auth)))
    .onErrorResume(e -> chain.filter(exchange)); // 处理认证失败
    }
    // 对于没有Bearer Token的请求直接放行
    return chain.filter(exchange);
    }
    }

    如果在后面实现了JwtSecurityContextRepository,则上面的代码可简化为以下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class JwtAuthenticationFilter implements WebFilter {

    private final JwtSecurityContextRepository jwtSecurityContextRepository;

    public JwtAuthenticationFilter(JwtSecurityContextRepository jwtSecurityContextRepository) {
    this.jwtSecurityContextRepository = jwtSecurityContextRepository;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return jwtSecurityContextRepository.load(exchange)
    .flatMap(context -> chain.filter(exchange).contextWrite(ReactiveSecurityContextHolder.withSecurityContext(Mono.just(context))))
    .switchIfEmpty(chain.filter(exchange));
    }
    }

    JwtAuthenticationFilter在请求到达时提取JWT Token,并使用JwtAuthenticationManager进行认证。这个过程实际上在JwtSecurityContextRepositoryload方法中已经体现。

JwtAuthenticationManager

这是一个自定义的AuthenticationManager,负责处理由JwtAuthenticationFilter提交的未经认证的Authentication对象。

  • 职责:验证Authentication对象中的JWT Token的有效性,如果Token有效,则创建一个已认证的Authentication对象,并设置用户的权限。

  • 用途:对通过JwtAuthenticationFilter提取出的Token进行验证,确定用户的身份并授权。在WebFlux中,它通常是ReactiveAuthenticationManager的实现,在Spring MVC中,则是AuthenticationManager的实现。

  • 适用场景:需要验证用户Token的场景,实现自定义的认证逻辑。

  • 实现:可以通过实现ReactiveAuthenticationManager接口来自定义认证逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class JwtAuthenticationManager implements ReactiveAuthenticationManager {

    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
    // 在这里实现JWT验证逻辑
    // 简单示例中,可以直接返回已认证的Authentication,或根据需要添加逻辑
    return Mono.just(authentication);
    }
    }
  • 参考示例

    JwtAuthenticationManager验证Token并创建已认证的Authentication对象时,需要从Token中解析用户的角色或权限,并将它们转换为GrantedAuthority对象,以便Spring Security进行授权判断。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class JwtAuthenticationManager implements ReactiveAuthenticationManager {

    private final JwtUtil jwtUtil;

    public JwtAuthenticationManager(JwtUtil jwtUtil) {
    this.jwtUtil = jwtUtil;
    }

    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
    String authToken = authentication.getCredentials().toString();

    try {
    String username = jwtUtil.getUsernameFromToken(authToken);
    if (!jwtUtil.isTokenExpired(authToken)) {
    List<String> roles = jwtUtil.getRolesFromToken(authToken);
    List<SimpleGrantedAuthority> authorities =
    roles
    .stream()
    .map(role -> new SimpleGrantedAuthority("ROLE_" + role.toUpperCase()))
    .collect(Collectors.toList());
    Authentication auth = new UsernamePasswordAuthenticationToken(username, authToken, authorities);
    return Mono.just(auth);
    }
    return Mono.empty();
    } catch (Exception e) {
    return Mono.empty();
    }
    }
    }
JwtSecurityContextRepository

这个组件用于在请求处理过程中加载和保存SecurityContext。对于基于JWT的无状态认证,它通常不会保存SecurityContext,但会在每个请求中基于JWT Token加载SecurityContext

  • 职责:基于每个请求中的JWT Token动态构建SecurityContext

  • 用途:负责在每个请求中重建SecurityContext,根据JWT Token中的信息设置用户的认证状态。由于JWT认证是无状态的,它通常不用于保存状态,而是用于在请求之间重建状态。

  • 适用场景:适用于所有基于JWT进行认证的场景,尤其是在需要将JWT Token信息转化为Spring Security认识的认证信息时。

  • 实现:实现ServerSecurityContextRepository接口,通常在load方法中处理Token的解析和验证。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class JwtSecurityContextRepository implements ServerSecurityContextRepository {

    @Override
    public Mono<Void> save(ServerWebExchange exchange, SecurityContext context) {
    // JWT是无状态的,不需要实现保存
    return Mono.error(new UnsupportedOperationException("Save method not supported"));
    }

    @Override
    public Mono<SecurityContext> load(ServerWebExchange exchange) {
    // 在这里实现JWT解析和安全上下文的加载
    // 简单示例中,可以直接返回空Mono,或根据需要添加逻辑
    return Mono.empty();
    }
    }
  • 参考示例

    在Spring Security中,ServerSecurityContextRepository用于在每个请求上加载或创建SecurityContext,这是处理安全上下文的一部分。默认行为是,在请求开始时尝试加载安全上下文,以便在处理请求时可以使用当前安全状态。也就是说默认load方法总是会在每次请求的时候被执行,因此对于不需要认证的路径(如使用permitAll()配置的路径),如果没有有效的认证Token,方法应该能够快速返回,而不影响请求的处理。否则就会造成“你需要先认证才能登录,而你又需要先登录才能认证”的局面。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class JwtSecurityContextRepository implements ServerSecurityContextRepository {

    private final ReactiveAuthenticationManager authenticationManager;

    public JwtSecurityContextRepository(ReactiveAuthenticationManager authenticationManager) {
    this.authenticationManager = authenticationManager;
    }

    @Override
    public Mono<Void> save(ServerWebExchange exchange, SecurityContext context) {
    throw new UnsupportedOperationException("Save method is not supported");
    }

    @Override
    public Mono<SecurityContext> load(ServerWebExchange exchange) {
    String path = exchange.getRequest().getURI().getPath();
    // 对于不需要认证的接口,直接返回Mono.empty()
    if ("/login".equals(path) || "/register".equals(path)) {
    return Mono.empty();
    }
    String token = extractToken(exchange.getRequest());
    // 如果Token为空,也直接返回Mono.empty()
    if (token == null) {
    return Mono.empty();
    }
    Authentication auth = new UsernamePasswordAuthenticationToken(token, token);
    return this.jwtAuthenticationManager.authenticate(auth).map(SecurityContextImpl::new);
    }

    private String extractToken(ServerHttpRequest request) {
    String bearerToken = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
    if (StringUtils.hasText(bearerToken) && bearerToken.startsWith("Bearer ")) {
    return bearerToken.substring(7);
    }
    return null;
    }
    }

协作示例

当一个请求到达应用时:

  1. **JwtAuthenticationFilter**拦截请求,从请求头中提取JWT Token,并构造一个未经认证的Authentication对象,然后提交给JwtAuthenticationManager
  2. **JwtAuthenticationManager**接收到未经认证的Authentication对象,验证JWT Token的有效性。如果验证通过,它将创建一个已认证的Authentication对象,包含用户的权限等信息。
  3. 在整个请求处理流程中,**JwtSecurityContextRepository**负责根据JwtAuthenticationManager验证后的Authentication对象构建SecurityContext,并使其在当前请求上下文中可用。
  4. 根据SecurityContext中的认证信息,Spring Security框架执行后续的授权判断。

这三个组件的协作实现了一个完整的基于JWT的认证和授权流程,使得应用能够处理无状态的HTTP请求。

以上三个核心组件虽然覆盖了JWT认证流程的主要部分:提取Token、验证Token、以及根据Token重建安全上下文。不过,根据具体的应用需求和安全要求,可能还会涉及其他重要的组件或配置。以下是一些可能涉及到的额外组件或方法:

1. AuthenticationEntryPoint

用于处理认证过程中的异常,如Token无效或过期时的情况。AuthenticationEntryPoint负责在认证失败时返回适当的响应,比如一个401 Unauthorized状态码。

简单示例

1
2
3
4
5
6
7
8
9
10
public class JwtAuthenticationEntryPoint implements ServerAuthenticationEntryPoint {

@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
return Mono.fromRunnable(() -> {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
// 你可以添加更多的响应设置,如设置响应体
});
}
}

2. AccessDeniedHandler

当用户尝试访问他们没有权限访问的资源时,AccessDeniedHandler负责处理这种授权失败的情况。它可以用于返回一个403 Forbidden响应或重定向到一个错误页面。

3. CorsConfigurationSource

在前后端分离的应用中,跨源资源共享(CORS)配置变得非常重要。CorsConfigurationSource用于定义CORS策略,允许或拒绝来自不同源的请求。

4. TokenRefreshMechanism

虽然不是Spring Security的标准部分,但在许多基于Token的认证系统中,实现Token刷新机制是一个常见需求。这涉及到提供一个机制,允许客户端在当前Token快要过期时,通过一个有效的刷新Token来获取一个新的访问Token。

5. UserDetails和UserDetailsService

UserDetailsService用于在认证过程中加载用户特定的数据。它通常与UserDetails接口一起使用,后者表示一个用户的认证信息。在JWT认证过程中,一旦Token被验证,可以通过UserDetailsService加载用户的详细信息,并构建一个UserDetails对象,进一步用于构建Authentication对象。

6. Reactive counterparts for WebFlux

对于使用Spring WebFlux的应用,上述组件(如AuthenticationEntryPointAccessDeniedHandler等)有相应的反应式版本或等效方法,以支持响应式编程模型。

其他重要的方法

Token的生成和响应:在用户登录成功后,需要一个服务或控制器方法来生成JWT Token,并将其返回给客户端。这通常发生在用户认证成功后的登录接口中。这里的功能需要在三层架构中实现,下文中会提到三层架构的适配。

Spring Security 配置

接下来,配置 Spring Security 以集成 JWT 认证。这包括定义一个自定义的认证管理器和安全过滤器链,用于解析请求中的 JWT Token 并进行认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {

// 定义JwtUtil Bean
@Bean
public JwtUtil jwtUtil() {
return new JwtUtil();
}

// 定义JwtAuthenticationManager Bean
@Bean
public JwtAuthenticationManager jwtAuthenticationManager(JwtUtil jwtUtil) {
return new JwtAuthenticationManager(jwtUtil);
}

// 定义JwtSecurityContextRepository Bean,依赖JwtAuthenticationManager
@Bean
public JwtSecurityContextRepository jwtSecurityContextRepository(JwtAuthenticationManager jwtAuthenticationManager) {
return new JwtSecurityContextRepository(jwtAuthenticationManager);
}

// 定义JwtAuthenticationFilter Bean,依赖JwtSecurityContextRepository
@Bean
public JwtAuthenticationFilter jwtAuthenticationFilter(JwtSecurityContextRepository jwtSecurityContextRepository) {
return new JwtAuthenticationFilter(jwtSecurityContextRepository);
}

@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http,
JwtAuthenticationManager jwtAuthenticationManager,
JwtSecurityContextRepository jwtSecurityContextRepository) {
http
.csrf(ServerHttpSecurity.CsrfSpec::disable) // 禁用CSRF保护,对于REST API是常见的做法
.authenticationManager(jwtAuthenticationManager) // 设置自定义的认证管理器
.securityContextRepository(jwtSecurityContextRepository) // 设置自定义的安全上下文仓库
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/public/**").permitAll() // 公开访问的路径
.anyExchange().authenticated() // 其他所有路径都需要认证
)
.addFilterAt(jwtAuthenticationFilter(jwtSecurityContextRepository), SecurityWebFiltersOrder.AUTHENTICATION); // 添加自定义JWT认证过滤器

return http.build();
}
}

三层架构的适配

设置好基于JWT的认证机制并且配置了Spring Security之后,下一步是确保三层架构(表示层、业务逻辑层、数据访问层)能够与这些安全配置无缝协作。以下是参考的适配流程

大致流程

1.定义安全模型

首先,定义应用中的安全模型,包括用户角色和权限模型。这可能涉及到在数据库中创建角色和权限表,并确定哪些API端点对应哪些角色或权限。

2.调整用户模型和数据访问层

  • 确保用户模型(通常是User实体)包含与安全相关的属性,如密码、角色列表等。
  • 更新用户的数据访问层(例如,用户的Repository),以支持查找用户的安全凭证(用户名和密码)、角色和权限。

3.实现UserDetailsService

  • 实现UserDetailsService接口,提供一种从数据库加载用户详情(包括权限)的方式。这是Spring Security调用来获取用户信息并进行认证的服务。
  • loadUserByUsername方法中,根据用户名查找用户,构建并返回一个UserDetails对象,这个对象应包含用户名、密码和权限信息。

4.配置方法级别的安全性

  • 使用@PreAuthorize@PostAuthorize@Secured等注解来指定方法级别的安全要求。例如,可以在服务层的方法上使用这些注解来限定只有特定角色的用户才能调用某个方法。

5.保护API端点

  • 根据的安全模型调整SecurityWebFilterChain中的.authorizeExchange()部分,指定哪些API端点是公开的,哪些需要认证,以及它们所需的权限或角色。
  • 例如,可以使用.pathMatchers("/api/admin/**").hasRole("ADMIN")来保护以/api/admin/开头的所有端点,使其只能由拥有ADMIN角色的用户访问。

6.处理认证和授权失败

  • 自定义认证失败和授权失败的处理逻辑,比如返回特定的HTTP状态码或错误信息给客户端。这可以通过自定义AuthenticationEntryPointAccessDeniedHandler来实现。

7.测试

  • 对整个安全配置进行测试,包括但不限于:
    • 测试未认证的请求是否被正确拒绝。
    • 测试具有不同角色的用户是否只能访问他们有权限的API端点。
    • 测试用户认证流程,包括使用JWT进行认证。

上面的流程仅供参考,实际简单场景可能不需要考虑那么多繁琐的步骤。

具体实现
数据库

首先创建好数据库中对应的数据表和初始测试数据,主键采用serial伪类型自动递增,参考如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 用户
create table users
(
id serial primary key,
username varchar(255) unique not null,
password varchar(255) not null
);
insert into users (username, password) values ('admin', 'admin');
insert into users (username, password) values ('user', 'user');
-- 角色
create table roles
(
id serial primary key,
name varchar(255) not null
);
insert into roles (name) values ('admin');
insert into roles (name) values ('user');
-- 用户角色
create table user_role
(
user_id int not null,
role_id int not null,
primary key (user_id, role_id),
constraint fk_user_id foreign key (user_id) references users (id),
constraint fk_role_id foreign key (role_id) references roles (id)
);
insert into user_role (user_id, role_id) values (1, 1);
insert into user_role (user_id, role_id) values (2, 2);

不推荐直接将密码明文存储在数据库中,这里是为了方便测试,通常应该将明文加密后再存储到数据库中。

实体类

然后创建好对应的实体类

users 数据表对应实体类 User

1
2
3
4
5
6
7
8
9
10
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("users")
public class User implements Serializable {
@Id
private Integer id;
private String username;
private String password;
}

roles 数据表对应实体类 Role

1
2
3
4
5
6
7
8
9
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("roles")
public class Role implements Serializable {
@Id
private Integer id;
private String name;
}

user_role 数据表对应实体类 UserRole

1
2
3
4
5
6
7
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserRole implements Serializable {
private Integer userId;
private Integer roleId;
}
数据访问层

这里假设已经提供了databaseClient

  • 登录的逻辑被拆分为两次查询,第一次查询是验证该用户是否存在,第二次查询是获取用户所有的角色列表。密码验证的逻辑在服务层实现。
  • 注册的逻辑比较复杂,注册提供的数据有用户名、密码和角色名称列表。首先是根据用户名查询users表判断该用户名是否已经注册,如果该用户名未被注册,则新增数据并返回新增数据的id字段(在 Postgresql 中通过returning语法实现新增时返回新增数据的id字段),然后是根据角色名称列表查询roles表得到角色id列表,最后将唯一的用户id和角色id列表批量增加到user_role表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Override
public Mono<User> findByUsername(String username) {
return databaseClient.sql("select * from users where username = :username")
.bind("username", username)
.map((row, rowMetadata) -> new User(
row.get("id", Integer.class), // 这里的id是自增的,所以不需要传入
row.get("username", String.class),
row.get("password", String.class)))
.one();
}

@Override
public Mono<UserRoleDTO> getUserRole(String username) {
return databaseClient.sql("select username, name " +
"from users u " +
"join user_role ur on u.id = ur.user_id " +
"join roles r on ur.role_id = r.id " +
"where username = :username")
.bind("username", username)
.map((row, rowMetadata) -> Tuples.of(
row.get("username", String.class),
row.get("name", String.class)))
.all()
.collectMultimap(tuple -> tuple.getT1(), tuple -> tuple.getT2())
.flatMap(map -> {
// 从map中取出username和对应的roles列表
return Mono.justOrEmpty(map.entrySet().stream()
.map(entry -> new UserRoleDTO(entry.getKey(), new ArrayList<>(entry.getValue())))
.findFirst());
});
}

@Override
public Mono<Integer> insertUser(User user) {
// 返回插入的id,这里的密码应该是加密后的
return databaseClient.sql("insert into users (username, password) values (:username, :password) returning id")
.bind("username", user.getUsername())
.bind("password", user.getPassword())
.map((row, rowMetadata) -> row.get("id", Integer.class))
.first();
}

@Override
public Mono<Long> insertUserRole(Integer userId, UserRoleDTO userRoleDTO) {
// 根据角色名查询对应的id,然后插入到user_role表中
return databaseClient.sql("select id from roles where name in (:names)")
.bind("names", userRoleDTO.getRoles())
.fetch()
.all()
.flatMap(row ->
databaseClient.sql("insert into user_role (user_id, role_id) values (:user_id, :role_id)")
.bind("user_id", userId)
.bind("role_id", (Integer) row.get("id"))
.fetch()
.rowsUpdated()
).reduce(0L, Long::sum);
}

这里提供了一个UserRoleDTO,对应前面的 Token 生成逻辑,传入usernameroles用于 Token 的生成

1
2
3
4
5
6
7
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserRoleDTO implements Serializable {
private String username;
private List<String> roles;
}
服务层
  • login:先根据username参数查询用户,然后比较password(实际的密码比较应该使用专门的类处理,下文会补充),如果通过了验证,则使用UserDTO的数据生成 Token,并将 Token 传输到控制层。

  • register:先用UserRoleDTOusername查询用户是否已注册,如果没有注册过则将usernamepassword封装成一个User用于新增用户,然后用新增用户返回的userIdUserRoleDTO一并传给新增用户角色方法,用户角色列表数据新增完成后生成 Token 并传输到控制层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public Mono<String> login(String username, String password) {
return userRepo.findByUsername(username)
.flatMap(user -> {
if (user.getPassword().equals(password)) {
return userRepo.getUserRole(username)
.flatMap(userRole -> Mono.fromSupplier(() ->
jwtUtil.generateToken(username, userRole.getRoles()))
);
}
return Mono.empty();
});
}

@Override
public Mono<String> register(UserRoleDTO userRoleDTO, String password) {
return userRepo.findByUsername(userRoleDTO.getUsername())
.flatMap(user -> Mono.just("User already exists"))
.switchIfEmpty(Mono.defer(() -> {
// id为null,由数据库自动生成
User user = new User(null, userRoleDTO.getUsername(), password);
return userRepo.insertUser(user)
.flatMap(userId -> userRepo.insertUserRole(userId, userRoleDTO)
.thenReturn(jwtUtil.generateToken(userRoleDTO.getUsername(), userRoleDTO.getRoles())));
}));
}
控制层

handler 处理器中将 Token 存入响应头,或者也可以存入封装好的响应体中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Mono<ServerResponse> login(ServerRequest request) {
return request.bodyToMono(LoginRequest.class)
.flatMap(login ->
userService.login(login.getUsername(), login.getPassword())
)
.flatMap(token ->
ServerResponse.ok()
.header(HttpHeaders.AUTHORIZATION, "Bearer " + token) // 将生成的Token放入响应头
.bodyValue(new ApiResponse<>(200, token))
)
.switchIfEmpty(ServerResponse.status(HttpStatus.UNAUTHORIZED).build());
}

public Mono<ServerResponse> register(ServerRequest request) {
return request.bodyToMono(RegisterRequest.class)
.flatMap(register -> {
String username = register.getUsername();
String password = register.getPassword();
List<String> roles = register.getRoles();
UserRoleDTO userRoleDTO = new UserRoleDTO(username, roles);
return userService.register(userRoleDTO, password)
.flatMap(token -> {
if ("User already exists".equals(token)) {
return ServerResponse.status(HttpStatus.CONFLICT)
.bodyValue(new ApiResponse<>(409, token));
}
return ServerResponse.ok()
.header(HttpHeaders.AUTHORIZATION, "Bearer " + token) // 将生成的Token放入响应头
.bodyValue(new ApiResponse<>(200, token));
}
);
})
.switchIfEmpty(ServerResponse.status(HttpStatus.UNAUTHORIZED).build());
}

在这里需要接受前端不同的请求数据,为了方便处理自定义了LoginRequstRegisterRequest,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// LoginRequest.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LoginRequest {
private String username;
private String password;
}

// RegisterRequest.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RegisterRequest implements Serializable {
private String username;
private String password;
private List<String> roles;
}

RouterConfig 路由配置中定义请求方式和 URL

1
2
3
4
5
6
@Bean
public RouterFunction<ServerResponse> userRoutes() {
return RouterFunctions
.route(RequestPredicates.POST("/login"), userHandler::login)
.andRoute(RequestPredicates.POST("/register"), userHandler::register);
}
安全层

只需要添加对应路径的角色权限即可,例如/api/admin/前缀的所有路径需要ROLE_ADMIN权限。这个权限的加载已经在前面的JwtAuthenticationManager中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
http
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.formLogin(ServerHttpSecurity.FormLoginSpec::disable)
.httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
.authenticationManager(jwtAuthenticationManager())
.securityContextRepository(jwtSecurityContextRepository())
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/login", "/register").permitAll()
.pathMatchers("/api/admin/**").hasRole("ADMIN")
.anyExchange().authenticated()
)
.addFilterAt(jwtAuthenticationFilter(), SecurityWebFiltersOrder.AUTHENTICATION);
return http.build();
}

PasswordEncoder 密码加密

前面直接比较明文密码(user.getPassword().equals(password))并不是一个安全的做法。为了增强安全性,应该使用密码哈希值进行存储和验证。Spring Security提供了PasswordEncoder接口来帮助实现密码的加密和验证。

引入PasswordEncoder

首先,需要在 Spring 配置中定义一个PasswordEncoder的Bean。自Spring Security 5起,推荐使用BCryptPasswordEncoder,它是一种基于bcrypt强哈希方法的密码编码器

1
2
3
4
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
更新用户注册逻辑

在用户注册或创建密码时,使用PasswordEncoder对密码进行加密,并存储加密后的密码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Mono<String> register(UserRoleDTO userRoleDTO, String rawPassword) {
return userRepo.findByUsername(userRoleDTO.getUsername())
.flatMap(user -> Mono.just("User already exists"))
.switchIfEmpty(Mono.defer(() -> {
String password = passwordEncoder.encode(rawPassword); // 使用encode方法加密密码
// id为null,由数据库自动生成
// 创建User对象,设置加密后的密码,然后保存User对象
User user = new User(null, userRoleDTO.getUsername(), password);
return userRepo.insertUser(user)
.flatMap(userId -> userRepo.insertUserRole(userId, userRoleDTO)
.thenReturn(jwtUtil.generateToken(userRoleDTO.getUsername(), userRoleDTO.getRoles())));
}));
}
更新登录验证逻辑

在登录逻辑中,使用PasswordEncodermatches方法来验证提交的密码与存储的加密密码是否匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Autowired
private PasswordEncoder passwordEncoder;

@Override
public Mono<String> login(String username, String rawPassword) {
return userRepo.findByUsername(username)
.flatMap(user -> {
if (passwordEncoder.matches(rawPassword, user.getPassword())) { // 使用matches方法验证密码
return userRepo.getUserRole(username)
.flatMap(userRole -> Mono.fromSupplier(() ->
jwtUtil.generateToken(username, userRole.getRoles()))
);
}
return Mono.empty(); // 密码不匹配时返回空
});
}

通过使用PasswordEncoder,可以增强系统的安全性,防止密码泄露时直接暴露用户的明文密码。同时,这也是符合安全最佳实践的做法。

总结

关于 Spring Webflux 还有很多其它可以实践的内容,例如整合OAuth2进行第三方服务鉴权认证、使用响应式 Web 客户端WebClient调用外部 HTTP 服务等。

以上就是基于 Spring Webflux 响应式框架后端项目开发过程中涉及到的主要内容。其实关于 Spring Webflux 的应用并没有传统的 Spring Web 那么广泛,而且使用 Spring Webflux 需要有一定的函数式编程与响应式编程的熟练度,对于习惯了传统 Spring Web 框架的开发者来说是具有一定的难度,但是它提供了一种不同于传统同步阻塞模型的异步非阻塞模型的视角来看待问题。

  • 标题: webflux-project
  • 作者: Entropy
  • 创建于 : 2024-02-09 16:25:56
  • 更新于 : 2024-02-09 16:25:56
  • 链接: https://www.entropy-tree.top/2024/02/09/webflux-project/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论