# flink-connector-starrocks **Repository Path**: HeLoong/flink-connector-starrocks ## Basic Information - **Project Name**: flink-connector-starrocks - **Description**: starRocks数据库 flink-connector源码 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2021-11-22 - **Last Updated**: 2025-12-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # starrocks-connector-for-apache-flink This is connector for [Apache Flink®](https://flink.apache.org/) ## Prerequisites ```xml com.starrocks flink-connector-starrocks x.x.x_flink-1.15 x.x.x_flink-1.14_2.11 x.x.x_flink-1.14_2.12 x.x.x_flink-1.13_2.11 x.x.x_flink-1.13_2.12 x.x.x_flink-1.12_2.11 x.x.x_flink-1.12_2.12 x.x.x_flink-1.11_2.11 x.x.x_flink-1.11_2.12 ``` Click [HERE](https://search.maven.org/search?q=g:com.starrocks) to get the latest version. ## Apache Flink® source ```java StarRocksSourceOptions options = StarRocksSourceOptions.builder() .withProperty("scan-url", "fe_ip1:8030,fe_ip2:8030,fe_ip3:8030") .withProperty("jdbc-url", "jdbc:mysql://fe_ip:9030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "flink_test") .withProperty("database-name", "test") .build(); TableSchema tableSchema = TableSchema.builder() .field("date_1", DataTypes.DATE()) .field("datetime_1", DataTypes.TIMESTAMP(6)) .field("char_1", DataTypes.CHAR(20)) .field("varchar_1", DataTypes.STRING()) .field("boolean_1", DataTypes.BOOLEAN()) .field("tinyint_1", DataTypes.TINYINT()) .field("smallint_1", DataTypes.SMALLINT()) .field("int_1", DataTypes.INT()) .field("bigint_1", DataTypes.BIGINT()) .field("largeint_1", DataTypes.STRING()) .field("float_1", DataTypes.FLOAT()) .field("double_1", DataTypes.DOUBLE()) .field("decimal_1", DataTypes.DECIMAL(27, 9)) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print(); env.execute("StarRocks flink source"); ``` OR ```java // create a table with `structure` and `properties` CREATE TABLE flink_test ( date_1 DATE, datetime_1 TIMESTAMP(6), char_1 CHAR(20), varchar_1 VARCHAR, boolean_1 BOOLEAN, tinyint_1 TINYINT, smallint_1 SMALLINT, int_1 INT, bigint_1 BIGINT, largeint_1 STRING, float_1 FLOAT, double_1 DOUBLE, decimal_1 DECIMAL(27,9) ) WITH ( 'connector'='starrocks', 'scan-url'='fe_ip1:8030,fe_ip2:8030,fe_ip3:8030', 'jdbc-url'='jdbc:mysql://fe_ip:9030', 'username'='root', 'password'='', 'database-name'='flink_test', 'table-name'='flink_test' ); select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126 ``` ### Source options | Option | Required | Default | Type | Description | | :-------------------------- | :------- | :----------------- | :----- | :----------------------------------------------------------- | | connector | YES | NONE | String | starrocks | | scan-url | YES | NONE | String | Hosts of the fe nodes like: `fe_ip1:http_port,fe_ip2:http_port...`. | | jdbc-url | YES | NONE | String | Hosts of the fe nodes like: `fe_ip1:query_port,fe_ip2: query_port...`. | | username | YES | NONE | String | StarRocks user name. | | password | YES | NONE | String | StarRocks user password. | | database-name | YES | NONE | String | Database name | | table-name | YES | NONE | String | Table name | | scan.connect.timeout-ms | NO | 1000 | String | Connect timeout | | scan.params.keep-alive-min | NO | 10 | String | Max keep alive time min | | scan.params.query-timeout-s | NO | 600(5min) | String | Query timeout for a single query(The value of this parameter needs to be longer than the estimated period of the source) | | scan.params.mem-limit-byte | NO | 1024*1024*1024(1G) | String | Memory limit for a single query | | scan.max-retries | NO | 1 | String | Max request retry times. | ### Source metrics | Name | Type | Description | | :-: | :-: | :-: | | totalScannedRows | counter | successfully collected data | ### Source type mappings | StarRocks | Flink | | ---------- | --------- | | NULL | NULL | | BOOLEAN | BOOLEAN | | TINYINT | TINYINT | | SMALLINT | SMALLINT | | INT | INT | | BIGINT | BIGINT | | LARGEINT | STRING | | FLOAT | FLOAT | | DOUBLE | DOUBLE | | DATE | DATE | | DATETIME | TIMESTAMP | | DECIMAL | DECIMAL | | DECIMALV2 | DECIMAL | | DECIMAL32 | DECIMAL | | DECIMAL64 | DECIMAL | | DECIMAL128 | DECIMAL | | CHAR | CHAR | | VARCHAR | STRING | ### Source tips 1. `exactly-once` semantic cannot be guaranteed in the case of a task failure. 2. Only SQLs without aggregation like `select {*|columns|count(1)} from {table-name} where ...` are supported. ## Flink sink ```java // -------- sink with raw json string stream -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://ip:port,ip:port?xxxxx") .withProperty("load-url", "ip:port;ip:port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.parallelism", "1") .build() ) ); // -------- sink with stream transformation -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://ip:port,ip:port?xxxxx") .withProperty("load-url", "ip:port;ip:port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.parallelism", "1") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) ); ``` OR ```java // create a table with `structure` and `properties` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://ip:port,ip:port?xxxxx'," + "'load-url'='ip:port;ip:port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.interval-ms' = '15000'," + "'sink.properties.format' = 'json'," + "'sink.properties.strip_outer_array' = 'true'," + "'sink.parallelism' = '1'," + "'sink.max-retries' = '10'," + ")" ); ``` ## Sink options | Option | Required | Default | Type | Description | | :-: | :-: | :-: | :-: | :-: | | connector | YES | NONE | String |`starrocks`| | jdbc-url | YES | NONE | String | this will be used to execute queries in starrocks. | | load-url | YES | NONE | String | `fe_ip:http_port;fe_ip:http_port` separated with `;`, which would be used to do the batch sinking. | | database-name | YES | NONE | String | starrocks database name | | table-name | YES | NONE | String | starrocks table name | | username | YES | NONE | String | starrocks connecting username | | password | YES | NONE | String | starrocks connecting password | | sink.semantic | NO | `at-least-once` | String | `at-least-once` or `exactly-once`(`flush at checkpoint only` and options like `sink.buffer-flush.*` won't work either). | | sink.buffer-flush.max-bytes | NO | 94371840(90M) | String | the max batching size of the serialized data, range: `[64MB, 10GB]`. | | sink.buffer-flush.max-rows | NO | 500000 | String | the max batching rows, range: `[64,000, 5000,000]`. | | sink.buffer-flush.interval-ms | NO | 300000 | String | the flushing time interval, range: `[1000ms, 3600000ms]`. | | sink.max-retries | NO | 3 | String | max retry times of the stream load request, range: `[0, 1000]`. | | sink.parallelism | NO | NULL | String | Specify the parallelism of the sink individually. Remove it if you want to follow the global parallelism settings. | | sink.connect.timeout-ms | NO | 1000 | String | Timeout in millisecond for connecting to the `load-url`, range: `[100, 60000]`. | | sink.label-prefix | NO | NO | String | the prefix of the stream load label, available characters are within [-_A-Za-z0-9]. | | sink.properties.* | NO | NONE | String | the stream load properties like `'sink.properties.columns' = 'k1, v1'`. | ## Sink metrics | Name | Type | Description | | :-: | :-: | :-: | | totalFlushBytes | counter | successfully flushed bytes. | | totalFlushRows | counter | successfully flushed rows. | | totalFlushSucceededTimes | counter | number of times that the data-batch been successfully flushed. | | totalFlushFailedTimes | counter | number of times that the flushing been failed. | ## Sink type mappings | Flink type | StarRocks type | | :-: | :-: | | BOOLEAN | BOOLEAN | | TINYINT | TINYINT | | SMALLINT | SMALLINT | | INTEGER | INTEGER | | BIGINT | BIGINT | | FLOAT | FLOAT | | DOUBLE | DOUBLE | | DECIMAL | DECIMAL | | BINARY | INT | | CHAR | JSON / STRING | | VARCHAR | JSON / STRING | | STRING | JSON / STRING | | DATE | DATE | | TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME | | TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME | | ARRAY\ | ARRAY\ | | MAP\ | JSON / JSON STRING | | ROW\ | JSON / JSON STRING | ### Sink tips 1. `Flush` action was triggered `at-least-once` when: `cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}` 2. `sink.buffer-flush.{max-rows|max-bytes|interval-ms}` becomes invalid when it comes with the `exactly-once` semantic.