Fetch the repository succeeded.
package com.stay4it.rxjava;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.stay4it.rxjava.bean.Course;
import com.stay4it.rxjava.bean.Entity;
import com.stay4it.rxjava.bean.Student;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
/**
* RxJava转换操作符
*/
public class RxJava04 {
static List<Student> studentList = new ArrayList<Student>(){
{
add(new Student("Stay", 28));
add(new Student("谷歌小弟", 23));
add(new Student("Star", 25));
}
};
static Map<String, Course> couseMap = new HashMap<>();
static {
couseMap.put("Stay", new Course("语文", 2001));
couseMap.put("谷歌小弟", new Course("数学", 2005));
couseMap.put("Star", new Course("美术", 2009));
}
/**
* Map
*
* 通过使用map中的方法对Observable中发射出来的所有数据进行变换
*
* test1()方法是得到多个Student对象中的name,保存到nameList中
* 注意:接口Func1包装的是有返回值的方法。
*
* 了解更多:http://blog.csdn.net/jdsjlzx/article/details/51493772
*/
private static void test1(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.name;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted nameList.size() = " + nameList.size());
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
nameList.add(value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* Map操作符连续使用
*/
private static void test2(){
Observable.from(studentList)
.map(new Func1<Student, Integer>() {
@Override
public Integer call(Student student) {
return student.age;
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer t) {
return String.valueOf(t+10);
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* Flatmap操作符
* FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
*
* FlatMap是一个用处很大的操作符,可以将要数据根据你想要的规则进行转化后再发射出去。
* 其原理就是将这个Observable转化为多个以原Observable发射的数据作为源数据的Observable,
* 然后再将这多个Observable发射的数据整合发射出来,需要注意的是最后的顺序可能会交错地发射出来,
* 如果对顺序有严格的要求的话可以使用concatmap操作符。
* FlatMapIterable和FlatMap基本相同,不同之处为其转化的多个Observable是使用Iterable作为源数据的。
* 参考:http://blog.csdn.net/jdsjlzx/article/details/51493552
*/
private static void test3(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student t) {
Course course = couseMap.get(t.name);
return Observable.just(course);
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(Course course) {
System.out.println("onSuccess course = " + course);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* FlatMap操作符
* FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
*/
private static void test4(){
List<String> nameList = new ArrayList<>();
Observable.from(studentList)
.flatMap(new Func1<Student, Observable<Entity>>() {
@Override
public Observable<Entity> call(Student student) {
Course course = couseMap.get(student.name);
Entity entity = new Entity(course, student);
return Observable.just(entity);
}
})
.subscribe(new Subscriber<Entity>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Entity entity) {
System.out.println("onSuccess entity = " + entity);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* ConcatMap操作符
* 类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
*
*/
private static void test5(){
Observable.from(studentList)
.concatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student t) {
Course course = couseMap.get(t.name);
return Observable.just(course);
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Course course) {
System.out.println("onSuccess course = " + course);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* flatMap与ConcatMap操作符比较
* 区别:
* 无序:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
* 有序:ConcatMap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。
*
* 说明:在同步线程中,FlatMap和ConcactMap的执行结果是一样的(结果是有序的),
* 只有在异步线程中,FlatMap结果可能是无序的,而ConcactMap始终能保持有序的结果。
*
* concatMap与flatMap操作符的比较 参见:http://blog.csdn.net/jdsjlzx/article/details/51508852
*/
private static void test6(){
List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
Observable.from(numbers)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("flatMap onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
System.out.println("----------------------------");
Observable.from(numbers)
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("concatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
}
/**
* switchMap
* 解释:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
* 用法与FlatMap几乎一样,区别是SwitchMap操作符只会发射[emit]最近的Observables。
*
* 当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
*
* 应用场景:http://blog.csdn.net/jdsjlzx/article/details/51730162
*
* 逻辑推演:
* A --> 取消空的,没有可以取消的
* B--> A1被取消
* C--> B1被取消
* D--> C1被取消
* E--> D1被取消
* 最终输出E1
*/
private static void test7(){
Observable.just("A", "B", "C", "D", "E")
.switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s+"1").subscribeOn(Schedulers.newThread()); //并发
//return Observable.just(s+"1");
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("switchMap onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("switchMap onError :" + e);
}
@Override
public void onNext(String s) {
System.out.println("switchMap Next :" + s);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* switchMap与flatmap的区别
*
* 说明:在同步线程中,switchMap发射[emit]所有的Observables,
* 在异步线程中,switchMap只会发射[emit]最近的Observables。
*
*/
private static void test8(){
ExecutorService service = Executors.newFixedThreadPool(10);
List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
Observable.from(numbers)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("flatMap onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
System.out.println("----------------------------------");
Observable.from(numbers)
.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t) {
return Observable.just(t).subscribeOn(Schedulers.from(service));
//return Observable.just(t);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Integer value) {
System.out.println("switchMap2 onNext value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
service.shutdown();
}
public static void main(String[] args) throws InterruptedException {
test8();
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。