目录
2.3 修改 zookeeper.properties 配置文件
1. kafka 下载
本片是小白入门篇,所以我们不以Linux操作系统为例,选择大多数小白都用的windows。
ksfka下载链接如下,点击链接进入官网即可下载
温馨提示:JDK版本至少需要1.8,高版本也可兼容;
Apache Kafkahttps://kafka.apache.org/downloads本篇中以kafka_2.1.3-3.6.1版本为例,直接点击对应的版本下载即可,tgz包就类似于我们常见的zip,下载完成之后解压即可。
下载完毕,我们就可以解压得到 kafka 了
解压之后就可以得到 kafka 文件了
2. 修改配置文件
2.1 文件夹内容
打开文件夹后可以发现内部含有bin文件夹,config配置夹,libs依赖夹等,和JDK,maven 问价夹的格式如出一辙;
2.2 创建一个 data 空文件夹
后续需要用来存放日志文件,只要创建完成就可以了,kafka启动后会自动生成日志文件;
2.3 修改 zookeeper.properties 配置文件
我们点击进入config文件夹,找到 zookeeper.properties 配置文件,双击进行修改,
然后,我们找到 dataDir ,将它的值修改为我们刚才创建的 data 文件的路径,还要注意一点,在后面还要多加一个 "/zk",因为一会还要配置 server.properties ,所以要用将她们两个区分开,
2.4 修改 server.properties 配置文件
和刚才一样,我们双击修改 "server.properties" 配置文件
我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径,在路径末尾再添加上 "/kafka" ,用来和刚才的zk做区分,kafka 文件夹用来存放kafka的日志文件,zk 文件夹用来存放zoopeeper的日志文件;
2.5 创建 "zk.cmd" windows脚本文件
以记事本的方式打开,然后加入下面这句话,
这句话的含义就是启动 Zookeeper ,并且启动文件为 "zookeeper.properties" ;
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
2.6 创建 "kfk.cmd" windows脚本文件
仍然以记事本的方式打开,然后加入下面这句话,
这句话的含义就是启动 kafka ,并且启动文件为 "server.properties" ;
call bin/windows/kafka-server-start.bat config/server.properties
此时,我们的 kafka 文件夹中就多了我们刚刚创建的 data 文件夹,kafka.cmd 脚本文件,zk.cmd 脚本文件;
3. 启动 kafka
经过第二部的配置,现在一切都已经准备就绪,我们只需要双击 zk.cmd 和 kafka.cmd 脚本文件启动kafka;
这里需要注意一点,必须先启动双击 zk.cmd 启动 zookeeper,
再双击 kafka.cmd 启动 kafka,关闭的时候,需要先关闭 kafka,再关闭 zookeeper ;
4. 创建主题+生产者消费者演示
4.1 创建 topic 主题
我们来到 bin 文件夹下的 windows 文件夹,打开 cmd 命令窗口,运行下方命令
# --bootstrap-server localhost:9092 配置服务器连接,此处为本机,9092为kafka默认端口号 # --topic test 创建topic主题,主题名称为 test # --create 创建topic主题命令 kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
4.2 命令行创建生产者
仍然是在 windows 文件夹下新建一个命令窗口,刚才我们已经创建出了名为 "topic" 的主题,现在运行如下命令启动脚本文件创建生产者连接上我们的 topic 主题
# 运行 kafka-console-producer.bat 脚本创建生产者连接本机9092端口名为 test 的主题 kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
运行如下所示,会出现一个小箭头,就说明我们链接主题成功,我们生产者发布的主题都会发送到 topic 主题中供消费者去消费使用;
4.3 命令行创建消费者
# 运行 kafka-console-consumer.bat 脚本创建消费者连接本机9092端口名为 test 的主题 windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
运行成功不会有任何显示,
4.4 生产者发送消息供消费者消费
如下图所示,我在生产者命令窗口输入 "hello kafka",点击回车,我们就可以在消费者中命令窗口中看到发送过来的 "hello kafka" 消息
5. demo 书写
5.1 创建项目
5.2 引入依赖
在 maven 项目的 pom.xml 文件中加入下方依赖,
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency> </dependencies>
5.3 生产者测试类代码书写
我们随便创建一个类即可,名字随意取,代码逻辑备有注释;
public class KafkaProducerTest { public static void main(String[] args) { // TODO 创建配置对象 // 创建生产者对象又分为两步 // 1. 创建配置对象集合 Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); // 2. 配置数据 Key-Value 的序列化方式 configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // TODO 创建生产者对象 // 将配置对象集合作为参数传入 // 返回值就是生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configMap); // TODO 创建数据 // 自己定义一个数据,传入三个参数,第一个参数为主题,第二个参数为数据的key,第三个参数为数据的value ProducerRecord<String,String> record = new ProducerRecord<>("test","first","hello kafka"); // TODO 发送数据 kafkaProducer.send(record); // TODO 关闭生产者对象 kafkaProducer.close(); } }
5.4 消费者测试类代码书写
public class KafkaConsumerTest { public static void main(String[] args) { // TODO 创建消费者配置对象 // 创建消费者配置对象集合 HashMap<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // TODO 创建消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(consumerConfig); // TODO 订阅消费主体,主题名称为 test consumer.subscribe(Collections.singletonList("test")); // TODO 从主题中获取数据消费 final ConsumerRecords<String, String> datas = consumer.poll(100); for (ConsumerRecord<String,String> data : datas){ System.out.println(data); } // TODO 关闭消费者对象 consumer.close(); } }