# learn-rx-j **Repository Path**: hsupu/learn-rx-j ## Basic Information - **Project Name**: learn-rx-j - **Description**: Learn Reactive in Java. - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-10-09 - **Last Updated**: 2021-10-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 学习响应式编程 ## 基础 RxJava 是使用可观测的序列来组成异步的、基于事件的程序流程。 Observable 对象是核心。具有 `Observable#subscribe` 接口来被订阅。 序列有三个后续,`onNext(args)` `onCompleted()` `onError(ex)`。 如果仍有事件,那么触发 onNext;如果没有事件,那么触发 onCompleted;如果抛出异常,那么触发 onError。 onCompleted 和 onError 是互斥的只会调用其一,且其后序列终止。 ## 运行 ```java Mono.just(o) // 处理最终的值 .subscribe(e -> process(e)); ``` ```java Mono.just(o) // 处理此时的值 .doOnNext(e -> process(e)) .subscribe(); ``` ## 调度 Reactor 的负载交由调度器实际执行。 对于被迫兼容传统(同步)或是逻辑复杂的情况,如果调度器的线程数固定,那么会很容易影响其他负载。 `publishOn` `subscribeOn` 提供了策略选择的能力。 ```java Mono fluxToBlockingRepository(Flux flux, BlockingRepository repository) { return flux .publishOn(Schedulers.elastic()) // 影响其后操作(位置有关) .doOnNext(repository::save) .then(); } ``` ```java Flux blockingRepositoryToFlux(BlockingRepository repository) { return Flux.defer(() -> Flux.fromIterable(repository.findAll())) .subscribeOn(Schedulers.elastic()); // 影响整个过程(位置无关) } ```