阅读量:0
1.新版本功能和性能介绍
1.1 流批一体
新版本使用了Flink最新的Source接口和SinkWriter接口,可以使用一套代码完成流式读取数据和批量读取数据
1.2 吞吐量大
新版本使用jedispipline和jedisClusterPipeline对数据进行写入和读取,每分钟可以达到千万级别的数据写入或者读取,且对机器要求较低
1.3 兼容所有版本的Flink
新版本使用新的接口重写不但可以适用旧版本的Flink,也兼容新版本的Flink
2.使用方式
使用方式还是和之前版本一样,但是新增了一些连接参数
1.使用案例和讲解 1.读取数据案例 CREATE TABLE orders ( `order_id` STRING, `price` STRING, `order_time` STRING, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'single', 'single.host' = '192.168.10.101', 'single.port' = '6379', 'password' = 'xxxxxx', 'command' = 'hgetall', 'key' = 'orders' ); select * from orders #集群模式 create table redis_sink ( site_id STRING, inverter_id STRING, start_time STRING, PRIMARY KEY(site_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'cluster', 'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003', 'password' = '123123', 'command' = 'hgetall', 'key' = 'site_inverter' ) cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3 注:redis表必须定义主键,可以是单个主键,也可以是联合主键 以下为sql读取结果,直接将redis数据解析成我们需要的表格形式 2.写入数据案例 1. generate source data CREATE TABLE order_source ( `order_number` BIGINT, `price` DECIMAL(32,2), `order_time` TIMESTAMP(3), PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '5', 'fields.order_number.min' = '1', 'fields.order_number.max' = '20', 'fields.price.min' = '1001', 'fields.price.max' = '1100' ); 2. define redis sink table CREATE TABLE orders ( `order_number` STRING, `price` STRING, `order_time` STRING, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'single', 'single.host' = '192.168.10.101', 'single.port' = '6379', 'password' = 'xxxxxx', 'command' = 'hmset', 'key' = 'orders' ); 3. insert data to redis sink table (cast data type to string) insert into redis_sink select cast(order_number as STRING) order_number, cast(price as STRING) price, cast(order_time as STRING) order_time from orders
3.新增的连接参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | no | String | connector name |
mode | required | no | String | redis cluster mode (single or cluster) |
single.host | optional | no | String | redis single mode machine host |
single.port | optional | no | int | redis single mode running port |
password | optional | no | String | redis database password |
command | required | no | String | redis write data or read data command |
key | required | no | String | redis key |
expire | optional | no | Int | set key ttl |
field | optional | no | String | get a value with field when using hget command |
cursor | optional | no | Int | using hscan command(e.g:1,2) |
start | optional | 0 | Int | read data when using lrange command |
end | optional | 10 | Int | read data when using lrange command |
connection.max.wait-mills | optional | no | Int | redis connection parameter |
connection.timeout-ms | optional | no | Int | redis connection parameter |
connection.max-total | optional | no | Int | redis connection parameter |
connection.max-idle | optional | no | Int | redis connection parameter |
connection.test-on-borrow | optional | no | Boolean | redis connection parameter |
connection.test-on-return | optional | no | Boolean | redis connection parameter |
connection.test-while-idle | optional | no | Boolean | redis connection parameter |
so.timeout-ms | optional | no | Int | redis connection parameter |
max.attempts | optional | no | Int | redis connection parameter |
sink.parallelism | optional | 1 | Int | sink data parallelism |
sink.delivery-guarantee | optional | AT_LEAST_ONCE | Enum | AT_LEAST_ONCE or EXACTLY_ONCE |
sink.buffer-flush.max-rows | optional | 1000 | Int | sink data row size |
sink.buffer-flush.interval | optional | 1s | duration | Specifies the batch flush interval |
4.代码地址
Github: https://github.com/niuhu3/flink_sql_redis_connector/tree/0.1.0
目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)
希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。
后面会给大家更新写这个连接器的思路,也会试着去更新新的连接器。