1 Star 0 Fork 0

Dean/bigdata-module-9-clickhouse-kudu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

bigdata-module-9-clickhouse-kudu

作业

任务1: 创建MergeTree表,并测试CRUD操作

任务2: 自己实现ClickHouse从kafka、MySql获取数据

任务3: kuduAPI代码编写

任务1

创建MergeTree表,并测试CRUD操作

-- 创建表
CREATE TABLE mt_table
(
    `date` Date,
    `id` UInt8,
    `name` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(date)
ORDER BY id
-- 插入数据
insert into mt_table values ('2019-05-01', 1, 'zhangshan');
insert into mt_table values ('2019-06-01', 2, 'lisi');
insert into mt_table values ('2019-05-01', 3, 'wangwu');
-- 查询数据
select * from  mt_table;
-- 删除数据
-- 按分区删除
alter table mt_table DROP PARTITION '201906'
-- 按条件删除
alter table mt_table delete where id='1'
-- 修改数据
alter table mt_table update name='Dean' where id='2';

任务2

自己实现ClickHouse从kafka、MySql获取数据

  1. mysql
-- mysql engine table 
CREATE TABLE mysql_table
(
    `id` UInt32,
    `name` String,
    `birthday` Date
)
ENGINE = MySQL('centos-master:3306', 'ebiz', 'T1', 'root', '12345678')
-- 查询
select * from mysql_table;
  1. kafka
kafka-topics.sh --list --zookeeper localhost:2181/myKafka

kafka-topics.sh --zookeeper centos-master:2181/myKafka --create --topic clickhouse_topic --partitions 3 --replication-factor 3

kafka-console-producer.sh --broker-list centos-master:9092 --topic clickhouse_topic

-- 创建kafka table
CREATE TABLE kafka_queue (
  q_date String,
  level String,
  message String
) ENGINE = Kafka SETTINGS kafka_broker_list='centos-master:9092', 
                          kafka_topic_list='clickhouse_topic',
                          kafka_group_name='group1',
                          kafka_format='CSV',
                          kafka_num_consumers=3,
                          kafka_skip_broken_messages=10
-- 创建存储表
CREATE TABLE kafka_data (
  day Date,
  level String,
  message String
) ENGINE = MergeTree(day, (day, level), 8192);
-- 创建物化视图
CREATE MATERIALIZED VIEW kafka_view TO kafka_data
  AS SELECT toDate(q_date) AS day, level, message
  FROM kafka_queue;

--准备数据
2020-05-21,level2,message2

任务3

kuduAPI代码编写

package dean.api;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;

import java.util.ArrayList;

public class KuduApiDemo {

    public static void main(String[] args) {
        //build client
        String masterAddresses = "centos-master";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddresses);
        kuduClientBuilder.defaultSocketReadTimeoutMs(5000);
        KuduClient client = kuduClientBuilder.build();
        //创建表
        createTable(client);
        //删除表测试
        deleteTable(client);
        //插入数据测试
        insertData(client);
        //查询数据测试
        queryData(client);
        //删除数据测试
        deleteData(client);
        //更新数据测试
        updateData(client);
    }
    public  static void createTable(KuduClient client){
        //指定每一列的信息
        ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
        ColumnSchema id = new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build();
        ColumnSchema name = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(false).build();
        ColumnSchema age = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).key(false).build();
        columnSchemas.add(id);
        columnSchemas.add(name);
        columnSchemas.add(age);
        Schema schema = new Schema(columnSchemas);

        CreateTableOptions options = new CreateTableOptions();
        //设定当前的副本数量为1
        options.setNumReplicas(1);
        ArrayList<String> colrule = new ArrayList<String>();
        colrule.add("id");
        options.addHashPartitions(colrule,3);

        try {
            client.createTable("student",schema,options);
        } catch (KuduException e) {
            e.printStackTrace();
        }finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }

    }
    public  static  void deleteTable(KuduClient client){
        try {
            client.deleteTable("student");
        } catch (KuduException e) {
            e.printStackTrace();
        }finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    }
    public static void insertData(KuduClient client){
        try {
            //2、打开一张表
            KuduTable stuTable = client.openTable("student");
            //3、创建会话
            KuduSession kuduSession = client.newSession();
            //4、设置刷数据模式
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            //5、获取插入实例
            Insert insert = stuTable.newInsert();
            //6、声明待插入数据
            insert.getRow().addInt("id",1);
            insert.getRow().addString("name","lucas");
            //7、刷入数据
            kuduSession.flush();
            //8、应用插入实例
            kuduSession.apply(insert);
            //9、关闭会话
            kuduSession.close();
        } catch (KuduException e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    }
    public  static void queryData(KuduClient client){
        try {
            KuduTable stuTable = client.openTable("student");
            KuduScanner scanner = client.newScannerBuilder(stuTable).build();
            while(scanner.hasMoreRows()) {
                for(RowResult result :scanner.nextRows()) {
                    int id = result.getInt("id");
                    String name = result.getString("name");
                    System.out.println("id:" + id + "...name:" + name);
                }

            }
        } catch (KuduException e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }

    }
    public static void deleteData(KuduClient client){
        try {
            KuduTable stuTable = client.openTable("student");
            KuduSession kuduSession = client.newSession();
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);

            Delete delete = stuTable.newDelete();
            PartialRow row = delete.getRow();
            row.addInt("id",1);

            kuduSession.flush();
            kuduSession.apply(delete);

            kuduSession.close();

        } catch (KuduException e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }

    }
    public static void updateData(KuduClient client){
        try {
            KuduTable stuTable = client.openTable("student");
            KuduSession kuduSession = client.newSession();
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            Update update = stuTable.newUpdate();
            PartialRow row = update.getRow();
            row.addInt("id",1);
            row.addString("name","xiaoming");
            kuduSession.apply(update);

            kuduSession.close();

        } catch (KuduException e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }
        }
    }
}

空文件

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/penglanglang/bigdata-module-9-clickhouse-kudu.git
git@gitee.com:penglanglang/bigdata-module-9-clickhouse-kudu.git
penglanglang
bigdata-module-9-clickhouse-kudu
bigdata-module-9-clickhouse-kudu
master

搜索帮助