Flink Sql Redis Connector 新版本来袭

avatar
作者
猴君
阅读量:0

1.新版本功能和性能介绍

1.1 流批一体

新版本使用了Flink最新的Source接口和SinkWriter接口,可以使用一套代码完成流式读取数据和批量读取数据

1.2 吞吐量大

新版本使用jedispipline和jedisClusterPipeline对数据进行写入和读取,每分钟可以达到千万级别的数据写入或者读取,且对机器要求较低

1.3 兼容所有版本的Flink

新版本使用新的接口重写不但可以适用旧版本的Flink,也兼容新版本的Flink

2.使用方式

使用方式还是和之前版本一样,但是新增了一些连接参数

1.使用案例和讲解 1.读取数据案例 CREATE TABLE orders (   `order_id` STRING,   `price` STRING,   `order_time` STRING,    PRIMARY KEY(order_id) NOT ENFORCED ) WITH (   'connector' = 'redis',   'mode' = 'single',   'single.host' = '192.168.10.101',   'single.port' = '6379',   'password' = 'xxxxxx',   'command' = 'hgetall',   'key' = 'orders' );     select * from orders         #集群模式 create table redis_sink ( site_id STRING, inverter_id STRING, start_time STRING, PRIMARY KEY(site_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'cluster', 'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003', 'password' = '123123', 'command' = 'hgetall', 'key' = 'site_inverter' )   cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3  注:redis表必须定义主键,可以是单个主键,也可以是联合主键  以下为sql读取结果,直接将redis数据解析成我们需要的表格形式  2.写入数据案例 1. generate source data CREATE TABLE order_source (   `order_number` BIGINT,   `price` DECIMAL(32,2),   `order_time` TIMESTAMP(3),    PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '5', 'fields.order_number.min' = '1', 'fields.order_number.max' = '20', 'fields.price.min' = '1001', 'fields.price.max' = '1100' );   2. define redis sink table    CREATE TABLE orders (   `order_number` STRING,   `price` STRING,   `order_time` STRING,    PRIMARY KEY(order_id) NOT ENFORCED ) WITH (   'connector' = 'redis',   'mode' = 'single',   'single.host' = '192.168.10.101',   'single.port' = '6379',   'password' = 'xxxxxx',   'command' = 'hmset',   'key' = 'orders' );   3. insert data to redis sink table (cast data type to string)   insert into redis_sink     select         cast(order_number as STRING) order_number,         cast(price as STRING) price,         cast(order_time as STRING) order_time     from orders

3.新增的连接参数

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter
sink.parallelismoptional1Intsink data parallelism
sink.delivery-guaranteeoptionalAT_LEAST_ONCEEnumAT_LEAST_ONCE or EXACTLY_ONCE
sink.buffer-flush.max-rowsoptional1000Intsink data row size
sink.buffer-flush.intervaloptional1sdurationSpecifies the batch flush interval

4.代码地址

Github: https://github.com/niuhu3/flink_sql_redis_connector/tree/0.1.0

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

后面会给大家更新写这个连接器的思路,也会试着去更新新的连接器。

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!