# elastic-job-starter **Repository Path**: kancy666/elastic-job-starter ## Basic Information - **Project Name**: elastic-job-starter - **Description**: elastic-job的自动配置 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2019-06-03 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## elastic-job-starter使用说明 ### 环境 elastic-job: 2.1.5 elastic-job-lite-console: 2.1.4 elastic-job-starter: 1.2-RELEASE zookeeper: 3.4.14 mysql: 5.6.26 ### 使用 1.引入maven依赖 ```xml com.kancy.framework.job.starter elastic-job-starter 1.2-RELEASE ``` 2.写一个ElasticJob处理类 SimpleJob: ```java package com.example.demojob.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.kancy.framework.job.starter.annotation.ElasticJobConfig; /** * Created by kancy on 2018/12/20. */ // name: Job的名称,默认为类名 // eventTraceRdbDataSource: 指定一个dataSource可以记录Job运行记录日志(自动创建表),不指定则不记录 @ElasticJobConfig(name = AbstractSimpleJob, eventTraceRdbDataSource = "dataSource") public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem())); /** * 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了 * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem */ } } ``` DataflowJob: ```java package com.example.demojob.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.dataflow.DataflowJob; import com.example.demojob.domain.User; import com.kancy.framework.job.starter.annotation.ElasticJobConfig; import java.util.List; /** * Created by kancy on 2018/12/23. * Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。 * * 可通过DataflowJobConfiguration配置是否流式处理。 * 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; * 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。 */ @ElasticJobConfig(name = "MyDataFlowJob", streamingProcess = true) public class MyDataFlowJob implements DataflowJob { /** * status * 0:待处理 * 1:已处理 */ @Override public List fetchData(ShardingContext shardingContext) { List users = null; /** * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30 */ return users; } @Override public void processData(ShardingContext shardingContext, List data) { for (User user: data) { System.out.println(String.format("用户 %s 开始计息", user.getUserId())); user.setStatus(1); /** * update user */ } } } ``` 3. 配置job ```yaml spring: application: name: demo-job profiles: active: dev datasource: url: jdbc:mysql://localhost:3306/job?useUnicode=true&serverTimezone=GMT%2B8&useSSL=false&autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8 username: root password: root driverClassName: com.mysql.cj.jdbc.Driver #分布式定时任务的相关配置 elastic.job: zk: enabled: true #elastic-job 启动的总开关 name: zk-${spring.application.name}-${spring.profiles.active} serverLists: localhost:2181 #zookeeper列表,多个地址用逗号分隔 namespace: ${spring.application.name}-${spring.profiles.active} #命名空间,可以理解为对应下面job的权限控制用户 MySimpleJob: cron: 0/5 * * * * ? #任务执行时间点 overwrite: true #每次应用启动的时候是否更新zk里面对应的相关配置,如果开启这新的配置全部生效,否则不生效 shardingTotalCount: 2 #任务分片总数 shardingItemParameters: 0=0,1=1 #分片参数,最好是和上面的总数一直 failover: true #是否开启任务失败转移。true表示开启,默认为false misfire: true #是否补偿执行丢失的任务。true 表示开启,默认为false description: "MySimpleJob相关的描述" monitorExecution: false #是否开启业务状态监控。true为开启,默认为true listener: com.example.demojob.listener.MessageElasticJobListener #任务监听器(ElasticJobListener),在任务开始和结束的时候会触发 jobExceptionHandler: com.example.demojob.handler.CustomJobExceptionHandler #自定义任务执行时抛出的异常处理类(JobExceptionHandler) disabled: false MyDataFlowJob: cron: 0/10 * * * * ? #任务执行时间点 overwrite: true #每次应用启动的时候是否更新zk里面对应的相关配置,如果开启这新的配置全部生效,否则不生效 shardingTotalCount: 2 #任务分片总数 shardingItemParameters: 0=0,1=1 #分片参数,最好是和上面的总数一直 failover: true #是否开启任务失败转移。true表示开启,默认为false misfire: true #是否补偿执行丢失的任务。true 表示开启,默认为false description: "MySimpleJob相关的描述" monitorExecution: false #是否开启业务状态监控。true为开启,默认为true listener: com.example.demojob.listener.MessageElasticJobListener #任务监听器(ElasticJobListener),在任务开始和结束的时候会触发 jobExceptionHandler: com.example.demojob.handler.CustomJobExceptionHandler #自定义任务执行时抛出的异常处理类(JobExceptionHandler) disabled: false ``` 4.自动配置ElasticJob 使用注解@EnableElasticJob自动配置 ```java package com.example.demojob; import com.kancy.framework.job.starter.annotation.EnableElasticJob; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableElasticJob public class Application { public static void main(String[] args) { SpringApplication.run(DemoJobApplication.class, args); } } ``` 5.启动项目和相关服务 6.定时任务运维控制台 https://gitee.com/kancy666/elastic-job-console ## Elatic Job https://gitee.com/elasticjob/elastic-job 其主要的功能如下: - (1)定时任务:基于成熟的定时任务作业框架Quartz cron表达式执行定时任务; - (2)作业注册中心:基于Zookeeper和其客户端Curator实现的全局作业注册控制中心;作业注册中心仅用于作业任务注册和监控信息的暂存; - (3)定时任务分片:可以将原本一个较大任务分片成为多小的子任务项分别在多个服务器上同时执行,提高总任务的执行处理效率; - (4)弹性扩容缩容:运行中定时任务所在的服务器崩溃,或新增加n台作业服务器,作业框架将在下次任务执行前重新进行任务调度分发,不影响当前任务的处理与执行; - (5)支持多种任务模式:分别支持Simple、Dataflow和Script类型的定时任务。具体的任务模式后面会详细介绍; - (6)失效转移:运行中的定时任务所在的服务器崩溃不会导致重新分片,会在下次定时任务启动时重新分发和调度; - (7)运行时定时任务状态收集:监控任务运行时的状态,统计最近一段时间任务处理成功和失败的数量,记录作业上次运行开始时间,结束时间和下次运行时间; - (8)支持配置定时任务停止、恢复和禁用:用于操作定时任务的启停,并可以禁止某任务的执行; - (9)Spring支持:Elastic-Job-Lite项目完美支持spring的容器,自定义命名空间,支持占位符 - (10)运维平台:提供运维界面,方便开发和运维人员管理生产环境上已经发布的定时任务和注册中心;