281 Star 2 Fork 14

有心课堂/RxJavaTutorial

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
RxJava04.java 11.21 KB
Copy Edit Raw Blame History
jdsjlzx authored 8 years ago . first commit
package com.stay4it.rxjava;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.stay4it.rxjava.bean.Course;
import com.stay4it.rxjava.bean.Entity;
import com.stay4it.rxjava.bean.Student;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
/**
* RxJava转换操作符
*/
public class RxJava04 {
static List<Student> studentList = new ArrayList<Student>(){
{
add(new Student("Stay", 28));
add(new Student("谷歌小弟", 23));
add(new Student("Star", 25));
}
};
static Map<String, Course> couseMap = new HashMap<>();
static {
couseMap.put("Stay", new Course("语文", 2001));
couseMap.put("谷歌小弟", new Course("数学", 2005));
couseMap.put("Star", new Course("美术", 2009));
}
/**
* Map
*
* 通过使用map中的方法对Observable中发射出来的所有数据进行变换
*
* test1()方法是得到多个Student对象中的name,保存到nameList中
* 注意:接口Func1包装的是有返回值的方法。
*
* 了解更多:http://blog.csdn.net/jdsjlzx/article/details/51493772
*/
private static void test1(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.name;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted nameList.size() = " + nameList.size());
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
nameList.add(value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* Map操作符连续使用
*/
private static void test2(){
Observable.from(studentList)
.map(new Func1<Student, Integer>() {
@Override
public Integer call(Student student) {
return student.age;
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer t) {
return String.valueOf(t+10);
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* Flatmap操作符
* FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
*
* FlatMap是一个用处很大的操作符,可以将要数据根据你想要的规则进行转化后再发射出去。
* 其原理就是将这个Observable转化为多个以原Observable发射的数据作为源数据的Observable,
* 然后再将这多个Observable发射的数据整合发射出来,需要注意的是最后的顺序可能会交错地发射出来,
* 如果对顺序有严格的要求的话可以使用concatmap操作符。
* FlatMapIterable和FlatMap基本相同,不同之处为其转化的多个Observable是使用Iterable作为源数据的。
* 参考:http://blog.csdn.net/jdsjlzx/article/details/51493552
*/
private static void test3(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student t) {
Course course = couseMap.get(t.name);
return Observable.just(course);
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(Course course) {
System.out.println("onSuccess course = " + course);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* FlatMap操作符
* FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
*/
private static void test4(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Entity>>() {
@Override
public Observable<Entity> call(Student student) {
Course course = couseMap.get(student.name);
Entity entity = new Entity(course, student);
return Observable.just(entity);
}
})
.subscribe(new Subscriber<Entity>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Entity entity) {
System.out.println("onSuccess entity = " + entity);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* ConcatMap操作符
* 类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
*
*/
private static void test5(){
Observable.from(studentList)
.concatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student t) {
Course course = couseMap.get(t.name);
return Observable.just(course);
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Course course) {
System.out.println("onSuccess course = " + course);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* flatMap与ConcatMap操作符比较
* 区别:
* 无序:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
* 有序:ConcatMap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。
*
* 说明:在同步线程中,FlatMap和ConcactMap的执行结果是一样的(结果是有序的),
* 只有在异步线程中,FlatMap结果可能是无序的,而ConcactMap始终能保持有序的结果。
*
* concatMap与flatMap操作符的比较 参见:http://blog.csdn.net/jdsjlzx/article/details/51508852
*/
private static void test6(){
List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
Observable.from(numbers)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("flatMap onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
System.out.println("----------------------------");
Observable.from(numbers)
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("concatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* switchMap
* 解释:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
* 用法与FlatMap几乎一样,区别是SwitchMap操作符只会发射[emit]最近的Observables。
*
* 当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
*
* 应用场景:http://blog.csdn.net/jdsjlzx/article/details/51730162
*
* 逻辑推演:
* A --> 取消空的,没有可以取消的
* B--> A1被取消
* C--> B1被取消
* D--> C1被取消
* E--> D1被取消
* 最终输出E1
*/
private static void test7(){
Observable.just("A", "B", "C", "D", "E")
.switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s+"1").subscribeOn(Schedulers.newThread()); //并发
//return Observable.just(s+"1");
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("switchMap onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("switchMap onError :" + e);
}
@Override
public void onNext(String s) {
System.out.println("switchMap Next :" + s);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* switchMap与flatmap的区别
*
* 说明:在同步线程中,switchMap发射[emit]所有的Observables,
* 在异步线程中,switchMap只会发射[emit]最近的Observables。
*
*/
private static void test8(){
ExecutorService service = Executors.newFixedThreadPool(10);
List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
Observable.from(numbers)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("flatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
System.out.println("----------------------------------");
Observable.from(numbers)
.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("switchMap2 onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
service.shutdown();
}
public static void main(String[] args) throws InterruptedException {
test8();
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/stay4repo/RxJavaTutorial.git
git@gitee.com:stay4repo/RxJavaTutorial.git
stay4repo
RxJavaTutorial
RxJavaTutorial
master

Search