背景
提及Rxjava之前,了解下响应式编程,它的英文名,Reactive Programming,字面意思就是针对响应的。那什么是叫响应呢?(eg:门铃响了,你会起身去开门,这就是响应)
响应式它是依赖于事件的,响应式的代码它的运行不是按代码的顺序,而是跟多个按时间发生的事件有关。依赖事件?不就是“回调”?,but在响应式编程里,这些按时间排列的事件,被称为“流”,stream。没有流stream,就没有响应式编程。一句概括的话,响应式编程就是通过异步和数据流来构建事物关系的编程模型,就是编程处理异步数据流。目前的情况,Android 的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。
RxJava 是什么
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
它的本质可以压缩为异步。归根结底,它就是一个实现异步操作的库,加粗的部分都是修饰Rxjava。
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
它扩展了观察者模式支持的数据/事件序列,使用操作符来将这些数据/事件进行调整序列,不用担忧无关的抽象底层线程,同步,线程安全和并发数据结问题。
RxJava 优势
- 不用担忧无关的抽象底层线程,同步,线程安全和并发数据结问题。
- 简洁
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android sdk里面的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。(code从上到下的链式调用,没有任何嵌套,逻辑清晰,可阅读性强);
代码示例:
1
2
3
4
5
6
7Observable obserInitSDK=Observable.create((context)->{initDevUtils(context);}).subscribeOn(Schedulers.newThread());
Observable obserInitDB=Observable.create((context)->{initDatabase(context);}).subscribeOn(Schedulers.newThread());
Observable observable = Observable.merge(obserInitSDK,obserInitDB);
observable.subscribe(()->{JumpToMain();});
可以解决的问题:让复杂的程序逻辑回归简单、清晰。
API 简析
上面实际已经提到 RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
RxJava的被观察者为观察者模式添加两个功能。
- 当不再产生数据时,生产者会通知消费者。(onComplete())
- 当发生错误时,生产者会通知消费者。(onError())
除此之外,RxJava的两点在于几行代码就可以变换,聚合,过滤被观察者发送的数据流,极大的减少需要维护的状态变量。
简单来说Rxjava原理就是使用”观察者模式+迭代器模式+函数式编程”,它扩展了观察者模式,通过使用可观察的对象序列流来表述一系列事件,订阅者进行占点观察并对序列流做出反应(或持久化或输出显示等等);借鉴迭代器模式,对多个对象序列进行迭代输出,订阅者可以依次处理不同的对象序列;使用函数式编程思想,极大简化问题解决的步骤。
RxJava 有四个基本概念:
- Observable/Flowable (可观察者,即被观察者)
- Observer/Subscriber (观察者)
- subscribe (订阅),被观察者(Observable/Flowable)
- 观察者(Observer/Subscriber) 通过 subscribe() 方法实现订阅关系
这样被观察者(Observable/Flowable) 可以在需要的时候发出事件来通知 观察者(Observer/Subscriber)。
所以使用RxJava 的三个步骤:
第一步:初始化 bservable/Flowable (可观察者,即被观察者)
第二部步:初始化 Observer/Subscriber (观察者)
- 第三步:建立订阅关系
代码示例:
创建观察者
1 | Observer<String> observer = new Observer<String>() { |
创建被观察者有很多种方法 具体参考 Rxjava中文翻译文档 Rxjava文档
1 | //被观察者 |
将观察者连接到被观察者(订阅):
被观察者是惰性的,在没有订阅者监听之前它不会做任何事情。
1 | observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer); |
操作符
Rxjava有很多实用的操作符,详细文档见中文翻译文档,也可以直接看官方文档。下面结合场景使用比较常用的几个操作符。
Rxjava官方wiki
改变流:
Filter:Filter运算符会过滤被观察者,被观察者发射的数据中只有通过你在谓词函数中指定的测试后才能继续往下流动。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private ArrayList<String> originData = new ArrayList<String>() {{
add("11");
add("1111");
}};
Observable.fromIterable(originData)
.filter(new Predicate<String>() {
@Override
public boolean test(String item) throws Exception {
return item.length() > 2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String dst) throws Exception {
//过滤出符合自己要求的数据
}
});上面的代码就是过滤出集合里字符串长度大于2的字符串。
应用场景,实际应用开发过滤数据还是很多的。转换流:
map: 可以将一个 Observable 对象通过某种关系转换为另一个Observable 对象。1
2
3
4
5
6
7
8
9
10
11
12
13
14Flowable.just("xxxxxxx")
//这个第一个泛型为接收参数的数据类型,第二个泛型为转换后要发射的数据类型
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s+"__uuuuuuu";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("consumer", s);
}
});
例子中map()将一个字符串对象,转换为另一个字符串对象返回,当然也可以将其转换为与之不同的对象,对应的返回的Flowable对象参数也会变为转换后的对象。另外Function的泛型第一个为接收参数的数据类型,第二个为转换后要发射的数据类型。
应用场景,map 操作符进行网络数据解析,数据库读取数据解析,改变数据对象属性 等等。
线程调度
线程调度器,方便更好的处理异步操作,在合适的场景选择合适的线程.
RxJava 中,已经内置了很多线程选项:
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread(): Android 还专用的,它指定的操作将在 Android 主线程运行。
通过这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
- subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。
- observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
Flowabale的使用,以及RxJava 2.x中的backpressure的处理策略。
Flowable的产生
在RxJava中会经常遇到一种情况就是被观察者发送消息十分迅速以至于观察者不能及时的响应这些消息。
被观察者是事件的生产者,观察者是事件的消费者,比如生产者无限生成事件,而消费者每2秒才能消费一个事件,这会造成事件无限堆积,最后造成OOM。
因此问题来了,怎么处理这些慢慢堆积起来的消息呢?
Flowable就是由此产生,专门用来处理这类问题。
关于上述的问题,有个专有的名词来形容上述现象,即:Backpressure(背压)。所谓背压,即生产者的速度大于消费者的速度带来的问题。
Flowable是为了应对Backpressure而产生的。Flowable是一个被观察者,与Subscriber(观察者)配合使用,解决Backpressure问题。
注意理解:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
处理Backpressure的策略
什么情况下才会产生Backpressure问题?
- 如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件。所以在这种情况下,根本不存在Backpressure问题。即同步情况下,Backpressure问题不存在。
- 如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生Backpressure问题。即异步情况下,Backpressure问题才会存在。
ERROR策略
这种方式会产生背压的时候直接throw一个异常MissingBackpressureException.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(20); //可以消费的事件数量
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 示例代码加入了s.request(20); 该方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,要求多少,给你多少。
注意:如果不显示调用request就表示消费能力为0。
虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。
所以ERROR策略就避免了这种情况的出现。
在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。
BUFFER策略
所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
总之BUFFER要慎用。
DROP策略
当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
LATEST策略
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
Flowable对象的获取通过create()方式,使用BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。示例代码:1
2
3
4
5
6
7
8
9Flowable.just(1).onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});