1 Star 0 Fork 0

董可伦/spark-hudi

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
TestHuDiPreCombinedFiled.scala 6.20 KB
一键复制 编辑 原始数据 按行查看 历史
package com.dkl.blog.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SparkSession
/**
* Created by dongkelun on 2021/7/10 16:39
*
* 测试预合并
*/
object TestHuDiPreCombinedFiled {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("TestHuDiPreCombinedFiled")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()
testProgram(spark)
testSQLNoPreCombineFiled(spark)
testSQLWithPreCombineFiled(spark)
testSQLWithPreCombineFiled(spark)
spark.stop()
}
def testProgram(spark: SparkSession) {
val tableName = "test_hudi_table1"
val data = Array((7, "name12", 1.21, 108L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
(7, "name3", 3.45, 108L, "2021-05-06")
)
val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "dt").
option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
option("hoodie.table.name", tableName).
// option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
// option(OPERATION.key(), "INSERT").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("append").
save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "/*")
read_df.show()
}
def testSQLNoPreCombineFiled(spark: SparkSession): Unit = {
val tableName = "test_hudi_table2"
spark.sql(
s"""
| create table ${tableName} (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
s"""
|merge into ${tableName} as t0
|using (
| select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
| select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
| select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
| select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
| ) as s0
|on t0.id = s0.id
|when matched and opt_type!='DELETE' then update set *
|when matched and opt_type='DELETE' then delete
|when not matched and opt_type!='DELETE' then insert *
|""".stripMargin)
spark.table(tableName).show()
}
def testSQLWithPreCombineFiled(spark: SparkSession): Unit = {
val tableName = "test_hudi_table3"
spark.sql(
s"""
| create table ${tableName} (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
s"""
|merge into ${tableName} as t0
|using (
| select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
| select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
| select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
| select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
| ) as s0
|on t0.id = s0.id
|when matched and opt_type!='DELETE' then update set *
|when matched and opt_type='DELETE' then delete
|when not matched and opt_type!='DELETE' then insert *
|""".stripMargin)
spark.table(tableName).show()
}
def testSQLAndProgram(spark: SparkSession): Unit = {
val tableName = "test_hudi_table4"
spark.sql(
s"""
| create table ${tableName} (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
val data = Array((7, "name12", 1.21, 106L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
(7, "name3", 3.45, 107L, "2021-05-06")
)
val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")
df.show()
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "dt").
option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
option("hoodie.table.name", tableName).
// option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
// option(OPERATION.key(), "INSERT").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "/*")
read_df.show()
spark.table(tableName).show()
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/dongkelun/spark-hudi.git
git@gitee.com:dongkelun/spark-hudi.git
dongkelun
spark-hudi
spark-hudi
master

搜索帮助

A270a887 8829481 3d7a4017 8829481