# flink-extend **Repository Path**: SoulBGM/flink-extend ## Basic Information - **Project Name**: flink-extend - **Description**: Flink 扩展 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-09-19 - **Last Updated**: 2024-09-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink 扩展 ## 介绍 ### UDF #### 雪花ID生成 ##### 使用示例 ```sql --注册UDF CREATE TEMPORARY SYSTEM FUNCTION snowflake_id AS 'git.soulbgm.udf.SnowflakeUDF'; --创建源表 CREATE TABLE source ( name string ) WITH ( 'connector' = 'datagen' ); --创建结果表 CREATE TABLE sink ( id bigint, name string ) WITH ( 'connector' = 'print' ); --将源表数据插入到结果表 INSERT INTO sink SELECT snowflake_id(name) as id, name FROM source; ``` ### Format #### Protostuff格式(用于解析从kafka中接收的数据) ##### 使用示例 ```sql --从kafka取数据 CREATE TABLE source ( sourceCode int, dataType int, seqNum bigint, receiveTime TIMESTAMP(3), WATERMARK FOR receiveTime AS receiveTime - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'packet_receive_sequence', 'properties.bootstrap.servers' = 'node1:9092', 'properties.group.id' = 'flink-test', 'scan.startup.mode' = 'latest-offset', 'format' = 'protobuf' ); --创建结果表 CREATE TABLE sink ( source_code int, data_type int, seq_num bigint, receive_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); --将源表数据插入到结果表 INSERT INTO sink SELECT sourceCode as source_code, dataType as data_type, seqNum as seq_num, receiveTime as receive_time FROM source; ```