Android Rxjava3 使用场景详解

网友投稿 321 2022-10-13

Android Rxjava3 使用场景详解

目录一、Rxjava使用场景1、多任务嵌套回调2、多任务合并处理3、轮询4、其他小场景1)倒计时2)打字机效果二、结合Rxbinding的使用场景1、点击事件防抖2、输入搜索优化3、联合判断三、防泄漏1、Observable.unsubscribeOn2、disposable.dispose3、CompositeDisposable参考了以下文章,表示感谢:

一、Rxjava使用场景

为了模拟实际场景,从wanandroid网站找了二个接口,如下:(对Wanandroid表示感谢!)

public interface ApiServer {

/**

* 接口一

* 获取文章列表

* @return

*/

@GET("article/list/1/json")

Observable> getArticleList();

/**

* 接口二

* 获取热词

* @return

*/

@GET("hotkey/json")

Observable>> getHotKey();

}

1、多任务嵌套回调

场景:比如调用接口一有回调后才能调用接口二,如果接口一调用失败不再调用接口二。下面是二种写法。

写法一,代码如下:

//为了看清楚代码,没有使用lambda简化

//接口一

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

//接口二

Observable>> hotKey = ApiManager.getInstance().getApiService().getHotKey();

Observable.just(articleList)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.map(new Function>, Observable>>>() {

@Override

public Observable>> apply(Observable> baseResponseObservable) throws Throwable {

//处理第一个请求返回的数据

if(baseResponseObservable!=null) mTv.setText(baseResponseObservable.blockingSingle().toString());

return hotKey; //发起第二次网络请求

}

}).subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer>>>() {

@Override

public void accept(Observable>> baseResponseObservable) throws Throwable {

//处理第二次网络请求的结果

if(baseResponseObservable!=null) mTvTwo.setText(baseResponseObservable.blockingSingle().toString());

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Throwable {

//异常的处理:比如Dialog的Dismiss,缺省页展示等

http:// //注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,如果多个请求同理

//但是请求成功的还是能正常处理

LogUtil.e(throwable.toString());

}

});

写法二,代码如下:

//为了看清楚代码,没有使用lambda简化

//接口一

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

//接口二

Observable>> hotKey = ApiManager.getInstance().getApiService().getHotKey();

//请求第一个

articleList.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.doOnNext(new Consumer>() {

@Override

public void accept(BaseResponse articleListRespBaseResponse) throws Throwable {

//处理第一个网络请求的结果

if(articleListRespBaseResponse!=null) mTv.setText(articleListRespBaseResponse.toString());

}

}).observeOn(Schedulers.io())

.flatMap(new Function, ObservableSource>>>() {

@Override

public ObservableSource>> apply(BaseResponse articleListRespBaseResponse) throws Throwable {

return hotKey; //将第一个网络请求转换为第二个网络请求

}

}).observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer>>() {

@Override

public void accept(BaseResponse> listBaseResponse) throws Throwable {

//处理第二次网络请求的结果

if(listBaseResponse!=null) mTvTwo.setText(listBaseResponse.toString());

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Throwable {

//注意:如果第一个网络请求异常,整个事件会中断,不会执行第二个网络请求,多个请求同理

//但是在异常前面已经成功的网络请求还是能正常处理

//异常的处理:比如Dialog的Dismiss,缺省页展示等

LogUtil.e(throwable.toString());

}

});

注意异常处理和线程切换,其他细节代码和注释比较详细。

2、多任务合并处理

场景:接口一和接口二返回数据后一起处理。代码如下:

private void zipRequest() {

//为了看清楚代码,没有使用lambda简化

//接口一

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

//接口二

Observable>> hotKey = ApiManager.getInstance().getApiService().getHotKey();

Observable.zip(articleList, hotKey, this::combiNotification) //传入方法定义合并规则

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

}

@Override

public void onNext(@NonNull String msg) {

if(!TextUtils.isEmpty(msg)){

mTv.setText(msg);

}

}

@Override

public void onError(@NonNull Throwable e) {

}

@Override

public void onComplete() {

}

});

}

//合并的规则,以及定义合并的返回值

public String combiNotification(BaseResponse articleListRespBaseResponse, BaseResponse> hotkeyResponse) {

//比如这里取二个接口数据toString返回

if (articleListRespBaseResponse != null && hotkeyResponse != null) {

return articleListRespBaseResponse.toString() + hotkeyResponse.toString();

}

return null;

}

3、轮询

场景一:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,每次轮询必须等上一次轮询有结果后才能开始下一次轮询。

特别注意repeatWhen操作符,只有在repeatWhen的Function方法中发射onNext事件,重复(repeat)才能触发,发射onError或者onComplite都会结束重复(repeat),基于这一点,通过flatMap操作符将事件转化为延迟一定时间的onNext事件,就达到了延时轮询的目的。至于onNext事件发射的什么不重要。

延伸:retryWhen的Function方法发射onError事件才会重试(retry)。

takeUntil操作符可以定义一定的条件,当达到条件时自动结束整个事件的目的,事件结束时会回调subscribe。

代码如下:

/**

* 轮询

* @param pollingTimes 轮询的次数

*/

private void timedPolling(int pollingTimes) {

AtomicInteger times = new AtomicInteger();

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

articleList.repeatWhen(new Function, ObservableSource>>() {

@Override

public ObservableSource> apply(Observable objectObservable) throws Throwable {

return objectObservable.flatMap(new Function>() { //转换事件

@Override

public ObservableSource> apply(Object o) throws Throwable {

//这里发射延时的onNext事件,触发repeat动作,发射的0不会回调到下面的subscribe

return Observable.just(0).delay(2, TimeUnit.SECONDS);

}

});

}

}).subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

//takeUntil定义了二个结束条件:前面是达到了轮询的次数,后面是网络请求返回了成功,当然也可以写成代码块做其他的返回判断

.takeUntil(response -> times.incrementAndGet() >= pollingTimes || response.getErrorCode() == 0)

.subscribe(new Observer>() {

@Override

public void onSubscribe(@NonNull Disposable d) {

}

@Override

public void onNext(@NonNull BaseResponse articleListRespBaseResponse) {

}

@Override

public void onError(@NonNull Throwable e) {

}

@Override

public void onComplete() {

}

});

}

如果想改成不限制次数的也比较简单。

场景二:轮询固定的次数(间隔一定的时间),可以提前退出轮询,也可以等轮询到指定次数后自动退出,这里的轮询不关心上次请求的结果。代码如下:

/**

* 轮询一定的次数

* @param pollTimes 轮询次数

*/

private void timedPolling(int pollTimes) {

//网络请求

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

//返回值用于取消轮询

mSubscribe = Observable.intervalRange(0, pollTimes, 0, 2000, TimeUnit.MILLISECONDS)

.flatMap(new Function>>() {

@Override

public ObservableSource> apply(Long aLong) throws Throwable {

return articleList; //转换事件

}

}).subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer>() {

@Override

public void accept(BaseResponse listBaseResponse) throws Throwable {

//如果满足了退出轮询的条件,可以调用下面的方法退出轮询

//mSubscribe.dispose();

}

});

}

思路是定时发射事件,然后将事件转化为网络请求。同理可以写出不限次数的轮询。

场景三:不限次数轮询(间隔一定的时间),不关心上次请求的结果。

假如接口返回的code为0时需要取消轮询,代码如下:

Observable> articleList = ApiManager.getInstance().getApiService().getArticleList();

//返回值用于取消轮询

mSubscribe = Observable.interval(0, 2000, TimeUnit.MILLISECONDS)

.flatMap(new Function>>() {

@Override

public ObservableSource> apply(Long aLong) throws Throwable {

return articleList;

}

})

.takeUntil(response -> response.getErrorCode() == 0) //使用takeUntil自动取消发射

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer>() {

@Override

public void accept(BaseResponse articleListRespBaseResponse) throws Throwable {

//处理回调

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Throwable {

//处理异常

}

});

如果是其他取消条件,也可以写在代码块里:

.takeUntil(response -> {

//处理接口数据,然后判断是返回true还是false,true:停止发射,false:继续发射

return false;

}) //使用takeUntil自动取消发射

不管何种轮询,注意在OnDestroy中取消。

4、其他小场景

1)倒计时

验证码的倒计时功能,代码如下:

/**

* 倒计时

* @param countDownSeconds 倒计时的秒数

*/

private void countDown(int countDownSeconds) {

Observable.intervalRange(0, countDownSeconds, 0, 1000, TimeUnit.MILLISECONDS)

.map(new Function() {

@Override

public String apply(Long aLong) throws Throwable {

return (countDownSeconds - aLong) + "s后重新获取";

}

}).observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

mTv.setEnabled(false);

}

@Override

public void onNext(@NonNull String s) {

mTv.setText(s);

}

@Override

public void onError(@NonNull Throwable e) {

mTv.setEnabled(true);

mTv.setText("获取验证码");

}

@Override

public void onComplete() {

mTv.setText("获取验证码");

mTv.setEnabled(true);

}

});

}

效果

2)打字机效果

几行代码实现打字机效果:

@RequiresApi(api = Build.VERSION_CODES.M) //6.0

public class DaziView extends View {

private TextPaint mTextPaint;

private StaticLayout mStaticLayout;

public DaziView(Context context) {

super(context,null);

}

public DaziView(Context context, @Nullable AttributeSet attrs) {

super(context, attrs);

initTextPaint();

}

/**

* 初始化画笔

*/

private void initTextPaint() {

mTextPaint = new TextPaint(Paint.ANTI_ALIAS_FLAG);

mTextPaint.setTextSize(48);

mTextPaint.setColor(Color.parseColor("#000000"));

}

/**

* 绘制

* @param content

*/

public void drawText(String content){

if(!TextUtils.isEmpty(content)){

Observable.intervalRange(0,content.length()+1,0,150, TimeUnit.MILLISECONDS)

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Throwable {

//动态改变文本长度

mStaticLayout = StaticLayout.Builder.obtain(content, 0, aLong.intValue(), mTextPaint, getWidth())

.build();

invalidate();

}

});

}

}

@Override

protected void onDraw(Canvas canvas) {

super.onDraw(canvas);

//绘制文本

mStaticLayout.draw(canvas);

}

}

文本

\u3000\u3000你好,这是一个打字机,这是一个打字机这是一个打字机这是一个打字机。\n\u3000\u3000换行空格继续打印。

二、结合Rxbinding的使用场景

RxBinding 提供的绑定能够将任何 Android View 事件转换为 Observable。

一旦将 View 事件转换为 Observable ,将发射数据流形式的 UI 事件,我们就可以订阅这个数据流,这与订阅其他 Observable 方式相同。

引入下面的库:

implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'

1、点击事件防抖

点击事件的写法:

RxView.clicks(button) //button为控件

.subscribe(new Consumer() {

@Override

public void accept(Unit unit) throws Throwable {

//点击事件

}

});

长点击事件的写法:

RxView.longClicks(button)

.subscribe(new Consumer() {

@Override

public void accept(Unit unit) throws Throwable {

//长点击自动响应,不需要等放开手指

}

});

点击防抖事件的写法:

RxView.clicks(button)

.throttleFirst(1000, TimeUnit.MILLISECONDS) //一秒以内第一次点击事件有效

.subscribe(new Consumer() {

@Override

public void accept(Unit unit) throws Throwable {

//点击事件

}

});

2、输入搜索优化

RxTextView.textChanges(editText) //传入EditText控件

.debounce(1000,TimeUnit.MILLISECONDS) //一秒内没有新的事件时,取最后一次事件发射

.skip(1) //跳过第一次EditText的空内容

.subscribeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(CharSequence charSequence) throws Throwable {

//EditText的内容

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Throwable {

}

});

3、联合判断

combineLatest 操作符将多个 Observable 发射的事件组装起来,然后再发射组装后的新事件。

Observable observableEdittext = RxTextView.textChanges(editText).skip(1);

Observable observableEdittextTwo = RxTextView.textChanges(editText_two).skip(1);

Observable.combineLatest(observableEdittext, observableEdittextTwo, new BiFunction() {

@Override

public Boolean apply(CharSequence charSequence, CharSequence charSequence2) throws Throwable {

if(!TextUtils.isEmpty(charSequence)&&!TextUtils.isEmpty(charSequence2)){

return true;

}

return false;

}

}).subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Throwable {

//TODO 其他处理

}

});

三、防泄漏

1、Observable.unsubscribeOn

Observable just = Observable.just(0);

just.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()); //取消事件,防止泄漏

2、disposable.dispose

这个比较常用。

3、CompositeDisposable

对订阅事件统一管理

CompositeDisposable compositeDisposable = new CompositeDisposable();

compositeDisposable.add(disposableOne);

compositeDisposable.add(disposableTwo);

compositeDisposable.clear();

参考了以下文章,表示感谢:

最适合使用 RxJava 处理的四种场景

Android RxJava应用:网络请求轮询(有条件)

Rxjava3文档级教程三: 实战演练

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:史上最详细的原理+实验——Linux Centos7 网络基础设置
下一篇:实操教程丨如何将一个k3s集群集成到Gitlab项目中
相关文章

 发表评论

暂时没有评论,来抢沙发吧~