还不会Rxjava响应式编程框架设计,先从这篇文章入手
admin
2023-02-09 14:41:16
0

一.Rxjava是什么

Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

还不会Rxjava响应式编程框架设计,先从这篇文章入手

通俗来说,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。

举个例子:

Observable observable = Observable.create(new ObservableOnSubscribe() {
 @Override
 public void subscribe(ObservableEmitter e) throws Exception {
 e.onNext("a");
 }
 });
 observable.subscribe(new Observer() {
 @Override
 public void onSubscribe(Disposable d) {
 }
 @Override
 public void onNext(String s) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });

这是Rxjava2最简单的用法:

1.创建一个Observable,重写subscribe方法,这里主要处理被观察的事件。

2.订阅这个Observable,事件会回调observer的方法,我们可以对事件做响应的处理

二.Rxjava源码解析

2.1. 创建Observable:

创建Observable用的是Observable.create(ObservableOnSubscribe source)方法。这个方法的参数是ObservableOnSubscribe:

public interface ObservableOnSubscribe {
 /**
 * Called for each Observer that subscribes.
 * @param e the safe emitter instance, never null
 * @throws Exception on error
 */
 void subscribe(@NonNull ObservableEmitter e) throws Exception;
}

ObservableOnSubscribe是一个接口,唯一的方法是subscribe,参数是ObservableEmitter e。ObservableEmitter是一个继承了Emitter的接口,接口Emitter里定义了onNext、onError、onComplete等方法,和Observer(观察者)的方法相对应。

public interface Emitter {
 /**
 * Signal a normal value.
 * @param value the value to signal, not null
 */
 void onNext(@NonNull T value);
 /**
 * Signal a Throwable exception.
 * @param error the Throwable to signal, not null
 */
 void onError(@NonNull Throwable error);
 /**
 * Signal a completion.
 */
 void onComplete();
}

ObservableEmitter对接口Emitter进行扩展,增加了setDisposable、setCancellable等方法

基本参数了解了,现在看看create方法里面做了什么,代码如下:

public static  Observable create(ObservableOnSubscribe source) {
 return RxJavaPlugins.onAssembly(new ObservableCreate(source));
 }
调用了RxJavaPlugins的onAssembly方法。又有一个新参数ObservableCreate(source),我们看看它是什么:

final class ObservableCreate extends Observable {
 public ObservableCreate(ObservableOnSubscribe source) {
 this.source = source;
 }
}

继承了Observable,所以也是个被观察对象,在构造函数中我们看到我们new的ObservableOnSubscribe对象,被存在了ObservableCreate的source里面

那我们继续看看onAssembly方法做什么:

public static  Observable onAssembly(@NonNull Observable source) {
 Function f = onObservableAssembly;
 if (f != null) {
 return apply(f, source);
 }
 return source;
 }

一个Hook方法。onObservableAssembly是一个静态变量,我们没有设置,默认为空,所以直接返回source对象。也就是说,Observable的create方法其实就是把我们ObservableOnSubscribe对象,存储在ObservableCreate对象的source里面,然后返回ObservableCreate对象。

我们知道ObservableCreate是继承Observable的,所以创建了ObservableCreate对象,我们的Observable也就创建完了。

2.2 订阅事件(被观察者)

订阅被观察者的操作是observable.subscribe(new Observer())。这个操作符其实是个“被动”,就是事件被观察者观察。因为subscribe方法里的参数Observer才是观察者。我们也会在Observer里的各个会调方法里接收到事件相关的返回值。

我们看看subscribe方法的源码:

public final void subscribe(Observer observer) {
 try {
 subscribeActual(observer);
 } catch (NullPointerException e) { // NOPMD
 throw e;
 } catch (Throwable e) {
 RxJavaPlugins.onError(e);
 }
 }

看代码我们知道最主要调用的方法是:subscribeActual(observer);,这个方法是Observable里的抽象方法,而此时我们的Observable是一个ObservableCreate对象(前面create方法返回的对象)。所以我们去看一下ObservableCreate里面是如何重写这个方法的。代码如下:

protected void subscribeActual(Observer observer) {
 CreateEmitter parent = new CreateEmitter(observer);
 observer.onSubscribe(parent);
 try {
 source.subscribe(parent);
 } catch (Throwable ex) {
 Exceptions.throwIfFatal(ex);
 parent.onError(ex);
 }

我们一看到这个方法主要做了三件事:

①创建一个CreateEmitter对象parent。

②把parent传给source的subscribe方法。上面我们知道source就是刚才存的ObservableOnSubscribe对象,subscribe也就是我们重写的方法:

@Override
 public void subscribe(ObservableEmitter e) throws Exception {
 e.onNext("a");
 }

所以我们在这个方法里就能收到一个CreateEmmiter,通过CreateEmitter可以回调相应的方法。CreateEmitter是实现ObservableEmitter接口,我们看看它内部实现,如:onNext源码如下:

@Override
public void onNext(T t) {
 observer.onNext(t);
}

也就是说,当我们在ObservableOnSubscribe的subscribe方法里调用ObservableEmitter的onNext方法的时候,它里面会调用observer的onNext。于是通过这样的传递,我们就能在observer里响应的回调方法里收到事件的相关状态。

至此一个简单Rxjava流式传递原理已经讲完了,总结流程如下:

使用Observbable.create方法,产生一个ObservableCreate对象,对象里存着ObservableOnSubscribe对象source。

调用ObservableCreate.subscribe方法,实际调用的是subscribeActual方法,传入一个Observer对象。

subscribeActual方法中创建一个CreateEmmiter对象,调用source.subscribe方法,传入CreateEmmiter对象。

于是我们在ObservableOnSubscribe中就接收到了一个CreateEmmiter,CreateEmmiter是ObservableEmmiter的子类。我们可以在这里调用CreateEmmiter的方法进行事件回调。

调用CreateEmmiter方法,实际上会调用Observer的响应的方法。也就是CreateEmmiter把事件状态传递给观察者。

Android架构师之路很漫长,所以我们从这份学习内容和计划开始执行吧!喜欢的话别忘记点击关注和赞哦

相关内容

热门资讯

德国总理:美国正在被伊朗羞辱 德国之声4月27日报道,德国总理默茨在访问一所学校时表示,在当前的持续冲突中,伊朗领导层正试图羞辱美...
理响中国|“长”歌以行,风云激... 光阴如梭,东方潮阔。这里是中国的长三角,世界的长三角。无论过去、现在还是未来,这片土地都因时代而生,...
白宫:特朗普及其国安团队开会讨... 新华社华盛顿4月27日电 美国白宫新闻秘书莱维特27日在记者会上证实,总统特朗普及其国家安全团队当天...
人民日报刊文:日本放开杀伤性武... 日本放开杀伤性武器出口推高地缘冲突风险(国际论坛)常思纯《人民日报》(2026年04月28日 第 0...
医疗保障法草案二审:明确生育保... 满足多样化健康保障需求本报记者 彭 波4月27日,医疗保障法草案二审稿提请十四届全国人大常委会第二十...
天津一景区发生自转旋翼机事故1... 澎湃新闻记者 吕新文中国民用航空华北地区管理局4月22日公布《豪客通航“10•1”天津长芦汉盐旅游区...
卡塔尔埃米尔与美国总统特朗普通... 当地时间24日,卡塔尔埃米尔塔米姆与美国总统特朗普通电话,重点就中东地区局势以及伊朗与美国谈判问题交...
男子30年前被扣押2859克黄... 澎湃新闻记者 王鑫家住辽宁省大连市的潘永嘉近日向澎湃新闻反映称,三十年前,他在大连周水子机场被盖州市...
商务部:取消反制欧盟两家金融机... 中华人民共和国商务部令二〇二六年 第1号鉴于欧盟已取消对中国两家金融机构的制裁措施,现公布《关于取消...
过去24小时共有5艘船只通过霍... 总台记者当地时间24日获悉,过去24小时内,共有5艘船只通过霍尔木兹海峡,其中包括一艘伊朗油轮。(总...