在RxJava中组合Observable

2023/05/10

1. 简介

在这个快速教程中,我们将讨论在RxJava中组合Observables的不同方法。

如果你是RxJava的新手,一定要先看看这个介绍教程

现在,让我们直接进入。

2. Observables

Observable序列,或简称Observables,是异步数据流的表示形式。

这些基于观察者模式,其中称为Observer的对象订阅Observable发出的元素。

订阅是非阻塞的,因为Observer可以对Observable将来发出的任何内容做出反应。这反过来又促进了并发性。

这是RxJava中的一个简单演示:

Observable
    .from(new String[] { "John", "Doe" })
    .subscribe(name -> System.out.println("Hello " + name))

3. 组合Observables

当使用响应式框架进行编程时,组合各种Observables是一个常见的用例。

比如在一个Web应用中,我们可能需要获取两组相互独立的异步数据流。

我们可以同时调用两者并订阅合并的流,而不是等待前一个流完成后再请求下一个流

在本节中,我们将讨论在RxJava中组合多个Observable的一些不同方式,以及每种方法适用的不同用例。

3.1 merge

我们可以使用merge运算符来组合多个Observable的输出,使它们像一个一样:

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();

    Observable.merge(
        Observable.from(new String[] {"Hello", "World"}),
        Observable.from(new String[] {"I love", "RxJava"})
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

3.2 mergeDelayError

mergeDelayError方法与merge相同,它将多个Observables合并为一个,但如果在合并过程中发生错误,它允许无错误的元素在传播错误之前继续

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable.mergeDelayError(
        Observable.from(new String[] { "hello", "world" }),
        Observable.error(new RuntimeException("Some exception")),
        Observable.from(new String[] { "rxjava" })
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("hello", "world", "rxjava");
    testSubscriber.assertError(RuntimeException.class);
}

上面的示例发出所有无错误值

hello
world
rxjava

请注意,如果我们使用merge而不是mergeDelayError,则不会发出字符串“rxjava”,因为merge会在发生错误时立即停止来自Observables的数据流

3.3 zip

zip扩展方法将两个值序列作为对汇集在一起

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
    List<String> zippedStrings = new ArrayList<>();

    Observable.zip(
        Observable.from(new String[] { "Simple", "Moderate", "Complex" }), 
        Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
        (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
        
    assertThat(zippedStrings).isNotEmpty();
    assertThat(zippedStrings.size()).isEqualTo(3);
    assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

3.4 带间隔的压缩

在这个例子中,我们将压缩一个带有间隔的流,这实际上会延迟第一个流元素的发射:

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
        
    Observable
        .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
        .toBlocking().subscribe(testSubscriber);
        
    testSubscriber.assertCompleted();
    testSubscriber.assertValueCount(5);
    testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

4. 总结

在本文中,我们看到了一些将Observables与RxJava结合的方法。你可以在官方RxJava文档中了解其他方法,例如combineLatestjoingroupJoinswitchOnNext

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章