kafka手动提交偏移量怎么实现

avatar
作者
猴君
阅读量:2

Kafka 提供了两种方式来手动提交偏移量:

  1. 使用 commitSync() 方法同步提交偏移量:
import org.apache.kafka.clients.consumer.*;  Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); // 关闭自动提交  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic"));  try {     while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));         for (ConsumerRecord<String, String> record : records) {             // 处理消息         }         consumer.commitSync(); // 手动提交偏移量     } } finally {     consumer.close(); } 
  1. 使用 commitAsync() 方法异步提交偏移量:
import org.apache.kafka.clients.consumer.*;  Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); // 关闭自动提交  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic"));  try {     while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));         for (ConsumerRecord<String, String> record : records) {             // 处理消息         }         consumer.commitAsync(); // 异步提交偏移量     } } finally {     consumer.close(); } 

在这两种方式中,commitSync() 方法会一直阻塞直到偏移量提交成功或发生错误。而 commitAsync() 方法则会在提交请求发送后立即返回,不会等待确认。如果发生错误,可以在 commitAsync() 方法的回调函数中处理。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!