1 Star 10 Fork 16

栾昊/Flink_1.13

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
SinkToMySQL.java 1.73 KB
Copy Edit Raw Blame History
dahuanhuan authored 2022-04-15 16:43 +08:00 . Flink 输出到 MySQL
package com.hao.chapter05;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinkToMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源,并行度为 1
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice", "./prod?id=200", 3200L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
String sql = "INSERT INTO clicks (user,url) VALUE (?,?)";
//将数据写入到MySQL中
stream.addSink(JdbcSink.sink(
sql,
((statement, event) -> {
statement.setString(1, event.user);
statement.setString(2, event.url);
}),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://hadoop102:3306/flink")
.withUsername("root")
.withPassword("000000")
.build()
));
env.execute();
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/luan_hao/Flink_1.13.git
git@gitee.com:luan_hao/Flink_1.13.git
luan_hao
Flink_1.13
Flink_1.13
master

Search