# ml-demo **Repository Path**: ggtool/ml-demo ## Basic Information - **Project Name**: ml-demo - **Description**: 一套大数据学习框架,可以拿来直接用。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-07-09 - **Last Updated**: 2024-07-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 1. 大数据开发基础框架 项目的整体组织结构 ![](./img/project.png) ---------------- | 目录 | 说明 | |-------------------|------------------------------------------| | annotation | 自定义注解Runner和Task。 | | app | 用来放整个项目的各个任务。
test1和test2是具体开发的业务任务。 | | base | BaseRunner和BaseTask是两个基础类 | | enums | 用来定义任务的别名 | | FeatureContextApp | 主类在目录中的位置保持不变,如果移动,会影响扫描task和Runner | # 2. 类说明 ## 2.1 公共接口类 ```scala package com.king.ml.base import com.king.ml.enums.TaskNameEnum import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.joda.time.DateTime import org.springframework.util.StopWatch import scala.util.{Failure, Success, Try} trait BaseTask extends Logging with Serializable { def taskName: TaskNameEnum.Value def initConf(sparkConf: SparkConf = new SparkConf()): SparkConf = sparkConf var runtime: StopWatch = _ def around(implicit spark: SparkSession, currDate: DateTime = DateTime.now): Unit = { before Try { Class.forName(spark.conf.get("task.runner")) .newInstance() .asInstanceOf[BaseRunner] .run } match { case Success(_) => after case Failure(_) => afterThrowException } } private def before(implicit spark: SparkSession, currDate: DateTime): Unit = { val taskName = spark.conf.get("task.runner") println("开始执行任务 ...["+taskName+"]") runtime = new StopWatch(taskName) runtime.start(taskName) } private def after(implicit spark: SparkSession, currDate: DateTime): Unit = { val taskName = spark.conf.get("task.runner") runtime.stop() println("任务执行结束 ...["+ taskName+"],共耗时:" + runtime.getTotalTimeSeconds +"秒") } private def afterThrowException(implicit spark: SparkSession, currDate: DateTime): Unit = { val taskName = spark.conf.get("task.runner") runtime.stop() println("任务执行异常 ...[" + taskName + "],共耗时:" + runtime.getTotalTimeSeconds + "秒") } } ``` 通过一个公共的接口记录每个任务执行的具体日志信息。 ![](./img/tasklog.png) ## 2.2 TaskNameEnum指定每个任务的名称 ```SCALA object TaskNameEnum extends Enumeration { def getEnumType(source:String):TaskNameEnum.Value = { val values =TaskNameEnum.values.toList.filter(_.toString.toUpperCase == source.toUpperCase) values.length match { case 1 => values.head case _ => throw new IllegalArgumentException("该任务不存在") } } val Test1 = Value("ods.ods_test1") val Test2 = Value("ods.ods_test2") } ``` 这里的Test1和Test2表示任务的名称。 ## 2.3 TaskRunner中编写任务的业务逻辑 ```scala package com.king.ml.app.test1 import com.king.ml.annotation.Runner import com.king.ml.base.BaseRunner import com.king.ml.enums.TaskNameEnum import org.apache.spark.sql.SparkSession import org.joda.time.DateTime @Runner class Test1TaskRunner extends BaseRunner{ override def taskName: TaskNameEnum.Value = TaskNameEnum.Test1 override def run(implicit spark: SparkSession, currDate: DateTime): Unit = { val cnt = spark.table("ods.ods_test1").count() println("===>总记录数为:") println("===>" + cnt) } } ``` # 3. 任务执行脚本 在执行脚本中,任务主程序名不需要改变,只需要给任务传参枚举中任务名的值即可。 ```shell spark-submit \ --name 'test-ml' \ --master yarn \ --deploy-mode client \ --conf spark.port.maxRetries=100 \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.yarn.executor.memoryOverhead=5120 \ --queue root.production \ --driver-memory 2g --num-executors 2 --executor-memory 2g --executor-cores 1 \ --class com.king.ml.app.FeatureContextApp \ ./ml/ml-demo.jar "ods.ods_test1" ```