pring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕。
WebFlux 使用的场景是异步非阻塞的,使用 Webflux 作为系统解决方案,在大多数场景下可以提高系统吞吐量。Spring Boot 2.0 是基于 Spring5 构建而成,因此 Spring Boot 2.X 将自动继承了 Webflux 组件,本篇给大家介绍如何在 Spring Boot 中使用 Webflux 。
为了方便大家理解,我们先来了解几个概念。
响应式编程
在计算机中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
例如,在命令式编程环境中,a=b+c 表示将表达式的结果赋给 a,而之后改变 b 或 c 的值不会影响 a 。但在响应式编程中,a 的值会随着 b 或 c 的更新而更新。
响应式编程是基于异步和事件驱动的非阻塞程序,只需要在程序内启动少量线程扩展,而不是水平通过集群扩展。
用大白话讲,我们以前编写的大部分都是阻塞类的程序,当一个请求过来时任务会被阻塞,直到这个任务完成后再返回给前端;响应式编程接到请求后只是提交了一个请求给后端,后端会再安排另外的线程去执行任务,当任务执行完成后再异步通知到前端。
Reactor
Java 领域的响应式编程库中,最有名的算是 Reactor 了。Reactor 也是 Spring 5 中反应式编程的基础,Webflux 依赖 Reactor 而构建。
Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每秒钟可处理 1500 万事件。
简单说,Reactor 是一个轻量级 JVM 基础库,帮助你的服务或应用高效,异步地传递消息。Reactor 中有两个非常重要的概念 Flux 和 Mono 。
Flux 和 Mono
Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。
Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
WebFlux 是什么?
WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。
非阻塞的关键预期好处是能够以小的固定数量的线程和较少的内存进行扩展。在服务器端 WebFlux 支持2种不同的编程模型:
WebFlux 模块从上到下依次是 Router Functions、WebFlux、Reactive Streams 三个新组件。
默认情况下,Spring Boot 2 使用 Netty WebFlux,因为 Netty 在异步非阻塞空间中被广泛使用,异步非阻塞连接可以节省更多的资源,提供更高的响应度。通过比较 Servlet 3.1 非阻塞 I / O 没有太多的使用,因为使用它的成本比较高,Spring WebFlux 打开了一条实用的通路。
值得注意的是:支持 reactive 编程的数据库只有 MongoDB, redis, Cassandra, Couchbase
Spring Webflux
Spring Boot 2.0 包括一个新的 spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。
Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。
快速上手
添加 webflux 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
通过 IEDA 的依赖关系图我们可以返现spring-boot-starter-webflux依赖于spring-webflux、Reactor 和 Netty 相关依赖包。
创建 Controller
@RestController public class HelloController { @GetMapping("/hello") public Mono<String> hello() { return Mono.just("Welcome to reactive world ~"); } }
通过上面的示例可以发现,开发模式和之前 Spring Mvc 的模式差别不是很大,只是在方法的返回值上有所区别。
测试类
@RunWith(SpringRunner.class) @WebFluxTest(controllers=HelloController.class) public class HelloTests { @Autowired WebTestClient client; @Test public void getHello() { client.get().uri("/hello").exchange().expectStatus().isOk(); } }
运行测试类,测试用例通过表示服务正常。启动项目后,访问地址:http://localhost:8080/hello,页面返回信息:
Welcome to reactive world ~
证明 Webflux 集成成功。
以上便是 Spring Boot 集成 Webflux 最简单的 Demo ,后续我们继续研究 Webflux 的使用。
关注我:私信回复“555”获取往期Java高级架构资料、源码、笔记、视频Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术往期架构视频截图
链路追踪是可观测性软件系统的一个非常好的工具。它使开发人员能够了解应用程序中和应用程序之间不同交互发生的时间、地点和方式。同时让观测复杂的软件系统变得更加容易。
从Spring Boot 3开始,Spring Boot 中用于链路追踪的旧 Spring Cloud Sleuth 解决方案将替换为新的 Micrometer Tracing 库。
您可能已经了解 Micrometer,因为它以前被用作公开独立于平台的指标和监控基于 JVM 的微服务(例如 Prometheus )的默认解决方案。最新产品通过独立于平台的链路追踪解决方案扩展了 Micrometer 生态系统。这使得开发人员能够使用一个通用 API 来检测其应用程序,并以不同的格式将其导出到 Jaeger、Zipkin 或 OpenTelemetry 等链路追踪收集器。
本文将介绍在响应式编程 Kotlin 中,如何在 Spring Boot 3 WebFlux 利用 Micrometer 进行链路追踪。
接下来,我们将创建一个简单的 Spring Boot 微服务,它提供一个响应式 REST 端点,该端点在内部查询另一个第三方服务以获取一些信息。目标是导出两个操作的 trace。
我们将从以下 Spring Boot Initializr 项目开始,您可以在此处找到该项目。它包括带有Kotlin Gradle DSL的Spring Boot 3.0.1、Spring Web Reactive (WebFlux)和带有Prometheus的Spring Actuator。以下代码主要使用Kotlin,但如果使用 Java 也是可以的,大多数方法都是相同的。
Spring 初始化模板(https://start.spring.io/) 带有 Webflux、Spring Actuator 和 Prometheus 的 Spring Boot 3 Kotlin 模板
我们将首先添加一个带有测试 endpoint 的简单 REST 控制器类,该测试 endpoint 使用 Spring WebClient 调用外部 API 。我们正在使用 suspend 关键字来使用Kotlin的协程。这使我们能够在利用 Spring WebFlux 的响应式流的同时编写命令式代码。
在以下示例中,我们使用 Spring WebClient 调用外部 TODO-API,该 API 以 JSON 字符串形式返回 TODO 项。我们还将创建一条日志消息,其中稍后应包含一些链路追踪信息。
@RestController
class Controller {
val log=LoggerFactory.getLogger(javaClass)
val webClient=WebClient.builder()
.baseUrl("https://jsonplaceholder.typicode.com")
.build()
@GetMapping("/test")
suspend fun test(): String {
// simulate some complex calculation
delay(1.seconds)
log.info("test log with tracing info")
// make web client call to external API
val externalTodos=webClient.get()
.uri("/todos/1")
.retrieve()
.bodyToMono(String::class.java)
.awaitSingle()
return externalTodos
}
}
在下一步中,我们将把 Micrometer tracing 依赖项添加到我们的build.gradle.kts文件中。由于 Micrometer 支持不同的链路追踪格式和供应商,因此依赖项被分开,我们只导入我们需要的内容。为了保持所有依赖项同步,我们使用 Micrometer Tracing BOM(bom 清单)。此外,我们添加了核心依赖项和桥接器,以将 Micrometer Tracing 转换为 OpenTelemetry 格式(其他格式也可用)。
implementation(platform("io.micrometer:micrometer-tracing-bom:1.0.0"))
implementation("io.micrometer:micrometer-tracing")
implementation("io.micrometer:micrometer-tracing-bridge-otel")
我们还需要添加导出器依赖项来导出创建的 trace。在此示例中,我们将使用由 OpenTelemetry 维护并由 Micrometer Tracing 支持的 Zipkin 导出器。
implementation("io.opentelemetry:opentelemetry-exporter-zipkin")
配置是设置链路追踪必不可少的一步,配置文件application.yaml位于src/main/resources目录下。
management:
tracing:
enabled: true
sampling.probability: 1.0
zipkin.tracing.endpoint: http://localhost:9411/api/v2/spans
logging.pattern.level: "trace_id=%mdc{traceId} span_id=%mdc{spanId} trace_flags=%mdc{traceFlags} %p"
现在我们已经完成了服务设置,我们可以运行它了。如果启动应用程序,默认情况下,服务器应在端口下启动 8080。然后,可以通过打开浏览器来调用我们创建的端点http://localhost:8080/test。以下是请求响应内容:
{ "userId" : 1 , "id" : 1 , "title" : "delectus aut autem" , "已完成" : false }
要查看调用端点时创建的实际链路追踪,我们需要收集并查看它们。在本教程中,我们将使用zipkin导出器将数据导出到 观测云。当然也可以使用其他系统,例如 Zipkin、Grafana Loki 或 Datadog。
现在您可以再次调用我们的 Spring Boot 服务的端点。之后,当您在 观测云 中搜索任何 tracing 时,您应该能够找到端点请求的链路追踪信息。
乍一看,一切似乎都运行良好。然而,我们有两个问题。
解决了部分 issue 问题,这些问题可以在 Micrometer Tracing 文档中找到,参考链接:https://micrometer.io/docs/observation#instrumentation_of_reactive_libraries_after_reactor_3_5_3。
如果我们查看应用程序日志,可以发现调用端点时发出的日志消息。
trace_id=span_id=trace_flags=INFO 43636 --- [DefaultExecutor] com.example.tracing.Controller : test log with tracing info
正如你所看到的,trace_id 和 span_id没有设置。这是因为Micrometer Tracing还无法轻松处理响应式流中的链路追踪上下文。此外,响应式流的Kotlin协程包装器隐藏了链路追踪上下文。因此,我们必须推迟当前响应式流的上下文来获取链路追踪信息。实际上,这看起来如下所示:
Mono.deferContextual { contextView ->
ContextSnapshot.setThreadLocalsFrom(
contextView,
ObservationThreadLocalAccessor.KEY
).use {
log.info("test log with tracing info")
Mono.empty<String>()
}
}.awaitSingleOrNull()
为了更符合应用性,我们可以将示例代码提取到一个单独的函数中。
@GetMapping("/test")
suspend fun test(): String {
// ...
observeCtx { log.info("test log with tracing info") }
// ...
}
suspend inline fun observeCtx(crossinline f: () -> Unit) {
Mono.deferContextual { contextView ->
ContextSnapshot.setThreadLocalsFrom(
contextView,
ObservationThreadLocalAccessor.KEY
).use {
f()
Mono.empty<Unit>()
}
}.awaitSingleOrNull()
}
如果我们现在启动应用程序并调用我们的端点,我们应该能够trace_id在日志中看到。
trace_id=6c0053eba01199f194f5f76ff8d61917 span_id=967d591266756905 trace_flags=INFO 45139 --- [DefaultExecutor] com.example.tracing.Controller : test log with tracing info
第二个问题可以通过查看观测云中的 trace 来发现。它仅显示端点的父链路追踪,但不显示调用的子范围 WebClient。理论上,Spring WebClient 以及 RestTemplate 都是由 Micrometer 自动检测的。但是如果我们查看代码,就会发现我们正在使用静态构建器方法 WebClient。为了从 WebClient 获取自动链路追踪,我们需要使用 Spring 框架提供的构建器 bean。它可以通过我们类的构造函数注入Controller。
@RestController
class Controller(
webClientBuilder: WebClient.Builder
) {
val webClient=webClientBuilder // use injected builder
.baseUrl("https://jsonplaceholder.typicode.com")
.build()
// ...
}
通过上面的代码调整后重新调用 endpoint,我们在观测云中可以看到WebClient的跨度。Micrometer Tracing 还将自动为包含trace_id. 例如,如果我们调用另一个带有链路追踪功能的微服务,它可以获取 ID 并向观测云发送附加信息。
Micrometer Tracing 在 Spring 中自动为我们做了很多事情。但是,有时我们可能希望向链路追踪范围添加特定信息或观察应用程序中非传入或传出调用的特定部分。
我们可以定义自定义标签并将其添加到当前观察中以增强链路追踪数据。要检索当前链路追踪,我们可以使用ObservationRegistry类的 bean 。与日志记录问题类似,我们必须使用包装函数来获取正确的上下文。
@GetMapping("/test")
suspend fun test(): String {
observeCtx {
val currentObservation=observationRegistry.currentObservation
currentObservation?.highCardinalityKeyValue("test_key", "test sample value")
}
// ...
}
添加此代码后,我们可以在观测云中看到我们的自定义标签及其值。
使用 Micrometer API 创建自定义可观测(跨度)通常很容易。但是,在使用响应式流和协程时,我们需要帮助上下文链路追踪。如果我们在端点处理程序中创建一个新的观测,它将被视为一个单独的链路追踪。为了使代码可重用,我们可以编写一个简单的包装函数来创建新的观测点。它的工作原理与我们之前创建的用于使用 trace_id 。
suspend fun runObserved(
name: String,
observationRegistry: ObservationRegistry,
f: suspend () -> Unit
) {
Mono.deferContextual { contextView ->
ContextSnapshot.setThreadLocalsFrom(
contextView,
ObservationThreadLocalAccessor.KEY
).use {
val observation=Observation.start(name, observationRegistry)
Mono.just(observation).flatMap {
mono { f() }
}.doOnError {
observation.error(it)
observation.stop()
}.doOnSuccess {
observation.stop()
}
}
}.awaitSingleOrNull()
}
该函数可以将任何挂起函数包装在新的观察周围。一旦执行了给定的函数,它将自动停止观测。此外,我们将追踪可能发生的任何错误并将其附加到链路追踪中。
我们现在可以应用这个函数来观察任何代码,例如函数的执行delay。
@GetMapping("/test")
suspend fun test(): String {
runObserved("delay", observationRegistry) {
delay(1.seconds)
}
// ....
}
将此代码添加到端点处理程序后,观测云将向我们显示该操作的自定义范围。
典型的 Spring Boot 应用程序通常会连接到实际应用程序中的数据库。要利用响应式技术栈,建议使用 R2DBC(https://r2dbc.io/) API 而不是 JDBC 。
由于Micrometer Tracing是一项相当新的技术,目前还没有可用的自动追踪。然而,Spring 团队正在研究创建自动配置。实验存储库参考链接:https://github.com/spring-projects-experimental/r2dbc-micrometer-spring-boot。
当前项目,需将添加以下依赖项到build.gradle.kts. 为了方便测试,我们不会使用真实的数据库,而是使用 H2 内存数据库(https://www.h2database.com/html/main.html)。
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
runtimeOnly("com.h2database:h2")
runtimeOnly("io.r2dbc:r2dbc-h2")
// R2DBC micrometer auto tracing
implementation("org.springframework.experimental:r2dbc-micrometer-spring-boot:1.0.2")
在Kotlin代码中,添加了一个带有协程支持的简单 CRUD 存储库。如下所示:
@Table("todo")
data class ToDo(
@Id
val id: Long=0,
val title: String,
)
interface ToDoRepository : CoroutineCrudRepository<ToDo, Long>
@RestController
class Controller(
val todoRepo: ToDoRepository,
// ...
) {
@GetMapping("/test")
suspend fun test(): String {
// ...
// save
val entry=ToDo(0,"Springboot3 + WebFlux + Kotlin ")
todoRepo.save(entry)
// Sample traced DB call
val dbtodos=todoRepo.findAll().toList()
// ...
return "${dbtodos.size} $externalTodos"
}
}
调用我们的 endpoint 将会再添加一个跨度。新的跨度名为query,包含多个标签,包括Spring Data R2DBC(https://spring.io/projects/spring-data-r2dbc/) 执行的 SQL 查询。
Micrometer 和新的链路追踪扩展统一了Spring Boot 3及以上版本的可观测性技术栈。为不同公司及其技术栈使用的不同链路追踪解决方案提供了很好的抽象。因此,它简化了我们开发人员的工作。
在 Spring WebFlux 的响应式编程方面,仍然有一些改进的潜力,尤其是 Kotlin。Micrometer 团队正在与Project Reactor (https://projectreactor.io/),Spring WebFlux 使用的响应式库背后的团队进行积极会谈,以简化响应式技术栈的 Micrometer Tracing 的使用。
开发Web应用程序时,经常需要进行一些全局性的配置,例如添加拦截器、设置消息转换器、配置跨域资源共享(CORS)等。在Spring WebFlux中,我们可以通过实现WebFluxConfigurer接口来进行这些全局配置。本文将详细介绍如何使用WebFluxConfigurer进行全局配置,以及常见的配置项和实现方式。
WebFluxConfigurer是Spring WebFlux框架中用于配置WebFlux全局特性的接口,它提供了多个方法用于配置不同的功能。
public interface WebFluxConfigurer {
default void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {}
default void addFormatters(FormatterRegistry registry) {}
default void configureViewResolvers(ViewResolverRegistry registry) {}
default void addArgumentResolvers(ArgumentResolverConfigurer configurer) {}
default void addReturnValueHandlers(ReturnValueHandlerConfigurer configurer) {}
default void configureHandlerExceptionResolvers(HandlerExceptionResolverConfigurer configurer) {}
default void addInterceptors(InterceptorRegistry registry) {}
default void addCorsMappings(CorsRegistry registry) {}
default void configurePathMatching(PathMatchConfigurer configurer) {}
default void configureWebSocketTransport(WebSocketTransportConfigurer configurer) {}
default void configureWebSocketHandlerMapping(WebSocketHandlerMappingConfigurer configurer) {}
default void configureHttpMessageReader(ServerHttpMessageReaderConfigurer configurer) {}
default void configureHttpMessageWriter(ServerHttpMessageWriterConfigurer configurer) {}
default void configureClientHttpRequestFactory(ClientHttpConnectorConfigurer configurer) {}
default void addResourceHandlers(ResourceHandlerRegistry registry) {}
default void configureServerSentEvent(ServerSentEventHttpMessageWriterConfigurer configurer) {}
}
在上述接口中,每个方法对应一个特定的全局配置功能。接下来,我们将详细介绍每个方法的作用和实现方式。
在WebFlux中,消息编解码器(Message Codec)负责将请求和响应的数据转换为对象,并进行序列化和反序列化。可以通过configureHttpMessageCodecs方法来配置消息编解码器。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.customCodecs().encoder(new MyEncoder());
configurer.customCodecs().decoder(new MyDecoder());
}
}
在上述示例中,我们通过configureHttpMessageCodecs方法添加了自定义的编码器和解码器,实现了对特定数据格式的处理。
格式化器(Formatter)用于将字符串类型的数据转换为特定类型的对象,例如将字符串转换为日期对象。可以通过addFormatters方法来配置格式化器。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void addFormatters(FormatterRegistry registry) {
registry.addFormatter(new DateFormatter("yyyy-MM-dd"));
}
}
在上述示例中,我们通过addFormatters方法添加了日期格式化器,将日期字符串转换为yyyy-MM-dd格式的日期对象。
视图解析器(View Resolver)用于将逻辑视图名称解析为实际的视图对象,例如将Thymeleaf模板文件解析为HTML视图。可以通过configureViewResolvers方法来配置视图解析器。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void configureViewResolvers(ViewResolverRegistry registry) {
registry.viewResolver(new ThymeleafViewResolver());
}
}
在上述示例中,我们通过configureViewResolvers方法添加了Thymeleaf视图解析器,用于解析Thymeleaf模板文件。
在WebFlux中,参数解析器(Argument Resolver)用于将请求中的参数解析为控制器方法的参数,而返回值处理器(Return Value Handler)用于将控制器方法的返回值处理为响应数据。可以通过addArgumentResolvers和addReturnValueHandlers方法来配置参数解析器和返回值处理器。
配置参数解析器
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void addArgumentResolvers(ArgumentResolverConfigurer configurer) {
configurer.addCustomResolver(new MyArgumentResolver());
}
}
在上述示例中,我们通过addArgumentResolvers方法添加了自定义的参数解析器MyArgumentResolver。
配置返回值处理器
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void addReturnValueHandlers(ReturnValueHandlerConfigurer configurer) {
configurer.addCustomHandler(new MyReturnValueHandler());
}
}
在上述示例中,我们通过addReturnValueHandlers方法添加了自定义的返回值处理器MyReturnValueHandler。
在Web应用程序开发中,异常处理器(Exception Resolver)用于处理控制器方法中抛出的异常,并返回适当的响应。可以通过configureHandlerExceptionResolvers方法来配置异常处理器。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void configureHandlerExceptionResolvers(HandlerExceptionResolverConfigurer configurer) {
configurer.addExceptionHandler(new MyExceptionHandler());
}
}
在上述示例中,我们通过configureHandlerExceptionResolvers方法添加了自定义的异常处理器MyExceptionHandler。
拦截器(Interceptor)用于在处理请求之前或之后执行一些额外的逻辑,例如记录请求日志、权限验证等。可以通过addInterceptors方法来配置拦截器。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new MyInterceptor()).addPathPatterns("/api/**");
}
}
在上述示例中,我们通过addInterceptors方法添加了自定义的拦截器MyInterceptor,并指定了拦截路径为/api/**。
跨域资源共享(Cross-Origin Resource Sharing,CORS)是一种用于解决跨域访问问题的机制,可以通过addCorsMappings方法来配置CORS。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
.allowedOrigins("*")
.allowedMethods("GET", "POST", "PUT", "DELETE")
.allowCredentials(true)
.maxAge(3600);
}
}
在上述示例中,我们通过addCorsMappings方法配置了跨域访问的规则,允许所有来源的请求访问/api/**路径,并指定了允许的方法、是否允许携带凭证以及最大缓存时间。
路径匹配规则(Path Matching)用于指定URL路径与请求处理器的映射关系。可以通过configurePathMatching方法来配置路径匹配规则。
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Override
public void configurePathMatching(PathMatchConfigurer configurer) {
configurer.setUseSuffixPatternMatch(false)
.setUseTrailingSlashMatch(false);
}
}
在上述示例中,我们通过configurePathMatching方法配置了路径匹配规则,禁用了后缀模式匹配和尾部斜杠匹配。
通过本文的详细介绍,读者应该对如何使用WebFluxConfigurer进行全局配置有了更深入的了解。合理地进行全局配置可以提高开发效率,并确保应用程序的稳定性和可维护性。希望本文能够帮助读者更好地理解和应用Spring WebFlux中的全局配置功能,为开发高效、灵活的Web应用程序提供参考和指导。
公众号:九极客
*请认真填写需求信息,我们会在24小时内与您取得联系。