# flinkKafkaDoris **Repository Path**: suntyu_admin/flink-kafka-doris ## Basic Information - **Project Name**: flinkKafkaDoris - **Description**: flinkKafkaDoris demo - **Primary Language**: Java - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 8 - **Forks**: 7 - **Created**: 2021-04-16 - **Last Updated**: 2024-11-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## Flink Kafka Doris实战 ### 环境: + Flink 1.12 + Doris 0.12 + Kafka 1.0.1+kafka3.1.1 ### 一:编译doris 参考官网Docker编译:https://github.com/apache/incubator-doris/wiki/Doris-Install #### 1.1 注意问题: > 需要把fe/pom.xml中 > > 下载的Repository地址改下 > > cloudera-thirdparty https://repository.cloudera.com/content/repositories/third-party/ 改为: cloudera-public https://repository.cloudera.com/artifactory/public/ > > cloudera-plugins https://repository.cloudera.com/content/groups/public/ 改为: cloudera-public https://repository.cloudera.com/artifactory/public/ #### 1.2 编译: 直接编译就行 #### 1.3 如果不想编译,直接下载编译好的包(0.12,0.11版本) https://download.csdn.net/download/leng91060404/16661347 ### 二:部署doris 参考:https://github.com/apache/incubator-doris/wiki/Doris-Install#3-%E9%83%A8%E7%BD%B2 #### 2.1 注意 FE: - 在fe目录下创建doris-meta目录; - 根据需要修改priority_networks = ip 参数 BE: - 修改参数storage_root_path = /home/disk1/doris;/home/disk2/doris;并创建目录; - 根据需要修改priority_networks = ip 参数 #### 2.2 启动 #### 2.3 查看状态 > BE节点需要先在FE中添加,才可加入集群。可以使用 mysql-client(下载MySQL 5.7) 连接到 FE: > >./mysql-client -h host -P port -uroot > >其中 host 为 FE 所在节点 ip;port 为 fe/conf/fe.conf 中的 query_port;默认使用 root 账户,无密码登录。 FE状态查看: > 用户可以通过 mysql 客户端登陆 Master FE。通过: > >SHOW PROC '/frontends'; > > 来查看当前 FE 的节点情况。 > > 也可以通过前端页面连接:http://fe_hostname:fe_http_port/frontend 或者 http://fe_hostname:fe_http_port/system?path=//frontends 来查看 FE 节点的情况。 BE状态查看: > 用户可以通过 mysql-client 登陆 Leader FE。通过: > > SHOW PROC '/backends'; > > 来查看当前 BE 的节点情况。 > > 也可以通过前端页面连接:http://fe_hostname:fe_http_port/backend 或者 http://fe_hostname:fe_http_port/system?path=//backends 来查看 BE 节点的情况。 ### 三:kafka load to doris(flink已经sink到kafka) #### 3.1 建库 ``` CREATE DATABASE example_db; ``` #### 3.2 建表 ``` CREATE TABLE item ( id INTEGER, name VARCHAR(256) DEFAULT '', ts DATETIME, counts BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(id, name, ts) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES("replication_num" = "1"); ``` #### 3.3 加载Kafka2Doris任务 ``` CREATE ROUTINE LOAD example_db.task1 ON item COLUMNS TERMINATED BY ",", COLUMNS(id, name, counts, ts) PROPERTIES ( "desired_concurrent_number"="1", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "xxxxxxx:9092", "kafka_topic" = "sinkTopic", "property.group.id" = "1234", "property.client.id" = "12345", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" ); ``` #### 3.4 查看任务 ``` SHOW ROUTINE LOAD FOR example_db.task1; SHOW ROUTINE LOAD TASK WHERE JobName = "task1"; ``` ```aidl mysql> SHOW ROUTINE LOAD FOR flink_kafka_db.task1; +-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+ | Id | Name | CreateTime | PauseTime | EndTime | DbName | TableName | State | DataSourceType | CurrentTaskNum | JobProperties | DataSourceProperties | CustomProperties | Statistic | Progress | ReasonOfStateChanged | ErrorLogUrls | OtherMsg | +-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+ | 10037 | task1 | 2021-04-16 11:27:39 | N/A | N/A | default_cluster:flink_kafka_db | item | RUNNING | KAFKA | 1 | {"partitions":"*","columnToColumnExpr":"id,name,counts,ts","maxBatchIntervalS":"20","whereExpr":"*","maxBatchSizeBytes":"209715200","columnSeparator":"','","maxErrorNum":"0","currentTaskConcurrentNum":"1","maxBatchRows":"300000"} | {"topic":"flink_sql_table_test_sink","currentKafkaPartitions":"0,1,2","brokerList":"172.16.2.148:9092"} | {"group.id":"1234","client.id":"12345"} | {"receivedBytes":726452,"errorRows":0,"committedTaskNum":159,"loadedRows":20118,"loadRowsRate":0,"abortedTaskNum":629,"totalRows":20118,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3247070} | {"0":"6059","1":"6915","2":"7141"} | | | | +-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+ 1 row in set (0.00 sec) ``` ```aidl mysql> SHOW ROUTINE LOAD TASK WHERE JobName = "task1"; +-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+ | TaskId | TxnId | TxnStatus | JobId | CreateTime | ExecuteStartTime | Timeout | BeId | DataSourceProperties | +-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+ | 56543bac022d4fdb-bb5944617d6480ac | 788 | UNKNOWN | 10037 | 2021-04-16 15:56:50 | 2021-04-16 15:56:50 | 40 | 10002 | {"0":6060,"1":6916,"2":7142} | +-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+ ``` #### 3.5 通过mysql查看数据 ``` ./mysql-client -h host -P port -uroot ``` ### 四:数据示例 sinkKafka数据: ``` 41219,HAT9,1,2021-04-16 11:32:08.524 41219,HAT9,1,2021-04-16 11:33:30.846 41219,HAT9,1,2021-04-16 11:40:36.387 41219,HAT9,1,2021-04-14 19:21:46.738 5046,SHOE9,1,2021-04-16 11:39:48.19 5046,SHOE9,1,2021-04-16 11:39:50.193 5046,SHOE9,1,2021-04-16 11:33:34.858 5046,SHOE9,1,2021-04-16 11:37:47.804 5046,SHOE9,1,2021-04-16 11:36:09.437 18889,SHOE9,1,2021-04-16 11:37:33.75 18889,SHOE9,1,2021-04-14 19:21:40.712 18889,SHOE9,1,2021-04-16 11:33:26.829 18889,SHOE9,1,2021-04-16 11:34:47.115 18889,SHOE9,1,2021-04-16 11:41:32.58 18889,SHOE9,1,2021-04-16 11:39:48.19 18889,SHOE9,1,2021-04-16 11:39:50.193 18889,SHOE9,1,2021-04-16 11:39:08.057 18889,SHOE9,1,2021-04-16 11:41:16.518 18889,SHOE9,1,2021-04-16 11:34:31.049 91190,HAT2,1,2021-04-16 11:31:38.406 ``` kafka2Doris数据,mysql数据 ```aidl mysql> select * from item order by id desc limit 10; +-------+-------+---------------------+--------+ | id | name | ts | counts | +-------+-------+---------------------+--------+ | 99995 | SHOE2 | 2021-04-16 11:34:45 | 3 | | 99995 | SHOE2 | 2021-04-15 19:05:59 | 4 | | 99995 | SHOE2 | 2021-04-16 11:35:59 | 3 | | 99995 | SHOE2 | 2021-04-16 11:35:05 | 3 | | 99995 | SHOE2 | 2021-04-14 19:22:06 | 4 | | 99995 | SHOE2 | 2021-04-16 11:35:57 | 3 | | 99995 | SHOE2 | 2021-04-16 11:38:45 | 3 | | 99995 | SHOE2 | 2021-04-16 11:38:11 | 3 | | 99995 | SHOE2 | 2021-04-14 19:22:24 | 4 | | 99995 | SHOE2 | 2021-04-14 19:21:12 | 4 | +-------+-------+---------------------+--------+ 10 rows in set (0.01 sec) ```