1 Star 0 Fork 0

Sunny / one

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

One

1. 介绍

Just one....

2. 数仓

2.1 离线数仓

2.1.1 技术架构
image-20221008094313129
2.1.2 文档(以电商为例)
image-20221005140953852
2.1.3 开发案例
################################# 
## @ScriptName dwd_xxx_order.sh
## @TableName db_dw.dwd_xxx_order
## @Desc 订单事实表开发
## @Storage 全量分区
## @Author one
################################
hql="
set hive.execution.engine=tez;

-- 全量分区存储订单表
insert
overwrite table dw.dwd_fact_t_order partition(dd)
select *
     , current_date as dd
from ods.tbl_orders;

"
hive -e ${hql}

2.2 实时数仓

....

2.3 湖仓一体

2.3.1 技术架构
  1. 不想写太多,没意思,基础架构用到 ->采集[cdc + 非结构化采集]+ 存储[hive+hadoop]+计算[flink+paimon]
  2. 数据服务平台 ->自建 or DBAPI
  3. 开发门户使用开源 dlinky

2.3.2 一个项目(实践)
  • 项目背景

    某销售crm需要做数字化转型

  • 数仓建模

    主题划分:商户(shp)、客户(cust)、交易(trd)、合同(contract)、流量(ip)、营销(market)、公共(cmn)

  • 开发代码

    todo


  • etl优化

    网络资源优化

    1、块大小

    2、数据本地化,减少网络传输

    3、jvm重用,减少gc开销

    存储资源优化

    1、存储格式,如orc

    2、压缩,如snappy

    3、小文件合并

    4、分区

    5、分桶

    6、任务结束后,清空临时表数据或者删除临时表

    计算资源优化

    1、调整内存大小&计算并行度。如果是hive引擎,调整map+shuffle+reduce数目以及内存大小;如果是flink,调整slot数目、算子并行度、开启mini_batch等

    2、使用map join,避免shuffle,当然需要明确何为“小表”(通过参数调整record或者size)

    3、使用临时表,减少任务计算复杂度(如仅使用需要用到的列,临时聚合等)

    4、record过滤、列裁剪,节省加载内存资源

    5、选择合适的jvm,调整jvm参数,如使用G1回收器、调整堆栈大小等

    6、根据任务日志找到数据倾斜的key,解决数据倾斜(解决思路就一个:“将倾斜的单个key或者组合key数据打散,分而处理。” 常见如key打散、一个sql多次聚合、多个sql多个临时表聚合、分桶等)


3. 数仓应用

若没有让数据活起来,数据仓库数据是死的,除了支持做几个报表,和传统bi无差;数据演变应该是下面这个样子才好

Q&A:

Q A
用户画像数据来源数仓哪一层 dwd+dws
用户标签怎么存储,存储在哪里,表模型;优点和缺点 hive宽表行存储的话,标签一旦多起来,肯定是多人参与开发,但落地表只有一个或者多个,不便于开发;采用hbase列存储,可以多人开发,需要加标签就动态加列,各自维护各自标签任务

3.1 用户行为分析

这里分别使用hive和doris进行代码层面分析,常见的做法应该是有对应的产品

  • 使用hive

    create table ods.t_user_event
    (
        log_Id    string comment '日志ID',
        user_Id   string comment '用户ID',
        en_type   string comment '事件类型,click view',
        page_name string comment '页面名称',
        en_name   string comment '事件名称',
        en_json   string comment '事件属性json',
        en_date   date comment '事件日期'
    ) comment '用户行为数据'
        partitioned by (dd string comment 'yyyyMMdd')
    ;
    -- 测试数据
    INSERT INTO ods.t_user_event (log_id, user_id, en_type, page_name, en_name, en_json, en_date, dd)
    VALUES ('log_001', 'u_001', 'view', 'pg_01', '访问', null, '2023-05-01', '20230501')
         , ('log_002', 'u_001', 'click', 'pg_01', '登录', null, '2023-05-01', '20230501')
         , ('log_003', 'u_002', 'view', 'pg_01', '访问', null, '2023-05-01', '20230501')
         , ('log_004', 'u_002', 'view', 'pg_01', '访问', null, '2023-05-02', '20230502')
         , ('log_005', 'u_003', 'view', 'pg_02', '访问', null, '2023-05-04', '20230504')
         , ('log_006', 'u_004', 'click', 'pg_01', '登录', null, '2023-05-05', '20230505')
         , ('log_007', 'u_001', 'click', 'pg_01', '登录', null, '2023-05-05', '20230505')
         , ('log_008', 'u_001', 'click', 'pg_01', '登录', null, '2023-05-06', '20230506')
         , ('log_009', 'u_004', 'click', 'pg_01', '登录', null, '2023-05-07', '20230507')
         , ('log_010', 'u_005', 'click', 'pg_01', '登录', null, '2023-05-08', '20230508')
         ,('log_011', 'u_001', 'click', 'pg_01', '登录', null, '2023-05-08', '20230508')
         ,('log_012', 'u_001', 'click', 'pg_01', '登录', null, '2023-05-08', '20230508')
         ;
         
     
    --------------------------------------------------------------- 用户连续登录 -------------------------------------------------
    -- 创建登录日志临时表
    drop table if exists tmp.t_user_login;
    create table tmp.t_user_login as
    select user_id, en_date as login_date
    from t_user_event
    where en_name = '登录';
    
    drop table if exists tmp.t_continous_users;
    create table tmp.t_continous_users as 
    -- step2:计算每个用户从什么时候开始登录,连续登录了几天,到什么时间结束登录
    select user_id                  as user_id,
           date_sub(login_date, rn) as login_group,
           min(login_date)          as start_date,
           max(login_date)          as end_date,
           count(1)                 as continous_days
    from (
    -- step1: 使用dense_rank根据登录日期对用户升序排序,并去重
    select distinct user_id,
                          login_date,
                          dense_rank() over (partition by user_id order by login_date) rn
          from tmp.t_user_login) t
    group by user_id
             ,date_sub(login_date, rn);
    
    -- 统计在2023-05-05和2023-05-06连续2d登录用户数
    select * from 
     tmp.t_continous_users
     where continous_days=2
     and start_date='2023-05-05'
    
    -------------------------------------------------------------- 留存分析 -----------------------------------------------------
    -- step3: 计算7d留存
    select
        first_date
        ,sum(if(day_diff=0,1,0)) as day_0
        ,sum(if(day_diff=1,1,0)) as day_1
        ,sum(if(day_diff=2,1,0)) as day_2
        ,sum(if(day_diff=3,1,0)) as day_3
        ,sum(if(day_diff=4,1,0)) as day_4
        ,sum(if(day_diff=5,1,0)) as day_5
        ,sum(if(day_diff=6,1,0)) as day_6
        ,sum(if(day_diff=7,1,0)) as day_7
    from (
    -- step2:对每个用户,统计当前登录日期与最早登录日期差值
    select user_id,
           first_date,
           datediff(login_date, first_date) as day_diff
    from (
    -- step1: 开窗分组求每个用户最早登录时间,并去重
             select distinct user_id,
                             login_date,
                             min(login_date) over (partition by user_id order by login_date) as first_date
             from tmp.t_user_login) t
    ) tt
    group by first_date
    
    
    -------------------------------------------------------------- 漏斗分析 -----------------------------------------------------
    -- 创建页面浏览日志临时表
    create table tmp.t_pg_view as
    select distinct user_id
                  , page_name
                  , en_date as view_date
    from ods.t_user_event
    where en_type = 'view';
    
    -- step3: 计算先访问页面pg_01多少人,再访问pg_02页面多少人
    select
        sum(if(split(split(view_path,',')[0],'@')[1]='pg_01',1,0)) as pg_01
        ,sum(if(split(split(view_path,',')[1],'@')[1]='pg_02',1,0)) as pg_02
    from (
    -- step2:获取每个用户访问页面路径
    select user_id,concat_ws(',',collect_set(concat_ws('@',view_date,page_name))) as view_path
    from (
    -- step1.1: 只取每个用户访问某个页面最后一条记录
    select
        user_id,page_name,view_date
    from (
    -- step1: 对每个用户根据页面访问顺序降序排序
    select
    user_id
    ,page_name
    ,cast(view_date as string) view_date
    ,row_number() over (partition by user_id,page_name order by view_date desc) as rn
    from tmp.t_pg_view
    ) t
    where rn=1
    sort by user_id,view_date
    ) tt
    group by user_id
    ) ttt
    
  • 使用doris

    -- 创建doris-sink表
    drop table if exists t_user_event;
    CREATE TABLE t_user_event (
    dt date,
    page_name varchar(10),
    en_type varchar(10),
    en_name varchar(10),
    users bitmap bitmap_union
    )
    AGGREGATE KEY(dt, page_name,en_type,en_name)
    DISTRIBUTED BY HASH(dt) BUCKETS 2
    PROPERTIES ('replication_num' = '1')
    ;
    -- rollup含义超出“上卷”的概念,一个是可以调整前缀索引,另一个可以选择指定列进行聚合。比如下面这个,会创建一个以page维度,users作为聚合的一张表,如果有根据page聚合的查询会查询这张rollup表,加快查询
    ALTER TABLE t_user_event add ROLLUP pv (page_name, users);
    
    --------------------------------------------------------- flink etl -----------------------------------
    
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'default-database' = 'ods',
      'hive-conf-dir' = '/opt/hive-3.1.3/conf'
    );
    use catalog myhive;
    LOAD MODULE hive WITH ('hive-version' = '3.1.3');
    set sql-client.execution.result-mode=tableau;
    set execution.runtime-mode = batch;
    
    -- SELECT * FROM t_user_event;
    -- 创建doris-sink表
    set table.sql-dialect=default;
    CREATE TABLE t_user_event_sink (
    dt date,
    page_name string,
    en_type string,
    en_name string,
    user_id bigint
    )WITH (
          'connector' = 'doris',
          'fenodes' = 'one:8050',
          'table.identifier' = 'bi.t_user_event',
          'username' = 'root',
          'password' = '123456asd',
          'sink.label-prefix' = 't_user_event',
          'sink.properties.columns'='dt,page_name,en_type,en_name,user_id,users=to_bitmap(user_id)'
    );
    
    -- 写入数据到doris
    insert into t_user_event_sink
    select
    en_date        as dt
    ,page_name     as page_name
    ,en_type       as en_type
    ,en_name       as en_name
    ,hash(user_id) as user_id
    from t_user_event
    group by en_date,page_name,en_type,en_name,user_id;
    
    ---------------------------------------- 用户连续登录 ------------------------
    select intersect_count(users, dt, '2023-05-05')               as day_5,
           intersect_count(users, dt, '2023-05-06')               as day_6,
           intersect_count(users, dt, '2023-05-05', '2023-05-06') as day_56 -- 05,06连续2d登录,当然也可以统计n天登录的用户数
    from (select cast(dt as varchar) as dt
               , users               as users
               , bitmap_to_string(users)
          from t_user_event
          where en_name = '登录'
            and dt in ('2023-05-05', '2023-05-06')) t
    
    ------------------------------------- 7d留存(从5.1开始) ------------------------
    select intersect_count(users, dt, '2023-05-01')               as day_0,
           intersect_count(users, dt, '2023-05-01', '2023-05-02') as day_1,
           intersect_count(users, dt, '2023-05-01', '2023-05-03') as day_2,
           intersect_count(users, dt, '2023-05-01', '2023-05-04') as day_3,
           intersect_count(users, dt, '2023-05-01', '2023-05-05') as day_4,
           intersect_count(users, dt, '2023-05-01', '2023-05-06') as day_5,
           intersect_count(users, dt, '2023-05-01', '2023-05-07') as day_6,
           intersect_count(users, dt, '2023-05-01', '2023-05-08') as day_7
    from (select cast(dt as varchar) as dt
               , users               as users
               , bitmap_to_string(users)
          from t_user_event
          where 1=1
            and dt in ('2023-05-01', '2023-05-02','2023-05-03','2023-05-04','2023-05-05','2023-05-06','2023-05-07','2023-05-08')) t
    
    ------------------------------------- 漏斗 -------------------------
    -- 乱序漏斗
    select intersect_count(users, page_name, 'pg_01')          as pg_01,
           intersect_count(users, page_name, 'pg_02')          as pg_02,
           intersect_count(users, page_name, 'pg_01', 'pg_02') as pg_01_02
    
    from (select page_name
               , users as users
               , bitmap_to_string(users)
          from t_user_event
          where page_name in ('pg_01', 'pg_02')
        ) t 
        
     
    ------------------------------------ 人群筛选 for 用户画像 ---------------------
    

3.2 用户画像

参考书籍【用户画像-方法论与工程化解决方案】

技术架构
image-20221008095802596
标签模型

用户标签存放在hbase,采用hbase列存储特性,一个列就是一个标签、一个离线任务,具体设计如下:

Tips:也可以将标签存放在hive(可以对标签进行划分,采用多表,多标签的json形式存放),然后在同步到其他查询、分享引擎里面。如druid、starrocks里面

标签元数据

管理标签以及对应任务、服务的监控,相关表设计如下:

开发流程
用户指标体系
用户特征库

Todo

画像产品化

4.数据治理

空文件

简介

just one one one 展开 收起
Shell
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/Sunnyghd/one.git
git@gitee.com:Sunnyghd/one.git
Sunnyghd
one
one
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891