Kafka:Kafka详解

avatar
作者
猴君
阅读量:1

Kafka

消息中间件

区别于rabbitmq,kafka更适用于量级较大的数据(100w级),主要在大数据领域使用

Kafka介绍

一个分布式流媒体平台,类似于消息队列或企业消息传递系统

Kafak的结构如下

在这里插入图片描述

producer:发布消息的对象
topic:Kafak将消息分门别类,每类的消息称为一个主题(Topic)
consumer:订阅主题并处理发布的消息的对象称为主题消费者
broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker)

消费者可以订阅一个或者多个的主题,并从broker中拉取数据,从而消费这些已发布的消息.

Kafka对zookeeper强依赖,需要使用zk进行分区的负载均衡以及节点的注册
docker安装zk和kafka
docker pull zookeeper:3.4.14 docker run -d --name zookeeper --restart=always -p 2181:2181 zookeeper:3.4.14 docker pull wurstmeister/kafka:2.12-2.3.1 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --restart=always \ --net=host wurstmeister/kafka:2.12-2.3.1 

Kafka入门

kafka的生产者消费者模型分为单播和多播

单播:同组内存在多个消费者,一个主题只能发送消息到同一个组内的一个消费者
多播:不同组的多个消费者,一个主题可以发送消息到不同组的多个不同消费者
<dependency>     	<groupId>org.apache.kafka</groupId>     	<artifactId>kafka-clients</artifactId> </dependency> 

Kafka的高可用设计(集群模式)

多个broker组成一个cluster集群

集群中的某一条机器宕机,其他机器上的broker依然能对外提供服务

Kafka的备份机制(republication)

消息的备份称为副本(replica)

分为两种副本

领导者副本(leader replica)
追随者副本(follower replica) 分为ISR 和 普通

同步方式:

领导者副本直接接收发布者的消息

随后领导者副本会同步将消息复制保存到ISR follower副本,异步将消息复制保存到普通follower副本

如果leader失效,需要选举出新的leader

依据以下原则选举:

选举优先从ISR中选举,因为ISR副本是同步的

如果ISR中没有生效的副本,就从普通中选举一个
在这里插入图片描述

如果所有副本都失效

可以等待ISR副本活过来保证数据完整性,也可以选举第一个活过来的保证数据可用性

Kafka的生产者详解

消息发送:

分为同步发送和异步发送
同步发送:
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka"); RecordMetadata recordMetadata = producer.send(ProducerRecord).get();//获取发送的结果 
异步发送:

传入一个callback对象处理回调结果

//异步消息发送 producer.send(ProducerRecord, new Callback() {     @Override     public void onCompletion(RecordMetadata recordMetadata, Exception e) {         if(e != null){             System.out.println("记录异常信息到日志表中");         }         System.out.println(recordMetadata.offset());     } }); 

生产者配置

消息确认配置acks

//ack配置  消息确认机制 prop.put(ProducerConfig.ACKS_CONFIG,"all"); acks = 0 //生产者不会等待任何来自服务器的响应 acks = 1 //集群leader收到消息后,生产者会接受一个来自服务器的响应 acks = -1/all(默认) //参与复制的所有生产者全部收到消息后,生产者才会收到响应 

重试次数retries

//重试次数 prop.put(ProducerConfig.RETRIES_CONFIG,10); 

消息压缩方式

消息默认不会被压缩

//数据压缩 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4"); | snappy | 占用较少的  CPU,  却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 | | lz4    | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观        | | gzip   | 占用较多的  CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 | 

Kafka消费者详解

消费者组:一个或者多个消费者组成的群体

topic会分发消息给每个消费者组中的一个消费者

消息有序性

跨分区的消息无法决定先后顺序

如果需要保证消息的有序性需要让多个消费者去监听同一个分区

提交和偏移量

消费者会在消费前或者消费后向__consumer_offset的特殊topic中发送偏移量,记录每个消息的处理进度

如果消费者崩溃或者新的消费者加入就会触发负载均衡

如果使用默认方式自动提交偏移量,就会在消费前直接自动提交偏移量,可能会出现重复消费或者漏消费的情况

所以我们需要使用手动提交的方式提交偏移量

手动提交(同步,会自动重试):

consumer.commitSync() 

异步提交:

consumer.commitAsync(new OffsetCommitCallback() {         @Override         public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {             if(e!=null){                 System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);             }         }     }); 

Spring继承Kafka

引入依赖
<!-- kafkfa -->     <dependency>         <groupId>org.springframework.kafka</groupId>         <artifactId>spring-kafka</artifactId>         <exclusions>             <exclusion>                 <groupId>org.apache.kafka</groupId>                 <artifactId>kafka-clients</artifactId>             </exclusion>         </exclusions>     </dependency>     <dependency>         <groupId>org.apache.kafka</groupId>         <artifactId>kafka-clients</artifactId>     </dependency> 
Topic的创建和删改(可省略)
public static void createTopic(String topicName, int partitions, short replicas) throws Exception {         NewTopic newTopic = new NewTopic(topicName, partitions, replicas);         CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));         topics.all().get();         log.info("【{}】topic创建成功", topicName);     }      /**      * @Title deleteTopic      * @Description 删除topic      * @param topicName  topic名称      * @return void      */     public static void deleteTopic(String topicName) throws Exception {         DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));         deleteTopicsResult.all().get();         log.info("【{}】topic删除成功", topicName);      }      /**      * @Title updateTopicRetention      * @Description 修改topic的过期时间      * @param topicName  topic名称      * @param ms  过期时间(毫秒值)      * @return void      */     public static void updateTopicRetention(String topicName, String ms) throws Exception {         ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);         ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);         Config config = new Config(Collections.singleton(configEntry));         // 创建AlterConfigsOptions         AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000);         // 执行修改操作         adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();         log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);     } 
生产者发送消息
@Resource private KafkaTemplate<String,String> kafkaTemplate;  kafkaTemplate.send("topic","test"); 
消费者消费消息
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;  @Component public class Listener {      @KafkaListener(topics = "topic")     public void onMessage(String message){         if(!StringUtils.isEmpty(message)){             System.out.println(message);         }      } } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!