Skip to content

介绍

Project Reactor 是一个用于构建响应式应用程序的框架,主要基于 Java 语言。它是响应式编程范式的一个实现,专注于处理异步流和背压(backpressure)。Project Reactor 是由 Pivotal(现为 VMware)开发的,并且是 Spring 生态系统的一部分,因此在 Spring WebFlux 等项目中得到了广泛应用。

核心概念

  • 响应式编程:

响应式编程是一种声明式编程范式,旨在处理异步数据流和事件驱动的系统。它的目标是提高系统的可扩展性和弹性。 响应式系统通常具备四个特征:响应性、弹性、弹性和消息驱动。

  • 背压(Backpressure):

背压是响应式流中的一个关键概念,指的是消费者可以控制生产者的速度,以防止被压垮。 Reactor 提供了精细化的背压控制,确保在高负载情况下系统仍能稳定运行。

核心组件

  • 异步数据处理:

Reactor 非常适合处理需要异步和并发的数据流操作,如网络请求、文件 IO、数据库访问等。

  • 微服务架构:

在微服务架构中,Reactor 能够通过非阻塞 IO 和背压机制,提高服务的响应速度和资源利用效率。

  • 实时数据流:

对于需要实时处理数据流的应用,如股票交易、传感器数据处理等,Reactor 提供了强大的工具来构建高效的流处理管道。

优势

高性能:

通过非阻塞 IO 和事件驱动模型,Reactor 可以显著提高应用的吞吐量和响应时间。

良好的 Spring 集成:

作为 Spring 框架的一部分,Reactor 与 Spring WebFlux 无缝集成,使得构建响应式 Web 应用变得更加简单和自然。

丰富的操作符:

Reactor 提供了大量的操作符用于流的转换、合并、过滤和聚合,极大地简化了复杂数据流的处理。

组件介绍

Mono<T>

用于表示 0 或 1 个元素的异步序列。

常用方法:

  • Mono.just(T data): 创建一个包含单个元素的 Mono。
  • Mono.empty(): 创建一个空的 Mono。
  • Mono.fromCallable(Callable<T> callable): 从一个 Callable 创建 Mono,用于异步计算。

Flux<T>

于表示 0 到 N 个元素的异步序列。

常用方法:

  • Flux.just(T... data): 创建一个包含多个元素的 Flux。
  • Flux.fromIterable(Iterable<T> iterable): 从一个 Iterable 创建 Flux。
  • Flux.range(int start, int count): 生成一个从 start 开始的整数序列。

方法介绍

转换操作符:

  • map(Function<T, R> mapper): 对每个元素应用函数转换。
  • flatMap(Function<T, Publisher<R>> mapper): 将每个元素映射为一个异步流,并将这些流合并。

过滤操作符:

  • filter(Predicate<T> predicate): 过滤出满足条件的元素。
  • distinct(): 过滤掉重复元素。

组合操作符:

  • mergeWith(Publisher<T> other): 合并两个流。
  • zipWith(Publisher<T> other): 将两个流的元素逐对组合。

错误处理操作符:

  • onErrorReturn(T fallbackValue): 在发生错误时返回一个默认值。
  • onErrorResume(Function<Throwable, ? extends Publisher<? extends T>> fallback): 在发生错误时切换到备用流。

终止操作符:

  • subscribe(Consumer<? super T> consumer): 订阅流并处理每个元素。
  • block(): 阻塞当前线程直到流结束,返回流的最终结果(适用于 Mono)。

并发控制

Scheduler 和并发

Scheduler:

  • Schedulers.immediate(): 在当前线程执行。
  • Schedulers.single(): 在单一的可复用线程中执行。
  • Schedulers.parallel(): 在并行线程池中执行。
  • Schedulers.boundedElastic(): 在弹性线程池中执行,适合 IO 密集型任务。

并发操作符:

  • publishOn(Scheduler scheduler): 指定在哪个 Scheduler 上执行后续操作。
  • subscribeOn(Scheduler scheduler): 指定在哪个 Scheduler 上订阅源。

背压策略

  • onBackpressureBuffer(): 在背压情况下缓存元素。
  • onBackpressureDrop(): 在背压情况下丢弃元素。
  • onBackpressureLatest(): 在背压情况下只保留最新元素。

代码示例

首先maven中引入响应的依赖

xml
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.7.0</version>
</dependency>

Flux 示例

场景:新闻推送

想象你订阅了一家新闻网站的实时新闻推送服务。每天,这个服务会向你推送多篇新闻文章。这种情况下,Flux 就非常适合,因为它能够处理一个包含 0 到 N 个元素的异步序列。

在代码中,你可以使用 Flux 来表示这个新闻流:

java
Flux<String> newsFlux = Flux.just("News 1", "News 2", "News 3", ...);
newsFlux.subscribe(news -> System.out.println("Received: " + news));

在这个例子中,newsFlux 可以表示每天收到的多篇新闻文章。你可以订阅这个 Flux 来处理每一篇新闻。

Mono 示例

场景:在线订单确认

假设你在网上购买了一件商品,完成支付后,你会收到一个订单确认信息。在这种情况下,Mono 就是一个很好的选择,因为它表示一个包含 0 或 1 个元素的异步序列。

在代码中,你可以使用 Mono 来表示这个订单确认:

java
Mono<String> orderConfirmationMono = Mono.just("Order Confirmation #12345");
orderConfirmationMono.subscribe(confirmation -> System.out.println("Received: " + confirmation));

在这个例子中,orderConfirmationMono 表示可能收到的订单确认信息。你可以订阅这个 Mono 来处理确认信息。

对比

Flux 适用于需要处理多个元素的场景,比如实时数据流、消息队列等。 Mono 适用于需要处理单个结果的场景,比如数据库查询的单个结果、HTTP 请求的单个响应等。

疑问

是不是可以用mono处理的都可以用flux处理?

答:

是的,理论上你可以用 Flux 来处理任何 Mono 能处理的情况,因为 Flux 是一个更通用的工具。Flux 可以表示 0 到 N 个元素的异步序列,而 Mono 是 Flux 的一个特例,专注于处理 0 或 1 个元素的情况。

为什么使用 Mono 而不是 Flux 虽然 Flux 可以处理 Mono 的所有情况,但在实际开发中,使用 Mono 有几个好处:

语义清晰:使用 Mono 可以更清晰地表达代码意图,表明预期的结果是单一的。对于其他开发者或维护者来说,看到 Mono 就知道这个操作只会产生一个结果或没有结果。

API 简洁:Mono 提供了一些特定于单个元素的操作符,这些操作符使得处理单个结果的代码更简洁和直观。

性能优化:在某些实现中,Mono 可能会进行优化,因为它不需要处理多个元素的开销。

复杂的示例

java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.Random;


public class FluxMain2 {

    public static void main(String[] args) throws InterruptedException {
        // 模拟传感器数据生成
        Flux<Integer> sensorDataFlux = Flux.interval(Duration.ofMillis(10))
            .map(i -> new Random().nextInt(100)) // 生成0到99之间的随机数
//            .take(50) // 取前50个数据
            .doOnNext(data -> System.out.println("Generated Data: " + data));

        // 数据处理流水线
        Flux<String> processedDataFlux = sensorDataFlux
//            .filter(data -> data >= 50) // 过滤掉小于50的值
            .map(data -> "Processed Data: " + (data * 2)) // 将数据翻倍并转换为字符串
            .flatMap(data -> simulateDatabaseSave(data)) // 模拟保存到数据库
            .onErrorResume(e -> {
                // 错误处理,发生错误时返回一个默认值
                System.err.println("Error occurred: " + e.getMessage());
                return Mono.just("Default Data");
            })
            .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池上执行
            .doOnComplete(() -> System.out.println("Processing Complete"));

        // 订阅并输出结果,使用背压机制控制消费速率
        processedDataFlux
            .onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE,
                dropped -> System.out.println("Dropped: " + dropped)) // 缓冲并处理溢出
            .publishOn(Schedulers.parallel(), 1) // 每次请求1个元素
            .subscribe(result -> {
                try {
                    // 模拟慢速消费者
                    Thread.sleep(800);
                    System.out.println("Result: " + result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

        // 保持主线程运行,以便观察输出
        Thread.sleep(20000);
    }

    // 模拟数据库保存操作
    private static Mono<String> simulateDatabaseSave(String data) {
        return Mono.just(data)
            .delayElement(Duration.ofMillis(500)) // 模拟数据库延迟
            .doOnNext(d -> System.out.println("Saved to DB: " + d));
    }

}

生活、学习、工作