Kafka-02 @KafkaListener学习

avatar
作者
猴君
阅读量:0

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId>     <version>2.8.11</version> </dependency> 

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ---------------------------- protected void finishRefresh() {    clearResourceCaches();     initLifecycleProcessor();     // 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessor    getLifecycleProcessor().onRefresh();     publishEvent(new ContextRefreshedEvent(this));     if (!NativeDetector.inNativeImage()) {       LiveBeansView.registerApplicationContext(this);    } }    // ------------------------------ DefaultLifecycleProcessor ---------------------------- public void onRefresh() {     startBeans(true);     this.running = true; }    // ------------------------------ DefaultLifecycleProcessor ---------------------------- private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) { 	Lifecycle bean = lifecycleBeans.remove(beanName); 	if (bean != null && bean != this) { 		String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); 		for (String dependency : dependenciesForBean) { 			doStart(lifecycleBeans, dependency, autoStartupOnly); 		} 		if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) ||               ((SmartLifecycle) bean).isAutoStartup())) { 			try {                 // 获取容器中的 LifeCycle bean 对象,调用它的 start()                 // SpringKafka 中对应的是 KafkaListenerEndpointRegistry                 // 我们重点看一下 KafkaListenerEndpointRegistry.start() 				bean.start(); 			} 			catch (Throwable ex) { 				throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); 			} 		} 	} } 

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry --------------------------- public void start() {     // 轮询所有的 ConcurrentMessageListenerContainer 对象     // 执行 ConcurrentMessageListenerContainer.start()     for (MessageListenerContainer listenerContainer : getListenerContainers()) {         startIfNecessary(listenerContainer);     }     this.running = true; }    // ---------------------------- KafkaListenerEndpointRegistry --------------------------- private void startIfNecessary(MessageListenerContainer listenerContainer) {     if ((this.contextRefreshed && this.alwaysStartAfterRefresh)          || listenerContainer.isAutoStartup()) {         // 执行 ConcurrentMessageListenerContainer.start()         listenerContainer.start();     } }    // ---------------------------- AbstractMessageListenerContainer --------------------------- public final void start() {     checkGroupId();     synchronized (this.lifecycleMonitor) {         if (!isRunning()) {             // 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()             doStart();         }     } } 

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer --------------------------- protected void doStart() {     if (!isRunning()) {         checkTopics();         ContainerProperties containerProperties = getContainerProperties();         TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();         if (topicPartitions != null && this.concurrency > topicPartitions.length) {             this.concurrency = topicPartitions.length;         }         setRunning(true);          // 1. 根据 @KafkaListener 中配置的 concurrency 轮询         for (int i = 0; i < this.concurrency; i++) {             // 2. 创建 KafkaMessageListenerContainer             KafkaMessageListenerContainer<K, V> container =                 constructContainer(containerProperties, topicPartitions, i);                          // 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置             configureChildContainer(i, container);             if (isPaused()) {                 container.pause();             }                          // 4. 启动 KafkaMessageListenerContainer             container.start();                          // 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中             this.containers.add(container);         }     } } 

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {     String beanName = getBeanName();     beanName = (beanName == null ? "consumer" : beanName) + "-" + index;     container.setBeanName(beanName);     ApplicationContext applicationContext = getApplicationContext();     if (applicationContext != null) {         container.setApplicationContext(applicationContext);     }     ApplicationEventPublisher publisher = getApplicationEventPublisher();     if (publisher != null) {         container.setApplicationEventPublisher(publisher);     }      // 设置 clinetIdSuffix,clientId 前缀     container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");     container.setGenericErrorHandler(getGenericErrorHandler());     container.setCommonErrorHandler(getCommonErrorHandler());     container.setAfterRollbackProcessor(getAfterRollbackProcessor());     container.setRecordInterceptor(getRecordInterceptor());     container.setBatchInterceptor(getBatchInterceptor());     container.setInterceptBeforeTx(isInterceptBeforeTx());     container.setListenerInfo(getListenerInfo());     AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();     if (exec == null) {         // 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors         exec = new SimpleAsyncTaskExecutor(beanName + "-C-");         this.executors.add(exec);                  // 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer         container.getContainerProperties().setConsumerTaskExecutor(exec);     } } 

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {     if (isRunning()) {         return;     }     ContainerProperties containerProperties = getContainerProperties();     checkAckMode(containerProperties);      Object messageListener = containerProperties.getMessageListener();     AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();     if (consumerExecutor == null) {         consumerExecutor = new SimpleAsyncTaskExecutor(             (getBeanName() == null ? "" : getBeanName()) + "-C-");         containerProperties.setConsumerTaskExecutor(consumerExecutor);     }     GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;     ListenerType listenerType = determineListenerType(listener);          // 1. 创建 ListenerConsumer     // ListenerConsumer 是一个 Runnable 对象     // new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员     // 它的 run() 比较重要     this.listenerConsumer = new ListenerConsumer(listener, listenerType);     setRunning(true);     this.startLatch = new CountDownLatch(1);          // 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用     this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer); } 

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {     ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());     publishConsumerStartingEvent();     this.consumerThread = Thread.currentThread();     setupSeeks();     KafkaUtils.setConsumerGroupId(this.consumerGroupId);     this.count = 0;     this.last = System.currentTimeMillis();     initAssignedPartitions();     publishConsumerStartedEvent();     Throwable exitThrowable = null;          // 开启自旋     while (isRunning()) {         // 通过 KafkaConsumer 向 kafka-server 发起 poll 请求         pollAndInvoke();     }     wrapUp(exitThrowable); } 

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {    if (this.invokerHandlerMethod != null) {        // 最终的执行入口        // 最后会通过反射调用我们的 @KafkaListener 声明的方法       return this.invokerHandlerMethod.invoke(message, providedArgs);    } else if (this.delegatingHandler.hasDefaultHandler()) {       Object[] args = new Object[providedArgs.length + 1];       args[0] = message.getPayload();       System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);       return this.delegatingHandler.invoke(message, args);    } else {       return this.delegatingHandler.invoke(message, providedArgs);    } } 

至此,SpringKafka 分析完毕;

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!