代码拉取完成,页面将自动刷新
--下面是Mysql中的语句
--mysql
CREATE DATABASE db_inventory_cdc;
use db_inventory_cdc;
CREATE TABLE tb_products_cdc(
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64),
description VARCHAR(128)
);
--mysql
INSERT INTO tb_products_cdc
VALUES
(DEFAULT, 'zhangsan', 'aaa'),
(DEFAULT, 'lisi', 'bbb'),
(DEFAULT, 'wangwu', 'ccc');
--下面是Flnk SQL Client中的语句
--source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'appleyuchi',
'password' = 'appleyuchi',
'database-name' = 'db_inventory_cdc',
'table-name' = 'tb_products_cdc'
);
--sink
CREATE TABLE tb_sink (
name STRING,
countSum BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'print'
);
--提交任务
INSERT INTO tb_sink SELECT name, COUNT(1) FROM mysql_binlog GROUP BY name;
--查看當前mysql中的數據
use db_inventory_cdc;
select * from tb_products_cdc;
--修改source端的数据,然後再次查看Flink的Task Manager的Std Out中的記錄
INSERT INTO tb_products_cdc VALUE(DEFAULT, 'lisi', 'ddd');
DELETE FROM tb_products_cdc WHERE id=4;
UPDATE tb_products_cdc SET name='wangwu' WHERE id=2;
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。