文章目录
前言
例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。
提示:以下是本篇文章正文内容,下面案例可供参考
一、Kafka简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
kafka核心概念
在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:
Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。
Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。
Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。
Producer: 数据的发布者,将消息发送到一个或多个 Topic。
Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。
Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。
Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。
二、安装Kafka
1.准备工作
1.1 Java
Kafka是依赖Java环境运行,所以需要在Linux系统内安装Java环境。
1.2 安装包下载
官方下载地址:http://kafka.apache.org/downloads.html
我这里下载的是:kafka_2.12-3.6.0.tgz
# 在线下载安装包 wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz
2.安装KafKa
将安装包传送到服务器并解压,这里我放到opt下面
1. 解压安装包
cd opt tar -zxvf kafka_2.12-3.6.0.tgz
2. 配置kafka
在kafka解压目录同一路径下创建
mkdir -p /opt/software/kafka mkdir -p /opt/software/kafka/zookeeper #zookeeper数据目录 mkdir -p /opt/software/kafka/log #kafka日志 mkdir -p /opt/software/kafka/zookeeper/log #zookeeper日志
3 进入配置文件目录
cd /opt/kafka_2.12-3.6.0/config/
4 修改配置文件server.properties,添加下面内容
broker.id=0 port=9092 #端口号 host.name=localhost #服务器IP地址,修改为自己的服务器IP log.dirs=/opt/software/kafka/log #日志存放路径,上面创建的目录 zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
5 配置zookeeper服务 zookeeper.properties
dataDir=/opt/software/kafka/zookeeper #zookeeper数据目录 dataLogDir=/opt/software/kafka/zookeeper/log #zookeeper日志目录 clientPort=2181 maxClientCnxns=100 tickTimes=2000 initLimit=10 syncLimit=5
6 创建启动和关闭的 kafka 执行脚本
6.1 创建启动脚本
cd /opt/kafka_2.12-3.6.0/ vi kafkaStart.sh
配置启动脚本 kafkaStart.sh
#启动zookeeper /opt/kafka_2.12-3.6.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties & sleep 3 #等3秒后执行 #启动kafka /opt/kafka_2.12-3.6.0/bin/kafka-server-start.sh /opt/kafka_2.12-3.6.0/config/server.properties &
6.2 创建关闭脚本 kafkaStop.sh
cd /opt/kafka_2.12-3.6.0/ vi kafkaStop.sh
配置关闭脚本 kafkaStop.sh
#关闭zookeeper /opt/kafka_2.12-3.6.0/bin/zookeeper-server-stop.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties & sleep 3 #等3秒后执行 #关闭kafka /opt/kafka_2.12-3.6.0/bin/kafka-server-stop.sh /opt/kafka_2.12-3.6.0/config/server.properties &
7 启动脚本,关闭脚本赋予权限
chmod 777 kafkaStart.sh chmod 777 kafkaStop.sh
启动和关闭kafka
cd /opt/kafka_2.12-3.6.0/ sh kafkaStart.sh #启动 sh kafkaStop.sh #关闭
8 创建生产者 topic 和 消费者 topic
cd /opt/kafka_2.12-3.6.0/bin/ #进入kafka目录 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test #创建生产者 test你要建立的topic名 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test #创建消费者
查看 kafka 是否启动
[root@localhost kafka_2.12-3.6.0]# jps 21324 QuorumPeerMain 15211 Jps 21215 Kafka
里面有QuorumPeerMain和kafkas说明启动成功了
查看当前的一些topic
cd /opt/kafka_2.12-3.6.0/bin/ ./kafka-topics.sh --zookeeper localhost:2181 --list ./kafka-topics.sh --list --bootstrap-server localhost:9092
9 Spring boot集成Kafka
1、pom依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
2.消费者
@Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/userGets") public Object gets() { // send 第一个参数为topic的名称,第二个参数为我们要发送的信息 kafkaTemplate.send("topic.quick.default","1231235"); return "发送成功"; } @KafkaListener(topics = {"topic1"}) public void onMessage(ConsumerRecord<?, ?> record) { System.out.println(record.value()); } @KafkaListener(topics = {"topic2"}) public void getMessage(ConsumerRecord<String, String> record) { String key = record.key(); String value = record.value(); }
- 测试
//生产者 public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); String topic = "test-topic"; for (int i = 0; i < 10; i++) { String message = "Message " + i; producer.send(new ProducerRecord<>(topic, message)); System.out.println("Sent: " + message); } producer.close(); } //消费者 public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //消息者订阅主题 consumer.subscribe(Collections.singletonList("test-topic")); //循环 while (true) { //每次拉取 1千条消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("=============> 消费kafka消息:"+ record.value()); } } }
4.配置文件
server: port: 8080 spring: kafka: bootstrap-servers: 172.16.253.21: 9093 producer: # 生产者 retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT # TIME | COUNT 有一个条件满足时提交 # COUNT_TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # MANUAL # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE