Reactor
关于
本文基本是对官方文档的翻译
官方文档: https://projectreactor.io/docs/core/release/reference/
Reactor 介绍
Reactor 是 JVM 的完全非阻塞响应式编程基础, 有着高效的需求管理 (背压的形式). 它直接整合 Java8 的函数式 API, 尤其是 CompletableFuture
, Stream
, 还有 Duration
. 提供了可组合的异步化序列 API --- Flux
(对于 [N] 个元素) 和 Mono
(对于 [0|1] 元素) --- 并广泛实现 响应式Stream 规范。
响应式编程介绍
Reactor 是响应式编程范式的一种实现, 概括如下:
响应式编程是一种涉及数据流和变化传播的异步编程范式. 这意味着可以通过编程语言轻松地表示静态 (如数组) 或动态 (如事件发射器) 数据流
在解决 JVM 上异步方法的缺点的同时具有以下特点
- 组合性(Composability) 和 可读性(Readability)
- 使用丰富的 操作符 来处理形如 流 的数据
- 在 订阅(subscribe) 之前什么都不会发生
- 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
- 与 并发无关 的高水平但高价值的抽象
Reactor 既是传送带也是转配工人, Publisher 负责生产商品放上传送带, 而操作符则像是传送带上的工人或是加工机械对 Publisher 进行处理, 如果中间发生异常则通过背压的机制向上游发送信号限制加工
组合性和可读性
"组合性" 是指能够编排多个异步任务, 我们使用先前任务的结果将输入提供给后续任务. 另外, 可以我们可以以 fork-join 的形式运行多个任务. 此外, 我们能够复用异步任务的零散组件到更高层次的系统中
Reactor 提供了丰富的组合选项, 其中代码反映了抽象过程的组织, 并且所有内容通长保持在同一级别 (减少嵌套)
流水线类比
你可以将响应式应用程序处理的数据视为经过流水线. Reactor 既是传送带又是工作站. 原材料从源 (最上游的 Publisher
) 中流出, 并且最终作为成品准备推送给消费者 (Subscriber
)
源材料可以经过多种传变和一些中间步骤, 或者成为将中间件聚合在一起的更大装配线的一部分. 如果某一点出现故障或堵塞 (也许是装箱产品需要花费不成比例的长时间), 受影响的工作站可以向上游发出信号, 以限制原材料流动
操作符
在 Reactor 中, 操作符类似工作站. 每个操作符添加行为到 Publisher
中, 并将上一步的 Publisher
包装到新的实例中. 因此, 整个链被链接在一起, 数据源于第一个 Publisher
沿着链向下移动, 并通过每个链接进行转换。最终, Subscriber
完成处理. 记住, 正如我们很快会看到的, 在 Subscriber
订阅 Publisher
之前, 什么都不会发生
订阅之前什么都不会发生
通过 subscribe()
, 可以将 Publisher
与 Subscriber
进行绑定, 从而触发整个链中的流数据. 这是在内部实现的, 通过单个 request
信号从 Subscriber
传播到上游, 一直传回到 Publisher
背压
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上, 某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样
在响应式流规范中实际定义的机制同刚才的类比非常接近: 订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据, 也可以通过使用 request
机制来告知源头它一次最多能够处理 n
个元素
中间环节的操作也可以影响 request
. 想象一个能够将每 10 个元素分批打包的缓存(buffer
) 操作。 如果订阅者请求一个 buffer,那么对于源头来说可以生成 10 个元素. 一些操作符还实现了 "提前获取" 的策略, 可以避免 request(1)
往返, 并且如果在需要之前生产这些元素的成本不是太高则是有益的
这样能够将 "推送" 模式转换为 "推拉混合" 的模式, 如果下游准备好了, 可以从上游拉取 n 个元素; 但是如果上游元素还没有准备好, 下游还是要等待上游的推送
热 (Hot) vs 冷 (Cold)
在 Rx 家族的响应式库中, 响应式流分为“热”和“冷”两种类型, 区别主要在于响应式流如何 对订阅者进行响应:
- 一个 "冷" 的序列, 指对于每一个
Subscriber
都会收到从头开始所有的数据, 包括数据源. 例如, 如果源头包装了一个 HTTP 调用, 对于每一个订阅都会发起一个新的 HTTP 请求 - 一个 "热" 的序列, 指对于一个
Subscriber
只能获取从它开始 订阅 之后 发出的数据. 不过注意, 有些 "热" 的响应式流可以缓存部分或全部历史数据. 通常意义上来说, 一个 "热" 的响应式流, 甚至在即使没有订阅者接收数据的情况下, 也可以 发出数据 (这一点同Subscribe()
之前什么都不会发生”的规则有冲突)
Singnal
在 Reactor 中传递的信号, 分为
- onSubscribe
- onNext
- onError
- onComplete
- onAfterTerminate
- onRequest
- onCancel
核心特性
Flux
包含 0 - n 个元素的异步序列, 返回 0 - n 个元素
Flux<T>
是一个能够发出 0 - n 个元素的标准的 Publisher<T>
, 它会被 "complete" 和 "error" 信号终止. 因此返回结果是一个 value, completion, error
所有的信号事件都是可选的, 如果没有 onNext
事件那么发出的就是一个 空的 无限序列, 如果没有 onCompelte
那么就是一个 无限的 空序列. 就像 Flux.interval(Duration)
生成的 Flux<Long>
, 就是一个无限地周期性发出规律 tick 的时钟序列
Mono
异步的 0 | 1 结果
Mono<T>
是一个特殊的 Publisher<T>
, 它最多发出一个元素, 然后终止于一个 onComplete
信号或一个 onError
信号
注意: 可以使用Mono<Void
> 创建一个可以用于表示 "空的" 只有完成概念的异步处理 (Runnable
) 的 Mono
简单创建和订阅 Flux 和 Mono
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); // 第一个参数是开始的数字, 第二个是元素个数
订阅的 API
方法 | 描述 |
---|---|
subscribe() | 订阅并触发序列 |
subscribe(Consumer<? super T> consumer) | 对每个生产的元素进行处理 |
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) | 处理每个数据, 也对异常做出应对 |
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer) | 处理每个数据和异常同时在成功完成时执行代码 |
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer) | 处理每个数据 & 异常 & 完成时回调的同时处理调用 subscribe 产生的 Subscription |
以上方法都会返回一个 Subscription
, 如果不需要更多的元素你可以用过此对象来取消订阅. 一旦取消订阅, 源会停止生产元素并且清楚它创建的所有资源. 这种 cancel-and-clean-up 行为定义在通用 Disposable
接口在中
subscribe 方法示例
产生 1 - 3, 3 个整型值
Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe();
产生三个值并打印
Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe(i -> System.out.println(i));
处理错误
Flux<Integer> ints = Flux.range(1, 4)
.map(i -> {
if (i <= 3) return i;
throw new RuntimeException("Got to 4");
});
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error));
添加完成后处理
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> {System.out.println("Done");});
包含一个自定义的 Subscriber
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);
public static class SampleSubscriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
@Override
protected void hookOnNext(T value) {
System.out.println("Next: " + value);
request(1);
}
}
request(n)
能够在任何 hook 中,通过 subscription 向上游传递 背压请求。这里我们在开始这个流的时候请求 1 个元素值, 并每次请求一个元素
自定义的 Subscriber
推荐继承 BaseSubsriber
来实现. 这个类提供了一个些 hook 方法, 可以重写它们来调整 subscriber 的行为. 默认情况下, 它会触发一个无限个数的请求, 但是当你想自定义请求元素的个数的时候, 扩展 BaseSubscriber
可以方便地实现背压
扩展时通常至少要覆盖 hookOnSubscribe(Subscription subscription)
和hookOnNext(T value)
两个方法. 建议同时重写 hookOnError
, hookOnCancel
, 以及hookOnComplete
方法. 最好也重写 hookFinally
方法
取消订阅
使用 subscribe()
方法及其变体返回的 Dispoable
类型来进行取消
对于 Flux
或 Mono
, 取消是一个源停止生产元素的信号. 然而, 他并不保证是立刻的: 一些源可能生产元素非常快以至于他们甚至可以在取消操作之前完成
一些围绕 Disposable
的工具, 可以通过 Disposables
类使用. 在这些之中, Disposables.swap()
创建一个 Disposable
包装器, 让你可以自动取消和替换具体的 Disposable
. Disposables.composite(...)
可以让你收集一些 Disposable
实例, 与服务调用的多个正在执行的请求并稍后立即处理这些请求
可编程式创建序列
以编程的方式定义其关联的事件 (onNext
, onError
, onComplete
)
同步 Generate
这是一种 同步地, 逐个地(一对一) 产生值的方法, 意味着 sink 是一个 SychronousSink
而其 next()
方法在每次回调的时候最多只能被调用一次. 可以调用 error(Throwable)
和 complete()
, 也可不调用
可以保留一个状态, 可以在 sink 的使用中参考这个状态来决定下一步发出什么
生成 1- 10 的序列
Flux.generate(() -> 1, (state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
}).subscribe(System.out::println);
如果状态对象需要清理资源, 使用 generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
带 stateConsumer
的方法签名, 这个 Consumer
会在序列终止是被调用
Flux.generate(() -> 1, (state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
}, System.out::println).subscribe(System.out::println);
如果 state 使用了数据库连接等其他需要清理的资源, 这个 Consumer
就可以用来最后关闭连接清理资源等
异步多线程 Create
这是一个更高级的创建 Flux
的方式, create
既可以是同步的也可以是异步的, 并且还可以每次发出多个元素
与 generate
不同的是, create
不需要状态值, 另一方面, 它可以在回调中出发多个事件 (即使是在未来某个时间)
注意:
create
不能使代码并行化, 也不能使其异步化, 即使它可以与异步的 API 一起使用. 如果create
lambda 中阻塞了, 则会使自己陷入死锁和类似副作用中. 即使使用了subscribeOn
, 也许要谨慎, 即长时间的阻塞的create
lambda (比如无限循环调用sink.next(t)
) 会锁住管道: 这些请求永远不会执行, 因为循环会耗尽运行它们的线程使用
subscribeOn(Scheduler, false)
的形式:requestOnSeaprateThread = false
将使用Scheduler
线程来进行create
. 并且仍然可以通过原始的线程中执行request
来让数据流动
create
有个好处就是可以将现有的 API 转为响应式, 比如监听器的异步方法
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
以上的代码桥接了 MyEventListener
overflowStrategy
INGORE
: 完全忽略下游的背压请求, 这可能在下游队列积满时导致IllegalStateException
ERROR
: 当下游跟不上节奏的时候发出一个IllegalStateException
的错误信号DROP
: 当下游没有准备好接受新的元素的时候抛弃这个元素LATEST
: 让下游只得到上游最新的元素BUFFER
: 默认策略, 缓存所有下游没有来得及处理的元素 (这是一个不限大小的缓存, 可能导致OutOfMemoryError
)
相应的, 在
Mono
中也有一个create
, 但不能生成多个元素, 因此会抛弃第一个元素后的所有元素
异步单线程 push
push
介于 generate
和 create
之间, 适合处理来自单个生产者的事件. 与 create
类似, push
也可以是异步的, 并能够使用各种溢出策略 (overflow strategies) 管理背压. 但是, 每次只能有一个生产线程可以调用 next
, complete
或 error
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
public void processError(Throwable e) {
sink.error(e);
}
});
});
混合式推/拉模型
大多数响应式操作符都遵循混合的 推/拉 模式. 尽管大多数处理都是异步的 (建议使用 push 方法), 但也有一个小的 pull 组件: request
从源来看, 消费者从源 pull 数据, 直到第一次请求之前, 它不会发出任何东西. 只要有可用的数据时, 源回向消费者 pull 数据, 但会在请求范围内
push()
和 create()
都允许设置 onRequest
Consumer, 以便于管理请求量, 并确保只有当有待处理请求时, 才将数据推送到 sink 中
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
// 传递异步到达的消息
sink.next(s);
}
}
});
sink.onRequest(n -> {
// 请求时轮询消息
List<String> messages = myMessageProcessor.getHistory(n);
for(String s : message) {
sink.next(s);
}
});
});
push()
和 create()
中的 sink
, 支持设置 onDispose
和 onCancel
, 在取消或终止时执行清理
onDispose
可以用于 Flux
完成, 错误或取消时执行的清理
onCancel
可以用于在使用 onDispose
执行清理之前, 执行任何特定于取消的操作
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
Handle
handle
方法是一个实例方法, 所以必须使用在一个现有的源
这个方法类似于 generate
, 使用 SynchronousSink
并只允许逐个的发出 . 但 handle
与其不同的是 handle
可以从每个源元素中生成任意一个值, 可能会跳过一些元素, 于是便可作为 map
和 filter
的组合
有一个例子. 响应式规范不允许序列中的值为 null
, 但是你想使用一个预先存在的方法作为 map 函数来执行 map
, 而这个方法有时候返回 null
该怎么办?
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
可以使用 handle
来删除任何空值
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
线程和调度器
Reactor 是 可并发的(concurrency-agnostic)
获得 Flux
和 Mono
不一定意味着它要在特定的线程运行. 相反, 大多数操作符继续在上一个操作符执行的 Thread
中工作. 除非指定, 否则最上面的操作符 (源) 本身运行在调用了 subscribe()
的 Thread
上
下面的示例在一个新的线程上运行 Mono
:
public static void main(String[] args) throws InterruptedException {
final Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
)
t.start();
t.join();
}
以上的 Mono
在 main 线程中组装, 但是它在 Thread-0
中完成订阅, 因此操作符的回调实际上是在 Thread-0
中运行
在 Reactor 中, 执行模型和执行的位置由使用的 Scheduler
决定. Scheduler 具有类似于 ExecutorService
的调度职责, 但有一个专门的抽象使其可以做更多的事情, 尤其是作为一个时钟和更广的范围实现 (虚拟时间测试, 波动或立即调度等)
Schedulers
类又可以访问执行上下文的静态方法:
Schedulers.immediate()
无执行上下文: 在处理时, 提交的Runnable
将直接执行, 有效地在当前的Thread
上运行它们 (可以视为 "空对象" 或无操作的Scheduler
)Schedulers.single()
一个单一可重用的线程: 此方法对所有调用者会重用相同的线程, 直到调度器销毁掉. 如果希望每次调用都有一个特定的线程, 使用Schedulers.newSingle()
Schedulers.elastic()
一个无边界的弹性线程池: 此方法已不再是首选, 使用Schedulers.boundedElastic()
代替Schdulers.boundedElastic()
一个有边界的弹性线程池: 这是一个提交阻塞处理自身线程方法, 这样他就不会占用其他资源. 在阻塞 I/O 中这是一个更好的选择. 从 3.6.0 开始, 这个方法可以根据设置提供两种不同的实现:- 基于
ExecutorService
. 这个实现类似前身elastic()
, 会根据需要创建新的和重用的线程池. 线程池闲置太久会被销毁 (默认 60s). 与elastic()
不同, 它可以创建的后台线程数量有限 (默认为 CPU 核心数 *10). 在到达上限后提交的 最多100 000 个任务将被排队, 并在线程变得可用时重新被调度 (如果调度有延迟, 这个延迟会开始在线程变得可用时) - 基于
Thread-per-task
(一个任务一个线程), 设计来在虚拟线程上运行. 要使用这个实现应用程序需要运行在 Java 21+ 环境下并且将reactor.schedulers.defaultBoundedElasticOnVirtualThreads
系统属性设置为true
. 一旦设置,Schedulers.boundedElastic()
返回一个BoundedElasticScheduler
的特定实现, 专门用于在虚拟线程上运行每个任务
- 基于
Schedulers.parallel()
固定线程池: 创建与 CPU 核心数相同的线程
此外还可以用现有的 ExecutorService
通过 Schedulers.fromExecutorService(ExecutorService)
来创建一个 Scheduler
(可以但不建议使用 Executor
创建)
也可以通过使用 newXXX
方法创建各种调度器类型的新实例. 例如 Schedulers.newParaleel(name)
注意: 虽然
boundedElastic
是为了在无法避免的情况下帮助处理遗留的阻塞代码, 但single
和parellel
不是. 因此, 在 Reactor 阻塞 API (在默认的单调度器和并行调度器内block()
,blockFirst
,𰻞ockLast()
以及迭代toIterable()
或toStream()
) 会导致IllegalStateException
通过创建
NonBlocking
标记接口的Thread
实例, 自定义的Scheduler
也可以被标记为NonBlocking
一旦操作符使用 Schedulers
中特定的调度器. 例如 Flux.interval(Duration.ofMillis(300))
工厂方法会每隔 300 ms 产生一个 Flux<Long>
. 默认情况下, 这是由 Schedulers.parallel()
启用的
通过下面的代码指定 Schduler
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor 提供了两种方式来切换响应式链中的执行上下文 (或者调度器) : publishOn
and subscribeOn
. 两种方式都会获取一个 Scheduler
并且将执行上下文切换到这个调度器, 但是 publishOn
在链中的放置位置很重要而 subscribeOn
则可随意
当使用链接操作符时, 可以根据需要在内部封装尽可能多的 Flux
和 Mono
实现. 一旦订阅, 一个 Subscriber
对象的链就会被创建出来, 向后 (沿着链向上) 到第一个生产者
publishOn 方法
publishOn
应用在订阅链的中间位置. 他接收来自上游的信号, 并在下游重放, 同时在相关的 Scheduler
中的某个 worker 执行回调. 因此他影响到后续操作符的执行 (直到链中的另一个 publishOn
)
- 将执行上下文改为由
Scheduler
选择的一个Thread
- 根据规范,
onNext
依次调用, 因此这会占用单个线程 - 除非他们工作在特殊的
Scheduler
上,publishOn
之后的操作符继续在相同的线程上执行
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.publishOn(s)
.map(i -> "value " + i);
new Thread(() -> flux.subscribe(System.out::println));
subscribeOn 方法
当向下的链被构造时, subscribeOn
适用与在订阅上处理. 通常建议放在源数据之后, 因为中间的操作可能会影响执行上下文
这个方法不会影响在随后调用的 pushlishOn
- 改变整个链的所订阅的
Thread
- 从
Scheduler
中选择一个线程
只有最接近下游的
subscribeOn
调用才会有效的将订阅和请求信号调度到可以拦截他们的源或操作符 (doFirst
,doOnRequest
). 使用多个subscibeOn
是不必要的
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);
new Thread(() -> flux.subscribe(System.out::println));
错误处理
在响应式流中, 错误是终结事件. 一旦错误发生, 他就会定制序列并沿着运算符链传播到最后一步, 即定义的订阅者及其 onError
方法
此类错误需要在应用程序层面上被处理. 对于实例, 你可以在 UI 上显示一个错误通知或者在 REST endpoint 发送一个有意义的错误载荷. 因此, 订阅者的 onError
方法总是需要被定义
如果没有定义,
onError
会抛出一个UnsupportedOperationException
. 你可以在Exceptions.isErrorCallbackNotImplemented
方法进一步检测和分类它
Reactor 还提供了处理链中间处理错误的替代方法, 即错误操作符:
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
错误处理操作符
静态备用值
onErrorReturn
等效于 "捕获并返回一个默认静态值"
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
你还可以选择在异常上应用一个 Predicate
来决定是否恢复
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
备用方法
如果想有多个默认值, 并且有其他的 (更安全的) 方式处理数据, 可以使用 `onErrorResume
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
动态备用值
如果没有其他的数据处理方式并且要从异常中计算出一个备用值
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
捕获并重新抛出异常
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
可以有一个更直接的方法:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
延迟处理 (Log or React on the Side)
如果想让错误继续传播, 但仍想在不修改序列的情况下对错误做出响应 (例如记录日志), 可以使用 doOnError
操作符
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
doOnError
操作符以及所有以 doOn
为前缀的操作符, 有时被称为 side-effect
. 它们可以让你在不修改序列时间的情况下窥视到序列内部的事件
下面的例子仍会传递错误, 但至少可以确保日志记录
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
使用 Resources 和 Final 块
使用 using
和 doFinally
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1); // 在发射一个元素后取消
重试
使用 retry
它通过重新订阅上游的 Flux
来工作. 实际上是一个不同的序列, 原始序列仍然是终止的
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed() // 将每个值与前一个值发出后的持续时间关联起来
.subscribe(System.out::println, System.err::println);
Thread.sleep(2100);
使用 retryWhen
使用一个 "伴生" Flux
去判断特定 error 是否应该重试. 这个伴生 Flux
被操作符创造但是被用户装饰, 目的是自定义重试条件
他接收一个 Retry
策略/函数. Retry
类是一个抽象类, 其中提供了各种工厂方法, 例如 Retry.from(Function)
重试周期如下:
- 每次发生错误时 (提供重试的可能性),
RetrySinal
都会被发送到伴生的Flux
, 而这个Flux
已经被函数装饰过了. 这里的Flux
可以看到至今所有的尝试.RetrySinal
可以访问错误及其周围的元数据 - 如果伴生
Flux
发射一个值, 就会发生一次重试 - 如果伴生
Flux
完成, 则错误被吞掉, 重试循环停止, 结果序列也完成 - 如果伴生的
Flux
产生一个 error (e), 重试周期停止并产生带有 error (e) 的序列
前两中情况区分很重要. 只需要完成伴生对象就可以有效的消除错误. 以下使用 retryWhen
模拟 retry(3)
的方法
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException())
.doOnError(System.out::println)
.retryWhen(Retry.from(companion ->
companion.take(3))); // 只重试 3 个 error
实际上, 前面的示例会生成一个空 Flux
, 但它成功完成. 由于同一个 Flux
上的 retry(3)
会以最近的错误而终止, 因此 retryWhen
示例于 retry(3)
并不完全相同
以下例子使行为完全相同
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion ->
companion.map(rs -> {
if (rs.totalRetries() < 3) return rs.totalRetries();
else throw Exceptions.propagate(rs.failure());
})
));
以上也可以使用 errorFlux.retryWhen(Retry.max(3))
代替
core 提供的 Retry
辅助工具, RetrySepc
和 RetryBackoffSpec
, 都允许进行高级定制, 如:
- 为可以出发重试的异常设置
filter(Predicate)
- 通过
modifyErrorFilter(Function)
修改这样一个先前设置的过滤器 - 触发重试触发器 (即延时前后的回退) 等 side-effect, 前提是重试有效 (
doBeforeRetry
和doAfterRetry
是附加的) - 在重试触发器周围触发一个异步的
Mono<Void>
, 这允许在基本延迟上添加异步行为, 从而进一步延迟触发器 (doBeforeRetryAsync
和doAfterRetryAsync
是附加的) - 在达到最大尝试次数的情况下, 通过
onRetryExhaustedThrow(BiFunction)
自定义异常. 默认情况下, 使用Exceptions.retryExhausted(...)
, 可以通过Exceptions.isRetryExhausted(Throwable)
来区分 - 激活暂时性错误处理
暂时性错误重试
一些长期存在的源可能会出现偶尔的错误爆发, 然后在一段较长的时间里一切正常. 这就是暂时性错误 (transient errors)
RetrySingal
接口代表 retryWhen
的状态, 他有有一个 totalRetriesInARow()
值可以用于此处. 不同于通常单调递增的 totalRetries()
索引, 每次重试恢复错误时 (即, 当重试尝试导致传入 onNext
而不是再次出现 onError
时), 这个副索引都会重置为 0
当在 RetrySpec
或 RetryBackoffSpec
中设置 transientErrors(boolean)
配置为 true
时, 结果策略会利用 totalRetriesInARow()
, 有效的处理暂时性错误. 这些规范根据索引计算重试模式, 因此实际上规范的所有其他配置参数都独立地应用于每个错误突发时
final AtomicInteger errorCount = new AtomicInteger();
final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
Flux.generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) {
sink.next(i);
sink.complete();
} else if (i % 3 == 0) {
sink.next(i);
} else {
sink.error(new IllegalStateException("Transient error at " + i));
}
});
Flux<Integer> transientFlux = httpRequest.get()
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
.blockLast();
assertThat(errorCount).hasValue(6);
如果没有 transientErrors(true)
, 在第二次突发错误时, 将达到最大重试次数 2, 并且在发出 onNext(3)
后, 序列将失败
处理操作符或函数中的异常
如果调用了一些声明了 throws
异常的方法 (例如大部分 IO API 和线程 API), 依然需要在 try-catch
中处理这些异常. 有几种选择:
- 捕获异常并从中恢复. 序列正常进行
- 捕获异常, 将其等装成一个 unchecked 异常, 并抛出 (中断序列).
Exceptions
工具类可以解决这个问题 - 如果需要返回一个
Flux
(例如在flatMap
中), 那么就用一个产生错误的Flux
来封装异常, 例如return Flux.error(checkedException)
. (这个序列也会终止)
Exceptions
工具类可以用来确保只有当异常为 checked
异常时才会被包装:
- 如果有必要, 使用
Exceptions.propagate
方法来包装异常. 它会先调用throwIfFatal
并且不会包装RuntimeException
- 使用
Exceptions.unwrap
方法可以获取原始的未包装的异常 (回到响应式特定异常的层次结构的根源)
比如以下方法, 可能会抛出 IOException
异常:
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
假设在 map
中调用此方法, 必须显示的捕获该异常, 且 map
函数不能重新抛出. 可以将其作为 RuntimeException
异常传播到 map
的 onError
方法中, 如下:
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
此后, 当订阅前面的 Flux
并对错误做出响应时, 如果你想对 IO 异常做一些特殊处理, 可以将其还原到原始状态:
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
Sinks
在 Reactor 中, sink 是一个类, 它允许以独立的方式安全地手动触发信号, 创建一个类似 Publisher
的结构, 能够处理多个 Subscriber
(unicast()
(单播) 风格除外)
多线程下使用 Sinks.One 和 Sinks.Many 安全地生产
reactor-core 公开的默认 Sinks
课确保检测到多线程使用, 并且从下游订阅者的角度来看, 不会导致规范违规或未定义的行为. 当使用 tryEmit*
API 时, 并行调用很快就会失败. 当使用 emit*
API 时, 提供的 EmissionFailureHandler
可以允许重试争用 (例如忙等待), 否则 sink 会因错误终止
Sinks
构造器为主要支持的生产者类型提供了向导 API
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
多个生产者线程可以同时在 sink 上生成数据:
//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);
//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));
//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);
当使用
busyLooping
时, 要注意返回的EmitFailureHandler
实例不能被重用, 例如, 每个emitNext
应该调用一次busyLooping
. 此外, 建议使用 100 ms 以上的超时, 因为较小的值没有实际意义
Sink.Many
可以作为 Flux
呈现给下游消费者:
Flux<Integer> fluxView = replaySink.asFlux();
fluxView
.takeWhile(i -> i < 10)
.log()
.blockLast();
类似地, Sinks.Empty
和 Sinks.One
可以通过 asMono
方法视为 Mono
Sinks
的种类:
many().multicast()
: 仅将新推送的数据传输给其订阅者, 尊重他们的背压 (新推送的数据: "订阅者订阅后")many().unicast()
: 跟上面一样, 但在第一个订阅者注册之前推送的数据会被缓存many().replay()
: 它将向新的订阅者们重播指定历史大小的推送数据, 然后继续实时推送新数据one()
: 向订阅者们播放一个元素的 sinkempty()
: 向订阅者们播放一个终结信号的 sink, 但是仍然可以被视为Mono<T>
可用的 Sinks 概述
Sinks.many().unicast().onBackpressureBuffer(args?)
单播 Sinks.Many
, 可以通过使用内部的缓冲区来处理背压. 只能有一个订阅者
一个基础的单播 sink 可以通过 Sinks.many().unicast().onBackpressureBuffer()
创建. 但是 Sinks.many().unicast()
中还有一些额外的 unicast
静态工厂方法, 允许进行更精细的调整
例如, 默认情况下, 它是无界的: 如果通过它推送任意数量的数据, 而其订阅者尚未请求数据, 他会缓存所有数据. 你可以通过在 Sinks.many().unicat().onBackPressureBuffer(Queue)
工厂方法中为内部缓存提供自定义队列实现来更改此设置. 如果该队列有界, 则当缓存已满且没有收到来自下游的足够请求时, sink 会拒绝推送值
Sinks.many().multicast().onBackpressureBuffer(args?)
多播 Sinks.Many
, 可以向多个订阅者发出消息, 同时为多个订阅者提供背压. 订阅者们在订阅后仅接受通过 sink 套送的信号
默认情况下, 如果所有订阅者被取消, 它会清楚自身内部的缓存并且停止接受新的订阅者. 你可以使用 Sinks.many().multicast()
下的多播静态工厂方法中的 autoCancel
参数来调整
Sinks.many().multicast().directAllOrNothing()
多播 Sinks.many
, 具有简单的背压处理: 如果任何订阅者太慢 (需求为 0), 则所有订阅者的 onNext
都会被丢弃
然而, 慢速订阅者不会被终止, 一旦慢速订阅者再次开始请求, 所有订阅者都将恢复接收从那里推送的元素
一旦 Sinks.Many
终止 (通常是通过调用它的 emitError(Throwable
或 emitComplete()
方法), 他就会让更多订阅者订阅, 但会立即向他们重放终止信号
Sinks.many().multicast().directBestEffort()
多播 Sinks.Many
, 具有尽力的背压处理: 如果一个订阅者太慢 (需求为 0), 则仅针对这个慢速订阅者丢弃 onNext
然而, 慢速订阅者不会被终止并且一旦他们重新开始请求, 他们将恢复接收新推送的元素
一旦 Sinks.Many
终止 (通常是通过调用它的 emitError(Throwable
或 emitComplete()
方法), 他就会让更多订阅者订阅, 但会立即向他们重放终止信号
Sinks.many().replay()
重放 Sinks.Many
, 缓存发出的元素并重放它们给新的订阅者
有多种方式创建:
- 有限历史大小的缓存
Sinks.many().replay().limit(int)
, 或者无限Sinks.many().replay().all()
- 基于时间的重播窗口
Sinks.many().replay().limit(Duration)
- 组合历史大小和时间窗口的缓存
Sinks.many().replay().limit(int, Duration)
还有更多包括缓存单个元素的变体 latest()
和 latestOrDefault(T)
Sinks.unsafe().many()
高级用户考虑使用, 提供相同的 Sinks.Manay
工厂, 而没有生产者线程安全性. 每个 sink 的开销会更少, 因为线程安全的 sink 必须检测多线程访问
库开发人员不应该用不安全的 sink, 但可以在受控的调用环境中内部使用它们, 在该环境中, 他们可以根据反应流规范确保导致 onNext
, onComplete
和 onError
信号的调用的外部同步
Sinks.one()
这个方法直接构造一个简单的 Sinks.One<T>
实例
可以作为 Mono
来查看, 并且具有稍微不同的发射方式, 以更好地传达这种类似 Mono
的语义:
emitValue(T value)
生成一个onNext(value)
信号并且在大多数实现中还出发一个隐式onComplete()
emitEmpty()
生成一个 独立的onComplete()
信号, 旨在生成一个空Mono
的等价物emitError(Throwable t)
生成一个onError(t)
信号
Sinks.one()
接受对这些方法中任何一个的一次调用, 有效地生成一个 Mono
, 该 Mono
要么完成一个值, 要么完成为空, 要么失败
Sinks.empty()
这个方法直接构造一个简单的 Sinks.Empty<T>
实例. 类似 Sinks.One<T>
, 只不过它不提供 emitValue
方法
结果只能生成一个完成为空或者失败的 Mono