使用RxJava 2将同步和异步API转换为Observable

2023/05/10

1. 概述

在本教程中,我们将学习如何使用RxJava 2运算符将传统的同步和异步API转换为Observables

我们将创建一些简单的函数来帮助我们详细讨论这些运算符。

2. Maven依赖

首先,我们必须添加RxJava2RxJava 2 Extensions作为Maven依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>
<dependency>
    <groupId>com.github.akarnokd</groupId>
    <artifactId>rxjava2-extensions</artifactId>
    <version>0.20.4</version>
</dependency>

3. 运算符

RxJava 2为响应式编程的各种用例定义了大量运算符

但我们只会讨论一些常用于根据其性质将同步或异步方法转换为Observable的运算符。这些运算符将函数作为参数并发出从该函数返回的值

除了普通的运算符,RxJava 2还定义了一些用于扩展功能的运算符。

让我们探讨如何利用这些运算符来转换同步和异步方法。

4. 同步方法转换

4.1 使用fromCallable()

该运算符返回一个Observable,当订阅者订阅它时,它会调用作为参数传递的函数,然后发出从该函数返回的值。让我们创建一个返回整数并转换它的函数:

AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();

现在,让我们将它转换成一个Observable并通过订阅它来测试它:

Observable<Integer> source = Observable.fromCallable(callable);

for (int i = 1; i < 5; i++) {
    source.test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(i);
    assertEquals(i, counter.get());
}

每次订阅包装的Observable时,fromCallable()运算符都会延迟执行指定的函数。为了测试这种行为,我们使用循环创建了多个订阅者。

由于响应式流在默认情况下是异步的,因此订阅者将立即返回。在大多数实际场景中,Callable函数都会有某种延迟才能完成其执行。因此,我们在测试Callable函数的结果之前添加了最长5秒的等待时间

另请注意,我们使用了Observable的test()方法。此方法在测试Observables时很方便。它创建一个TestObserver并订阅了我们的Observable。

4.2 使用start()

start()运算符是RxJava 2 Extension模块的一部分。它将异步调用指定的函数并返回一个发出结果的Observable:

Observable<Integer> source = AsyncObservable.start(callable);

for (int i = 1; i < 5; i++) {
    source.test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1);
    assertEquals(1, counter.get());
}

该函数会立即调用,而不是在订阅者订阅生成的Observable时调用。对此Observable对象的多个订阅会观察到相同的返回值

5. 异步方法转换

5.1 使用fromFuture()

众所周知,在Java中创建异步方法的最常见方法是使用Future实现,fromFuture方法将Future作为其参数并发出从Future.get()方法获得的值。

首先,让我们将之前创建的函数设为异步函数:

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);

接下来我们通过转换它来进行测试:

Observable<Integer> source = Observable.fromFuture(future);

for (int i = 1; i < 5; i++) {
    source.test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1);
    assertEquals(1, counter.get());
}
executor.shutdown();

再次请注意,每个订阅都观察到相同的返回值

现在,Observable的dispose()方法在防止内存泄漏方面非常有用。但在这种情况下,由于Future.get()的阻塞性质,它不会取消Future。

因此,我们可以通过结合源Observable对象的doOnDispose()函数和future的cancel方法来确保取消future:

source.doOnDispose(() -> future.cancel(true));

5.2 使用startFuture()

顾名思义,此运算符将立即启动指定的Future并在订阅者订阅时发出返回值。与缓存结果以供下次使用的fromFuture运算符不同,此运算符将在每次被订阅时执行异步方法

ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));

for (int i = 1; i < 5; i++) {
    source.test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(i);
    assertEquals(i, counter.get());
}
executor.shutdown();

5.3 使用deferFuture()

此运算符聚合从Future方法返回的多个Observable,并返回从每个Observable获得的返回值流。每当新订阅者订阅时,这将启动传递的异步工厂函数。

因此,让我们首先创建异步工厂函数:

List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), 
    counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);

然后我们可以做一个快速测试:

Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
    source.test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1,2,3);
}
exec.shutdown();

6. 总结

在本教程中,我们学习了如何将同步和异步方法转换为RxJava 2 Observable对象。

当然,我们在这里展示的示例是基本实现。但是我们可以将RxJava 2用于更复杂的应用程序,例如视频流和需要分段发送大量数据的应用程序。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章