# RxJava **Repository Path**: HackerX9/RxJava ## Basic Information - **Project Name**: RxJava - **Description**: RxJava - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2017-11-07 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RxJava 在RxJava2.x中,Observable用于订阅Observer,不再支持背压(1.x中可以使用背压策略),而Flowable用于订阅Subscriber,是支持背压(BackPressure)的。 ## ObservableEmitter Emitter是发射器的意思,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。 ## Disposable 这个单词的字面意思是一次性用品,用完即可丢弃的.在RxJava中可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时,它就会将两根管道切断, 从而导致下游收不到事件. ## 发送事件需要满足一定的规则 1. 上游可以发送无限个onNext, 下游也可以接收无限个onNext. 2. 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送,而下游收到onComplete事件之后将不再继续接收事件. 3. 当上游发送了一个onError后, 上游onError之后的事件将继续发送,而下游收到onError事件之后将不再继续接收事件. 4. 上游可以不发送onComplete或onError. 5. 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete,也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然 * 注:关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制,如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃.比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了,但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃. ## subscribe()有多个重载的方法: 1. public final Disposable subscribe() {} 2. public final Disposable subscribe(Consumer onNext) {} 3. public final Disposable subscribe(Consumer onNext, Consumer onError) {} 4. public final Disposable subscribe(Consumer onNext, Consumer onError, Action onComplete) {} 5. public final Disposable subscribe(Consumer onNext, Consumer onError, Action onComplete, Consumer onSubscribe) {} 6. public final void subscribe(Observer observer) {} ### 基本用法 ``` //Observable(被观察者、发射器、上游事件) //Observable.create()创建Observable对象 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { Log.i("observable", "---subscribe()发送数据--->1"); // 发送数据 e.onNext(1); Log.i("observable", "---subscribe()发送数据--->2"); e.onNext(2); Log.i("observable", "---subscribe()发送数据--->3"); e.onNext(3); Log.i("observable", "---subscribe()发送数据--->4"); e.onNext(4); // onComplete()之后observer无法接收数据 //e.onComplete(); Log.i("observable", "---subscribe()发送数据--->5"); e.onNext(5); } }); //Observer(观察者、接收器、下游事件) Observer observer = new Observer() { private int i; private Disposable mDisposable @Override public void onSubscribe(Disposable d) { Log.i("observer", "---onSubscribe()--->订阅"); mDisposable = d; @Override public void onNext(Integer integer) { Log.i("observer", "---onNext()接收数据--->" + integer); Log.i("observer", "---onNext()接收数据isDisposed--->" + mDisposable.isDisposed()); i++; if (i == 8) { // 在RxJava2.x中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件 mDisposable.dispose(); Log.i("observer", "---onNext()接收数据isDisposed--->" + mDisposable.isDisposed()); } @Override public void onError(Throwable e) { Log.i("observer", "---onError()接收数据发生错误--->"); @Override public void onComplete() { Log.i("observer", "---onComplete()接收数据结束--->"); } }; //observable订阅observer observable.subscribe(observer); ``` ### zip()方法 将多个发射器发送的事件(数据)进行平行匹配,最终匹配成功的发射事件(数据)数目等于包含最少发送事件的发射器的发送事件的数目 ``` private void zip() { Observable.zip(createIntegerObservable(), createStringObservable(), createBooleanObservable(), new Function3() { @Override public String apply(Integer integer, String s, Boolean b) throws Exception { Log.i("Zip", "---apply()--->" + s + integer + b); return s + integer + b; } }).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i("Observer", "---accept()--->" + s); } }); } private Observable createBooleanObservable() { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(true); Log.i("BooleanObservable", "---subscribe()--->true"); e.onNext(true); Log.i("BooleanObservable", "---subscribe()--->true"); e.onNext(true); Log.i("BooleanObservable", "---subscribe()--->true"); e.onNext(true); Log.i("BooleanObservable", "---subscribe()--->true"); } }); } private Observable createStringObservable() { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("A"); Log.i("StringObservable", "---subscribe()--->A"); e.onNext("B"); Log.i("StringObservable", "---subscribe()--->B"); e.onNext("C"); Log.i("StringObservable", "---subscribe()--->C"); } }); } private Observable createIntegerObservable() { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); Log.i("IntegerObservable", "---subscribe()--->1"); e.onNext(2); Log.i("IntegerObservable", "---subscribe()--->2"); e.onNext(3); Log.i("IntegerObservable", "---subscribe()--->3"); e.onNext(4); Log.i("IntegerObservable", "---subscribe()--->4"); e.onNext(5); Log.i("IntegerObservable", "---subscribe()--->5"); } }); } ``` ### concat()方法 把2个发射器合并成1个发射器 ``` Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### merge()方法 把多个Observable结合起来.注意它和concat的区别在于,不用等到发射器A发送完所有的事件再进行发射器B的发送 ``` ArrayList integerArrayList = new ArrayList<>(); for (int i = 10; i < 20; i++) { integerArrayList.add(i); } Observable.merge(Observable.fromIterable(integerArrayList), Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### timer()方法 延迟发送事件,默认在新线程 ``` Log.i("MainActivity", "---timer()--->" + System.currentTimeMillis()); Observable.timer(2, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn (AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { Log.i("observer", "---accept()--->" + System.currentTimeMillis()); } }); ``` ### interval()方法 用于间隔时间执行某个操作,默认在新线程,其接受三个参数,分别是第一次发送延迟、间隔时间、时间单位 ``` private void interval() { Log.i("MainActivity", "---timer()--->" + System.currentTimeMillis()); mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn (AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { Log.i("observer", aLong + "---accept()--->" + System.currentTimeMillis()); } }); } @Override protected void onDestroy() { super.onDestroy(); if (mDisposable!=null&&!mDisposable.isDisposed()){ mDisposable.dispose(); } } ``` ### single()方法 只会接收一个参数,而SingleObserver只会调用onError()或者onSuccess() ``` Single.just(1).subscribe(new SingleObserver() { @Override public void onSubscribe(Disposable d) { Log.i("observer", "---onSubscribe()--->"); } @Override public void onSuccess(Integer integer) { Log.i("observer", "---onSuccess()--->" + integer); } @Override public void onError(Throwable e) { Log.i("observer", "---onError()--->"); } }); ``` ### defer() 观察者订阅时才创建Observable,每次订阅返回一个新的Observable ``` final Observable deferObservable = Observable.defer(new Callable>() { @Override public ObservableSource call() throws Exception { return Observable.just(1, 2, 3); } }); Log.i("MainActivity", "---defer()--->" + deferObservable); deferObservable.subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.i("Observer", "---onSubscribe()--->" + deferObservable); } @Override public void onNext(Integer integer) { Log.i("Observer", "---onNext()--->" + integer); } @Override public void onError(Throwable e) { Log.i("Observer", "---onError()--->"); } @Override public void onComplete() { Log.i("Observer", "---onComplete()--->" + deferObservable); } }); ``` ### range()方法 创建一个发射特定整数序列的Observable:第一个参数为起始值;第二个为发送的个数,如果为0则不发送,负数则抛异常. ``` Observable.range(10, 5).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ## 线程调度 subscribeOn()指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程. * 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略. * 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次. ## 在RxJava中, 已经内置了很多线程选项供我们选择, 例如有 1. Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 2. Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作 3. Schedulers.newThread() 代表一个常规的新线程 4. AndroidSchedulers.mainThread() 代表Android的主线程 ``` Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); String name = Thread.currentThread().getName(); Log.i("Observable", "---subscribe()--->" + 1 + "---Thread--->" + name); e.onNext(2); Log.i("Observable", "---subscribe()--->" + 2 + "---Thread--->" + name); e.onNext(3); Log.i("Observable", "---subscribe()--->" + 3 + "---Thread--->" + name); e.onNext(4); Log.i("Observable", "---subscribe()--->" + 4 + "---Thread--->" + name); } }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).doOnNext (new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---mainThread Accept()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); } }).observeOn(Schedulers.newThread()).doOnNext(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---newThread Accept()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); } }).observeOn(Schedulers.io()).doOnNext(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---io Accept()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); } }).observeOn(Schedulers.computation()).doOnNext(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---computation Accept()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); } }).subscribe(); ``` ### FlatMap()方法 将一个发射器Observable转换为多个Observables,然后再把这些分散的Observables装进一个单一的发射器Observable.但有个需要注意的是,flatMap并不能保证事件的顺序 ``` ArrayList arrayList = new ArrayList<>(); for (int i = 1; i < 10; i++) { arrayList.add(i); } Observable.fromIterable(arrayList).flatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { Log.i("FlatMap", "---apply()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); return Observable.just(String.valueOf(integer)).delay(10000,TimeUnit.MILLISECONDS); } }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i("observer", "---accept()--->" + s); } }); ``` ### concatMap()方法 顺序发送事件,上一个事件接收器接收后,concatMap发射器再发送下一个事件 ``` Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); Log.i("Observable", "---subscribe()--->1"); e.onNext(2); Log.i("Observable", "---subscribe()--->2"); e.onNext(3); Log.i("Observable", "---subscribe()--->3"); e.onNext(4); Log.i("Observable", "---subscribe()--->4"); e.onNext(5); Log.i("Observable", "---subscribe()--->5"); e.onNext(6); Log.i("Observable", "---subscribe()--->6"); e.onNext(7); Log.i("Observable", "---subscribe()--->7"); e.onNext(8); Log.i("Observable", "---subscribe()--->8"); e.onNext(9); Log.i("Observable", "---subscribe()--->9"); } }).concatMap(new Function>() { @Override public ObservableSource apply(Integer integer) throws Exception { Log.i("ConcatMap", "---apply()--->" + integer + "---Thread--->" + Thread .currentThread().getName()); return Observable.just(String.valueOf(integer)).delay(10000, TimeUnit.MILLISECONDS); } }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i("observer", "---accept()--->" + s); } }); ``` ### Map()方法 促使发送的每一个事件都按照指定的函数去变化 ``` Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function() { @Override public String apply(Integer integer) throws Exception { //map()方法促使发送的每一个事件都按照指定的函数去变化 //Function T为事件变化前的类型,R为事件变化后的类型 return "this is result " + integer; } }).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i("observer", "---accept()--->" + s); } }); ``` ### distinct()方法 去除重复的事件 ``` Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(1); e.onNext(2); e.onNext(2); e.onNext(3); } }).distinct().subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### debounce()方法 去除与下一个事件发送时间间隔短的项 ``` Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); Thread.sleep(505); e.onNext(2);//skip Thread.sleep(466); e.onNext(3); Thread.sleep(507); e.onNext(4);//skip Thread.sleep(400); e.onNext(5); } }).debounce(500, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).observeOn (AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### filter()方法 根据特定的条件过滤掉不符合条件的事件 ``` Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer >= 3; } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### buffer()方法 将事件打包放在List集合中发送,参数count打包的事件数目,参数skip发送事件的步长,count>skip时部分事件会被重复打包发送,count>() { private int mInt; @Override public void accept(List integers) throws Exception { Log.i("Observer", "---accept()--->times" + ++mInt); for (Integer item : integers) { Log.i("Observer", "---accept()--->" + item); } } }); ``` ### doOnNext()方法 让订阅者在接收到数据之前干点有意思的事情 ``` Observable.just(1, 2, 3).doOnNext(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("doOnNext", "---accept()--->做点事情" + integer); } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### skip()方法 代表跳过 count 个数目开始接收 ``` Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### take()方法 接受一个long型参数count,代表至多接收count个数据。 ``` Observable.just(1, 2, 3, 4, 5, 6).take(2).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### last()方法 当ObservableSource为空时发送last(defaultItem),不为空时发送可观察到的最后一个值 ``` Observable.fromIterable(new ArrayList()).last("123").subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i("Observer", "---accept()--->" + s); } }); Observable.just(1, 2, 3).last(6).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### reduce()方法 将发射器发送的事件依次进行处理,只发送最后的处理结果 ``` Observable.just(1, 2, 3).reduce(6,new BiFunction() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### scan()方法 将发射器发送的事件依次进行处理,发送每次处理后的结果 ``` Observable.just(1, 2, 3).scan(new BiFunction() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ``` ### window() ``` Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次 .take(15) // 最多接收15个 .window(3, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn (AndroidSchedulers.mainThread()).subscribe(new Consumer>() { @Override public void accept(Observable longObservable) throws Exception { Log.i("Observer", "---SubWindow--->"); longObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers .mainThread()).subscribe(new Consumer() { @Override public void accept(Long aLong) throws Exception { Log.i("Observer", "---accept()--->" + aLong); } }); } }); ``` ### repeat()方法 发射器重复发送事件 ``` Observable.just(1).repeat(3).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i("Observer", "---accept()--->" + integer); } }); ```