Reactor

321

关于

本文基本是对官方文档的翻译

官方文档: https://projectreactor.io/docs/core/release/reference/

Reactor 介绍

Reactor 是 JVM 的完全非阻塞响应式编程基础, 有着高效的需求管理 (背压的形式). 它直接整合 Java8 的函数式 API, 尤其是 CompletableFuture, Stream, 还有 Duration . 提供了可组合的异步化序列 API --- Flux (对于 [N] 个元素) 和 Mono (对于 [0|1] 元素) --- 并广泛实现 响应式Stream 规范。

响应式编程介绍

Reactor 是响应式编程范式的一种实现, 概括如下:

响应式编程是一种涉及数据流和变化传播的异步编程范式. 这意味着可以通过编程语言轻松地表示静态 (如数组) 或动态 (如事件发射器) 数据流

https://en.wikipedia.org/wiki/Reactive_programming

在解决 JVM 上异步方法的缺点的同时具有以下特点

  • 组合性(Composability)可读性(Readability)
  • 使用丰富的 操作符 来处理形如 的数据
  • 订阅(subscribe) 之前什么都不会发生
  • 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
  • 并发无关 的高水平但高价值的抽象

Reactor 既是传送带也是转配工人, Publisher 负责生产商品放上传送带, 而操作符则像是传送带上的工人或是加工机械对 Publisher 进行处理, 如果中间发生异常则通过背压的机制向上游发送信号限制加工

组合性和可读性

"组合性" 是指能够编排多个异步任务, 我们使用先前任务的结果将输入提供给后续任务. 另外, 可以我们可以以 fork-join 的形式运行多个任务. 此外, 我们能够复用异步任务的零散组件到更高层次的系统中

Reactor 提供了丰富的组合选项, 其中代码反映了抽象过程的组织, 并且所有内容通长保持在同一级别 (减少嵌套)

流水线类比

你可以将响应式应用程序处理的数据视为经过流水线. Reactor 既是传送带又是工作站. 原材料从源 (最上游的 Publisher) 中流出, 并且最终作为成品准备推送给消费者 (Subscriber)

源材料可以经过多种传变和一些中间步骤, 或者成为将中间件聚合在一起的更大装配线的一部分. 如果某一点出现故障或堵塞 (也许是装箱产品需要花费不成比例的长时间), 受影响的工作站可以向上游发出信号, 以限制原材料流动

操作符

在 Reactor 中, 操作符类似工作站. 每个操作符添加行为到 Publisher 中, 并将上一步的 Publisher 包装到新的实例中. 因此, 整个链被链接在一起, 数据源于第一个 Publisher 沿着链向下移动, 并通过每个链接进行转换。最终, Subscriber 完成处理. 记住, 正如我们很快会看到的, 在 Subscriber 订阅 Publisher 之前, 什么都不会发生

订阅之前什么都不会发生

通过 subscribe(), 可以将 PublisherSubscriber 进行绑定, 从而触发整个链中的流数据. 这是在内部实现的, 通过单个 request 信号从 Subscriber 传播到上游, 一直传回到 Publisher

背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上, 某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样

在响应式流规范中实际定义的机制同刚才的类比非常接近: 订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据, 也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素

中间环节的操作也可以影响 request. 想象一个能够将每 10 个元素分批打包的缓存(buffer) 操作。 如果订阅者请求一个 buffer,那么对于源头来说可以生成 10 个元素. 一些操作符还实现了 "提前获取" 的策略, 可以避免 request(1) 往返, 并且如果在需要之前生产这些元素的成本不是太高则是有益的

这样能够将 "推送" 模式转换为 "推拉混合" 的模式, 如果下游准备好了, 可以从上游拉取 n 个元素; 但是如果上游元素还没有准备好, 下游还是要等待上游的推送

热 (Hot) vs 冷 (Cold)

在 Rx 家族的响应式库中, 响应式流分为“热”和“冷”两种类型, 区别主要在于响应式流如何 对订阅者进行响应:

  • 一个 "冷" 的序列, 指对于每一个 Subscriber 都会收到从头开始所有的数据, 包括数据源. 例如, 如果源头包装了一个 HTTP 调用, 对于每一个订阅都会发起一个新的 HTTP 请求
  • 一个 "热" 的序列, 指对于一个 Subscriber 只能获取从它开始 订阅 之后 发出的数据. 不过注意, 有些 "热" 的响应式流可以缓存部分或全部历史数据. 通常意义上来说, 一个 "热" 的响应式流, 甚至在即使没有订阅者接收数据的情况下, 也可以 发出数据 (这一点同 Subscribe() 之前什么都不会发生”的规则有冲突)

Singnal

在 Reactor 中传递的信号, 分为

  • onSubscribe
  • onNext
  • onError
  • onComplete
  • onAfterTerminate
  • onRequest
  • onCancel

核心特性

Flux

包含 0 - n 个元素的异步序列, 返回 0 - n 个元素

Flux<T> 是一个能够发出 0 - n 个元素的标准的 Publisher<T>, 它会被 "complete" 和 "error" 信号终止. 因此返回结果是一个 value, completion, error

所有的信号事件都是可选的, 如果没有 onNext 事件那么发出的就是一个 空的 无限序列, 如果没有 onCompelte 那么就是一个 无限的 空序列. 就像 Flux.interval(Duration) 生成的 Flux<Long>, 就是一个无限地周期性发出规律 tick 的时钟序列

Mono

异步的 0 | 1 结果

mono

Mono<T> 是一个特殊的 Publisher<T>, 它最多发出一个元素, 然后终止于一个 onComplete 信号或一个 onError 信号

注意: 可以使用Mono<Void> 创建一个可以用于表示 "空的" 只有完成概念的异步处理 (Runnable) 的 Mono

简单创建和订阅 Flux 和 Mono

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");

Flux<String> seq2 = Flux.fromIterable(iterable);

Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); // 第一个参数是开始的数字, 第二个是元素个数

订阅的 API

方法描述
subscribe()订阅并触发序列
subscribe(Consumer<? super T> consumer)对每个生产的元素进行处理
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)处理每个数据, 也对异常做出应对
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer)处理每个数据和异常同时在成功完成时执行代码
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)处理每个数据 & 异常 & 完成时回调的同时处理调用 subscribe 产生的 Subscription

以上方法都会返回一个 Subscription, 如果不需要更多的元素你可以用过此对象来取消订阅. 一旦取消订阅, 源会停止生产元素并且清楚它创建的所有资源. 这种 cancel-and-clean-up 行为定义在通用 Disposable 接口在中

subscribe 方法示例

产生 1 - 3, 3 个整型值

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(); 

产生三个值并打印

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(i -> System.out.println(i)); 

处理错误

Flux<Integer> ints = Flux.range(1, 4) 
      .map(i -> { 
        if (i <= 3) return i; 
        throw new RuntimeException("Got to 4"); 
      });
ints.subscribe(i -> System.out.println(i), 
      error -> System.err.println("Error: " + error));

添加完成后处理

Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");}); 

包含一个自定义的 Subscriber

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

 public static class SampleSubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("Subscribed");
            request(1);
        }

        @Override
        protected void hookOnNext(T value) {
            System.out.println("Next: " + value);
            request(1);
        }
}

request(n) 能够在任何 hook 中,通过 subscription 向上游传递 背压请求。这里我们在开始这个流的时候请求 1 个元素值, 并每次请求一个元素

自定义的 Subscriber 推荐继承 BaseSubsriber 来实现. 这个类提供了一个些 hook 方法, 可以重写它们来调整 subscriber 的行为. 默认情况下, 它会触发一个无限个数的请求, 但是当你想自定义请求元素的个数的时候, 扩展 BaseSubscriber 可以方便地实现背压

扩展时通常至少要覆盖 hookOnSubscribe(Subscription subscription)hookOnNext(T value) 两个方法. 建议同时重写 hookOnError, hookOnCancel, 以及hookOnComplete 方法. 最好也重写 hookFinally 方法

取消订阅

使用 subscribe() 方法及其变体返回的 Dispoable 类型来进行取消

对于 FluxMono, 取消是一个源停止生产元素的信号. 然而, 他并不保证是立刻的: 一些源可能生产元素非常快以至于他们甚至可以在取消操作之前完成

一些围绕 Disposable 的工具, 可以通过 Disposables 类使用. 在这些之中, Disposables.swap() 创建一个 Disposable 包装器, 让你可以自动取消和替换具体的 Disposable. Disposables.composite(...) 可以让你收集一些 Disposable 实例, 与服务调用的多个正在执行的请求并稍后立即处理这些请求

可编程式创建序列

以编程的方式定义其关联的事件 (onNext, onError, onComplete)

同步 Generate

这是一种 同步地, 逐个地(一对一) 产生值的方法, 意味着 sink 是一个 SychronousSink 而其 next() 方法在每次回调的时候最多只能被调用一次. 可以调用 error(Throwable)complete(), 也可不调用

可以保留一个状态, 可以在 sink 的使用中参考这个状态来决定下一步发出什么

生成 1- 10 的序列

Flux.generate(() -> 1, (state, sink) -> {
      sink.next(state);
      if (state == 10) sink.complete();
      return state + 1;
  }).subscribe(System.out::println);

如果状态对象需要清理资源, 使用 generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)stateConsumer 的方法签名, 这个 Consumer 会在序列终止是被调用

Flux.generate(() -> 1, (state, sink) -> {
    sink.next(state);
    if (state == 10) sink.complete();
    return state + 1;
}, System.out::println).subscribe(System.out::println);

如果 state 使用了数据库连接等其他需要清理的资源, 这个 Consumer 就可以用来最后关闭连接清理资源等

异步多线程 Create

这是一个更高级的创建 Flux 的方式, create 既可以是同步的也可以是异步的, 并且还可以每次发出多个元素

generate 不同的是, create 不需要状态值, 另一方面, 它可以在回调中出发多个事件 (即使是在未来某个时间)

注意:

create 不能使代码并行化, 也不能使其异步化, 即使它可以与异步的 API 一起使用. 如果 create lambda 中阻塞了, 则会使自己陷入死锁和类似副作用中. 即使使用了 subscribeOn, 也许要谨慎, 即长时间的阻塞的 create lambda (比如无限循环调用 sink.next(t)) 会锁住管道: 这些请求永远不会执行, 因为循环会耗尽运行它们的线程

使用 subscribeOn(Scheduler, false) 的形式: requestOnSeaprateThread = false 将使用 Scheduler 线程来进行 create. 并且仍然可以通过原始的线程中执行 request 来让数据流动

create 有个好处就是可以将现有的 API 转为响应式, 比如监听器的异步方法

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

以上的代码桥接了 MyEventListener

overflowStrategy

  • INGORE: 完全忽略下游的背压请求, 这可能在下游队列积满时导致 IllegalStateException
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号
  • DROP: 当下游没有准备好接受新的元素的时候抛弃这个元素
  • LATEST: 让下游只得到上游最新的元素
  • BUFFER: 默认策略, 缓存所有下游没有来得及处理的元素 (这是一个不限大小的缓存, 可能导致 OutOfMemoryError)

相应的, 在 Mono 中也有一个 create, 但不能生成多个元素, 因此会抛弃第一个元素后的所有元素

异步单线程 push

push 介于 generatecreate 之间, 适合处理来自单个生产者的事件. 与 create 类似, push 也可以是异步的, 并能够使用各种溢出策略 (overflow strategies) 管理背压. 但是, 每次只能有一个生产线程可以调用 next, completeerror

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }

        public void processError(Throwable e) {
            sink.error(e); 
        }
    });
});

混合式推/拉模型

大多数响应式操作符都遵循混合的 推/拉 模式. 尽管大多数处理都是异步的 (建议使用 push 方法), 但也有一个小的 pull 组件: request

从源来看, 消费者从源 pull 数据, 直到第一次请求之前, 它不会发出任何东西. 只要有可用的数据时, 源回向消费者 pull 数据, 但会在请求范围内

push()create() 都允许设置 onRequest Consumer, 以便于管理请求量, 并确保只有当有待处理请求时, 才将数据推送到 sink 中

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            // 传递异步到达的消息
            sink.next(s); 
          }
        }
    });
    sink.onRequest(n -> {
        // 请求时轮询消息
        List<String> messages = myMessageProcessor.getHistory(n); 
        for(String s : message) {
           sink.next(s); 
        }
    });
});

push()create() 中的 sink, 支持设置 onDisposeonCancel, 在取消或终止时执行清理

onDispose 可以用于 Flux 完成, 错误或取消时执行的清理

onCancel 可以用于在使用 onDispose 执行清理之前, 执行任何特定于取消的操作

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });

Handle

handle 方法是一个实例方法, 所以必须使用在一个现有的源

这个方法类似于 generate, 使用 SynchronousSink 并只允许逐个的发出 . 但 handle 与其不同的是 handle 可以从每个源元素中生成任意一个值, 可能会跳过一些元素, 于是便可作为 mapfilter 的组合

有一个例子. 响应式规范不允许序列中的值为 null, 但是你想使用一个预先存在的方法作为 map 函数来执行 map, 而这个方法有时候返回 null 该怎么办?

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

可以使用 handle 来删除任何空值

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); 
        if (letter != null) 
            sink.next(letter); 
    });

alphabet.subscribe(System.out::println);

线程和调度器

Reactor 是 可并发的(concurrency-agnostic)

获得 FluxMono 不一定意味着它要在特定的线程运行. 相反, 大多数操作符继续在上一个操作符执行的 Thread 中工作. 除非指定, 否则最上面的操作符 (源) 本身运行在调用了 subscribe()Thread

下面的示例在一个新的线程上运行 Mono:

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); 

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> 
          System.out.println(v + Thread.currentThread().getName()) 
      )
  )
  t.start();
  t.join();
}

以上的 Mono 在 main 线程中组装, 但是它在 Thread-0 中完成订阅, 因此操作符的回调实际上是在 Thread-0 中运行

在 Reactor 中, 执行模型和执行的位置由使用的 Scheduler 决定. Scheduler 具有类似于 ExecutorService 的调度职责, 但有一个专门的抽象使其可以做更多的事情, 尤其是作为一个时钟和更广的范围实现 (虚拟时间测试, 波动或立即调度等)

Schedulers 类又可以访问执行上下文的静态方法:

  • Schedulers.immediate() 无执行上下文: 在处理时, 提交的 Runnable 将直接执行, 有效地在当前的 Thread 上运行它们 (可以视为 "空对象" 或无操作的 Scheduler)
  • Schedulers.single() 一个单一可重用的线程: 此方法对所有调用者会重用相同的线程, 直到调度器销毁掉. 如果希望每次调用都有一个特定的线程, 使用Schedulers.newSingle()
  • Schedulers.elastic() 一个无边界的弹性线程池: 此方法已不再是首选, 使用 Schedulers.boundedElastic() 代替
  • Schdulers.boundedElastic() 一个有边界的弹性线程池: 这是一个提交阻塞处理自身线程方法, 这样他就不会占用其他资源. 在阻塞 I/O 中这是一个更好的选择. 从 3.6.0 开始, 这个方法可以根据设置提供两种不同的实现:
    • 基于 ExecutorService. 这个实现类似前身 elastic(), 会根据需要创建新的和重用的线程池. 线程池闲置太久会被销毁 (默认 60s). 与 elastic() 不同, 它可以创建的后台线程数量有限 (默认为 CPU 核心数 *10). 在到达上限后提交的 最多100 000 个任务将被排队, 并在线程变得可用时重新被调度 (如果调度有延迟, 这个延迟会开始在线程变得可用时)
    • 基于 Thread-per-task (一个任务一个线程), 设计来在虚拟线程上运行. 要使用这个实现应用程序需要运行在 Java 21+ 环境下并且将 reactor.schedulers.defaultBoundedElasticOnVirtualThreads 系统属性设置为 true. 一旦设置, Schedulers.boundedElastic() 返回一个 BoundedElasticScheduler 的特定实现, 专门用于在虚拟线程上运行每个任务
  • Schedulers.parallel() 固定线程池: 创建与 CPU 核心数相同的线程

此外还可以用现有的 ExecutorService 通过 Schedulers.fromExecutorService(ExecutorService) 来创建一个 Scheduler (可以但不建议使用 Executor 创建)

也可以通过使用 newXXX 方法创建各种调度器类型的新实例. 例如 Schedulers.newParaleel(name)

注意: 虽然 boundedElastic 是为了在无法避免的情况下帮助处理遗留的阻塞代码, 但 singleparellel 不是. 因此, 在 Reactor 阻塞 API (在默认的单调度器和并行调度器内 block(), blockFirst, 𰻞ockLast() 以及迭代 toIterable()toStream()) 会导致 IllegalStateException

通过创建 NonBlocking 标记接口的 Thread 实例, 自定义的 Scheduler 也可以被标记为 NonBlocking

一旦操作符使用 Schedulers 中特定的调度器. 例如 Flux.interval(Duration.ofMillis(300)) 工厂方法会每隔 300 ms 产生一个 Flux<Long>. 默认情况下, 这是由 Schedulers.parallel() 启用的

通过下面的代码指定 Schduler

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor 提供了两种方式来切换响应式链中的执行上下文 (或者调度器) : publishOn and subscribeOn. 两种方式都会获取一个 Scheduler 并且将执行上下文切换到这个调度器, 但是 publishOn 在链中的放置位置很重要而 subscribeOn 则可随意

当使用链接操作符时, 可以根据需要在内部封装尽可能多的 FluxMono 实现. 一旦订阅, 一个 Subscriber 对象的链就会被创建出来, 向后 (沿着链向上) 到第一个生产者

publishOn 方法

publishOn 应用在订阅链的中间位置. 他接收来自上游的信号, 并在下游重放, 同时在相关的 Scheduler 中的某个 worker 执行回调. 因此他影响到后续操作符的执行 (直到链中的另一个 publishOn)

  • 将执行上下文改为由 Scheduler 选择的一个 Thread
  • 根据规范, onNext 依次调用, 因此这会占用单个线程
  • 除非他们工作在特殊的 Scheduler 上, publishOn 之后的操作符继续在相同的线程上执行
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  
    .publishOn(s)  
    .map(i -> "value " + i);  

new Thread(() -> flux.subscribe(System.out::println)); 

subscribeOn 方法

当向下的链被构造时, subscribeOn 适用与在订阅上处理. 通常建议放在源数据之后, 因为中间的操作可能会影响执行上下文

这个方法不会影响在随后调用的 pushlishOn

  • 改变整个链的所订阅的 Thread
  • Scheduler 中选择一个线程

只有最接近下游的 subscribeOn 调用才会有效的将订阅和请求信号调度到可以拦截他们的源或操作符 (doFirst, doOnRequest). 使用多个 subscibeOn 是不必要的

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  
    .subscribeOn(s)  
    .map(i -> "value " + i);  

new Thread(() -> flux.subscribe(System.out::println)); 

错误处理

在响应式流中, 错误是终结事件. 一旦错误发生, 他就会定制序列并沿着运算符链传播到最后一步, 即定义的订阅者及其 onError 方法

此类错误需要在应用程序层面上被处理. 对于实例, 你可以在 UI 上显示一个错误通知或者在 REST endpoint 发送一个有意义的错误载荷. 因此, 订阅者的 onError 方法总是需要被定义

如果没有定义, onError 会抛出一个 UnsupportedOperationException. 你可以在 Exceptions.isErrorCallbackNotImplemented 方法进一步检测和分类它

Reactor 还提供了处理链中间处理错误的替代方法, 即错误操作符:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example

错误处理操作符

静态备用值

onErrorReturn 等效于 "捕获并返回一个默认静态值"

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}
Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

你还可以选择在异常上应用一个 Predicate 来决定是否恢复

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
备用方法

如果想有多个默认值, 并且有其他的 (更安全的) 方式处理数据, 可以使用 `onErrorResume

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}
Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) 
        .onErrorResume(e -> getFromCache(k)) 
    );
动态备用值

如果没有其他的数据处理方式并且要从异常中计算出一个备用值

erroringFlux.onErrorResume(error -> Mono.just( 
        MyWrapper.fromError(error) 
));

捕获并重新抛出异常

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );

可以有一个更直接的方法:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
延迟处理 (Log or React on the Side)

如果想让错误继续传播, 但仍想在不修改序列的情况下对错误做出响应 (例如记录日志), 可以使用 doOnError 操作符

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}

doOnError 操作符以及所有以 doOn 为前缀的操作符, 有时被称为 side-effect. 它们可以让你在不修改序列时间的情况下窥视到序列内部的事件

下面的例子仍会传递错误, 但至少可以确保日志记录

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) 
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); 
        })
        
    );
使用 Resources 和 Final 块

使用 usingdoFinally

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); 
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Flux<String> flux =
Flux.using(
        () -> disposableInstance, 
        disposable -> Flux.just(disposable.toString()), 
        Disposable::dispose 
);
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1);  // 在发射一个元素后取消
重试

使用 retry

它通过重新订阅上游的 Flux 来工作. 实际上是一个不同的序列, 原始序列仍然是终止的

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() // 将每个值与前一个值发出后的持续时间关联起来
    .subscribe(System.out::println, System.err::println); 

Thread.sleep(2100);

使用 retryWhen

使用一个 "伴生" Flux 去判断特定 error 是否应该重试. 这个伴生 Flux 被操作符创造但是被用户装饰, 目的是自定义重试条件

他接收一个 Retry 策略/函数. Retry 类是一个抽象类, 其中提供了各种工厂方法, 例如 Retry.from(Function)

重试周期如下:

  1. 每次发生错误时 (提供重试的可能性), RetrySinal 都会被发送到伴生的 Flux, 而这个 Flux 已经被函数装饰过了. 这里的 Flux 可以看到至今所有的尝试. RetrySinal 可以访问错误及其周围的元数据
  2. 如果伴生 Flux 发射一个值, 就会发生一次重试
  3. 如果伴生 Flux 完成, 则错误被吞掉, 重试循环停止, 结果序列也完成
  4. 如果伴生的 Flux 产生一个 error (e), 重试周期停止并产生带有 error (e) 的序列

前两中情况区分很重要. 只需要完成伴生对象就可以有效的消除错误. 以下使用 retryWhen 模拟 retry(3) 的方法

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) 
    .doOnError(System.out::println) 
    .retryWhen(Retry.from(companion -> 
        companion.take(3))); // 只重试 3 个 error

实际上, 前面的示例会生成一个空 Flux, 但它成功完成. 由于同一个 Flux 上的 retry(3) 会以最近的错误而终止, 因此 retryWhen 示例于 retry(3) 并不完全相同

以下例子使行为完全相同

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
    Flux.<String>error(new IllegalArgumentException())
    .doOnError(e -> errorCount.incrementAndGet())
    .retryWhen(Retry.from(companion -> 
                          companion.map(rs -> { 
                              if (rs.totalRetries() < 3) return rs.totalRetries(); 
                              else throw Exceptions.propagate(rs.failure()); 
                          })
                         ));

以上也可以使用 errorFlux.retryWhen(Retry.max(3)) 代替

core 提供的 Retry 辅助工具, RetrySepcRetryBackoffSpec, 都允许进行高级定制, 如:

  • 为可以出发重试的异常设置 filter(Predicate)
  • 通过 modifyErrorFilter(Function) 修改这样一个先前设置的过滤器
  • 触发重试触发器 (即延时前后的回退) 等 side-effect, 前提是重试有效 (doBeforeRetrydoAfterRetry 是附加的)
  • 在重试触发器周围触发一个异步的 Mono<Void>, 这允许在基本延迟上添加异步行为, 从而进一步延迟触发器 (doBeforeRetryAsyncdoAfterRetryAsync 是附加的)
  • 在达到最大尝试次数的情况下, 通过 onRetryExhaustedThrow(BiFunction) 自定义异常. 默认情况下, 使用Exceptions.retryExhausted(...), 可以通过 Exceptions.isRetryExhausted(Throwable) 来区分
  • 激活暂时性错误处理
暂时性错误重试

一些长期存在的源可能会出现偶尔的错误爆发, 然后在一段较长的时间里一切正常. 这就是暂时性错误 (transient errors)

RetrySingal 接口代表 retryWhen 的状态, 他有有一个 totalRetriesInARow() 值可以用于此处. 不同于通常单调递增的 totalRetries() 索引, 每次重试恢复错误时 (即, 当重试尝试导致传入 onNext 而不是再次出现 onError 时), 这个副索引都会重置为 0

当在 RetrySpecRetryBackoffSpec 中设置 transientErrors(boolean) 配置为 true 时, 结果策略会利用 totalRetriesInARow(), 有效的处理暂时性错误. 这些规范根据索引计算重试模式, 因此实际上规范的所有其他配置参数都独立地应用于每个错误突发时

final AtomicInteger errorCount = new AtomicInteger(); 
final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
    Flux.generate(sink -> {
        int i = transientHelper.getAndIncrement();
        if (i == 10) {
            sink.next(i);
            sink.complete();
        } else if (i % 3 == 0) {
            sink.next(i);
        } else {
            sink.error(new IllegalStateException("Transient error at " + i));
        }
    });
Flux<Integer> transientFlux = httpRequest.get() 
    .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))  
    .blockLast();
assertThat(errorCount).hasValue(6);

如果没有 transientErrors(true), 在第二次突发错误时, 将达到最大重试次数 2, 并且在发出 onNext(3) 后, 序列将失败

处理操作符或函数中的异常

如果调用了一些声明了 throws 异常的方法 (例如大部分 IO API 和线程 API), 依然需要在 try-catch 中处理这些异常. 有几种选择:

  1. 捕获异常并从中恢复. 序列正常进行
  2. 捕获异常, 将其等装成一个 unchecked 异常, 并抛出 (中断序列). Exceptions 工具类可以解决这个问题
  3. 如果需要返回一个 Flux (例如在 flatMap 中), 那么就用一个产生错误的 Flux 来封装异常, 例如 return Flux.error(checkedException). (这个序列也会终止)

Exceptions 工具类可以用来确保只有当异常为 checked 异常时才会被包装:

  • 如果有必要, 使用 Exceptions.propagate 方法来包装异常. 它会先调用 throwIfFatal 并且不会包装 RuntimeException
  • 使用 Exceptions.unwrap 方法可以获取原始的未包装的异常 (回到响应式特定异常的层次结构的根源)

比如以下方法, 可能会抛出 IOException 异常:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

假设在 map 中调用此方法, 必须显示的捕获该异常, 且 map 函数不能重新抛出. 可以将其作为 RuntimeException 异常传播到 maponError 方法中, 如下:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

此后, 当订阅前面的 Flux 并对错误做出响应时, 如果你想对 IO 异常做一些特殊处理, 可以将其还原到原始状态:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

Sinks

在 Reactor 中, sink 是一个类, 它允许以独立的方式安全地手动触发信号, 创建一个类似 Publisher 的结构, 能够处理多个 Subscriber (unicast() (单播) 风格除外)

多线程下使用 Sinks.One 和 Sinks.Many 安全地生产

reactor-core 公开的默认 Sinks 课确保检测到多线程使用, 并且从下游订阅者的角度来看, 不会导致规范违规或未定义的行为. 当使用 tryEmit* API 时, 并行调用很快就会失败. 当使用 emit* API 时, 提供的 EmissionFailureHandler 可以允许重试争用 (例如忙等待), 否则 sink 会因错误终止

Sinks 构造器为主要支持的生产者类型提供了向导 API

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();

多个生产者线程可以同时在 sink 上生成数据:

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);

当使用 busyLooping 时, 要注意返回的 EmitFailureHandler 实例不能被重用, 例如, 每个 emitNext 应该调用一次 busyLooping. 此外, 建议使用 100 ms 以上的超时, 因为较小的值没有实际意义

Sink.Many 可以作为 Flux 呈现给下游消费者:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();

类似地, Sinks.EmptySinks.One 可以通过 asMono 方法视为 Mono

Sinks 的种类:

  1. many().multicast(): 仅将新推送的数据传输给其订阅者, 尊重他们的背压 (新推送的数据: "订阅者订阅后")
  2. many().unicast(): 跟上面一样, 但在第一个订阅者注册之前推送的数据会被缓存
  3. many().replay(): 它将向新的订阅者们重播指定历史大小的推送数据, 然后继续实时推送新数据
  4. one(): 向订阅者们播放一个元素的 sink
  5. empty(): 向订阅者们播放一个终结信号的 sink, 但是仍然可以被视为 Mono<T>

可用的 Sinks 概述

Sinks.many().unicast().onBackpressureBuffer(args?)

单播 Sinks.Many, 可以通过使用内部的缓冲区来处理背压. 只能有一个订阅者

一个基础的单播 sink 可以通过 Sinks.many().unicast().onBackpressureBuffer() 创建. 但是 Sinks.many().unicast() 中还有一些额外的 unicast 静态工厂方法, 允许进行更精细的调整

例如, 默认情况下, 它是无界的: 如果通过它推送任意数量的数据, 而其订阅者尚未请求数据, 他会缓存所有数据. 你可以通过在 Sinks.many().unicat().onBackPressureBuffer(Queue) 工厂方法中为内部缓存提供自定义队列实现来更改此设置. 如果该队列有界, 则当缓存已满且没有收到来自下游的足够请求时, sink 会拒绝推送值

Sinks.many().multicast().onBackpressureBuffer(args?)

多播 Sinks.Many, 可以向多个订阅者发出消息, 同时为多个订阅者提供背压. 订阅者们在订阅后仅接受通过 sink 套送的信号

默认情况下, 如果所有订阅者被取消, 它会清楚自身内部的缓存并且停止接受新的订阅者. 你可以使用 Sinks.many().multicast() 下的多播静态工厂方法中的 autoCancel 参数来调整

Sinks.many().multicast().directAllOrNothing()

多播 Sinks.many, 具有简单的背压处理: 如果任何订阅者太慢 (需求为 0), 则所有订阅者的 onNext 都会被丢弃

然而, 慢速订阅者不会被终止, 一旦慢速订阅者再次开始请求, 所有订阅者都将恢复接收从那里推送的元素

一旦 Sinks.Many 终止 (通常是通过调用它的 emitError(ThrowableemitComplete() 方法), 他就会让更多订阅者订阅, 但会立即向他们重放终止信号

Sinks.many().multicast().directBestEffort()

多播 Sinks.Many, 具有尽力的背压处理: 如果一个订阅者太慢 (需求为 0), 则仅针对这个慢速订阅者丢弃 onNext

然而, 慢速订阅者不会被终止并且一旦他们重新开始请求, 他们将恢复接收新推送的元素

一旦 Sinks.Many 终止 (通常是通过调用它的 emitError(ThrowableemitComplete() 方法), 他就会让更多订阅者订阅, 但会立即向他们重放终止信号

Sinks.many().replay()

重放 Sinks.Many, 缓存发出的元素并重放它们给新的订阅者

有多种方式创建:

  • 有限历史大小的缓存 Sinks.many().replay().limit(int) , 或者无限 Sinks.many().replay().all()
  • 基于时间的重播窗口 Sinks.many().replay().limit(Duration)
  • 组合历史大小和时间窗口的缓存 Sinks.many().replay().limit(int, Duration)

还有更多包括缓存单个元素的变体 latest()latestOrDefault(T)

Sinks.unsafe().many()

高级用户考虑使用, 提供相同的 Sinks.Manay 工厂, 而没有生产者线程安全性. 每个 sink 的开销会更少, 因为线程安全的 sink 必须检测多线程访问

库开发人员不应该用不安全的 sink, 但可以在受控的调用环境中内部使用它们, 在该环境中, 他们可以根据反应流规范确保导致 onNext, onCompleteonError 信号的调用的外部同步

Sinks.one()

这个方法直接构造一个简单的 Sinks.One<T> 实例

可以作为 Mono 来查看, 并且具有稍微不同的发射方式, 以更好地传达这种类似 Mono 的语义:

  • emitValue(T value) 生成一个 onNext(value) 信号并且在大多数实现中还出发一个隐式 onComplete()
  • emitEmpty() 生成一个 独立的 onComplete() 信号, 旨在生成一个空 Mono 的等价物
  • emitError(Throwable t) 生成一个 onError(t) 信号

Sinks.one() 接受对这些方法中任何一个的一次调用, 有效地生成一个 Mono, 该 Mono 要么完成一个值, 要么完成为空, 要么失败

Sinks.empty()

这个方法直接构造一个简单的 Sinks.Empty<T> 实例. 类似 Sinks.One<T>, 只不过它不提供 emitValue 方法

结果只能生成一个完成为空或者失败的 Mono

高级特性和概念

互用操作符使用

使用 transform 操作符

使用 transformDeferred 操作符

热 vs 冷

使用 ConnecttableFlux 广播给多个订阅者

三种批处理

使用 Flux<GroupedFlux<T>> 进行分组

使用 Flux<Flux<T>> 进行窗口化

使用 Flux<List<T>> 进行缓存

替换默认的 Schedulers

使用全局 Hooks

丢弃 Hooks

内部错误 Hook

组装 Hook

Hook 预设

向一个响应式序列添加 Context

Context API

关联 Context 与 Flux

用过 ContextView 读取 Context

简单的 Context 例子

完整例子

Context-Propagation 支持