阅读量:0
Flume对接Kafka
Flume组件: agent source channel sink
kafka组件: producer 节点集群服务器 **consumer **
1.形式1: FLume --> Kafka的对接
此时Flume中的sink是生产者, 让kafka来接收
1.1KafkaSink
将用flume监测端口数据,传送到 ksfka
(1)创建netcat-flume-kafka.conf 配置文件
[itwise@node2 ~]$ cd /opt/module/flume-1.9.0/jobs/ 创建文件夹 [itwise@node2 jobs]$ mkdir flume--kafka [itwise@node2 flume--kafka]$ vim netcat-flume-kafka.conf
配置文件如下:
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = node2:9092,node3:9092,node4:9092 a1.sinks.k1.kafka.topic = flumetopic a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.useFlumeEventFormat = true a1.sinks.k1.kafka.producer.acks = -1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.2运行:
启动一个node2窗口,创建一个消费者,消费kafka中的flumetopic中的数据
kafka-console-consumer.sh --bootstrap-server node2:9092 --topic flumetopic
再启动一个node2端口:运行如下命令
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume--kafka/netcat-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动第三监控窗口输入消息
nc localhost 6666
1.3运行截图:三合一
2. Flume -> Kafka : KafkaSink多topic支持
此时Flume中的sink是生产者, 让kafka来接收,将flume管道多路复用
2.1创建配置文件netcat-flume-kafkatopic.conf
[itwise@node2 ~]$ cd /opt/module/flume-1.9.0/jobs/ [itwise@node2 flume--kafka]$ vim netcat-flume-kafkatopic.conf
配置文件如下:
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type =com.itwise.flume_kafka.Topics_Interceptor$MyBuilder #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers =node2:9092,node3:9092,node4:9092 a1.sinks.k1.kafka.topic = topicother a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.useFlumeEventFormat = true a1.sinks.k1.kafka.producer.acks = -1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(2)编写java代码,接口实现数据分发到不同的topic上:
package com.itwise.kafka.flumeinterceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; public class DataValueInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //实现逻辑 判断 //javaspring hadoop String body = new String(event.getBody(), StandardCharsets.UTF_8); if(body.startsWith("java")){ event.getHeaders().put("topic", "java"); }else if(body.startsWith("hadoop")){ event.getHeaders().put("topic", "hadoop"); }else{ event.getHeaders().put("topic", "other"); } return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class MyBuilder implements Builder{ @Override public Interceptor build() { return new DataValueInterceptor(); } @Override public void configure(Context context) { } } }
将自定义拦截器打包上传到flume的lib包下:
2.2启动命令:
[itwise@node2 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume--kafka/netcat-flume-kafkatopic.conf -n a1 -Dflume.root.logger=INFO,console
往端口发送数据:
nc localhost 6666
启动消费者3个:分别取访问:java hadoop other
kafka-topics.sh --list --bootstrap-server node2:9092 kafka-console-consumer.sh --bootstrap-server node2:9092 --topic java --from-beginning kafka-console-consumer.sh --bootstrap-server node2:9092 --topic hadoop --from-beginning kafka-console-consumer.sh --bootstrap-server node2:9092 --topic other --from-beginning
2.3演示:
3. Kafka–>Flume : Kafka Source
3.1新建配置文件kafka-flume-logger.conf
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type =org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers =node2:9092,node3:9092 a1.sources.r1.kafka.topics = itwise a1.sources.r1.kafka.consumer.group.id = flume a1.sources.r1.batchSize = 100 a1.sources.r1.useFlumeEventFormat = false #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = logger #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.2启动
1)flume消费者 运行代码;
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka--flume/kafka-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
2)启动一个kafka生产者:
查看列表 kafka-topics.sh --list --bootstrap-server node2:9092 查看主题状态 kafka-topics.sh --describe --bootstrap-server node2:9092 --topic itwise 使用生产者 kafka-console-producer.sh --topic itwise --broker-list node2:9092
3.3效果演示:
kafka生产者产生数据
flume消费者: 效果加了一个headers
4.KafkaChannel --> xxxSink
4.1创建一个配置文件:kafkachannel-flume-logger.conf
#Named a1.channels = c1 a1.sinks = k1 #Source #Channel a1.channels.c1.type =org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers =node2:9092,node3:9092,node4:9092 a1.channels.c1.kafka.topic = itwise a1.channels.c1.kafka.consumer.group.id = flume #将offset提交方式设置为latest,最后数据 a1.channels.c1.kafka.consumer.auto.offset.reset = latest a1.channels.c1.parseAsFlumeEvent = false #Sink a1.sinks.k1.type = logger #Bind a1.sinks.k1.channel = c1
4/2运行:
启动下游消费者flume
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafkachannel-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
启动生产者 kafka
kafka-console-producer.sh --topic itwise --broker-list node2:9092
4.3演示:
生产者
消费者:
5.xxxSource --> KafkaChannel
5.1配置生产者文件 netcat-flume-kafkachannel.conf
#Named a1.sources = r1 a1.channels = c1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers=node2:9092,node3:9092,node4:9092 a1.channels.c1.kafka.topic = itwise a1.channels.c1.parseAsFlumeEvent = false #Sink #Bind a1.sources.r1.channels = c1
5.2运行:
启动消费者 并且调动之前的数据:
kafka-console-consumer.sh --topic itwise --bootstrap-server node2:9092 --from-beginning
启动生产者录入数据
nc localhost 6666
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console
5.3演示:
生产者监控端口输入消息:
kafka处理:
消费者消费数据:加载之前的数据
nc localhost 6666
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console
5.3演示:
生产者监控端口输入消息:
[外链图片转存中…(img-K5dNNCGA-1713315009021)]
kafka处理:
[外链图片转存中…(img-RUbx6MQB-1713315009021)]
消费者消费数据:加载之前的数据
[外链图片转存中…(img-lGYHCcQ8-1713315009022)]