将从Rx一个最简单的流程说起,说到map,说到SubscribeOn,说到observeOn,说到天荒地老
一个完整流程
Observable.create(new ObservableOnSubscribe
接下来看图分析整个流程做了什么
文字描述:被观察者调用subscribe
(订阅)观察者,然后被观察者通过调用ObservableEmitter
(发射器)发射各种事件(onNext
、onError
、onComplete
)
转换事件
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).map(new Function () { @Override public Integer apply(@NonNull String o) throws Exception { System.out.println("将string转换成int返回"); return o.length(); } }).subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull Integer o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码
可以看到,一开始的被观察者发射的类型是String
,而观察者接受的类型是Integer
类型。我要的是宝马你却给我电单车?WTF? 这个时候就需要用到map
这个操作符了,可以相当于一个中转站(加工站),将被观察者发送过来的数据转换成符合观察者需要的数据,然后再将它返回给观察者。完美! But,how it work?? 接下来看图分析整个流程做了什么
注:上述图传递的不是ObservableEmitter
,而是为了更直观了解流程,而实际具体流程也差不多,只是实现不太一样。 文字描述: ①:原始被观察者
调用map()
的时候,重新创建了一个被观察者(这里称它为Map被观察者
),然后用Map被观察者
订阅原始观察者
。 ②:然后在订阅成功后,原始被观察者
将订阅一个新的观察者(这里称它为Map观察者
)。 ③:然后原始被观察者
在发送(String)消息的时候,Map观察者
接受到(String)消息,将(String)消息通过apply()
方法将其转为(Integer)消息,并通过Map被观察者
发送给原始观察者
。 ④:apply()
方法是我们自己实现的方法
Map简单代码如下
public finalObservable map(Function mapper) { return create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter e) { subscribe(new Observer () { @Override public void onNext(T var1) { e.onNext(mapper.call(var1)); } }); } }); }复制代码
转换线程(SubscribeOn)
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).subscribeOn(Schedulers.io()) .subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull String o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码
接下来看图分析整个流程做了什么
文字分析:和map类似的操作,但又有点不同 ①:原始被观察者
调用subscribeOn()
的时候,重新创建了一个被观察者(这里称它为subscribeOn被观察者
),然后用subscribeOn被观察者
订阅原始观察者
。 ②:然后在订阅成功后,进行线程的转换
。 ③:在subscribeOn被观察者
中调用原始被观察者
的subscribe(ObservableEmitter<Object> e)
,其中的参数发射器e
用的是subscribeOn被观察者
的发射器
subscribeOn()简单代码如下
public final ObservablesubscribeOn(Scheduler scheduler){ return Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // 线程的转换 scheduler.createWorker().schedule(new SubscribeTask() { @Override public void run() { // source为原始被观察者 source.call(e); } }); } }); }复制代码
转换线程(observeOn)
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { System.out.println("工作空间。我发送的是string类型数据"); } }).observeOn(Schedulers.io()) .subscribe(new Observer () { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("已连接观察通道"); } @Override public void onNext(@NonNull String o) { System.out.println("接受工作空间发送的事件,接受到的是int类型"); } @Override public void onError(@NonNull Throwable e) { System.out.println("出错"); } @Override public void onComplete() { System.out.println("完成"); } });复制代码
接下来看图分析整个流程做了什么
文字分析:和map比较类似的操作,但又有点不同 ①:原始被观察者
调用observeOn()
的时候,重新创建了一个被观察者(这里称它为observeOn被观察者
),然后用observeOn被观察者
订阅原始观察者
。 ②:然后订阅成功后将创建一个观察者(这里称为observeOn观察者
),作为参数调用原始被观察者
的subscribe(@NonNull ObservableEmitter<Object> e)
。 ③:之后的原始被观察者
发送的onNext
事件都会先经过observeOn观察者
的onNext
事件先,在里面会进行线程的转换
,再调用observeOn被观察者
的发射器来
发送onNext
事件给原始观察者
。
observeOn()简单代码如下
public final ObservableobserveOn(Scheduler scheduler) { return Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(@NonNull Observer e) { // source为原始被观察者 source.subscribe(new Observer () { @Override public void onNext(T var1) { // 线程的转换 scheduler.createWorker().schedule(new Runnable() { @Override public void run() { // e为observeOn被观察者 e.onNext(var1); } }); } }); } }); }复制代码
结语
其实RxJava里面的各种操作符大部分都是利用了这种思想,用各种Observable和Observer来达到目的。 observeOn和subscribeOn对线程的调度94这样,对于里面什么方法走在什么线程,应该理解上面所说的就应该清楚了。
感谢: