博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
说说RxJava怎么走的歪路
阅读量:5778 次
发布时间:2019-06-18

本文共 7393 字,大约阅读时间需要 24 分钟。

将从Rx一个最简单的流程说起,说到map,说到SubscribeOn,说到observeOn,说到天荒地老

一个完整流程

Observable.create(new ObservableOnSubscribe() {            @Override            public void subscribe(@NonNull ObservableEmitter e) throws Exception {                System.out.println("工作空间");            }        }).subscribe(new Observer() {            @Override            public void onSubscribe(@NonNull Disposable d) {                System.out.println("已连接观察通道");            }            @Override            public void onNext(@NonNull Object o) {                System.out.println("接受工作空间发送的事件");            }            @Override            public void onError(@NonNull Throwable e) {                System.out.println("出错");            }            @Override            public void onComplete() {                System.out.println("完成");            }        });复制代码

接下来看图分析整个流程做了什么

文字描述:被观察者调用subscribe(订阅)观察者,然后被观察者通过调用ObservableEmitter发射器)发射各种事件(onNextonErroronComplete


转换事件

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).map(new Function
() { @Override public Integer apply(@NonNull String o) throws Exception { System.out.println("将string转换成int返回"); return o.length(); } }).subscribe(new Observer
() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull Integer o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码

可以看到,一开始的被观察者发射的类型是String,而观察者接受的类型是Integer类型。我要的是宝马你却给我电单车?WTF? 这个时候就需要用到map这个操作符了,可以相当于一个中转站(加工站),将被观察者发送过来的数据转换成符合观察者需要的数据,然后再将它返回给观察者。完美! But,how it work?? 接下来看图分析整个流程做了什么

注:上述图传递的不是ObservableEmitter,而是为了更直观了解流程,而实际具体流程也差不多,只是实现不太一样。 文字描述: ①:原始被观察者调用map()的时候,重新创建了一个被观察者(这里称它为Map被观察者),然后用Map被观察者订阅原始观察者。 ②:然后在订阅成功后,原始被观察者将订阅一个新的观察者(这里称它为Map观察者)。 ③:然后原始被观察者在发送(String)消息的时候,Map观察者接受到(String)消息,将(String)消息通过apply()方法将其转为(Integer)消息,并通过Map被观察者发送给原始观察者。 ④:apply()方法是我们自己实现的方法

Map简单代码如下

public final 
Observable
map(Function
mapper) { return create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) { subscribe(new Observer
() { @Override public void onNext(T var1) { e.onNext(mapper.call(var1)); } }); } }); }复制代码

转换线程(SubscribeOn)

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).subscribeOn(Schedulers.io()) .subscribe(new Observer
() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull String o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码

接下来看图分析整个流程做了什么

文字分析:和map类似的操作,但又有点不同 ①:原始被观察者调用subscribeOn()的时候,重新创建了一个被观察者(这里称它为subscribeOn被观察者),然后用subscribeOn被观察者订阅原始观察者。 ②:然后在订阅成功后,进行线程的转换。 ③:在subscribeOn被观察者中调用原始被观察者subscribe(ObservableEmitter<Object> e),其中的参数发射器e用的是subscribeOn被观察者发射器

subscribeOn()简单代码如下

public final Observable
subscribeOn(Scheduler scheduler){ return Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { // 线程的转换 scheduler.createWorker().schedule(new SubscribeTask() { @Override public void run() { // source为原始被观察者 source.call(e); } }); } }); }复制代码

转换线程(observeOn)

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).observeOn(Schedulers.io()) .subscribe(new Observer
() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull String o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码

接下来看图分析整个流程做了什么

文字分析:和map比较类似的操作,但又有点不同 ①:原始被观察者调用observeOn()的时候,重新创建了一个被观察者(这里称它为observeOn被观察者),然后用observeOn被观察者订阅原始观察者。 ②:然后订阅成功后将创建一个观察者(这里称为observeOn观察者),作为参数调用原始被观察者subscribe(@NonNull ObservableEmitter<Object> e)。 ③:之后的原始被观察者发送的onNext事件都会先经过observeOn观察者onNext事件先,在里面会进行线程的转换,再调用observeOn被观察者发射器来发送onNext事件给原始观察者

observeOn()简单代码如下

public final Observable
observeOn(Scheduler scheduler) { return Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull Observer
e) { // source为原始被观察者 source.subscribe(new Observer
() { @Override public void onNext(T var1) { // 线程的转换 scheduler.createWorker().schedule(new Runnable() { @Override public void run() { // e为observeOn被观察者 e.onNext(var1); } }); } }); } }); }复制代码

结语

其实RxJava里面的各种操作符大部分都是利用了这种思想,用各种Observable和Observer来达到目的。 observeOn和subscribeOn对线程的调度94这样,对于里面什么方法走在什么线程,应该理解上面所说的就应该清楚了。

感谢:

转载地址:http://qsuyx.baihongyu.com/

你可能感兴趣的文章
【转】【WPF】WPF中MeasureOverride ArrangeOverride 的理解
查看>>
ASP、Access、80040e14、保留关键字、INSERT INTO 语句的语法错误
查看>>
【转】二叉树的非递归遍历
查看>>
NYOJ283对称排序
查看>>
接连遇到大牛
查看>>
[Cocos2d-x For WP8]矩形碰撞检测
查看>>
自己写spring boot starter
查看>>
花钱删不完负面消息
查看>>
JBPM之JPdl小叙
查看>>
(step6.1.5)hdu 1233(还是畅通工程——最小生成树)
查看>>
Membership三步曲之进阶篇 - 深入剖析Provider Model
查看>>
huffman编码——原理与实现
查看>>
Linux移植随笔:终于解决Tslib的问题了【转】
查看>>
MyBitis(iBitis)系列随笔之四:多表(多对一查询操作)
查看>>
【leetcode】Longest Common Prefix
查看>>
前端优化及相关要点总结
查看>>
Vue 列表渲染
查看>>
struts2中form提交到action中的中文参数乱码问题解决办法(包括取中文路径)
查看>>
25 个精美的手机网站模板
查看>>
C#反射实例应用--------获取程序集信息和通过类名创建类实例
查看>>