本文最后更新于 2 年前,文中所描述的信息可能已发生改变。
java响应式编程库SmallRye Mutiny学习
Uni.createFrom().item("hello")
.onItem().transform(item -> item + " world mutiny")
.onItem().transform(String::toUpperCase)
.subscribe().with(item -> System.out.println("item = " + item));
- 与上面等价,但是不推荐,推荐链式调用
Uni<String> uni1 = Uni.createFrom().item("hello");
Uni<String> uni2 = uni1.onItem().transform(item -> item + " mutiny");
Uni<String> uni3 = uni2.onItem().transform(String::toUpperCase);
uni3.subscribe().with(item -> System.out.println(">> " + item));
订阅的不是变换后的结果
Uni<String> uni = Uni.createFrom().item("hello");
uni.onItem().transform(item -> item + " world mutiny");
uni.subscribe().with(item -> System.out.println("item = " + item));
创建item
1.Creating Unis from items
1、根据已知的值创建Uni
Uni<Integer> uni4 = Uni.createFrom().item(1);
2、根据Supplier
创建Uni
AtomicInteger atomicInteger = new AtomicInteger();
Uni<Integer> uni6 = Uni.createFrom().item(atomicInteger::incrementAndGet);
uni4.subscribe().with(item -> System.out.println("item = " + item), failure -> System.out.println("failure = " + failure));
Creating failing Unis
3、根据已知的异常创建Uni
Uni<Integer> failure = Uni.createFrom().failure(new Exception("boom"));
Uni<Integer> failure1 = Uni.createFrom().failure(() -> new Exception("boom"));
Creating Uni<Void>
When the represented operation to not produce a result, you still need a way to indicate the operation’s completion. For this, you need to emit a null item:
Uni<Void> voidUni = Uni.createFrom().voidItem();
Uni<Object> nullUni = Uni.createFrom().nullItem();
creating Unis using an emitter
可以使用emitter创建。此方法在集成基于回调的API时非常有用: emitter也可以发送故障。它还可以收到取消通知,例如停止正在进行的工作。
Uni<Object> emitterUni = Uni.createFrom().emitter(emitter -> emitter.complete("hello"));
Creating Unis from CompletionStage
Uni可以从CompletionStage创建 You can also Uni objects from CompletionStage / CompletableFuture. This is useful when integrating with APIs that are based on these types:
Uni<String> uni7 = Uni.createFrom().completionStage(stage);
// 可以使用下面的创建CompletionStage
CompletableFuture<String> completionStage = uni.subscribe().asCompletionStage();
Uni.createFrom().completionStage(completionStage);
创建items
Creating Multi pipelines
Multi表示数据流。流可以发出0、1、n或无限多个项。
- emits 0…n items
- emits a failure event
- emits a completion event 为有界流发出完成事件
Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().transform(i -> i * 2)
.select().first(3)
.onFailure().recoverWithItem(0)
.subscribe().with(System.out::println);
// 订阅时,可以传递一个项回调(在发出项时调用),
// 或传递两个回调,一个接收项,一个接收失败,
// 或传递三个回调,分别处理项、失败和完成事件。
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4)
.onItem().transform(i -> i + 2)
.select().first(2);
// Note the returned Cancellable: this object allows canceling the stream if need be.
// 通过Cancellable对象可以取消流
Cancellable cancellable = multi
.subscribe().with(
System.out::println,
failure -> System.out.println("Failed with " + failure),
() -> System.out.println("Completed"));
Multi<Integer> multiFromItems = Multi.createFrom().items(1, 2, 3, 4);
Multi<Integer> multiFromIterable = Multi.createFrom().iterable(Arrays.asList(1, 2, 3, 4, 5));
// You can also use Suppliers
AtomicInteger counter = new AtomicInteger();
Multi<Integer> integerMulti = Multi.createFrom()
.items(() ->
IntStream.range(counter.getAndIncrement(), counter.get() * 2).boxed());
// You can create ranges using
Multi<Integer> multiFromRange = Multi.createFrom().range(1, 10);
Creating failing Multis
Multi<Integer> multiFromFailure = Multi.createFrom().failure(new Exception("boom"));
Multi<Integer> multiFromFailure1 = Multi.createFrom().failure(() -> new Exception("boom"));
Unlike Uni
, Multi
streams don’t send null items (this is forbidden in reactive streams). 相反,流发送完成事件,指示没有更多的项要消费。当然,即使没有项,也会发生完成事件,从而创建一个空流。 您可以使用以下命令创建这样的流:
Multi<Object> multiFromEmpty = Multi.createFrom().empty();
Creating Multis using an emitter(advanced)
You can create a Multi
using an emitter. This approach is useful when integrating callback-based APIs: 发射器也可以发送故障。它还可以收到取消通知,例如停止正在进行的工作。
Multi.createFrom().emitter(emitter -> {
emitter.emit("hello");
emitter.emit("world");
emitter.complete();
});
Creating Multis
from ticks(advanced)
You can create a stream that emit a ticks periodically: 您可以创建定期发出的流:
Multi.createFrom().ticks().every(Duration.ofSeconds(1));
Creating Multis
from a generator (advanced)
You can create a stream from some initial state, and a generator function: 您可以从一些初始状态和生成器函数创建流:
Multi.createFrom().generator(() -> 1, (n, emitter) -> {
int next = n + (n / 2) + 1;
if (n < 50)
emitter.emit(next);
else
emitter.complete();
return next;
});
/**
* The initial state is given through a supplier (here () -> 1). The generator function accepts 2 arguments:
* 初始状态通过供应商(此处)给出。generator函数接受两个参数:
* 1.the current state, 当前状态,
* 2.an emitter that can emit a new item, emit a failure, or emit a completion.
* 可以发出新项、发出失败或发出完成的发射器。
* The generator function return value is the next current state. Running the previous example gives the following number suite: {2, 4, 7, 11, 17, 26, 40, 61}.
* 生成器函数返回值是下一个当前状态。运行上一个示例会得到以下数字组: {2, 4, 7, 11, 17, 26, 40, 61} .
*/
The invoke method
The invoke method is synchronous and the passed callback does not return anything. Mutiny
invokes the configured callback when the observed stream dispatches the event: invoke方法是同步的,传递的回调不返回任何内容。当观察到的流分派事件时,Mutiny
会调用配置的回调:
multi.onItem()
.invoke(n -> System.out.println("响应式流:" + n))
.subscribe().with(item -> System.out.println("item = " + item), failure -> System.out.println("failure = " + failure), () -> System.out.println("完成"));
如上所述,是同步的。Mutiny
调用回调,并在回调返回时将事件向下游传播。它阻止了调度。 Of course, we highly recommend you not to block.
The following snippets show how you can log the different types of events. 以下代码片段显示了如何记录不同类型的事件。
Multi<Integer> invoke = multi
.onSubscription()
.invoke(() -> System.out.println("⬇️ Subscribed"))
.onItem()
.invoke(i -> System.out.println("⬇️ Received item: " + i))
.onFailure()
.invoke(f -> System.out.println("⬇️ Failed with " + f))
.onCompletion()
.invoke(() -> System.out.println("⬇️ Completed"))
.onCancellation()
.invoke(() -> System.out.println("⬆️ Cancelled"))
.onRequest()
.invoke(l -> System.out.println("⬆️ Requested: " + l));
When observing the failure event, if the callback throws an exception, Mutiny
propagates a CompositeException aggregating the original failure and the callback failure. 当观察到失败事件时,如果回调抛出异常,则Mutiny
会传播一个CompositeException,聚合原始失败和回调失败。
invoke.subscribe().with(
item -> System.out.println("item = " + item),
failure -> System.out.println("failure = " + failure),
() -> System.out.println("完成")
);
The call method
Unlike invoke
, call is asynchronous, and the callback returns a Uni<?> object
. 与invoke
不同,call
是异步的,回调返回一个Uni<?>
对象。 call is often used when you need to implement asynchronous side-effects, such as closing resources. 当您需要实现异步副作用(例如关闭资源)时,通常会使用call
。 Mutiny does not dispatch the original event downstream until the Uni returned by the callback emits an item: 在回调返回的Uni
发出项之前,Mutiny
不会将原始事件分派到下游: call() 的正常返回值不被使用(只有异常会传播到外部流中),只是用来表示异步操作完成,因此适用于异步的副作用操作,比如关闭资源。
multi.onItem().call(i -> Uni.createFrom().voidItem()
.onItem().delayIt().by(Duration.ofMillis(1000))
);
As shown in the previous snippet, you can use this approach to delay items. But, the primary use case is about completing asynchronous actions such as calling an asynchronous close method on a resource: 如前面的代码片段所示,您可以使用此方法延迟项。但是,主要用例是完成异步 操作,例如在资源上调用异步关闭方法:
multi.onCompletion().call(() -> resource.close());
Under the hood, Mutiny
gets the Uni
(by invoking the callback) and subscribes to it. It observes the item or failure event from that Uni
. It discards the item value as only the emission matters in this case. 在幕后,Mutiny
获取Uni
(通过调用回调)并对其进行订阅。它观察来自该Uni
的项或失败事件。它丢弃项值,因为在这种情况下只有发射才重要。 If the callback throws an exception or the produced Uni
produces a failure, Mutiny
propagates that failure (or a CompositeException) downstream, replacing the original event. 如果回调抛出异常或生成的Uni
产生失败,则Mutiny会将该失败(或CompositeException
)传播到下游,替换原始事件。
Summary
The invoke
and call methods are handy when you need to observe a Uni or a Multi without changing the transiting events. 当您需要观察Uni或Multi而不更改传输事件时,invoke
和call方法非常方便。 The invoke
method is synchronous, and the callback does not return anything. invoke
方法是同步的,回调不返回任何内容。 Use invoke
for implementing synchronous side-effects or logging events. 用于实现同步副作用或记录事件。 The asynchronous nature of call makes it perfect for implementing asynchronous side-effects, such as closing resources, flushing data, delay items, and so on. call
的异步特性使其非常适合实现异步副作用,例如关闭资源、刷新数据、延迟项等。
What if the transformation failed If the transformation throws an exception, that exception is caught and passed to the downstream subscriber as a failure event. It also means that the subscriber won’t get further item after that failure.。 如果转换引发异常,则该异常将被捕获并作为失败事件传递给下游订阅者。这也意味着订阅者在此失败之后不会获得更多的项。
Uni.createFrom().item("hello")
.onItem().transform(String::toUpperCase)
.onItem().transform(i -> i + "!");
Multi
- Transforming an item into a Uni
Imagine that you have a Uni<String>
, and you want to call a remote service. 假设您有一个,并且您想调用一个远程服务。 Calling a remote service is an asynchronous action represented by a Uni
, as in: 调用远程服务是一个异步操作,由表示,如下所示:
public static Uni<String> invokeRemoteGreetingService(String name) {
return Uni.createFrom().item("hello " + name);
}
Uni<String> invokeRemoteGreetingService(String name);
To call this service, you need to transform the item received from the first Uni into the Uni returned by the service: 要调用此服务,您需要将从第一个接收到的项转换为服务返回的项:
Uni.createFrom().item("hello")
.onItem().transformToUni(name -> invokeRemoteGreetingService(name))
.subscribe().with(
System.out::println,
failure -> System.out.println("failure = " + failure));
Uni
- Transforming an item into a Multi
将Uni转换为Multi
Uni<String> uni = Uni.createFrom().item("hello");
uni.onItem().transformToMulti(name -> Multi.createFrom().items(name.split("")));
Transforming items from Multi
- the merge vs concatenate dilemma
转换多个项-合并与连接的两难境地 合并或者连接,取决于你的需求,其中合并是无序的,连接是串行(有序)的。
- Merging – it does not preserve the order and emits the items from the produced streams as they come,
- 合并-它不保留顺序,并在生成的流中发出items
- Concatenating – it maintains and concatenates the streams produced for each item
- 连接-它维护并连接为每个项生成的流
Multi
- Transforming an item into a Uni
Multi
转换为Uni
To implement the scenario from the last section, you will use onItem().transformToUniAndMerge ()
or onItem().transformToUniAndConcatenate()
depending on your ordering choice: 根据排序选择:
Multi<String> multi1 = Multi.createFrom().items("1", "2", "3", "4", "5");
multi1.onItem().transformToUniAndMerge(name->invokeRemoteGreetingService(name));
multi1.onItem().transformToUniAndConcatenate(name->invokeRemoteGreetingService(name));
Multi - Transforming an item into a Multi
Multi转换为Multi
multi1.onItem().transformToMultiAndMerge(name->Multi.createFrom().items(name.split("")));
multi1.onItem().transformToMultiAndConcatenate(name->Multi.createFrom().items(name.split("")));