# rxjs学习 **Repository Path**: y_jiping/learning-rxjs ## Basic Information - **Project Name**: rxjs学习 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-12 - **Last Updated**: 2026-03-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RxJS 常用操作符学习指南 本指南包含 RxJS 常用操作符的详细示例,以及在前端和 Node.js 生产环境中的实际应用。 ## 📁 文件说明 - `src/a001.js` - 基础 Observable 示例 - `src/a002.js` - **重点**:全面的操作符示例和生产场景案例 ## 🚀 快速开始 ```bash # 运行完整示例 node src/a002.js ``` ## 📚 操作符分类详解 ### 一、创建类操作符 (Creation Operators) #### 1. `of(...values)` 从参数序列创建 Observable ```javascript of(1, 2, 3, 'hello').subscribe(val => console.log(val)); // 输出:1, 2, 3, 'hello' ``` **使用场景**:将静态值转换为 Observable,用于组合操作 --- #### 2. `from(observableLike)` 从数组、Promise、Iterable 等创建 Observable ```javascript // 从数组 from([10, 20, 30]).subscribe(val => console.log(val)); // 从 Promise from(fetch('/api/data')).then(response => response.json()); ``` **生产场景**: - 将现有 Promise API 转换为 Observable - 处理数组数据流 --- #### 3. `interval(period)` 按时间间隔发送递增数字 ```javascript interval(1000).pipe(take(5)).subscribe(val => console.log(val)); // 每秒输出:0, 1, 2, 3, 4 ``` **生产场景**: - 轮询服务器状态 - 倒计时/计时器 - 定时刷新数据 --- #### 4. `timer(dueTime, period?)` 延迟后发送值或周期性发送 ```javascript // 2 秒后发送一个值 timer(2000).subscribe(() => console.log('触发')); // 延迟 1 秒后,每 2 秒发送一次 timer(1000, 2000).subscribe(val => console.log(val)); ``` **生产场景**: - 延迟执行任务 - 会话超时检测 - 定时任务调度 --- #### 5. `fromEvent(target, eventName)` 从事件目标创建 Observable(前端) ```javascript fromEvent(document, 'click').subscribe(event => { console.log('点击位置:', event.clientX, event.clientY); }); ``` **生产场景**: - DOM 事件监听 - WebSocket 消息 - Node.js EventEmitter --- #### 6. Subject 系列 **Subject** - 可变的热 Observable ```javascript const subject = new Subject(); subject.subscribe(val => console.log(val)); subject.next('新值'); ``` **BehaviorSubject** - 有初始值,新订阅者立即获得最新值 ```javascript const behavior$ = new BehaviorSubject('初始值'); behavior$.subscribe(val => console.log(val)); // 立即输出 behavior$.next('新值'); ``` **ReplaySubject** - 缓存并重放历史值 ```javascript const replay$ = new ReplaySubject(2); // 缓存最后 2 个值 replay$.next('1'); replay$.next('2'); replay$.next('3'); replay$.subscribe(val => console.log(val)); // 输出:2, 3 ``` **生产场景**: - 全局状态管理(类似 Vuex/Redux) - 组件间通信 - 缓存最新数据 --- ### 二、转换类操作符 (Transformation Operators) #### 1. `map(project)` 映射每个值为新值 ```javascript of(1, 2, 3).pipe( map(x => x * 10), tap(val => console.log(val)) // 10, 20, 30 ).subscribe(); ``` **生产场景**: - API 响应数据格式化 - 提取对象属性 - 数据类型转换 --- #### 2. `scan(accumulator, seed)` 累加器,每次发出累积结果 ```javascript of(1, 2, 3, 4).pipe( scan((acc, curr) => acc + curr, 0) ).subscribe(val => console.log(val)); // 1, 3, 6, 10 ``` **生产场景**: - 购物车总价计算 - 实时统计 - 进度追踪 --- #### 3. `reduce(accumulator, seed)` 只在完成时发出最终累积值 ```javascript of(1, 2, 3, 4).pipe( reduce((acc, curr) => acc + curr, 0) ).subscribe(val => console.log(val)); // 10(只在完成时) ``` **对比**:`scan` 发出中间值,`reduce` 只发最终值 --- ### 三、高阶转换操作符 (Higher-Order Mapping) ⭐⭐⭐ 这是最重要的操作符类别,用于处理异步请求链。 #### 1. `mergeMap(project)` - 并行 将每个值映射为 Observable,**并行**执行所有内部 Observable ```javascript of(1, 2, 3).pipe( mergeMap(val => timer(1000 / val).pipe(map(() => val))) ).subscribe(val => console.log(val)); // 输出顺序可能是:3, 2, 1(谁先完成谁先输出) ``` **特点**: - ✅ 并行执行,效率高 - ❌ 不保证顺序 - ❌ 不取消前一个请求 **生产场景**: - 独立的并发请求 - 文件上传(多个文件同时上传) - 不依赖顺序的数据获取 --- #### 2. `concatMap(project)` - 串行 将每个值映射为 Observable,**串行**执行 ```javascript of(1, 2, 3).pipe( concatMap(val => timer(500).pipe(map(() => val))) ).subscribe(val => console.log(val)); // 严格按顺序输出:1, 2, 3 ``` **特点**: - ✅ 严格保证顺序 - ✅ 不会并发,安全 - ❌ 效率较低 **生产场景**: - 需要顺序执行的依赖任务 - 数据库事务操作 - 防止竞态条件的更新 --- #### 3. `switchMap(project)` - 切换 将每个值映射为 Observable,**取消**前一个内部 Observable ```javascript of(1, 2, 3).pipe( switchMap(val => timer(1000).pipe(map(() => val))) ).subscribe(val => console.log(val)); // 只输出:3(前两个被取消了) ``` **特点**: - ✅ 自动取消旧请求 - ✅ 只保留最新结果 - ❌ 会丢弃中间值 **生产场景**: - 🔥 搜索框自动补全(最常用!) - 页面切换取消旧请求 - Tab 切换取消未完成的加载 - WebSocket 重连 --- #### 4. `exhaustMap(project)` - 排他 忽略前一个任务完成前的新值 ```javascript of(1, 2, 3).pipe( exhaustMap(val => timer(800).pipe(map(() => val))) ).subscribe(val => console.log(val)); // 只输出:1(后面的被忽略了) ``` **特点**: - ✅ 防止重复提交 - ✅ 等待当前任务完成 - ❌ 会丢失部分值 **生产场景**: - 🔥 按钮防重复点击 - 表单提交防止重复 - 验证码发送 --- ### 四、过滤类操作符 (Filtering Operators) #### 1. `filter(predicate)` 条件过滤 ```javascript of(1, 2, 3, 4, 5).pipe( filter(x => x % 2 === 0) ).subscribe(val => console.log(val)); // 2, 4 ``` --- #### 2. `debounceTime(duration)` - 防抖 ⭐ 等待指定时间没有新值才发出 ```javascript searchInput$.pipe( debounceTime(300) // 300ms 内没有输入才触发 ).subscribe(query => searchAPI(query)); ``` **生产场景**: - 🔥 搜索框优化 - 窗口 resize 事件 - 滚动事件优化 --- #### 3. `throttleTime(duration)` - 节流 每间隔一段时间只发出一个值 ```javascript fromEvent(button, 'click').pipe( throttleTime(2000) // 2 秒内只响应一次 ).subscribe(() => submit()); ``` **生产场景**: - 按钮点击频率限制 - 滚动加载更多 - 游戏技能冷却 --- #### 4. `distinctUntilChanged()` 去除连续重复值 ```javascript of(1, 1, 2, 2, 3, 1, 1).pipe( distinctUntilChanged() ).subscribe(val => console.log(val)); // 1, 2, 3, 1 ``` **生产场景**: - 表单验证(值不变不验证) - Redux 状态去重 - 避免重复渲染 --- #### 5. `take(count)` / `takeUntil(notifier)` 取前 N 个值 / 直到某个 Observable 发出值 ```javascript // 取前 5 次点击 fromEvent(document, 'click').pipe(take(5)); // 直到组件销毁取消订阅 const destroy$ = new Subject(); data$.pipe(takeUntil(destroy$)).subscribe(); ``` **生产场景**: - 限制重试次数 - 组件卸载取消订阅(防止内存泄漏) --- ### 五、组合类操作符 (Combination Operators) #### 1. `merge(...observables)` 合并多个 Observable,交替发出 ```javascript merge( interval(400), interval(600) ).subscribe(val => console.log(val)); ``` **生产场景**:多数据源合并(如:鼠标点击 + 定时器) --- #### 2. `concat(...observables)` 连接多个 Observable,按顺序执行 ```javascript concat( of('先执行'), of('再执行') ).subscribe(val => console.log(val)); ``` **生产场景**:任务队列(按顺序执行) --- #### 3. `forkJoin(args)` - 并发请求 ⭐ 等待所有完成,取最后一个值 ```javascript forkJoin({ user: fetch('/api/user').then(r => r.json()), posts: fetch('/api/posts').then(r => r.json()) }).subscribe(({ user, posts }) => { // 同时获取用户和帖子数据 }); ``` **生产场景**: - 🔥 多接口并发请求 - 页面初始化加载多个独立数据 - Promise.all 的 Observable 版本 --- #### 4. `combineLatest(observables)` 任一发出新值,就组合所有最新值 ```javascript combineLatest([username$, password$]).pipe( map(([user, pass]) => user.length >= 3 && pass.length >= 6) ).subscribe(valid => console.log('表单有效:', valid)); ``` **生产场景**: - 🔥 表单联动验证 - 多筛选条件组合查询 - 状态依赖计算 --- #### 5. `zip(observables)` 成对压缩,一一对应 ```javascript zip( of(1, 2, 3), of('a', 'b', 'c') ).subscribe(([num, letter]) => console.log(num + letter)); // 1a, 2b, 3c ``` **生产场景**:键值对合并、数据配对 --- ### 六、错误处理操作符 (Error Handling) #### 1. `catchError(selector)` 捕获并处理错误 ```javascript apiCall().pipe( catchError(err => { console.error('错误:', err); return of('默认值'); // 返回备用值 }) ).subscribe(); ``` **生产场景**: - API 失败降级 - 显示友好错误提示 - 重试逻辑 --- #### 2. `retry(count)` 自动重试 ```javascript fetchData().pipe( retry(3) // 失败后重试 3 次 ).subscribe(); ``` **生产场景**:网络不稳定时的自动重试 --- #### 3. `retryWhen(notifier)` 自定义重试逻辑 ```javascript fetchData().pipe( retryWhen(errors => errors.pipe( delay(1000), // 延迟 1 秒 take(3) // 最多重试 3 次 )) ).subscribe(); ``` **生产场景**:指数退避策略、条件重试 --- #### 4. `finalize(callback)` 无论成功失败都会执行 ```javascript data$.pipe( finalize(() => hideLoading()) ).subscribe(); ``` **生产场景**: - 🔥 隐藏 Loading 状态 - 清理资源 - 记录日志 --- ### 七、工具类操作符 (Utility Operators) #### 1. `tap(observer)` 侧边效果,不影响流 ```javascript data$.pipe( tap({ next: val => console.log('收到:', val), error: err => console.error('错误:', err), complete: () => console.log('完成') }) ).subscribe(); ``` **生产场景**:调试日志、副作用处理 --- #### 2. `startWith(value)` 以指定值开始 ```javascript state$.pipe( startWith(initialState) ).subscribe(state => render(state)); ``` **生产场景**:提供初始值、默认状态 --- #### 3. `share()` / `shareReplay(n)` 共享订阅 ```javascript // share: 冷 Observable 变热 const shared$ = expensiveObservable$.pipe(share()); // shareReplay: 共享并重放最新值 const cached$ = apiCall$.pipe(shareReplay(1)); ``` **生产场景**: - 🔥 避免重复请求 - 多组件共享数据 - 缓存 API 结果 --- ## 🎯 前端生产环境最佳实践 ### 1. 搜索框优化(防抖 + 取消旧请求) ```javascript import { fromEvent } from 'rxjs'; import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators'; const searchInput = document.querySelector('#search'); fromEvent(searchInput, 'input').pipe( map(e => e.target.value), debounceTime(300), // 防抖 300ms distinctUntilChanged(), // 内容不变不请求 switchMap(query => // 取消旧的请求 fetch(`/api/search?q=${query}`).then(r => r.json()) ) ).subscribe(results => renderResults(results)); ``` **为什么用 switchMap?** - 用户快速输入时,前面的请求还没返回就不需要了 - 只保留最新的搜索结果 --- ### 2. 表单状态联动 ```javascript import { BehaviorSubject, combineLatest } from 'rxjs'; import { map } from 'rxjs/operators'; const username$ = new BehaviorSubject(''); const password$ = new BehaviorSubject(''); const confirmPassword$ = new BehaviorSubject(''); combineLatest([username$, password$, confirmPassword$]).pipe( map(([user, pass, confirmPass]) => ({ usernameValid: user.length >= 3, passwordValid: pass.length >= 6, passwordsMatch: pass === confirmPass, allValid: user.length >= 3 && pass.length >= 6 && pass === confirmPass })) ).subscribe(validation => { updateUI(validation); }); ``` --- ### 3. 防止重复提交 ```javascript import { fromEvent } from 'rxjs'; import { throttleTime, switchMap, tap } from 'rxjs/operators'; const submitButton = document.querySelector('#submit'); fromEvent(submitButton, 'click').pipe( throttleTime(2000), // 2 秒内只响应一次 tap(() => showLoading()), switchMap(() => fetch('/api/submit', { method: 'POST' })), finalize(() => hideLoading()) ).subscribe(); ``` **或者用 exhaustMap:** ```javascript fromEvent(submitButton, 'click').pipe( exhaustMap(() => submitForm()) // 前一个完成前忽略新点击 ); ``` --- ### 4. 轮询请求 ```javascript import { interval } from 'rxjs'; import { switchMap, retry, catchError } from 'rxjs/operators'; const pollStatus$ = interval(5000).pipe( // 每 5 秒轮询 switchMap(() => fetch('/api/status').pipe( retry(3), // 失败重试 3 次 catchError(err => of({ status: 'error' })) ) ) ); pollStatus$.subscribe(status => updateStatusUI(status)); ``` --- ### 5. 组件卸载取消订阅(防止内存泄漏) ```javascript import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; class Component { private destroy$ = new Subject(); ngOnInit() { data$.pipe( takeUntil(this.destroy$) ).subscribe(data => this.render(data)); } ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); } } ``` --- ## 🖥️ Node.js 生产环境最佳实践 ### 1. HTTP 请求队列控制 ```javascript import { from } from 'rxjs'; import { concatMap, timeout } from 'rxjs/operators'; const urls = [ 'https://api.example.com/data1', 'https://api.example.com/data2', 'https://api.example.com/data3' ]; from(urls).pipe( concatMap(url => fetch(url).pipe(timeout(5000)) // 串行请求,避免并发过高 ) ).subscribe(); ``` --- ### 2. 并发控制(限制同时执行数) ```javascript import { mergeMap } from 'rxjs/operators'; tasks.pipe( mergeMap(task => processTask(task), 3) // 最多 3 个并发 ).subscribe(); ``` --- ### 3. 文件流处理 ```javascript import { fromEvent } from 'rxjs'; import { createReadStream } from 'fs'; const readStream = createReadStream('large-file.txt'); fromEvent(readStream, 'data').pipe( // 处理数据块 ).subscribe(); ``` --- ### 4. 日志批量处理 ```javascript import { Subject } from 'rxjs'; import { bufferTime, filter } from 'rxjs/operators'; const log$ = new Subject(); log$.pipe( bufferTime(1000), // 每秒收集一次 filter(logs => logs.length > 0), tap(logs => writeToFile(logs)) // 批量写入 ).subscribe(); // 使用 log$.next('INFO: User logged in'); ``` --- ### 5. 级联 API 调用 ```javascript import { of } from 'rxjs'; import { switchMap, map } from 'rxjs/operators'; // 先获取用户,再获取用户文章 of(userId).pipe( switchMap(id => fetchUser(id)), switchMap(user => fetchPosts(user.id)), map(posts => posts.map(post => post.title)) ).subscribe(); ``` --- ## 🎨 操作符选择指南 ### 面对异步请求,该用哪个 Map? | 需求 | 操作符 | 场景 | |------|--------|------| | 取消前一个请求 | `switchMap` | 搜索框、页面切换 | | 按顺序执行 | `concatMap` | 依赖任务、队列 | | 并发执行 | `mergeMap` | 独立请求、文件上传 | | 防止重复 | `exhaustMap` | 按钮提交、验证码 | ### 面对用户输入,该用什么优化? | 场景 | 操作符 | 延迟 | |------|--------|------| | 搜索框 | `debounceTime` | 300-500ms | | 按钮点击 | `throttleTime` | 200-1000ms | | 表单验证 | `distinctUntilChanged` | - | ### 面对多个 Observable,如何组合? | 需求 | 操作符 | |------|--------| | 全部完成后取结果 | `forkJoin` | | 任一变化就组合 | `combineLatest` | | 交替发出 | `merge` | | 按顺序连接 | `concat` | | 一一对应 | `zip` | --- ## ⚠️ 常见陷阱和注意事项 ### 1. 忘记取消订阅导致内存泄漏 ```javascript // ❌ 错误:组件销毁后仍在订阅 data$.subscribe(data => updateUI(data)); // ✅ 正确:使用 takeUntil 或 async pipe const destroy$ = new Subject(); data$.pipe(takeUntil(destroy$)).subscribe(data => updateUI(data)); // 组件销毁时 destroy$.next(); destroy$.complete(); ``` ### 2. 误用高阶 Map 导致请求混乱 ```javascript // ❌ 错误:搜索框用 mergeMap,旧请求会干扰新请求 search$.pipe(mergeMap(query => searchAPI(query))); // ✅ 正确:用 switchMap 取消旧请求 search$.pipe(switchMap(query => searchAPI(query))); ``` ### 3. 在 forkJoin 中使用永远不会完成的 Observable ```javascript // ❌ 错误:interval 永远不会完成 forkJoin(interval(1000), of(1, 2, 3)); // ✅ 正确:确保所有 Observable 都能完成 forkJoin(timer(1000), of(1, 2, 3)); ``` ### 4. 错误处理不当导致流中断 ```javascript // ❌ 错误:错误未被捕获,流中断 apiCall().subscribe(); // ✅ 正确:使用 catchError 恢复流 apiCall().pipe( catchError(err => { console.error(err); return of(defaultValue); // 返回默认值继续流 }) ).subscribe(); ``` --- ## 📖 推荐学习资源 1. **[RxJS Official Documentation](https://rxjs.dev/)** - 官方文档 2. **[RxJS Marbles](https://rxmarbles.com/)** - 可视化学习工具 3. **[Learn RxJS](https://www.learnrxjs.io/)** - 优秀教程网站 --- ## 💡 总结 RxJS 的核心价值: - ✅ 优雅的异步处理 - ✅ 强大的事件组合能力 - ✅ 声明式编程风格 - ✅ 统一的数据流管理 关键要点: 1. 掌握 `switchMap`、`mergeMap`、`concatMap`、`exhaustMap` 的区别 2. 理解冷热 Observable 的概念 3. 善用 `debounceTime`、`throttleTime` 优化用户体验 4. 永远记得取消订阅(或使用 `takeUntil`) 5. 使用 `catchError` 和 `finalize` 完善错误处理和资源清理 希望这个示例集能帮助你快速掌握 RxJS 在实际项目中的应用!🚀