spring-boot2.x整合Kafka步骤

avatar
作者
猴君
阅读量:0

1.pom依赖添加

<properties> 		<java.version>1.8</java.version> 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 		<maven.compiler.source>1.8</maven.compiler.source> 		<maven.compiler.target>1.8</maven.compiler.target> 	</properties>  	<parent> 		<groupId>org.springframework.boot</groupId> 		<artifactId>spring-boot-starter-parent</artifactId> 		<version>2.7.18</version> 		<relativePath /> <!-- lookup parent from repository --> 	</parent> 	<dependencies> 		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-web</artifactId> 			<exclusions> 				<exclusion> 					<groupId>ch.qos.logback</groupId> 					<artifactId>logback-core</artifactId> 				</exclusion> 				<exclusion> 					<groupId>ch.qos.logback</groupId> 					<artifactId>logback-classic</artifactId> 				</exclusion> 			</exclusions> 		</dependency>  		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot</artifactId> 		</dependency> 		<dependency> 			<groupId>org.springframework</groupId> 			<artifactId>spring-context-support</artifactId> 		</dependency> 		<dependency> 			<groupId>org.slf4j</groupId> 			<artifactId>slf4j-log4j12</artifactId> 		</dependency> 		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-autoconfigure</artifactId> 		</dependency>  		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> 		<dependency> 			<groupId>org.springframework.kafka</groupId> 			<artifactId>spring-kafka</artifactId> 		</dependency> 	</dependencies>

2. 配置文件添加配置:

server:   port: 8080  spring:   application:     name: application-kafka   kafka:     bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的     producer:       batch-size: 16384 #批量大小       acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)       retries: 10 # 消息发送重试次数       #transaction-id-prefix: transaction       buffer-memory: 33554432       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.apache.kafka.common.serialization.StringSerializer       properties:         linger:           ms: 2000 #提交延迟         #partitioner: #指定分区器           #class: com.test.config.CustomerPartitionHandler     consumer:       group-id: testGroup,testg2 #默认的消费组ID       enable-auto-commit: true #是否自动提交offset       auto-commit-interval: 2000 #提交offset延时       # 当kafka中没有初始offset或offset超出范围时将自动重置offset       # earliest:重置为分区中最小的offset;       # latest:重置为分区中最新的offset(消费分区中新产生的数据);       # none:只要有一个分区不存在已提交的offset,就抛出异常;       auto-offset-reset: latest       max-poll-records: 500 #单次拉取消息的最大条数       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer       properties:         session:           timeout:             ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)         request:           timeout:             ms: 18000 # 消费请求的超时时间     listener:       missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 #      type: batch   logging:   config: classpath:log4j2.xml 

3. 日志配置

<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO">      <!--全局参数-->     <Properties>         <Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property> <!--         <Property name="logDir">/data/logs/logViewer</Property> -->         <Property name="logDir">logs</Property>     </Properties>      <Appenders>         <!-- 定义输出到控制台 -->         <Console name="console" target="SYSTEM_OUT" follow="true">             <!--控制台只输出level及以上级别的信息--> <!--             <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> -->             <PatternLayout>                 <Pattern>${pattern}</Pattern>             </PatternLayout>         </Console>         <!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 -->         <RollingFile name="rolling_file"                      fileName="${logDir}/logViewer.log"                      filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log">             <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>             <PatternLayout>                 <Pattern>${pattern}</Pattern>             </PatternLayout>             <Policies>                 <TimeBasedTriggeringPolicy interval="1"/>             </Policies>             <!-- 日志保留策略,配置只保留七天 -->             <DefaultRolloverStrategy>                 <Delete basePath="${logDir}/" maxDepth="1">                     <IfFileName glob="logViewer_*.log" />                     <IfLastModified age="7d" />                 </Delete>             </DefaultRolloverStrategy>         </RollingFile>     </Appenders>            <Loggers>         <Root level="INFO">             <AppenderRef ref="console"/>             <AppenderRef ref="rolling_file"/>         </Root>     </Loggers> </Configuration> 

4. controller入口类,其它应用通过该接口直接将数据写入kafka

@RequestMapping(value="/kafka") @Controller public class ProducerController {     @Autowired     private KafkaTemplate kafkaTemplate;   //    模拟发送消息     @RequestMapping(value = "/send",method = RequestMethod.GET)     public String sendMessage(@PathParam(value = "msg") String msg) {     	System.out.println("收到get请求。。。");         kafkaTemplate.send("test",msg);         return "成功";     }

5. kafka回调方法(需要回调通知时使用该方式):

    @GetMapping("/kafka/callbackTwo/{message}")     public void sendCallbackTwoMessage(@PathVariable("message") String message) {         kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {             @Override             public void onFailure(Throwable throwable) {                 System.out.println("发送消息失败2:"+throwable.getMessage());             }              @Override             public void onSuccess(SendResult<String, Object> result) {                 System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"                         + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());             }         });     }

6.kafka消费者注册

@Component public class KafkaMessageListener {       private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);       /**      * containerFactory      * 消息过滤器 	消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。 	配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。      * @param record      */     @KafkaListener(topics = {"test","test2"},groupId = "testGroup")     public void listenTestStatus(ConsumerRecord<?, ?> record) {     	LOGGER.info("接收到消息:开始业务处理。。。。。");     	if (null == record || null == record.value()) { 			LOGGER.info("接收到空数据,跳过"); 		}else { 			LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString()); 		}     }     @KafkaListener(topics = {"test"},groupId = "testg2")     public void listenTest2(ConsumerRecord<?, ?> record) {     	LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。");     	if (null == record || null == record.value()) {     		LOGGER.info("接收到空数据,跳过");     	}else {     		LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString());     	}     }     /**      * id:消费者ID 		groupId:消费组ID 		topics:监听的topic,可监听多个 		topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区      * @param records      */   //批量消费     @KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")     public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {         System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());         for (ConsumerRecord<String, Object> record : records) {             System.out.println(record.value());         }     }

7.非spring-boot环境下使用java原生API手写kafka生产消息:

 public static void main(String[] args) throws ExecutionException, InterruptedException {         //PART1:设置发送者相关属性         Properties props = new Properties();         // 此处配置的是kafka的端口         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");         // 配置key的序列化类         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");         // 配置value的序列化类         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");           Producer<String,String> producer = new KafkaProducer<>(props);         for(int i = 0; i < 5; i++) {             //Part2:构建消息             ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);             //Part3:发送消息             //单向发送:不关心服务端的应答。             producer.send(record);             System.out.println("message "+i+" sended");         }         //消息处理完才停止发送者。         producer.close();     }

8.非spring-boot环境下使用java原生API手写java手写kafka消费者:

 public static void main(String[] args) {         //PART1:设置发送者相关属性         Properties props = new Properties();         //kafka地址         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);         //每个消费者要指定一个group         props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");         //key序列化类         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         //value序列化类         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         Consumer<String, String> consumer = new KafkaConsumer<>(props);         consumer.subscribe(Arrays.asList("test"));           while (true) {             //PART2:拉取消息             // 100毫秒超时时间             ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));             //PART3:处理消息             for (ConsumerRecord<String, String> record : records) {                 System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());             }                 //提交offset,消息就不会重复推送。             consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。 //            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。         }

9.非spring-boot环境下使用java原生API手写异步发送kafka:

 public static void main(String[] args) throws ExecutionException, InterruptedException { 	        //PART1:设置发送者相关属性 	        Properties props = new Properties(); 	        // 此处配置的是kafka的端口 	        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS); 	        // 配置key的序列化类 	        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); 	        // 配置value的序列化类 	        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); 	  	        Producer<String,String> producer = new KafkaProducer<>(props); 	        CountDownLatch latch = new CountDownLatch(5); 	        for(int i = 0; i < 5; i++) { 	            //Part2:构建消息 	            ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i); 	            //Part3:发送消息 	            //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数 	            producer.send(record, new Callback() { 	                @Override 	                public void onCompletion(RecordMetadata recordMetadata, Exception e) { 	                    if(null != e){ 	                        System.out.println("消息发送失败,"+e.getMessage()); 	                        e.printStackTrace(); 	                    }else{ 	                        String topic = recordMetadata.topic(); 	                        long offset = recordMetadata.offset(); 	                        String message = recordMetadata.toString(); 	                        System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset); 	                    } 	                    latch.countDown(); 	                } 	            }); 	        } 	        //消息处理完才停止发送者。 	        latch.await(); 	        //消息处理完才停止发送者。 	        producer.close(); 	    }

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!