代码拉取完成,页面将自动刷新
任务1: 创建MergeTree表,并测试CRUD操作
任务2: 自己实现ClickHouse从kafka、MySql获取数据
任务3: kuduAPI代码编写
创建MergeTree表,并测试CRUD操作
-- 创建表
CREATE TABLE mt_table
(
`date` Date,
`id` UInt8,
`name` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(date)
ORDER BY id
-- 插入数据
insert into mt_table values ('2019-05-01', 1, 'zhangshan');
insert into mt_table values ('2019-06-01', 2, 'lisi');
insert into mt_table values ('2019-05-01', 3, 'wangwu');
-- 查询数据
select * from mt_table;
-- 删除数据
-- 按分区删除
alter table mt_table DROP PARTITION '201906'
-- 按条件删除
alter table mt_table delete where id='1'
-- 修改数据
alter table mt_table update name='Dean' where id='2';
自己实现ClickHouse从kafka、MySql获取数据
-- mysql engine table
CREATE TABLE mysql_table
(
`id` UInt32,
`name` String,
`birthday` Date
)
ENGINE = MySQL('centos-master:3306', 'ebiz', 'T1', 'root', '12345678')
-- 查询
select * from mysql_table;
kafka-topics.sh --list --zookeeper localhost:2181/myKafka
kafka-topics.sh --zookeeper centos-master:2181/myKafka --create --topic clickhouse_topic --partitions 3 --replication-factor 3
kafka-console-producer.sh --broker-list centos-master:9092 --topic clickhouse_topic
-- 创建kafka table
CREATE TABLE kafka_queue (
q_date String,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list='centos-master:9092',
kafka_topic_list='clickhouse_topic',
kafka_group_name='group1',
kafka_format='CSV',
kafka_num_consumers=3,
kafka_skip_broken_messages=10
-- 创建存储表
CREATE TABLE kafka_data (
day Date,
level String,
message String
) ENGINE = MergeTree(day, (day, level), 8192);
-- 创建物化视图
CREATE MATERIALIZED VIEW kafka_view TO kafka_data
AS SELECT toDate(q_date) AS day, level, message
FROM kafka_queue;
--准备数据
2020-05-21,level2,message2
kuduAPI代码编写
package dean.api;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
public class KuduApiDemo {
public static void main(String[] args) {
//build client
String masterAddresses = "centos-master";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddresses);
kuduClientBuilder.defaultSocketReadTimeoutMs(5000);
KuduClient client = kuduClientBuilder.build();
//创建表
createTable(client);
//删除表测试
deleteTable(client);
//插入数据测试
insertData(client);
//查询数据测试
queryData(client);
//删除数据测试
deleteData(client);
//更新数据测试
updateData(client);
}
public static void createTable(KuduClient client){
//指定每一列的信息
ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
ColumnSchema id = new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build();
ColumnSchema name = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(false).build();
ColumnSchema age = new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).key(false).build();
columnSchemas.add(id);
columnSchemas.add(name);
columnSchemas.add(age);
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
//设定当前的副本数量为1
options.setNumReplicas(1);
ArrayList<String> colrule = new ArrayList<String>();
colrule.add("id");
options.addHashPartitions(colrule,3);
try {
client.createTable("student",schema,options);
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
public static void deleteTable(KuduClient client){
try {
client.deleteTable("student");
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
public static void insertData(KuduClient client){
try {
//2、打开一张表
KuduTable stuTable = client.openTable("student");
//3、创建会话
KuduSession kuduSession = client.newSession();
//4、设置刷数据模式
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
//5、获取插入实例
Insert insert = stuTable.newInsert();
//6、声明待插入数据
insert.getRow().addInt("id",1);
insert.getRow().addString("name","lucas");
//7、刷入数据
kuduSession.flush();
//8、应用插入实例
kuduSession.apply(insert);
//9、关闭会话
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
public static void queryData(KuduClient client){
try {
KuduTable stuTable = client.openTable("student");
KuduScanner scanner = client.newScannerBuilder(stuTable).build();
while(scanner.hasMoreRows()) {
for(RowResult result :scanner.nextRows()) {
int id = result.getInt("id");
String name = result.getString("name");
System.out.println("id:" + id + "...name:" + name);
}
}
} catch (KuduException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
public static void deleteData(KuduClient client){
try {
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Delete delete = stuTable.newDelete();
PartialRow row = delete.getRow();
row.addInt("id",1);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
public static void updateData(KuduClient client){
try {
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Update update = stuTable.newUpdate();
PartialRow row = update.getRow();
row.addInt("id",1);
row.addString("name","xiaoming");
kuduSession.apply(update);
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。