1 Star 0 Fork 0

董可伦/spark-hudi

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SparkHudiDemo.scala 5.94 KB
一键复制 编辑 原始数据 按行查看 历史
dongkelun 提交于 2022-05-13 15:32 . 路径名和表名保持一致
package com.dkl.blog.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.hudi.command.UuidKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Created by dongkelun on 2022/5/10 19:49
*/
object SparkHudiDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().
master("local[*]").
appName("SparkHudiDemo").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
// 扩展Spark SQL,使Spark SQL支持Hudi
config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
// 支持Hive,本地测试时,注释掉
// enableHiveSupport().
getOrCreate()
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2022-05-12")).toDF("id", "name", "value", "ts", "dt")
val databaseName = "default"
val tableName1 = "test_hudi_table_1"
val primaryKey = "id"
val preCombineField = "ts"
val partitionField = "dt"
val tablePath1 = "/tmp/test_hudi_table_1"
save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
UPSERT_OPERATION_OPT_VAL, tablePath1, Overwrite)
spark.read.format("hudi").load(tablePath1).show(false)
// 删除表
save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
DELETE_OPERATION_OPT_VAL, tablePath1, Append)
spark.read.format("hudi").load(tablePath1).show(false)
val tableName2 = "test_hudi_table_2"
val tablePath2 = "/tmp/test_hudi_table_2"
save2HudiWithNoPrimaryKey(df, tableName2, tablePath2)
spark.read.format("hudi").load(tablePath2).show(false)
val tableName3 = "test_hudi_table_3"
save2HudiWithSaveAsTable(df, databaseName, tableName3, primaryKey)
spark.table(tableName3).show()
spark.stop()
}
/**
* 写hudi并同步到hive,有主键,分区字段dt
*
*/
def save2HudiSyncHiveWithPrimaryKey(df: DataFrame, databaseName: String, tableName: String, primaryKey: String, preCombineField: String,
partitionField: String, operation: String, tablePath: String, mode: SaveMode): Unit = {
df.
write.format("hudi").
option(RECORDKEY_FIELD.key, primaryKey). // 主键字段
option(PRECOMBINE_FIELD.key, preCombineField). // 预合并字段
option(PARTITIONPATH_FIELD.key, partitionField).
option(TBL_NAME.key, tableName).
option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName).
option(OPERATION.key(), operation).
// 下面的参数和同步hive元数据,查询hive有关
option(META_SYNC_ENABLED.key, true).
option(HIVE_USE_JDBC.key, false).
option(HIVE_DATABASE.key, databaseName).
option(HIVE_AUTO_CREATE_DATABASE.key, true).
// 内部表,这里非必须,但是在用saveAsTable时则必须,因为0.9.0有bug,默认外部表
option(HIVE_CREATE_MANAGED_TABLE.key, true).
option(HIVE_TABLE.key, tableName).
option(HIVE_CREATE_MANAGED_TABLE.key, true).
option(HIVE_STYLE_PARTITIONING.key, true).
option(HIVE_PARTITION_FIELDS.key, partitionField).
option(HIVE_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName).
// 为了SparkSQL更新用,0.9.0版本有bug,需要设置这个参数,最新版本已经修复,可以不设置这个参数
// 详情查看PR:https://github.com/apache/hudi/pull/3745
option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
mode(mode)
.save(tablePath)
}
/**
* 非主键表,非分区表
*/
def save2HudiWithNoPrimaryKey(df: DataFrame, tableName: String, tablePath: String): Unit = {
df.
write.format("hudi").
option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
option(RECORDKEY_FIELD.key, "").
option(PARTITIONPATH_FIELD.key, "").
option(TBL_NAME.key, tableName).
option(OPERATION.key(), INSERT_OPERATION_OPT_VAL).
mode(Overwrite)
.save(tablePath)
}
/**
* 利用saveAsTable写Hudi并同步Hive,实际最终调用的是Spark SQL CTAS(CreateHoodieTableAsSelectCommand)
* CTAS 先用的insert into(InsertIntoHoodieTableCommand),再建表
*/
def save2HudiWithSaveAsTable(df: DataFrame, databaseName: String, tableName: String, primaryKey: String): Unit = {
df.
write.format("hudi").
option(RECORDKEY_FIELD.key(), primaryKey).
// 不需要预合并,所以设置为primaryKey
// 当insert/bulk_insert等操作,并且关闭了相关参数,则不需要设置
// SparkSQL中如果没有显示配置预合并字段,则默认将预合并字段设置为schema的最后一个字段
// `PRECOMBINE_FIELD.key -> tableSchema.fields.last.name`
// 如果为默认值的话,则可能会报null异常,所以设置为主键
// 相关issue:https://github.com/apache/hudi/issues/4131
option(PRECOMBINE_FIELD.key(), primaryKey).
option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
option(TBL_NAME.key(), tableName).
option(HIVE_CREATE_MANAGED_TABLE.key, true).
// 关闭预合并,虽然默认值为false,但是0.9.0版本SparkSQL,当有主键时,设置为了true
option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key, false).
// 使用bulk_insert
option(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, true).
// 这里虽然为Overwrite,但是Hudi CTAS要求目录必须为空,否则会报验证错误
mode(Overwrite).
saveAsTable(s"$databaseName.$tableName")
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/dongkelun/spark-hudi.git
git@gitee.com:dongkelun/spark-hudi.git
dongkelun
spark-hudi
spark-hudi
master

搜索帮助

A270a887 8829481 3d7a4017 8829481