# gmall_Flink_realtime **Repository Path**: eliyson/gmall_Flink_realtime ## Basic Information - **Project Name**: gmall_Flink_realtime - **Description**: 电商实时数仓demon - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-09-23 - **Last Updated**: 2022-11-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README > Flink实时数仓学习笔记 > 代码仓库地址:[https://gitee.com/eliyson/gmall_Flink_realtime.git](https://gitee.com/eliyson/gmall_Flink_realtime.git) # 原始数据层ODS > ODS层要点:实时数仓采集至Kafka的日志主题topic_log和业务数据主题topic_db即为ODS层的数据 > ODS层作用:对数据做原样展示及备份 > ODS层构建:日志主题数据的采集和离线数仓对日志记录的采集一致,业务主题数据的采集可以选择和离线数仓对业务增量数据采集的方案,也可以使用更高效的Flink CDC方案 ## 日志主题数据采集 ![日志数据采集](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1656221771870-aff2aa30-af02-46e9-8200-9106e590316e.svg "日志数据采集") ```shell myzookeeper.sh start # 开启zookeeper mykafka.sh start # 开启Kafka f1.sh start # 开启hadoop102/hadoop103的flume,用于将日志数据写入Kafka lg.sh # 脚本执行hadoop102/hadoop103的jar包,生成日志[持续生成,需手动关闭kill] # 启动消费者测试 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log ``` ## 业务主题数据采集/传统方式 ![业务数据采集](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1656241388415-aef46a38-3a11-4e4a-bbd6-b03c1daced54.svg "业务数据采集") > Notice:在Maxwell的配置文件config.properties中需指定Kafka Producer按照表分区`producer_partition_by=table`,原因是Maxwell往主题的多个分区写数据默认是轮询方式,会造成数据乱序,指定按数据所在的表来决定数据写入哪个固定的分区,可以保证数据的有序性。但这样容易导致数据倾斜,可以在添加Source时将并行度调整为Kafka主题分区数,如后续处理对顺序没有要求,再使用rebalance重分区 ```shell mymaxwell.sh start # 开启Maxwell mysql_to_kafka_realtime_init.sh all # 首次全量同步 cd /opt/module/dblog java -jar gmall2020-mock-db-2021-11-14.jar # 运行模拟业务数据的jar包 # 启动消费者测试 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db ``` ```shell #!/bin/bash # 该脚本的作用是初始化所有的增量表[实时数仓采集的表],只需执行一次 MAXWELL_HOME=/opt/module/maxwell-1.29.2 import_data() { $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties } case $1 in "activity_info") import_data activity_info ;; "activity_rule") import_data activity_rule ;; "activity_sku") import_data activity_sku ;; "base_category1") import_data base_category1 ;; "base_category2") import_data base_category2 ;; "base_category3") import_data base_category3 ;; "base_province") import_data base_province ;; "base_region") import_data base_region ;; "base_trademark") import_data base_trademark ;; "coupon_info") import_data coupon_info ;; "coupon_range") import_data coupon_range ;; "financial_sku_cost") import_data financial_sku_cost ;; "sku_info") import_data sku_info ;; "spu_info") import_data spu_info ;; "user_info") import_data user_info ;; "all") import_data activity_info import_data activity_rule import_data activity_sku import_data base_category1 import_data base_category2 import_data base_category3 import_data base_province import_data base_region import_data base_trademark import_data coupon_info import_data coupon_range import_data financial_sku_cost import_data sku_info import_data spu_info import_data user_info ;; esac ``` ## *业务主题数据采集/Flink CDC方式 Flink CDC 的介绍: Flink CDC 的使用文档: CDC 全称是 Change Data Capture ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC。 ![CDC.png](https://cdn.nlark.com/yuque/0/2022/png/23152645/1663469722677-1e5146a9-ab50-4f8d-81d0-7a684078d6b6.png) 一个典型的传统ETL架构模式可能是这样的: 通过 Debezium 订阅业务库 MySQL 的 Binlog 传输至 Kafka ,Flink 通过创建 Kafka 表指定 format 格式为 debezium-json ,然后通过 Flink 进行计算后或者直接插入到其他外部数据存储系统。这种架构最大的缺点是采集端组件过多导致维护繁杂。 ![ea195acd250d491ca9e55e67be428006.png](https://cdn.nlark.com/yuque/0/2022/png/23152645/1663470097153-32195b14-db84-4c1a-9c56-8bfcc0639f6a.png "传统ETL架构模式") flink-cdc-connectors 可以用来替换 Debezium+Kafka 的数据采集模块,从而实现 Flink SQL 采集+计算+传输(ETL)一体化。 ![6231a71c5b78493783bc0f3c024d9e65.png](https://cdn.nlark.com/yuque/0/2022/png/23152645/1663470327007-ff9b00a1-9dce-4a88-a0a0-ee61f29a4bbe.png "CDC解决方案") # 公共维度层DIM > DIM层要点:DIM层表用于维度关联,要通过主键关联获取相应的维度信息,一般选择Hbase存储维度数据(使用Redis存储对内存要求较高);维度信息从业务数据主题提取,从Kafka读取业务数据主题的数据流,通过在MySQL构建配置表,使用Flink CDC将配置表信息也读取至程序中,连接后写入HBase的不同维度表中,实现动态分流 > DIM层作用:存储维度模型的维度表 > DIM层构建:确定维度➡️构建配置表➡️使用Flink CDC读取配置表创建广播流➡️连接流并分流处理➡️维度数据写入Phoenix > Notice:作用范围为provided的依赖在本地调试不被引入项目,需在Idea中的_Run/ Edit Configurations_配置_Add dependencies with “provided” scope to classpath_ [table_process.xls](https://www.yuque.com/attachments/yuque/0/2022/xls/23152645/1664724895166-8cb4f4ef-406d-45dd-a133-3ce7debac85a.xls?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Fxls%2F23152645%2F1664724895166-8cb4f4ef-406d-45dd-a133-3ce7debac85a.xls%22%2C%22name%22%3A%22table_process.xls%22%2C%22size%22%3A29696%2C%22type%22%3A%22application%2Fvnd.ms-excel%22%2C%22ext%22%3A%22xls%22%2C%22source%22%3A%22%22%2C%22status%22%3A%22done%22%2C%22mode%22%3A%22title%22%2C%22download%22%3Atrue%2C%22taskId%22%3A%22u6c5cf502-6218-4cd1-be5a-c487c16fdcb%22%2C%22taskType%22%3A%22upload%22%2C%22__spacing%22%3A%22both%22%2C%22id%22%3A%22udf610c6e%22%2C%22margin%22%3A%7B%22top%22%3Atrue%2C%22bottom%22%3Atrue%7D%2C%22card%22%3A%22file%22%7D) ![Snipaste_2022-10-02_23-33-56.png](https://cdn.nlark.com/yuque/0/2022/png/23152645/1664724899028-f461d56e-71e8-4a70-a19d-54c3cae26c9c.png "实时数仓配置表信息") ## DIM层数据流图 ![DIM层流程.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1663559199862-ce2d928b-820b-4027-9d05-e25fa44f663f.svg) ## DIM层主体实现代码 ```java public abstract class BaseApp { public void process(int port, int parallelism, String jobname, String topic) { //参数: //port:应用端口,parallelism:应用并行度,jobname:检查点保存位置 //groupid:消费者组ID/与jobname保持一致,topic:主题 String groupid = jobname; //environment System.setProperty("HADOOP_USER_NAME", "eliyson"); Configuration conf = new Configuration(); conf.setInteger("rest.port", port); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(parallelism); //设置checkpoint|statebackend env.enableCheckpointing(5000); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall/" + jobname); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointTimeout(60000); //kafka Source DataStreamSource ds = env.addSource(FlinkSourceUtil.getKafkaSource(groupid, topic)); //Transform handle(env,ds); //execute try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } //抽象方法:流转换逻辑 protected abstract void handle(StreamExecutionEnvironment env,DataStreamSource ds); } ``` ```java public class DimApp extends BaseApp { public static void main(String[] args) { new DimApp().process(2001, 2, "DimApp", Constant.TOPIC_ODS_DB); } //重写流处理的方法 @Override protected void handle(StreamExecutionEnvironment env, DataStreamSource ds) { //ETL:数据清洗 DataStream dataStream = etl1(ds); //读取存储在MySQL的配置信息:CDC方式 DataStream tableProcessDataStream = readTableProcess(env); //创建phoenix中的表 createDimTable(tableProcessDataStream); //数据流和配置流的connect,配置流转换为广播流,并处理数据 SingleOutputStreamOperator> connectds = connect(dataStream, tableProcessDataStream); //过滤不需要的字段 SingleOutputStreamOperator> res = etl2(connectds); //写入phoenix相应的维度表 writeToPhoenix(res); } private void writeToPhoenix(SingleOutputStreamOperator> res) { res.addSink(new PhoenixSink()); } private SingleOutputStreamOperator> etl2(SingleOutputStreamOperator> connectds) { //配置表中声明每个维度表所需要的字段,可以将不指定的字段剔除 SingleOutputStreamOperator> res = connectds.map(tuple -> { List columnlist = Arrays.asList(tuple.f1.getSinkColumns().split(",")); JSONObject data = tuple.f0; //保留配置表指定的字段和op字段 data.keySet().removeIf(key -> !columnlist.contains(key) && !"op".equals(key)); return tuple; }).returns(new TypeHint>() { }); return res; } private SingleOutputStreamOperator> connect(DataStream dataStream, DataStream tableProcessDataStream) { MapStateDescriptor tableProcessMapStateDescriptor = new MapStateDescriptor<>("tpstate", String.class, TableProcess.class); BroadcastStream bds = tableProcessDataStream.broadcast(tableProcessMapStateDescriptor); SingleOutputStreamOperator> res = dataStream.connect(bds).process(new BroadcastProcessFunction>() { @Override public void processElement(JSONObject jsonObject, ReadOnlyContext readOnlyContext, Collector> collector) throws Exception { ReadOnlyBroadcastState broadcastState = readOnlyContext.getBroadcastState(tableProcessMapStateDescriptor); TableProcess tp = broadcastState.get(jsonObject.getString("table")); if (tp != null) { JSONObject data = jsonObject.getJSONObject("data"); data.put("op", jsonObject.getString("type")); collector.collect(Tuple2.of(data, tp)); } } @Override public void processBroadcastElement(TableProcess tp, Context context, Collector> collector) throws Exception { BroadcastState broadcastState = context.getBroadcastState(tableProcessMapStateDescriptor); String key = tp.getSourceTable(); broadcastState.put(key, tp); } }); return res; } private void createDimTable(DataStream tableProcessDataStream) { //根据TableProcess对象中的OP属性确定对表的操作 //op=d: 删除表 //op=r|c: 新建表 //op=u: 删表后再建表 tableProcessDataStream.map(new RichMapFunction() { private Connection conn; @Override public void open(Configuration parameters) throws Exception { //获取Phoenix连接 conn = JDBCUtil.getPhoenixConnection(); } @Override public void close() throws Exception { //关闭Phoenix连接 JDBCUtil.closeConnection(conn); } @Override public String map(TableProcess tp) throws Exception { if (!"dim".equals(tp.getSinkType())) { return "非DIM配置信息,不进行操作"; } String str = null; switch (tp.getOp()) { case "d": dropTable(tp); str = "删除Phoniex的维度表:" + tp.getSinkTable(); break; case "r": case "c": createTable(tp); str = "创建Phoniex的维度表:" + tp.getSinkTable(); break; case "u": dropTable(tp); createTable(tp); str = "更新Phoniex的维度表:" + tp.getSinkTable(); break; default: str = "未知的OP操作类型!"; } System.out.println(str); //打印日志 return str; } private void dropTable(TableProcess tp) throws Exception { //防止长连接被关闭 if (conn.isClosed()) { conn = JDBCUtil.getPhoenixConnection(); } String sql = "drop table if exists " + tp.getSinkTable(); //注意在exists后面加空格! PreparedStatement ps = conn.prepareStatement(sql); ps.execute(); conn.commit(); ps.close(); } private void createTable(TableProcess tp) throws Exception { //防止长连接被关闭 if (conn.isClosed()) { conn = JDBCUtil.getPhoenixConnection(); } //Phoenix建表: create table if not exist T(id varchar,value varchar,constraint pk primary key(id)) salt_buckets=4; //salt_buckets是在hbase中预分区,避免region的自动分裂影响hbase的运行性能 StringBuilder sql = new StringBuilder(); //拼接字段较多,使用stringbuilder提高效率 sql.append("create table if not exists ").append(tp.getSinkTable()).append("(") .append(tp.getSinkColumns().replaceAll("[^,]+", "$0 varchar")) .append(", constraint pk primary key(").append(tp.getSinkPk() == null ? "id" : tp.getSinkPk()).append("))") .append(tp.getSinkExtend() == null ? "" : tp.getSinkExtend()); PreparedStatement ps = conn.prepareStatement(sql.toString()); ps.execute(); conn.commit(); ps.close(); } }); } private DataStream readTableProcess(StreamExecutionEnvironment env) { //使用Flink CDC实时读取配置表信息【动态读取配置】,并处理 //mysql-cdc connector只能设置一个并行度 //1.创建连接 Properties properties = new Properties(); properties.setProperty("useSSL", "false"); MySqlSource sqlSource = MySqlSource.builder() .hostname("hadoop102").port(3306).databaseList("gmall_config") .tableList("gmall_config.table_process") .username("root").password("mysql") .jdbcProperties(properties) //可以用properties传入额外的参数 .deserializer(new JsonDebeziumDeserializationSchema()) .build(); //Flink CDC读出的数据是json字符串,数据有以下4种情形: //首次读取: before=null,after=现在的数据,op=r //新增数据: before=null,after=新增数据,op=c //删除数据: before=删除之前的数据,after=null,op=d //更新数据: 更新主键数据-->先删除后新增-->可归为新增|删除数据的情况 //更新数据: 更新非主键数据-->before=更新前数据,after=更新后数据,op=u //2.处理:封装 return env.fromSource(sqlSource, WatermarkStrategy.noWatermarks(), "sqlSource") .map(str -> { JSONObject jsonObject = JSON.parseObject(str); TableProcess tableProcess = null; if ("d".equals(jsonObject.getString("op"))) { tableProcess = jsonObject.getObject("before", TableProcess.class); } else { //r|c|u tableProcess = jsonObject.getObject("after", TableProcess.class); } tableProcess.setOp(jsonObject.getString("op")); return tableProcess; }); } private DataStream etl1(DataStreamSource ds) { //对Maxwell同步过来的数据进行清洗: //1.过滤不是目标数据库的数据 //2.掐头去尾,去掉Maxwell首次同步的头尾标志 //3.将首次同步标识bootstrap-insert改为insert return ds.map(JSON::parseObject) .filter(jsonObject -> { String type = jsonObject.getString("type"); String database = jsonObject.getString("database"); boolean tag = !"gmall".equals(database) || "bootstrap-start".equals(type) || "bootstrap-complete".equals(type); return !tag; }) .map(jsonObject -> { String type = jsonObject.getString("type"); if ("bootstrap-insert".equals(type)) jsonObject.put("type", "insert"); return jsonObject; }); } } ``` ## DIM层程序测试 ```shell # 开启各组件 myzookeeper.sh start myhadoop.sh start mykafka.sh start mymaxwell.sh start start-hbase.sh # 启动DIM程序,测试可以依次在流处理各环节添加print打印输出,查看流中的数据 # 手动同步user_info配置表 bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties # 在Phoenix的shell操作界面查看数据写入情况,可修改配置表信息查看Phoenix中的表是否变化 cd /opt/module/phoenix-5.0.0-HBase-2.0/bin/ ./sqlline.py # ! tables # select * from DIM_USER_INFO; # ! describe DIM_USER_INFO ``` # 明细数据层DWD > DWD层要点:如果为多张业务表连接形成的事实表,则单独编写APP程序处理;由单张业务表简单生成的事实表可以使用一个APP进行分流。DWD层的事实表可以来源业务表操作和日志文件 > DWD层作用:存储维度模型的事实表 > DWD层构建:根据业务总线矩阵确定事实➡️来源于日志文件的事实表,使用分流处理生成➡️来源于业务表变更操作的事实表,按连接的复杂程度选择单独APP或分流APP处理(需构建配置表)生成➡️写入Kafka的业务主题表 ## DWD层数据流图 1. **_流量域事务事实表[日志分流]_** > 1. 流量域页面浏览事务事实表 > 2. 流量域启动事务事实表 > 3. 流量域动作事务事实表 > 4. 流量域曝光事务事实表 > 5. 流量域错误事务事实表 ![DWD层流量域事务事实表流程.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1663991977072-8b2b410e-f8b4-40dd-8417-1daa6744e668.svg) 2. **_事实表动态分流[单张业务表简单生成]_** > 1. 工具域优惠券领取事务事实表 > 2. 工具域优惠券使用(下单)事务事实表 > 3. 工具域优惠券使用(支付)事务事实表 > 4. 互动域收藏商品事务事实表 > 5. 互动域评价事务事实表 > 6. 用户域用户注册事务事实表 ![事实表动态分流.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1663992411817-8cace144-a22c-4a7a-87ce-ac4ee6069217.svg) 3. _**交易域下单事务事实表[多张业务表连接生成]**_ > 下单事务事实表:关联订单明细表、订单表、订单明细活动关联表、订单明细优惠券关联表 ![交易域下单事务事实表.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1663993765907-27533c71-6323-4864-a03e-5549322f519f.svg) ## DWD层主体实现代码 **_流量域事务事实表&交易域下单事务事实表,其它详见文首仓库_** ```java public class DwdBaseLogApp extends BaseApp { private final String START = "start"; private final String DISPLAY = "display"; private final String ACTION = "action"; private final String PAGE = "page"; private final String ERR = "err"; public static void main(String[] args) { new DwdBaseLogApp().process(3001, 2, "DwdBaseLogApp", Constant.TOPIC_ODS_LOG); } @Override protected void handle(StreamExecutionEnvironment env, DataStreamSource ds) { //数据清洗:json格式的文本在传输过程中丢失 DataStream ds1 = etl1(ds); //前端埋点采集的is_new新客户标识可能存在错误:将老客户标识为新客户 SingleOutputStreamOperator ds2 = etl2(ds1); //分流:使用侧输出流【启动|曝光|活动|页面|错误】 Map> map = splitStream(ds2); //写入Kafka不同的主题 writeToKafka(map); } private void writeToKafka(Map> map) { map.get(START).map(JSONAware::toJSONString).addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_START)); map.get(DISPLAY).map(JSONAware::toJSONString).addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_DISPLAY)); map.get(ACTION).map(JSONAware::toJSONString).addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ACTION)); map.get(PAGE).map(JSONAware::toJSONString).addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_PAGE)); map.get(ERR).map(JSONAware::toJSONString).addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ERR)); } private Map> splitStream(DataStream ds) { //定义侧输出流标签 OutputTag displayTag = new OutputTag("display") { }; //曝光 OutputTag actionTag = new OutputTag("action") { }; //活动 OutputTag pageTag = new OutputTag("err") { }; //页面 OutputTag errTag = new OutputTag("page") { }; //日志 SingleOutputStreamOperator res = ds.process(new ProcessFunction() { //启动 -> 主流输出 @Override public void processElement(JSONObject jsonObject, Context context, Collector collector) throws Exception { JSONObject err = jsonObject.getJSONObject("err"); if (err != null) { context.output(errTag, err); //将数据写入err侧输出流 jsonObject.remove("err"); //同时处理原数据,删除err部分内容 } JSONObject start = jsonObject.getJSONObject("start"); if (start != null) { //如果是启动日志,则直接写出 collector.collect(jsonObject); return; } JSONArray displays = jsonObject.getJSONArray("displays"); if (displays != null) { //分出曝光数据 displays.toJavaList(JSONObject.class).forEach(json -> context.output(displayTag, json)); jsonObject.remove("displays"); } JSONArray actions = jsonObject.getJSONArray("actions"); if (actions != null) { //分出活动数据 actions.toJavaList(JSONObject.class).forEach(json -> context.output(actionTag, json)); jsonObject.remove("actions"); } context.output(pageTag, jsonObject); //剩下的部分作为页面数据写出 } }); //返回各个输出流构成的map HashMap> map = new HashMap<>(); map.put(START, res); map.put(ERR, res.getSideOutput(errTag)); map.put(DISPLAY, res.getSideOutput(displayTag)); map.put(ACTION, res.getSideOutput(actionTag)); map.put(PAGE, res.getSideOutput(pageTag)); return map; } private SingleOutputStreamOperator etl2(DataStream ds) { //使用状态保存用户首次登录的日期,供后续比较 return ds.keyBy(json -> json.getJSONObject("common").getString("mid")) .map(new RichMapFunction() { private ValueState vs; //keyby 之后的状态针对每一个key有效 @Override public void open(Configuration parameters) throws Exception { vs = getRuntimeContext().getState(new ValueStateDescriptor("vs", String.class)); } @Override public JSONObject map(JSONObject jsonObject) throws Exception { String isNew = jsonObject.getJSONObject("common").getString("is_new"); Long ts = jsonObject.getLong("ts"); String date = TimeUtil.toDate(ts); String recordDate = vs.value(); //前端埋点只会出现一种错误:因重启或清除缓存导致将老用户标识为新用户 //TODO Flink应用启动之前的老用户如果被标识为新用户,则利用状态无法纠正 if (recordDate == null) { if ("1".equals(isNew)) { vs.update(TimeUtil.toDate(ts)); //新用户,保存状态 } else { //老用户,因为Flink应用启动时间比用户注册时间晚,没有保存 vs.update("1970-01-01"); //判断为老用户,直接将时间置为历史时间即可 } } else { if ("1".equals(isNew) && !recordDate.equals(TimeUtil.toDate(ts))) { jsonObject.getJSONObject("common").put("is_new", "0"); //修正 } } return jsonObject; } }); } private DataStream etl1(DataStreamSource ds) { //清除不符合json格式的数据,转为jsonobject格式 return ds.filter(str -> { try { JSON.parseObject(str); } catch (Exception e) { e.printStackTrace(); return false; } return true; }).map(JSON::parseObject); } } ``` ```java public class DwdTradeOrderDetail extends BaseSQLApp { //下单事务事实表:关联订单明细表、订单表、订单明细活动关联表、订单明细优惠券关联表 public static void main(String[] args) { new DwdTradeOrderDetail().process(3003, 2, "DwdTradeOrderDetail"); } @Override protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tenv) { //必须设置TTL,避免下面regular join的各表状态一直保留在内存中,运行时间长会导致OOM tenv.getConfig().setIdleStateRetention(Duration.ofSeconds(15)); //Flink SQL从topic_db中读取数据:表名为topic_db readOdsDb(tenv, "DwdTradeOrderDetail"); //读取字典表 readdic(tenv); //base_dic //业务表均只取insert的数据 //订单明细表 Table orderDetail = tenv.sqlQuery("select " + "data['id'] id, " + "data['order_id'] order_id, " + "data['sku_id'] sku_id, " + "data['sku_name'] sku_name, " + "data['create_time'] create_time, " + "data['source_id'] source_id, " + "data['source_type'] source_type, " + "data['sku_num'] sku_num, " + "cast(cast(data['sku_num'] as decimal(16,2)) * cast(data['order_price'] as decimal(16,2)) as String) split_original_amount, " + "data['split_total_amount'] split_total_amount, " + "data['split_activity_amount'] split_activity_amount, " + "data['split_coupon_amount'] split_coupon_amount, " + "ts, " + "pt " + "from topic_db where `database`='gmall' and `table`='order_detail' and `type`='insert'"); tenv.createTemporaryView("order_detail", orderDetail); //订单表 Table orderInfo = tenv.sqlQuery("select " + "data['id'] id," + "data['user_id'] user_id," + "data['province_id'] province_id " + "from topic_db where `database`='gmall' and `table`='order_info' and `type`='insert'"); tenv.createTemporaryView("order_info", orderInfo); //订单明细活动表 Table orderDetailActivity = tenv.sqlQuery("select " + "data['order_detail_id'] order_detail_id, " + "data['activity_id'] activity_id, " + "data['activity_rule_id'] activity_rule_id " + "from topic_db where `database`='gmall' and `table`='order_detail_activity' and `type`='insert'"); tenv.createTemporaryView("order_detail_activity", orderDetailActivity); //订单明细优惠劵表 Table orderDetailCoupon = tenv.sqlQuery("select " + "data['order_detail_id'] order_detail_id, " + "data['coupon_id'] coupon_id " + "from topic_db where `database`='gmall' and `table`='order_detail_coupon' and `type`='insert'"); tenv.createTemporaryView("order_detail_coupon", orderDetailCoupon); //join Table res = tenv.sqlQuery("select " + "od.id, " + "od.order_id, " + "oi.user_id, " + "od.sku_id, " + "od.sku_name, " + "oi.province_id, " + "act.activity_id, " + "act.activity_rule_id, " + "cou.coupon_id, " + "date_format(od.create_time, 'yyyy-MM-dd') date_id, " + "od.create_time, " + "od.source_id, " + "od.source_type source_type_code, " + "dic.dic_name source_type_name, " + "od.sku_num, " + "od.split_original_amount, " + "od.split_activity_amount, " + "od.split_coupon_amount, " + "od.split_total_amount, " + "od.ts, " + "current_row_timestamp() row_op_ts " + "from order_detail od left join order_info oi on od.order_id = oi.id " + "left join order_detail_activity act on od.id = act.order_detail_id " + "left join order_detail_coupon cou on od.id = act.order_detail_id " + "join base_dic for system_time as of od.pt as dic on od.source_type = dic.dic_code"); //写出至Kafka:upsert-kafka tenv.executeSql("create table dwd_trade_order_detail(" + "id string, " + "order_id string, " + "user_id string, " + "sku_id string, " + "sku_name string, " + "province_id string, " + "activity_id string, " + "activity_rule_id string, " + "coupon_id string, " + "date_id string, " + "create_time string, " + "source_id string, " + "source_type_code string, " + "source_type_name string, " + "sku_num string, " + "split_original_amount string, " + "split_activity_amount string, " + "split_coupon_amount string, " + "split_total_amount string, " + "ts bigint, " + "row_op_ts timestamp_ltz(3), " + "primary key(id) NOT ENFORCED" + ")" + SQLUtil.kafkaConnectorTemplate(Constant.TOPIC_DWD_TRADE_ORDER_DETAIL, true)); res.executeInsert("dwd_trade_order_detail"); } } ``` ```java public abstract class BaseSQLApp { public void process(int port, int parallelism, String jobname) { //参数: //port:应用端口,parallelism:应用并行度,jobname:检查点保存位置 //groupid:消费者组ID/与jobname保持一致,topic:主题 String groupid = jobname; //environment System.setProperty("HADOOP_USER_NAME", "eliyson"); Configuration conf = new Configuration(); conf.setInteger("rest.port", port); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.setParallelism(parallelism); //设置checkpoint|statebackend env.enableCheckpointing(5000); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall/" + jobname); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointTimeout(60000); //Source由实现类创建 //Transform handle(env, tenv); } //抽象方法:流转换逻辑 protected abstract void handle(StreamExecutionEnvironment env, StreamTableEnvironment tenv); //读取业务数据:增加了处理时间[用于lookup join] public void readOdsDb(StreamTableEnvironment tenv, String groupid) { tenv.executeSql("create table topic_db(`database` string,`table` string,`type` string," + "`ts` bigint,`data` map,`old` map," + "pt as proctime()) " + SQLUtil.kafkaConnectorTemplate(Constant.TOPIC_ODS_DB, groupid)); } //读取字典表 public void readdic(StreamTableEnvironment tenv) { tenv.executeSql("create temporary table base_dic (dic_code string,dic_name string,parent_code string," + "create_time timestamp,operate_time timestamp,primary key(dic_code) NOT ENFORCED) " + SQLUtil.mysqlConnectorTemplate("base_dic")); } } ``` ## DWD层程序测试 ```shell # 开启各组件 myzookeeper.sh start myhadoop.sh start mykafka.sh start # [与日志相关的App测试] f1.sh start # 开启读取日志数据写入Kafka的Flume程序 java -jar gmall2020-mock-log-2021-01-22.jar # 进入模拟日志数据jar包目录,模拟生成日志数据 # 查看程序流输出&是否正确写入Kafka主题 # [与DB相关的APP测试] mymaxwell.sh start java -jar gmall2020-mock-db-2021-11-14.jar # 进入模拟业务数据jar包目录,模拟生成业务数据 # 查看程序流输出&是否正确写入Kafka主题 ``` # 汇总数据层DWS > DWS层要点:对数据进行轻度汇总,方便ADS层的灵活查取 > DWS层作用:数据的轻度聚合 > DWS层构建:参考指标体系设计DWS层➡️简单的聚合基本都是过滤(保留要统计的记录或是去重)/封装(构建包含统计指标信息的POJO类)/聚合(小窗口轻度聚合)/写出(clickhouse);复杂的汇总可能包含连接多个维度的需求,主要流程是过滤/封装/聚合/补充维度信息/写出,其中补充维度信息可以采取优化措施,常用的优化是利用Redis作为旁路缓存,使用异步IO查询➡️写入Clickhouse的DWS层汇总表(需创建Clickhouse的汇总表) ## DWS层数据流图 1. **_流量域来源关键词粒度页面浏览各窗口汇总表【Flink SQL实现】_** ![流量域来源关键词粒度页面浏览各窗口汇总表.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1664759946550-ef33edfe-6785-4a0f-bd67-329bbd0531ae.svg) 2. **_流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表_** ![DWS层流量域版本-渠道-地区-用户类别粒度页面浏览各窗口汇总表.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1664761986120-1c4b4482-6aa3-4404-a07c-13169db299cf.svg) 3. **_交易域SKU粒度下单各窗口汇总表_** ![DWS层交易域SKU粒度下单各窗口汇总表.svg](https://cdn.nlark.com/yuque/0/2022/svg/23152645/1664762560263-ae446d3d-9f33-40b9-b794-3f8aa9ed8e67.svg) ## DWS层主体实现代码 1. **_流量域来源关键词粒度页面浏览各窗口汇总表_** ```java public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp { //流量域来源关键词粒度页面浏览各窗口汇总表:统计各窗口各关键词出现频次 public static void main(String[] args) { new DwsTrafficSourceKeywordPageViewWindow().process(4001, 2, "DwsTrafficSourceKeywordPageViewWindow"); } @Override protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tenv) { //读取页面日志 tenv.executeSql("create table dwd_page(" + "page map," + "ts bigint," + "et as to_timestamp_ltz(ts,3)," + "watermark for et as et - interval '3' second " + ")" + SQLUtil.kafkaConnectorTemplate(Constant.TOPIC_DWD_TRAFFIC_PAGE, "DwsTrafficSourceKeywordPageViewWindow")); //过滤搜索行为 Table t1 = tenv.sqlQuery("select page['item'] keyword,et " + "from dwd_page where page['last_page_id']='search' and page['item_type']='keyword' and page['item'] is not null"); tenv.createTemporaryView("t1", t1); //分词 tenv.createTemporaryFunction("ikanalyzer", WordSplit.class); Table t2 = tenv.sqlQuery("select kw,et from t1,lateral table(ikanalyzer(keyword))"); tenv.createTemporaryView("t2", t2); //开窗聚合:每隔5秒进行一次聚合,小批量聚合,给后续ADS层的查取提供灵活性 //调用 unix_timestamp() 函数获取以秒为单位的当前系统时间戳,转为毫秒(*1000),作为ClickHouse表的版本字段,用于数据去重 Table res = tenv.sqlQuery("select " + "date_format(window_start, 'yyyy-MM-dd HH:mm:ss') stt," + "date_format(window_end,'yyyy-MM-dd HH:mm:ss') edt," + "'search' source," + "kw keyword,count(*) keyword_count," + "unix_timestamp()*1000 ts " + "from table(tumble(table t2,descriptor(et),interval '5' second)) " + "group by kw,window_start,window_end"); //写出至clickhouse //res.execute().print(); //测试:打印结果 tenv.toRetractStream(res, KeywordBean.class) .filter(tuple -> tuple.f0) //只保留聚合的最新数据 .map(tuple -> tuple.f1) .addSink(ClickhouseSink.getClickhouseSink("dws_traffic_source_keyword_page_view_window", KeywordBean.class)); try { env.execute(); //因使用了流,要启动流执行 } catch (Exception e) { e.printStackTrace(); } } } ``` ```java @FunctionHint(output = @DataTypeHint("row")) public class WordSplit extends TableFunction { public void eval(String str) { if (str == null) { return; } List kwlist = ikSplit(str); kwlist.forEach(kw -> collect(Row.of(kw))); } private List ikSplit(String str) { //将字符串转为内存流 StringReader reader = new StringReader(str); ArrayList list = new ArrayList<>(); //智能分词 IKSegmenter segmenter = new IKSegmenter(reader, true); try { Lexeme lexeme = segmenter.next(); while (lexeme != null) { list.add(lexeme.getLexemeText()); lexeme = segmenter.next(); } } catch (IOException e) { e.printStackTrace(); } return new ArrayList<>(new HashSet<>(list)); //去重 } } ``` ```java public class ClickhouseSink { public static SinkFunction getClickhouseSink(String table, Class clazz) { String driver = ClickHouseDriver.class.getName(); String url = "jdbc:clickhouse://hadoop102:8123/gmall"; String user = "default"; String password = "clickhouse"; StringBuilder sql = new StringBuilder(); //insert into table(f1,f2,f3) values(?,?,?) List fieldnamelist = getClassFieldsName(clazz); sql.append("insert into ").append(table).append("(") //insert into 后注意添加空格! .append(String.join(",", fieldnamelist)).append(") values(") .append(String.join(",", Collections.nCopies(fieldnamelist.size(), "?"))).append(")"); return JdbcSink.sink(sql.toString(), new JdbcStatementBuilder() { @Override public void accept(PreparedStatement ps, T t) throws SQLException { //占位符赋值 Class clazz = t.getClass(); Field[] declaredFields = clazz.getDeclaredFields(); for (int i = 0, j = 1; i < declaredFields.length; i++) { //TODO POJO类的属性顺序和clickhouse中建表的字段顺序保持一致,需两边同时维护 Field field = declaredFields[i]; if (field.getAnnotation(NoNeedSink.class) != null) { //带NoNeedSink注解的属性不写出至clickhouse continue; } field.setAccessible(true); try { ps.setObject(j++, field.get(t)); } catch (IllegalAccessException e) { e.printStackTrace(); } } } }, new JdbcExecutionOptions.Builder() .withBatchIntervalMs(3000) .withBatchSize(1024) .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUsername(user) .withPassword(password) .withUrl(url) .withDriverName(driver) .build()); } private static List getClassFieldsName(Class clazz) { Field[] declaredFields = clazz.getDeclaredFields(); //获取的POJO类可能是驼峰命名方式的属性,也可能是下划线形式命名的属性,需统一转为下划线形式,才能写入clickhouse //过滤带有NoNeedSink注解的属性 return Arrays.stream(declaredFields) .filter(field -> field.getAnnotation(NoNeedSink.class) == null) .map(Field::getName) .map(fieldname -> CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, fieldname)) .collect(Collectors.toList()); } } ``` ```sql create database if not exists gmall; use gmall; drop table if exists dws_traffic_source_keyword_page_view_window; create table if not exists dws_traffic_source_keyword_page_view_window ( stt DateTime COMMENT '窗口起始时间', edt DateTime COMMENT '窗口结束时间', source String COMMENT '关键词来源', keyword String COMMENT '关键词', keyword_count UInt64 COMMENT '关键词出现频次', ts UInt64 COMMENT '时间戳' ) engine = ReplacingMergeTree(ts) partition by toYYYYMMDD(stt) order by (stt, edt, source, keyword); ``` 2. **_流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表_** ```java public class DwsTrafficVcChArIsNewPageViewWindow extends BaseApp { //流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表:会话数、页面浏览数、浏览总时长、独立访客数 public static void main(String[] args) { new DwsTrafficVcChArIsNewPageViewWindow().process(4002, 2, "DwsTrafficVcChArIsNewPageViewWindow", Constant.TOPIC_DWD_TRAFFIC_PAGE); } @Override protected void handle(StreamExecutionEnvironment env, DataStreamSource ds) { //封装数据 SingleOutputStreamOperator ds1 = parseBean(ds); //开窗聚合 SingleOutputStreamOperator res = agg(ds1); //写出至clickhouse writeToClickhouse(res); } private void writeToClickhouse(DataStream ds) { //ds.print(); //测试 ds.addSink(ClickhouseSink.getClickhouseSink("dws_traffic_vc_ch_ar_is_new_page_view_window",TrafficPageViewBean.class)); } private SingleOutputStreamOperator agg(DataStream ds) { return ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((bean, l) -> bean.getTs())) .keyBy(bean -> bean.getVc() + "_" + bean.getCh() + "_" + bean.getAr() + "_" + bean.getIsNew()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction() { @Override public TrafficPageViewBean reduce(TrafficPageViewBean t1, TrafficPageViewBean t2) throws Exception { t1.setUvCt(t1.getUvCt() + t2.getUvCt()); t1.setPvCt(t1.getPvCt() + t2.getPvCt()); t1.setSvCt(t1.getSvCt() + t2.getSvCt()); t1.setDurSum(t1.getDurSum() + t2.getDurSum()); return t1; } }, new ProcessWindowFunction() { @Override public void process(String key, Context context, Iterable iterable, Collector collector) throws Exception { TrafficPageViewBean bean = iterable.iterator().next(); bean.setStt(TimeUtil.toDateTime(context.window().getStart())); bean.setEdt(TimeUtil.toDateTime(context.window().getEnd())); bean.setTs(System.currentTimeMillis()); collector.collect(bean); } }); } private SingleOutputStreamOperator parseBean(DataStreamSource ds) { return ds.keyBy(str -> JSON.parseObject(str).getJSONObject("common").getString("uid")) .map(new RichMapFunction() { private ValueState uidState; //状态存储uid当日第一次访问记录 private ValueState dateState; //状态存储uid保存的日期 @Override public void open(Configuration parameters) throws Exception { uidState = getRuntimeContext().getState(new ValueStateDescriptor<>("uidState", String.class)); dateState = getRuntimeContext().getState(new ValueStateDescriptor<>("dateState", String.class)); } @Override public TrafficPageViewBean map(String str) throws Exception { JSONObject jsonObject = JSON.parseObject(str); JSONObject common = jsonObject.getJSONObject("common"); String vc = common.getString("vc"); String ch = common.getString("ch"); String ar = common.getString("ar"); String isNew = common.getString("is_new"); JSONObject page = jsonObject.getJSONObject("page"); long pvCt = 1L; //每条日志记录代表一次PV long durSum = page.getLong("during_time"); //通过是否有上个页面来判断新会话开启 long svCt = page.getString("last_page_id") == null ? 0L : 1L; //通过状态判断UV long uvCt = 0L; //首先判断状态是否过期,即当天为新的一天 String date = TimeUtil.toDate(jsonObject.getLong("ts")); if (!date.equals(dateState.value())) { //不等表示过期 uidState.clear(); dateState.update(date); } if (uidState.value() == null) { uvCt = 1L; uidState.update(common.getString("uid")); } return new TrafficPageViewBean("", "", vc, ch, ar, isNew, uvCt, svCt, pvCt, durSum, jsonObject.getLong("ts")); } }); } } ``` ```sql create database if not exists gmall; use gmall; drop table if exists dws_traffic_vc_ch_ar_is_new_page_view_window; create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window ( stt DateTime COMMENT '窗口起始时间', edt DateTime COMMENT '窗口结束时间', vc String COMMENT 'app版本号', ch String COMMENT '渠道', ar String COMMENT '地区', is_new String COMMENT '新老访客状态标记', uv_ct UInt64 COMMENT '独立访客数', sv_ct UInt64 COMMENT '会话数', pv_ct UInt64 COMMENT '页面浏览数', dur_sum UInt64 COMMENT '累计访问时长', ts UInt64 COMMENT '时间戳' ) engine = ReplacingMergeTree(ts) partition by toYYYYMMDD(stt) order by (stt, edt, vc, ch, ar, is_new); ``` 3. **_交易域SKU粒度下单各窗口汇总表【Redis旁路缓存 + 异步IO优化】_** ```java public class DwsTradeSkuOrderWindow extends BaseApp { //添加维度的模式 public static final String MODE_DIRECT_FROM_PHONEIX = "directFromPhoneix"; public static final String MODE_REDIS_CACHE = "redisCache"; public static final String MODE_REDIS_CACHE_AND_ASYNCIO = "redisCacheAndAsyncio"; //交易域SKU粒度下单各窗口汇总表 public static void main(String[] args) { new DwsTradeSkuOrderWindow().process(4009, 2, "DwsTradeSkuOrderWindow", Constant.TOPIC_DWD_TRADE_ORDER_DETAIL); } @Override protected void handle(StreamExecutionEnvironment env, DataStreamSource ds) { //处理:来源的DWD数据是使用retractstream写入的,存在回撤,因此取同一条记录会存在重复,需要去重 //去重方式:1.按记录keyby后使用session窗口,gay设置为最大延迟时间 //去重方式:2.使用定时器,定时时间设置为最大延迟时间,取ts时间戳最大的记录 //去重方式:3.仅需要左表数据,则直接取第一条即可[此处不适用,如果仅需左表数据,直接取左表也行] SingleOutputStreamOperator ds2 = etl(ds); //封装 SingleOutputStreamOperator ds3 = parsebean(ds2); //开窗聚合 SingleOutputStreamOperator ds4 = agg(ds3); //补充维度信息:旁路缓存|异步IO //直接使用phoenix连接取得维度数据,不考虑优化的方式 //SingleOutputStreamOperator res = addDim1(ds4); //SingleOutputStreamOperator res = addDim(ds4,MODE_DIRECT_FROM_PHONEIX); //使用redis作为旁路缓存,优化查找的效率 //SingleOutputStreamOperator res = addDim2(ds4); //SingleOutputStreamOperator res = addDim(ds4, MODE_REDIS_CACHE); //使用redis+异步IO的方式 //SingleOutputStreamOperator res = addDim3(ds4); SingleOutputStreamOperator res = addDim(ds4, MODE_REDIS_CACHE_AND_ASYNCIO); //写出至clickhouse //res.print(); //测试 writeToClickhouse(res); } private void writeToClickhouse(DataStream res) { res.addSink(ClickhouseSink.getClickhouseSink("dws_trade_sku_order_window", TradeSkuOrderBean.class)); } private SingleOutputStreamOperator addDim(DataStream ds, String mode) { switch (mode) { case MODE_DIRECT_FROM_PHONEIX: return addDim1(ds); case MODE_REDIS_CACHE: return addDim2(ds); case MODE_REDIS_CACHE_AND_ASYNCIO: return addDim3(ds); default: throw new RuntimeException("未知的运行模式!"); } } private SingleOutputStreamOperator addDim3(DataStream ds) { //对流进行异步处理 //补充skuInfo信息 SingleOutputStreamOperator skuInfoStream = AsyncDataStream.unorderedWait(ds, new DimAsyncFunction() { @Override protected String getTable() { return "dim_sku_info"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getSkuId(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject skuInfo) { bean.setSkuName(skuInfo.getString("SKU_NAME")); bean.setSpuId(skuInfo.getString("SPU_ID")); bean.setTrademarkId(skuInfo.getString("TM_ID")); bean.setCategory3Id(skuInfo.getString("CATEGORY3_ID")); } }, 60, TimeUnit.SECONDS); //补充spuInfo信息 SingleOutputStreamOperator spuInfoStream = AsyncDataStream.unorderedWait(skuInfoStream, new DimAsyncFunction() { @Override protected String getTable() { return "dim_spu_info"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getSpuId(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject spuInfo) { bean.setSpuName(spuInfo.getString("SPU_NAME")); } }, 60, TimeUnit.SECONDS); //timeout:等待数据获取的超时时间,超过该时间则不予处理 //补充base_trademark SingleOutputStreamOperator baseTrademarkStream = AsyncDataStream.unorderedWait(spuInfoStream, new DimAsyncFunction() { @Override protected String getTable() { return "dim_base_trademark"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getTrademarkId(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject baseTrademark) { bean.setTrademarkName(baseTrademark.getString("TM_NAME")); } }, 60, TimeUnit.SECONDS); //补充三级品类信息 SingleOutputStreamOperator category3Stream = AsyncDataStream.unorderedWait(baseTrademarkStream, new DimAsyncFunction() { @Override protected String getTable() { return "dim_base_category3"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getCategory3Id(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject category3) { bean.setCategory3Name(category3.getString("NAME")); bean.setCategory2Id(category3.getString("CATEGORY2_ID")); } }, 60, TimeUnit.SECONDS); //补充二级品类信息 SingleOutputStreamOperator category2Stream = AsyncDataStream.unorderedWait(category3Stream, new DimAsyncFunction() { @Override protected String getTable() { return "dim_base_category2"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getCategory2Id(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject category2) { bean.setCategory2Name(category2.getString("NAME")); bean.setCategory1Id(category2.getString("CATEGORY1_ID")); } }, 60, TimeUnit.SECONDS); //补充一级品类信息 return AsyncDataStream.unorderedWait(category2Stream, new DimAsyncFunction() { @Override protected String getTable() { return "dim_base_category1"; } @Override protected String getId(TradeSkuOrderBean bean) { return bean.getCategory1Id(); } @Override protected void addDim(TradeSkuOrderBean bean, JSONObject category1) { bean.setCategory1Name(category1.getString("NAME")); } }, 60, TimeUnit.SECONDS); } private SingleOutputStreamOperator addDim2(DataStream ds) { //从phoenix中将维度数据读出并补充,使用redis作为旁路缓存优化 return ds.map(new RichMapFunction() { private Connection conn; //在open方法创建连接,避免频繁与数据库连接 private Jedis redisClient; @Override public void open(Configuration parameters) throws Exception { conn = DruidDSUtil.getDataSource().getConnection(); redisClient = RedisUtil.getRedisClient(); } @Override public void close() throws Exception { if (conn != null) { conn.close(); } if (redisClient != null) { redisClient.close(); } } @Override public TradeSkuOrderBean map(TradeSkuOrderBean bean) throws Exception { //select * from T where id = ? //补充sku_info表的信息 JSONObject skuInfo = DimUtil.readDimFromRedis(redisClient, conn, "dim_sku_info", bean.getSkuId()); bean.setSkuName(skuInfo.getString("SKU_NAME")); bean.setSpuId(skuInfo.getString("SPU_ID")); bean.setTrademarkId(skuInfo.getString("TM_ID")); bean.setCategory3Id(skuInfo.getString("CATEGORY3_ID")); //补充base_trademark表的信息 JSONObject baseTrademark = DimUtil.readDimFromRedis(redisClient, conn, "dim_base_trademark", bean.getTrademarkId()); bean.setTrademarkName(baseTrademark.getString("TM_NAME")); //补充spu_info表的信息 JSONObject spuInfo = DimUtil.readDimFromRedis(redisClient, conn, "dim_spu_info", bean.getSpuId()); bean.setSpuName(spuInfo.getString("SPU_NAME")); //补充三级品类信息 JSONObject category3 = DimUtil.readDimFromRedis(redisClient, conn, "dim_base_category3", bean.getCategory3Id()); bean.setCategory3Name(category3.getString("NAME")); bean.setCategory2Id(category3.getString("CATEGORY2_ID")); //补充二级品类信息 JSONObject category2 = DimUtil.readDimFromRedis(redisClient, conn, "dim_base_category2", bean.getCategory2Id()); bean.setCategory2Name(category2.getString("NAME")); bean.setCategory1Id(category2.getString("CATEGORY1_ID")); //补充一级品类信息 JSONObject category1 = DimUtil.readDimFromRedis(redisClient, conn, "dim_base_category1", bean.getCategory1Id()); bean.setCategory1Name(category1.getString("NAME")); return bean; } }); } private SingleOutputStreamOperator addDim1(DataStream ds) { //从phoenix中将维度数据读出并补充 return ds.map(new RichMapFunction() { private Connection conn; //在open方法创建连接,避免频繁与数据库连接 @Override public void open(Configuration parameters) throws Exception { conn = DruidDSUtil.getDataSource().getConnection(); } @Override public void close() throws Exception { if (conn != null) { conn.close(); //归还连接 } } @Override public TradeSkuOrderBean map(TradeSkuOrderBean bean) throws Exception { //select * from T where id = ? //补充sku_info表的信息 JSONObject skuInfo = DimUtil.readDimFromPhoenix(conn, "dim_sku_info", bean.getSkuId()); bean.setSkuName(skuInfo.getString("SKU_NAME")); bean.setSpuId(skuInfo.getString("SPU_ID")); bean.setTrademarkId(skuInfo.getString("TM_ID")); bean.setCategory3Id(skuInfo.getString("CATEGORY3_ID")); //补充base_trademark表的信息 JSONObject baseTrademark = DimUtil.readDimFromPhoenix(conn, "dim_base_trademark", bean.getTrademarkId()); bean.setTrademarkName(baseTrademark.getString("TM_NAME")); //补充spu_info表的信息 JSONObject spuInfo = DimUtil.readDimFromPhoenix(conn, "dim_spu_info", bean.getSpuId()); bean.setSpuName(spuInfo.getString("SPU_NAME")); //补充三级品类信息 JSONObject category3 = DimUtil.readDimFromPhoenix(conn, "dim_base_category3", bean.getCategory3Id()); bean.setCategory3Name(category3.getString("NAME")); bean.setCategory2Id(category3.getString("CATEGORY2_ID")); //补充二级品类信息 JSONObject category2 = DimUtil.readDimFromPhoenix(conn, "dim_base_category2", bean.getCategory2Id()); bean.setCategory2Name(category2.getString("NAME")); bean.setCategory1Id(category2.getString("CATEGORY1_ID")); //补充一级品类信息 JSONObject category1 = DimUtil.readDimFromPhoenix(conn, "dim_base_category1", bean.getCategory1Id()); bean.setCategory1Name(category1.getString("NAME")); return bean; } }); } private SingleOutputStreamOperator agg(DataStream ds) { return ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((bean, l) -> bean.getTs())) .keyBy(TradeSkuOrderBean::getSkuId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction() { //模拟数据的事件时间集中在某一时刻,窗口可能不会关闭导致没数据输出,可以进行2次数据模拟 @Override public TradeSkuOrderBean reduce(TradeSkuOrderBean value1, TradeSkuOrderBean value2) throws Exception { //将同个sku的各条记录汇总 value1.setOriginalAmount(value1.getOriginalAmount().add(value2.getOriginalAmount())); value1.setActivityAmount(value1.getActivityAmount().add(value2.getActivityAmount())); value1.setCouponAmount(value1.getCouponAmount().add(value2.getCouponAmount())); value1.getOrderIdSet().addAll(value2.getOrderIdSet()); return value1; } }, new ProcessWindowFunction() { @Override public void process(String skuId, Context context, Iterable iterable, Collector collector) throws Exception { //补充窗口相关的信息和指标 TradeSkuOrderBean bean = iterable.iterator().next(); bean.setStt(TimeUtil.toDateTime(context.window().getStart())); bean.setEdt(TimeUtil.toDateTime(context.window().getEnd())); bean.setTs(System.currentTimeMillis()); bean.setOrderCount((long) bean.getOrderIdSet().size()); collector.collect(bean); } }); } private SingleOutputStreamOperator parsebean(DataStream ds) { //将POJO类设计为建造者模式,使用链式编程方便属性赋值 //商品不一定参加活动或有优惠劵,所以这两个字段可能获取到null值,使用Optional类处理 //此处不补充维度信息,等聚合后数据量减少再补充维度信息 return ds.map(json -> TradeSkuOrderBean.builder() .skuId(json.getString("sku_id")) .orderIdSet(new HashSet<>(Collections.singleton(json.getString("order_id")))) .originalAmount(json.getBigDecimal("split_original_amount")) .activityAmount(Optional.ofNullable(json.getBigDecimal("split_activity_amount")).orElse(new BigDecimal(0))) .couponAmount(Optional.ofNullable(json.getBigDecimal("split_coupon_amount")).orElse(new BigDecimal(0))) .orderAmount(json.getBigDecimal("split_total_amount")) .ts(json.getLong("ts") * 1000) .build()); } private SingleOutputStreamOperator etl(DataStreamSource ds) { return ds.map(JSON::parseObject).keyBy(json -> json.getString("id")) .process(new KeyedProcessFunction() { private ValueState vs; @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { out.collect(vs.value()); } @Override public void open(Configuration parameters) throws Exception { vs = getRuntimeContext().getState(new ValueStateDescriptor("vs", JSONObject.class)); } @Override public void processElement(JSONObject jsonObject, Context context, Collector collector) throws Exception { //考虑数据到来的乱序影响,要比较到来的数据和状态中保留的数据的时间,取时间最大的记录 if (vs.value() == null) { //记录首次到来,则直接保存进状态,并注册定时器:定义在首条记录到达的事件时间5s后处理 vs.update(jsonObject); context.timerService().registerProcessingTimeTimer(jsonObject.getLong("ts") * 1000 + 5000); } else { //非首次到来的记录:比较时间戳,写入kafka的时间戳会带有Z的标记,要特殊处理 String vs_ts = vs.value().getString("row_op_ts"); String row_ts = jsonObject.getString("row_op_ts"); if (TimeUtil.compareTime(row_ts, vs_ts) >= 0) { vs.update(jsonObject); } } } }); } } ``` ```java public abstract class DimAsyncFunction extends RichAsyncFunction { private DruidDataSource dataSource; private ThreadPoolExecutor threadPool; protected abstract String getTable(); //抽象方法:获取表名 protected abstract String getId(T t); //抽象方法:获取id protected abstract void addDim(T t, JSONObject dim); //抽象方法:获取维度信息 @Override public void open(Configuration parameters) throws Exception { dataSource = DruidDSUtil.getDataSource(); threadPool = ThreadPoolUtil.getThreadPool(); } @Override public void asyncInvoke(T t, ResultFuture resultFuture) throws Exception { threadPool.submit(new Runnable() { @Override public void run() { try { //获取连接 DruidPooledConnection conn = dataSource.getConnection(); Jedis redisClient = RedisUtil.getRedisClient(); //读取对应的维度信息 JSONObject jsonObject = DimUtil.readDimFromRedis(redisClient, conn, getTable(), getId(t)); //将维度信息补充至记录 addDim(t, jsonObject); //ResultFuture用于写出数据 resultFuture.complete(Collections.singleton(t)); //关闭连接 conn.close(); redisClient.close(); } catch (Exception e) { throw new RuntimeException(e); } } }); } } ``` ```java public class DimUtil { public static JSONObject readDimFromPhoenix(Connection conn, String table, String id) throws Exception { //select * from T where id = ? String sql = "select * from " + table + " where id = ?"; ResultSet rs = JDBCUtil.query(conn, sql, new Object[]{id}); return JDBCUtil.parseone(JSONObject.class, rs); } public static JSONObject readDimFromRedis(Jedis redisClient, Connection conn, String table, String id) throws Exception { //Redis查找 -> get table:id -> 再转为JSONObject //Redis无缓存 -> Phoenix查找 -> 缓存至Redis //当维度更新时需删除缓存中对应的维度数据,在DimApp中实现 String key = table + ":" + id; String str = redisClient.get(key); if (str != null) { //System.out.println("缓存读取..."); //测试 return JSON.parseObject(str); //从缓存中读取 } else { //缓存没有数据,只能从Phoenix读取 //System.out.println("Phoenix读取..."); //测试 JSONObject jsonObject = readDimFromPhoenix(conn, table, id); //将Phoenix读取的数据写入Redis缓存 redisClient.setex(key, Constant.TTL_OF_REDIS + new Random().nextInt(300), jsonObject.toJSONString()); return jsonObject; } } } ``` ```sql create database if not exists gmall; use gmall; drop table if exists dws_trade_sku_order_window; create table if not exists dws_trade_sku_order_window ( stt DateTime COMMENT '窗口起始时间', edt DateTime COMMENT '窗口结束时间', trademark_id String COMMENT '品牌id', trademark_name String COMMENT '品牌名称', category1_id String COMMENT '一级品类id', category1_name String COMMENT '一级品类名称', category2_id String COMMENT '二级品类id', category2_name String COMMENT '二级品类名称', category3_id String COMMENT '三级品类id', category3_name String COMMENT '三级品类名称', sku_id String COMMENT 'sku_id', sku_name String COMMENT 'sku名称', spu_id String COMMENT 'spu_id', spu_name String COMMENT 'spu名称', original_amount Decimal(38, 20) COMMENT '原始金额', activity_amount Decimal(38, 20) COMMENT '活动减免金额', coupon_amount Decimal(38, 20) COMMENT '优惠券减免金额', order_amount Decimal(38, 20) COMMENT '下单金额', order_count UInt64 COMMENT '订单个数', ts UInt64 COMMENT '时间戳' ) engine = ReplacingMergeTree(ts) partition by toYYYYMMDD(stt) order by (stt, edt, sku_id, sku_name); ``` ## DWS层程序测试 ```shell # 开启集群Maxwell/Kafka/Clickhouse/Hadoop/Zookeeper/Redis # 流量域来源关键词粒度页面浏览各窗口汇总表 # 开启程序: DWD层:DwdBaseLogApp | DWS层:DwsTrafficSourceKeywordPageViewWindow f1.sh start # flume日志采集 java -jar gmall2020-mock-log-2021-01-22.jar # 模拟生成日志 clickhouse-client --host=hadoop102 -m --password # 打开clickhouse客户端查看数据写入情况 #> select * from gmall.dws_traffic_source_keyword_page_view_window limit 10; # 流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表 # 开启程序: DWD层:DwdBaseLogApp | DWS层:DwsTrafficVcChArIsNewPageViewWindow f1.sh start # flume日志采集 java -jar gmall2020-mock-log-2021-01-22.jar # 模拟生成日志 clickhouse-client --host=hadoop102 -m --password # 打开clickhouse客户端查看数据写入情况 #> select * from gmall.dws_traffic_vc_ch_ar_is_new_page_view_window limit 10; # 交易域SKU粒度下单各窗口汇总表【Redis旁路缓存 + 异步IO优化】 # 开启程序: DIM层:DimApp | DWD层:DwdTradeOrderDetail | DWS层:DwsTradeSkuOrderWindow mymaxwell.sh start # 业务数据采集 java -jar gmall2020-mock-db-2021-11-14.jar # 模拟生成业务数据 clickhouse-client --host=hadoop102 -m --password # 打开clickhouse客户端查看数据写入情况 #> select * from gmall.dws_trade_sku_order_window limit 10; ``` # 应用数据层ADS > DWS层要点:根据选择的可视化工具所要求的输入格式(一般是json字符串形式),开发SpringBoot接口 > DWS层作用:呈现统计结果,为BI提供分析数据源 > DWS层构建:选择可视化工具➡️按指标体系设计可视化展示的图表,或按BI数据需求明确需要的数据,进而确定从DWS层汇聚的SQL➡️一般交互的数据形式为json字符串,开发SpringBoot接口➡️接口调试 ## ADS层主体实现代码 使用SpringBoot从Clickhouse查询数据,对外提供接口,详见仓库。 ## ADS层程序测试 开启SpringBoot后台,网页访问相应的url查看数据是否符合要求。