同步操作将从 子龙/flink-connector-redis 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
Based on the bahir-flink,the contents adjusted relative to bahir include: dAdded support for higher versions of Flink (including 1.12,1.13,1.14, etc.), added Table/SQL API, added dimension Table query support, increased query cache (supporting incremental and full), unified expiration policy, write concurrency, etc.
Due to the older version of the flink interface used by bahir, the changes were relatively large. During the development process, the stream computing products of Tencent Cloud and Alibaba Cloud were referenced, taking the advantages of the two, and adding richer functions, including more Redis operation commands and more redis service types, such as: simple、 sentinel、 cluster.
The operation commands corresponding to the supported functions of redis are:
insert | Dimension table query |
---|---|
set | get |
hset | hget |
rpush lpush | |
incrBy decrBy hincrBy zincrby | |
sadd zadd pfadd(hyperloglog) | |
publish | |
zrem decrby srem | |
del hdel |
After executing mvn package -DskipTests on the command line, import the generated package flink-connector-redis-1.0.11.jar into flink lib, no other settings are required.
Development environment engineering direct reference:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.11</version>
</dependency>
There is no need to map the key in redis through the primary key, the key is directly determined by the order of the fields in the ddl, such as:
#Where username is the key and passport is the value.
create table sink_redis(username VARCHAR, passport VARCHAR) with ('command'='set')
#The name is the key of the map structure, the subject is the field, and the score is the value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
Field | Default | Type | Description |
---|---|---|---|
connector | (none) | String | redis |
host | (none) | String | Redis IP |
port | 6379 | Integer | Redis port |
password | null | String | null if not set |
database | 0 | Integer | db0 is used by default |
maxTotal | 2 | Integer | Maximum number of connections |
maxIdle | 2 | Integer | Max keepalive connections |
minIdle | 1 | Integer | Minimum keepalive connections |
timeout | 2000 | Integer | Connection timeout, in ms, default 1s |
cluster-nodes | (none) | String | Cluster ip and port, not empty when redis-mode is cluster, such as:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
command | (none) | String | Corresponds to the redis command above |
redis-mode | (none) | Integer | mode type: single cluster |
lookup.cache.max-rows | -1 | Integer | Query cache size, reduce the query for redis duplicate keys |
lookup.cache.ttl | -1 | Integer | Query cache expiration time, in seconds. The condition for enabling query cache is that neither max-rows nor ttl can be -1 |
lookup.max-retries | 1 | Integer | Number of retries on failed query |
lookup.cache.load-all | false | Boolean | when command is hget, query all elements from redis map to cache,help to resolve cache penetration issues |
sink.max-retries | 1 | Integer | Number of retries for write failures |
sink.parallelism | (none) | Integer | Number of concurrent writes |
Field | Default | Type | Description |
---|---|---|---|
master.name | (none) | String | master name |
sentinels.info | (none) | String | |
sentinels.password | none) | String |
flink type | redis row converter |
---|---|
CHAR | String |
VARCHAR | String |
String | String |
BOOLEAN | String String.valueOf(boolean val) boolean Boolean.valueOf(String str) |
BINARY | String Base64.getEncoder().encodeToString byte[] Base64.getDecoder().decode(String str) |
VARBINARY | String Base64.getEncoder().encodeToString byte[] Base64.getDecoder().decode(String str) |
DECIMAL | String BigDecimal.toString DecimalData DecimalData.fromBigDecimal(new BigDecimal(String str),int precision, int scale) |
TINYINT | String String.valueOf(byte val) byte Byte.valueOf(String str) |
SMALLINT | String String.valueOf(short val) short Short.valueOf(String str) |
INTEGER | String String.valueOf(int val) int Integer.valueOf(String str) |
DATE | String the day from epoch as int date show as 2022-01-01 |
TIME | String the millisecond from 0'clock as int time show as 04:04:01.023 |
BIGINT | String String.valueOf(long val) long Long.valueOf(String str) |
FLOAT | String String.valueOf(float val) float Float.valueOf(String str) |
DOUBLE | String String.valueOf(double val) double Double.valueOf(String str) |
TIMESTAMP | String the millisecond from epoch as long timestamp TimeStampData.fromEpochMillis(Long.valueOf(String str)) |
create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- Insert data into redis first, which is equivalent to the redis command: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
-- Randomly generate data within 10 as data source --
-- One of the data will be: username = 3 level = 3, which will be associated with the data inserted above --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- The line where username is 3 will be associated with the value in redis, and the output will be: 3,3,100
Sample code path: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java
hset example, equivalent to redis command: hset tom math 150
Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisCacheOptions redisCacheOptions = new RedisCacheOptions.Builder().setCacheMaxSize(100).setCacheTTL(10L).build();
FlinkJedisConfigBase conf = getLocalRedisClusterConfig();
RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, redisCacheOptions);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");
Sample code path: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java
set example, equivalent to redis command: set test test11
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
ide: IntelliJ IDEA
code format: google-java-format + Save Actions
code check: CheckStyle
flink 1.13/1.14/1.12
jdk1.8
Switch to branch flink-1.12 if you need flink1.12 support
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。