# WpxBatch **Repository Path**: weipx/wpx-batch ## Basic Information - **Project Name**: WpxBatch - **Description**: WpxBatch 轻量、快速的批处理器,日志、重试、统计等批处理特性,最适合配合任务调度的处理、日常维护中的批处理 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-07-19 - **Last Updated**: 2023-08-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ![logo](doc/imgs/wpx-b.png) ## WpxBatch 简单批处理器 ##### 简介 WpxBatch 是一个简单的批处理器,是本人在平时的开发运维过程中做批量处理数据时的产物, 在做大量数据处理时单线程的for吃力,加上 多线程代码多而乱,统计、日志、异常处理 各环节堆积杂乱无章,没法复用。WpxBatch把这个过程抽象出来,使用者只需 关注处理逻辑。无其他依赖。 --- ##### 功能特性 - 读、处、写 分离 - 统计:读统计、处理统计、异常统计、写统计【未完善】 - 日志:贯穿上下文的日志记录器 - 重试:指定需要重试的异常 及 重试次数 - 存储:执行状态存储(默认控制台报表输出) - 多次执行: 一个实例多次执行,但不能同时 - 参数化:执行时 传入参数,可在每个节点取到 --- ##### 架构图 ![架构图](doc/imgs/WpxBatch架构图-流程图.jpg) --- ##### 示列 ###### 综合示列 ``` WpxBatchJob.builder(Integer.class, Integer.class) //声明处理前后的类型 .logLevel(ExecuteLogger.Level.info) //日志等级 默认info .logHandels(new ConsoleLogHandler()) //日志处理器(多个) 设置才有 .maxWaitTask(36) // 等待处理任务大于该值时暂停读取(避免数据堆积), 针对循环读,默认值8888 .reader(reader) // 读取数据,读取模式 下文介绍 .processorPoolSize(10) // 处理线程池大小 默认3 .processor(new itemPro()) // 数据处理,20条批处理batchProcessor(20, new itemPro()) .retryExceptions(RuntimeException.class) // 指定重试的异常(多个) .retry(2) //重试的次数 默认3次 .writerPoolSize(5) // 写出线程池的大小 默认3 .writer((executeContext, item) -> {}) //写出,20条批写出batchWriter(20, writer) ,默认空实现 .build() //设置完后 构建 .asyncExecute(new ExecuteParam("aa", 8888)); //传入参数开始执行, 也有无参asyncExecute() //asyncExecute() 同步执行, execute() 异步执行 ``` ###### 简单示列 ``` WpxBatchJob.builder(Integer.class, Integer.class) .logHandels(new ConsoleLogHandler()) // .reader(reader) .processor(new itemPro()) .writer((executeContext, item) -> {}) .build() .asyncExecute(); ``` --- ##### 注意 处理节点、写节点 lambda表达式实现 无法明确处理数据的类型,要么自行强转 要么单独写个实现类(建议) --- ##### 读节点 实现接口 `ItemReader` - 只读一次:`readerOne(reader);` 数据一次就读完 的场景 - 读n次:`reader(n, reader);` 指定调用读的次数 - 循环读:`reader(reader);` 直到读回来的数据集合size=0时 结束, 所有中间还有数据 不要返回空集合 - 有界数据:一批有限的数据,如数据库,读完就结束 退出 - 分页读:中间没有空的情况 - 标识读:读后标识已读 - 无界数据:持续读,等待数据 等场景, 永不退出 - 设计成阻塞读,没有数据就一直阻塞,直到取到数据才返回,不能返回空数据 - AbstractBlockItemReader 封装了阻塞逻辑, 只需实现readDatas方法,readDatas中只关心读的逻辑即可 - ReceptorItemReader 接收数据读,调receptor(E e) 推送数据 (如:web容器中http接收到数据 然后推给 读取器) --- ##### 处理节点 批处理或单处理 都是实现接口 `ItemProcessor` ###### 单处理 泛型写具体类即可`ItemProcessor` ###### 批处理 泛型写成 集合形式 `ItemProcessor, List>` `batchProcessor(88, processor)` 处理88条一批 - 注:输入list长度要和返回的list的长度一致(否则统计不一致无法结束job),这只是批处理,不做聚合处理 --- ##### 写节点 批写或单写 都是实现接口 `ItemWriter` ###### 单写 泛型写具体类即可 `ItemWriter` ###### 批写 泛型写成 集合形式 `ItemWriter>` `batchWriter(88, writer)` 写88条一批 ##### 更新日志 - 2023-08-14——v1.0.2 - 停止操作 - 运行状态检查 - 2023-08-14——v1.0.1 - 空闲时间批量缓冲区刷出 - 同步/异步执行 - 2023-07-21——v1.0.0 发行版本 - 增加阻塞读、接收读的实现 - 2023-07-19 - 首次提交 --- ##### 结束语 简单而不简单...