# waterdrop-example
**Repository Path**: mirrors_InterestingLab/waterdrop-example
## Basic Information
- **Project Name**: waterdrop-example
- **Description**: seatunnel plugin developing examples.
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2020-09-25
- **Last Updated**: 2026-01-24
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
本文面向所有 seatunnel 插件开发人员,尽可能清晰得阐述开发一个 seatunnel 插件所经历的过程,希望能消除开发者的困惑。
## seatunnel 为什么需要插件机制
1. 原生提供插件可能仅能满足80%的需求,还有20%需要你自己来开发
2. 有了插件机制,开发仅需关注特定插件的处理逻辑,数据处理的共性问题,由框架来统一处理。
3. 通用插件大大提升了代码的复用率。
## seatunnel 插件体系介绍
seatunnel 插件分为3部分,`Input` 、`filter` 和 `output`,这里贴一个简版类图
### Input
`Input` 插件主要负责从外部读取数据并且封装为 `Spark DataSet[Row]` 数据集。插件有三种类型,分别是:
1. 读取静态离线数据的 `BaseStaticInput`
2. 实时数据流处理 `BaseStreamingInput`,并且我们在此接口基础上开发了方便 Java 使用的 `BaseJavaStreamingInput`
3. 状态的实时流处理 `BaseStructuredStreamingInput`
### Filter
`Filter` 插件主要负责处理类型为 `Spark DataSet[Row]` 的输入,并且返回同样为 `Spark DataSet[Row]` 类型的数据集。
### Output
`Output` 插件则负责将 `Spark DataSet[Row]` 数据集输出到外部数据源或者将结果打印到终端。
## 如何开发一个插件
### 第一步,创建项目
创建项目或者直接拉取本项目代码
> git clone https://github.com/InterestingLab/seatunnel-example.git
当你看到这里的,相比这一步不用过多阐述
### 第二步,配置seatunnel-api依赖
**sbt**
```
libraryDependencies += "io.github.interestinglab.waterdrop" %% "waterdrop-apis" % "1.5.0"
```
**maven**
```
io.github.interestinglab.waterdrop
waterdrop-apis_2.11
1.5.0
```
### 第三步,继承并实现接口
#### 1. Input
新建一个类,并继承 **waterdrop-apis** 提供的父类并实现相关接口完成 `Input` 插件开发。
##### 1.1 StreamingInput
`BaseStreamingInput` 用于实现一个流式处理 `Input` 插件,它支持泛型,用户可以根据实际数据情况指定类型。
需要注意,seatunnel 中的流式计算插件,类名必须以 **Stream** 结尾,如 `hdfsStream`。
`BaseStreamingInput` 接口定义如下:
```scala
abstract class BaseStreamingInput[T] extends Plugin {
/**
* Things to do after filter and before output
* */
def beforeOutput: Unit = {}
/**
* Things to do after output, such as update offset
* */
def afterOutput: Unit = {}
/**
* This must be implemented to convert RDD[T] to Dataset[Row] for later processing
* */
def rdd2dataset(spark: SparkSession, rdd: RDD[T]): Dataset[Row]
/**
* start should be invoked in when data is ready.
* */
def start(spark: SparkSession, ssc: StreamingContext, handler: Dataset[Row] => Unit): Unit = {
getDStream(ssc).foreachRDD(rdd => {
val dataset = rdd2dataset(spark, rdd)
handler(dataset)
})
}
/**
* Create spark dstream from data source, you can specify type parameter.
* */
def getDStream(ssc: StreamingContext): DStream[T]
```
`BaseStreamingInput` 接口功能如下:
* `beforeOutput`: 定义调用 Output 之前的逻辑。
* `afterOutput`: 定义调用 Output 之后的逻辑,常用于维护 Offset 实现 *at-least-once* 语义
* `rdd2dataset`: 定义 **RDD[T]** 转换为 **DataSet[Row]** 的处理逻辑
* `start`: 内部操作逻辑,可以无需关心
* `getDStream`: 定义获取外部数据源数据的逻辑,需要生成 **DStream[T]**
总的来说,我们需要定义外部数据源转换为 **DataSet[Row]** 的逻辑。
##### 1.2 BaseStaticInput
`BaseStaticInput` 用于实现一个离线/静态数据源读取插件,接口定义如下
```
abstract class BaseStaticInput extends Plugin {
/**
* Get Dataset from this Static Input.
* */
def getDataset(spark: SparkSession): Dataset[Row]
}
```
接口功能如下:
* `getDataset`: 将外部数据源转换为 **DataSet[Row]**
##### 1.3 BaseStructuredStreamingInput
`BaseStructuredStreamingInput` 接口定义与 `BaseStaticFilter` 定义类似,此处不再做过多陈述。
#### 2. Filter
新建一个类,并继承 **waterdrop-apis** 提供的父类并实现相关接口完成 `Filter` 插件开发。
##### BaseFilter
Spark 较好实现了批流一体,因此我们的 `Filter` 插件仅有一套API,且大部分 `Filter` 插件都能同时支持批处理和流处理。
`BaseFilter` 接口定义如下:
```
abstract class BaseFilter extends Plugin {
def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row]
/**
* Allow to register user defined UDFs
* @return empty list if there is no UDFs to be registered
* */
def getUdfList(): List[(String, UserDefinedFunction)] = List.empty
/**
* Allow to register user defined UDAFs
* @return empty list if there is no UDAFs to be registered
* */
def getUdafList(): List[(String, UserDefinedAggregateFunction)] = List.empty
}
```
`BaseFilter` 接口功能如下:
`process`: 定义 `Filter` 插件的具体操作逻辑,方法输入和输出都是 **DataSet[Row]**
`getUdfList`: 定义需要被注册的 UDF 列表
`getUdafList`: 定义需要被注册的 UDAF 列表
大部分场景我们仅需要实现 `process` 方法定义数据处理逻辑即可。
#### 3. Output
新建一个类,并继承 **waterdrop-apis** 提供的父类并实现相关接口完成 `Output` 插件开发。
##### 3.1 BaseOutput
`BaseOutput` 插件支持批处理和 Spark Streaming,不支持 Spark Structured Streaming
`BaseOutput` 接口定义如下:
```scala
abstract class BaseOutput extends Plugin {
def process(df: Dataset[Row])
}
```
`BaseOutput` 接口功能如下:
`process`: 定义 **Dataset[Row]** 数据输出到外部数据源的方法,需要注意,这里需要触发一个 `action` 操作
##### 3.2 BaseStructuredStreamingOutputIntra
`BaseStructuredStreamingOutputIntra` 则是为了提供 Spark Structured Streaming 对外输出的插件。
`BaseStructuredStreamingOutputIntra` 接口定义
```scala
trait BaseStructuredStreamingOutputIntra extends Plugin {
def process(df: Dataset[Row]): DataStreamWriter[Row]
}
```
`BaseStructuredStreamingOutputIntra` 接口功能:
`process`: 与 `BaseOutput` 不同的是,这里返回的是 **DataStreamWriter[Row]**。
##### 3.3 BaseStructuredStreamingOutput
`BaseStructuredStreamingOutputIntra` 主要为了提供 Spark 现在的 Output 方法支持,而 `BaseStructuredStreamingOutput` 则是为了自定义输出数据源。
`BaseStructuredStreamingOutput` 接口定义如下:
```scala
trait BaseStructuredStreamingOutput extends ForeachWriter[Row] with BaseStructuredStreamingOutputIntra {
/**
* Things to do before process.
* */
def open(partitionId: Long, epochId: Long): Boolean
/**
* Things to do with each Row.
* */
def process(row: Row): Unit
/**
* Things to do after process.
* */
def close(errorOrNull: Throwable): Unit
/**
* seatunnel Structured Streaming process.
* */
def process(df: Dataset[Row]): DataStreamWriter[Row]
}
```
`BaseStructuredStreamingOutput` 接口功能如下:
`open`: 定义处理之前的逻辑
`process`: 定义每条数据的处理逻辑
`close`: 定义处理之后的逻辑
`process`: seatunnel 内部的处理逻辑,需要返回 **DataStreamWriter[Row]**
### 第四步,打包使用
1. 编译打包
2. 将打包后的 Jar 包放到 seatunnel 指定目录下,以便 seatunnel 在启动的时候可以加载到。
```shell
cd seatunnel
mkdir -p plugins/my_plugins/lib
cd plugins/my_plugins/lib
```
seatunnel需要将第三方Jar包放到,必须新建**lib**文件夹
> plugins/your_plugin_name/lib/your_jar_name
其他文件放到
> plugins/your_plugin_name/files/your_file_name
3. 在配置文件中使用插件
第三方插件在使用时,**必须使用插件的完整包路径**,例如
> org.interestinglab.seatunnel.output.JavaOutput
```
output {
org.interestinglab.seatunnel.output.JavaStdout {
limit = 2
}
}
```
### 第五步, 启动
至此,我们就完成了一个插件的开发,并且在 seatunnel 中使用这个插件。