# realtime-2024 **Repository Path**: chen_shuai_jun/realtime-2024 ## Basic Information - **Project Name**: realtime-2024 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-03-10 - **Last Updated**: 2024-04-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 本文档记录的是在构建实时数仓中问题的解决方案 ## dim层数的处理 ### 维度层和维度表的概念 1. 什么是维度表 维度建模中将复杂的业务过程抽象为事实和维度的概念,事实指代的是不可分割的业务过程,维度指代的是业务过程发生时所处的环境。 2. 维度表的分类 维度表主要分成主维表和相关维表,维度表的粒度适合主维表一样的。 3. 实时数仓中维度表的设计 在实时数仓中,为了保证数据的时效性,维度数据是不整合的。不整合的原因是,如果整合维度表中的某个表的某个字段发生变化的话,需要找到维度表中相关的数据,然后替换掉这一条是数据,这个解决方案想着挺简单,但是实际做起来的非常复杂的。下面给出一个例子: **商品维度表**:这个表主要是由下面八张表组合而成: sku_info,spu_info,base_trademark,base_category1_info,base_category2_info,base_category3_info,sku_attr_value,sku_sale_attr_value 最终形成的表如下: ```sql CREATE EXTERNAL TABLE dim_sku_full ( `id` STRING COMMENT 'SKU_ID', `price` DECIMAL(16, 2) COMMENT '商品价格', `sku_name` STRING COMMENT '商品名称', `sku_desc` STRING COMMENT '商品描述', `weight` DECIMAL(16, 2) COMMENT '重量', `is_sale` BOOLEAN COMMENT '是否在售', `spu_id` STRING COMMENT 'SPU编号', `spu_name` STRING COMMENT 'SPU名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `category2_id` STRING COMMENT '二级品类id', `category2_name` STRING COMMENT '二级品类名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `sku_attr_values` ARRAY> COMMENT '平台属性', `sku_sale_attr_values` ARRAY> COMMENT '销售属性', `create_time` STRING COMMENT '创建时间' ) COMMENT '商品维度表' ``` 如果这张表中的category3_name字段的值发生了变更,我们应该如何改变这张维度表的数据。 - 我们需要查询出使用到category3_name这个字段的数据,然后对这个数据进行修改。 - 如果我们对数据进行修改,那么其他七张表的数据应该如何获取到,注意当前是在流式的环境中,数据已经流过去了。 所以在实时的环境在,我们不应该将维度数据进行整合。 4. 维度数据存储框架的选择 本研究主要将维度数据存储在Hbase中,主要的原因如下: - Hbase能够存储海量的数据 - Hbase在使用rowKey进行查询的时候,能够达到ms级别的响应 - 后续可以使用redis做热缓存进行优化(这个不是选择Hbase的原因) 5. 维度表动态更新的问题 在实时环境下我们需要解决,如何在程序启动的情况下,实现维度数据的增、删、改 [这是flink官网关于广播流应用的案例](https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/datastream/fault-tolerance/broadcast_state/) ![输入图片说明](image/广播状态解释.png) 因此我们可以发现flink的广播流天生适合做这件事情 6. 维度数据处理的流程 ![输入图片说明](image/dim数据处理流程.png) 需要注意的问题: - 广播数据迟到的问题 解决的办法是,在connect之后的process的open方法中,预先把配置数据读取出来,然后存储到map中,~~**注意这个map不能选择HashMap**,可能会出现并发修改异常,我们应该选择concurrentHashMap~~(这里明显是一个认知上的错误,普通的集合和算子状态是一样的,他们是subtask隔离的,只不过算子状态可以在错误中恢复,普通的集合是不可以在错误中恢复的,每个subtask的数据是串行处理的,因此并不会出现并发修改异常的问题,下面的代码给出了subtask隔离的证据)等到配置流的数据到来时,我们应该删除map中的数据,减少空间的占用。 ```java import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import java.util.HashMap; import java.util.Map; /** * @Date 2024/3/14 14:29 * @Created by chenshuaijun */ public class TestOperatorState { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 10010); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(4); DataStream streamSource = env.socketTextStream("hadoop102", 9999) .rebalance(); streamSource.process(new ProcessFunction() { private HashMap map; @Override public void open(Configuration parameters) throws Exception { if (map != null){ System.out.println("这个map已经被初始化化过了"); } else { System.out.println("我在初始化map"); map = new HashMap<>(); } } @Override public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception { map.put(value,value); for (Map.Entry entry : map.entrySet()) { System.out.println(entry); } System.out.println("-------------"); } }).print(); env.execute(); } } 运行结果为: ---------------------------------------- 输入为: 1 2 3 4 5 6 7 8 ---------------------------------------- 输出为: 我在初始化map 我在初始化map 我在初始化map 我在初始化map 1=1 ------------- 2=2 ------------- 3=3 ------------- 4=4 ------------- 1=1 5=5 ------------- 2=2 6=6 ------------- 3=3 7=7 ------------- 4=4 8=8 ------------- --------------------------------------- ```