阅读量:0
1.pom依赖添加
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath /> <!-- lookup parent from repository --> </parent> <dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
2. 配置文件添加配置:
server: port: 8080 spring: application: name: application-kafka kafka: bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #这个是kafka的地址,对应你server.properties中配置的 producer: batch-size: 16384 #批量大小 acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) retries: 10 # 消息发送重试次数 #transaction-id-prefix: transaction buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger: ms: 2000 #提交延迟 #partitioner: #指定分区器 #class: com.test.config.CustomerPartitionHandler consumer: group-id: testGroup,testg2 #默认的消费组ID enable-auto-commit: true #是否自动提交offset auto-commit-interval: 2000 #提交offset延时 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest max-poll-records: 500 #单次拉取消息的最大条数 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session: timeout: ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作) request: timeout: ms: 18000 # 消费请求的超时时间 listener: missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 # type: batch logging: config: classpath:log4j2.xml
3. 日志配置
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <!--全局参数--> <Properties> <Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property> <!-- <Property name="logDir">/data/logs/logViewer</Property> --> <Property name="logDir">logs</Property> </Properties> <Appenders> <!-- 定义输出到控制台 --> <Console name="console" target="SYSTEM_OUT" follow="true"> <!--控制台只输出level及以上级别的信息--> <!-- <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --> <PatternLayout> <Pattern>${pattern}</Pattern> </PatternLayout> </Console> <!-- 同一来源的Appender可以定义多个RollingFile,定义按天存储日志 --> <RollingFile name="rolling_file" fileName="${logDir}/logViewer.log" filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout> <Pattern>${pattern}</Pattern> </PatternLayout> <Policies> <TimeBasedTriggeringPolicy interval="1"/> </Policies> <!-- 日志保留策略,配置只保留七天 --> <DefaultRolloverStrategy> <Delete basePath="${logDir}/" maxDepth="1"> <IfFileName glob="logViewer_*.log" /> <IfLastModified age="7d" /> </Delete> </DefaultRolloverStrategy> </RollingFile> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="console"/> <AppenderRef ref="rolling_file"/> </Root> </Loggers> </Configuration>
4. controller入口类,其它应用通过该接口直接将数据写入kafka
@RequestMapping(value="/kafka") @Controller public class ProducerController { @Autowired private KafkaTemplate kafkaTemplate; // 模拟发送消息 @RequestMapping(value = "/send",method = RequestMethod.GET) public String sendMessage(@PathParam(value = "msg") String msg) { System.out.println("收到get请求。。。"); kafkaTemplate.send("test",msg); return "成功"; }
5. kafka回调方法(需要回调通知时使用该方式):
@GetMapping("/kafka/callbackTwo/{message}") public void sendCallbackTwoMessage(@PathVariable("message") String message) { kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { System.out.println("发送消息失败2:"+throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-" + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset()); } }); }
6.kafka消费者注册
@Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); /** * containerFactory * 消息过滤器 消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。 配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。 * @param record */ @KafkaListener(topics = {"test","test2"},groupId = "testGroup") public void listenTestStatus(ConsumerRecord<?, ?> record) { LOGGER.info("接收到消息:开始业务处理。。。。。"); if (null == record || null == record.value()) { LOGGER.info("接收到空数据,跳过"); }else { LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString()); } } @KafkaListener(topics = {"test"},groupId = "testg2") public void listenTest2(ConsumerRecord<?, ?> record) { LOGGER.info("###listenTest2接收到消息:开始业务处理。。。。。"); if (null == record || null == record.value()) { LOGGER.info("接收到空数据,跳过"); }else { LOGGER.info("test-topics -->kafka监听到的值为: {}", record.value().toString()); } } /** * id:消费者ID groupId:消费组ID topics:监听的topic,可监听多个 topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区 * @param records */ //批量消费 @KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler") public void onBatchMessage(List<ConsumerRecord<String, Object>> records) { System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size()); for (ConsumerRecord<String, Object> record : records) { System.out.println(record.value()); } }
7.非spring-boot环境下使用java原生API手写kafka生产消息:
public static void main(String[] args) throws ExecutionException, InterruptedException { //PART1:设置发送者相关属性 Properties props = new Properties(); // 此处配置的是kafka的端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor"); // 配置key的序列化类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 配置value的序列化类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i < 5; i++) { //Part2:构建消息 ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i); //Part3:发送消息 //单向发送:不关心服务端的应答。 producer.send(record); System.out.println("message "+i+" sended"); } //消息处理完才停止发送者。 producer.close(); }
8.非spring-boot环境下使用java原生API手写java手写kafka消费者:
public static void main(String[] args) { //PART1:设置发送者相关属性 Properties props = new Properties(); //kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS); //每个消费者要指定一个group props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); //key序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //value序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { //PART2:拉取消息 // 100毫秒超时时间 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); //PART3:处理消息 for (ConsumerRecord<String, String> record : records) { System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value()); } //提交offset,消息就不会重复推送。 consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。 // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。 }
9.非spring-boot环境下使用java原生API手写异步发送kafka:
public static void main(String[] args) throws ExecutionException, InterruptedException { //PART1:设置发送者相关属性 Properties props = new Properties(); // 此处配置的是kafka的端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS); // 配置key的序列化类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 配置value的序列化类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); CountDownLatch latch = new CountDownLatch(5); for(int i = 0; i < 5; i++) { //Part2:构建消息 ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i); //Part3:发送消息 //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ System.out.println("消息发送失败,"+e.getMessage()); e.printStackTrace(); }else{ String topic = recordMetadata.topic(); long offset = recordMetadata.offset(); String message = recordMetadata.toString(); System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset); } latch.countDown(); } }); } //消息处理完才停止发送者。 latch.await(); //消息处理完才停止发送者。 producer.close(); }