- 基于开源的flink,对其实时sql进行扩展
- 自定义create table 语法(包括源表,输出表,维表)
- 自定义create view 语法
- 自定义create function 语法
- 实现了流与维表的join
- 支持原生FLinkSQL所有的语法
- 扩展了输入和输出的性能指标到promethus
进入项目根目录,使用maven打包:
mvn clean package -Dmaven.test.skip
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
mode
name
sql
localSqlPluginPath
remoteSqlPluginPath
addjar
confProp
flinkconf
yarnconf
savePointPath
allowNonRestoredState
flinkJarPath
queue
业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s)
数据本身的时间和进入flink的当前时间的差值.
各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData
从kafka获取的数据解析失败的视为脏数据
各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate
kafka接受的记录数(未解析前)/s
各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
kafka接受的记录数(解析后)/s
各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate
kafka接受的字节数/s
Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
当前kafka10,kafka11有采集该指标
各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate
写入的外部记录数/s
CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun;
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv int,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
parallelism ='1'
);
CREATE TABLE MyResult(
channel varchar,
pv varchar
)WITH(
type ='mysql',
url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
userName ='dtstack',
password ='abc123',
tableName ='pv2',
parallelism ='1'
);
CREATE TABLE workerinfo(
cast(logtime as TIMESTAMP) AS rtime,
cast(logtime) AS rtime
)WITH(
type ='hbase',
zookeeperQuorum ='rdos1:2181',
tableName ='workerinfo',
rowKey ='ce,de',
parallelism ='1',
zookeeperParent ='/hbase'
);
CREATE TABLE sideTable(
cf:name varchar as name,
cf:info varchar as info,
PRIMARY KEY(name),
PERIOD FOR SYSTEM_TIME //维表标识
)WITH(
type ='hbase',
zookeeperQuorum ='rdos1:2181',
zookeeperParent ='/hbase',
tableName ='workerinfo',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1'
);
insert
into
MyResult
select
d.channel,
d.info
from
( select
a.*,b.info
from
MyTable a
join
sideTable b
on a.channel=b.name
where
a.channel = 'xc2'
and a.pv=10 ) as d
1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至sishu@dtstack.com。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。