一. 引入依赖
SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.11</version> </dependency>
二. 核心结构
先来看一下 spring-kafka 核心图;
当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;
ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;
ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;
三. 核心流程
先来看一下核心方法的调用流程图,略去了部分非核心流程;
执行流程如下:
- Spring 启动;
- Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
- 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
- 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
- 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
- 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
- ListenerConsumer 的 run() 被调用;
- run 中开启自旋;
- 不断调用 kafka-client 提供的 poll() 拉取新的消息;
- 收到新的消息就执行,执行完了就继续自旋;
- 收不新消息,重启下一轮自旋;
四. 分析
1. 启动入口
入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();
这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;
// ------------------------------ AbstractApplicationContext ---------------------------- protected void finishRefresh() { clearResourceCaches(); initLifecycleProcessor(); // 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessor getLifecycleProcessor().onRefresh(); publishEvent(new ContextRefreshedEvent(this)); if (!NativeDetector.inNativeImage()) { LiveBeansView.registerApplicationContext(this); } } // ------------------------------ DefaultLifecycleProcessor ---------------------------- public void onRefresh() { startBeans(true); this.running = true; } // ------------------------------ DefaultLifecycleProcessor ---------------------------- private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = lifecycleBeans.remove(beanName); if (bean != null && bean != this) { String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); for (String dependency : dependenciesForBean) { doStart(lifecycleBeans, dependency, autoStartupOnly); } if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) { try { // 获取容器中的 LifeCycle bean 对象,调用它的 start() // SpringKafka 中对应的是 KafkaListenerEndpointRegistry // 我们重点看一下 KafkaListenerEndpointRegistry.start() bean.start(); } catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } } } }
2. KafkaListenerEndpointRegistry
KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;
我们先看它的 start();
// ---------------------------- KafkaListenerEndpointRegistry --------------------------- public void start() { // 轮询所有的 ConcurrentMessageListenerContainer 对象 // 执行 ConcurrentMessageListenerContainer.start() for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } this.running = true; } // ---------------------------- KafkaListenerEndpointRegistry --------------------------- private void startIfNecessary(MessageListenerContainer listenerContainer) { if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) { // 执行 ConcurrentMessageListenerContainer.start() listenerContainer.start(); } } // ---------------------------- AbstractMessageListenerContainer --------------------------- public final void start() { checkGroupId(); synchronized (this.lifecycleMonitor) { if (!isRunning()) { // 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart() doStart(); } } }
我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;
3. ConcurrentMessageListenerContainer
我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;
// ---------------------------- ConcurrentMessageListenerContainer --------------------------- protected void doStart() { if (!isRunning()) { checkTopics(); ContainerProperties containerProperties = getContainerProperties(); TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.concurrency = topicPartitions.length; } setRunning(true); // 1. 根据 @KafkaListener 中配置的 concurrency 轮询 for (int i = 0; i < this.concurrency; i++) { // 2. 创建 KafkaMessageListenerContainer KafkaMessageListenerContainer<K, V> container = constructContainer(containerProperties, topicPartitions, i); // 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置 configureChildContainer(i, container); if (isPaused()) { container.pause(); } // 4. 启动 KafkaMessageListenerContainer container.start(); // 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中 this.containers.add(container); } } }
关键流程是第 3 步和第 4 步,我们分开来看;
3.1 configureChildContainer()
对刚创建出的 KafkaMessageListenerContainer 做一些配置;
这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) { String beanName = getBeanName(); beanName = (beanName == null ? "consumer" : beanName) + "-" + index; container.setBeanName(beanName); ApplicationContext applicationContext = getApplicationContext(); if (applicationContext != null) { container.setApplicationContext(applicationContext); } ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { container.setApplicationEventPublisher(publisher); } // 设置 clinetIdSuffix,clientId 前缀 container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : ""); container.setGenericErrorHandler(getGenericErrorHandler()); container.setCommonErrorHandler(getCommonErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); container.setRecordInterceptor(getRecordInterceptor()); container.setBatchInterceptor(getBatchInterceptor()); container.setInterceptBeforeTx(isInterceptBeforeTx()); container.setListenerInfo(getListenerInfo()); AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor(); if (exec == null) { // 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors exec = new SimpleAsyncTaskExecutor(beanName + "-C-"); this.executors.add(exec); // 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer container.getContainerProperties().setConsumerTaskExecutor(exec); } }
3.2 container.start()
调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();
protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); checkAckMode(containerProperties); Object messageListener = containerProperties.getMessageListener(); AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor(); if (consumerExecutor == null) { consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener; ListenerType listenerType = determineListenerType(listener); // 1. 创建 ListenerConsumer // ListenerConsumer 是一个 Runnable 对象 // new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员 // 它的 run() 比较重要 this.listenerConsumer = new ListenerConsumer(listener, listenerType); setRunning(true); this.startLatch = new CountDownLatch(1); // 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用 this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer); }
ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();
4. ListenerConsumer.run()
我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;
public void run() { ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata()); publishConsumerStartingEvent(); this.consumerThread = Thread.currentThread(); setupSeeks(); KafkaUtils.setConsumerGroupId(this.consumerGroupId); this.count = 0; this.last = System.currentTimeMillis(); initAssignedPartitions(); publishConsumerStartedEvent(); Throwable exitThrowable = null; // 开启自旋 while (isRunning()) { // 通过 KafkaConsumer 向 kafka-server 发起 poll 请求 pollAndInvoke(); } wrapUp(exitThrowable); }
ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;
我们简单看下最终调我们 @KafkaListener 声明方法的地方;
4.1 HandlerAdapter.invoke()
调用到 RecordMessagingMessageListenerAdapter.invoke();
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { if (this.invokerHandlerMethod != null) { // 最终的执行入口 // 最后会通过反射调用我们的 @KafkaListener 声明的方法 return this.invokerHandlerMethod.invoke(message, providedArgs); } else if (this.delegatingHandler.hasDefaultHandler()) { Object[] args = new Object[providedArgs.length + 1]; args[0] = message.getPayload(); System.arraycopy(providedArgs, 0, args, 1, providedArgs.length); return this.delegatingHandler.invoke(message, args); } else { return this.delegatingHandler.invoke(message, providedArgs); } }
至此,SpringKafka 分析完毕;