RXJAVA-concat

网友投稿 220 2022-11-24

RXJAVA-concat

concat操作符可以连接俩个Observable,只有第一个Observable调用了onComplete后,才会触发第二个Observable。 比如在读取数据时,先查询缓存,缓存存在直接处理,不存在查询数据库,然后在处理。 package com.netty.demo.vertx; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscriber; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @Slf4j public class RxJavaTest { public static void main(String[] args) throws InterruptedException { Observable o1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { //查询缓存数据 Integer cacheData = Integer.MAX_VALUE; if (cacheData != null) { //缓存不为空则直接传递给观察者 emitter.onNext(cacheData); } else { //缓存为空则调用onComplete,触发第二个(o2)的执行逻辑 emitter.onComplete(); } } }); Observable o2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { //查询数据库 Integer cacheData = Integer.MAX_VALUE; //传递给观察者 emitter.onNext(cacheData); } }); Observable.concat(o1, o2).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { log.info("onSubscribe"); } @Override public void onNext(Integer o) { log.info(o.toString()); } @Override public void onError(Throwable e) { log.info("onError"); } @Override public void onComplete() { log.info("onComplete"); } }); Thread.sleep(300000000); } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:深度解析串口、COM口、TTL、RS-232、RS-485的区别及应用
下一篇:RXJAVA-filter
相关文章

 发表评论

暂时没有评论,来抢沙发吧~