阅读量:0
在 Kafka 中,生产者发送消息后,可以通过以下几种方式判断消息是否发送成功:
- 同步发送方式:生产者调用
send()
方法后,会等待消息的确认返回,如果发送成功,send()
方法会返回一个RecordMetadata
对象,其中包含了消息的元数据信息;如果发送失败,则可能抛出异常。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value"); try { RecordMetadata metadata = producer.send(record).get(); System.out.println("消息发送成功,offset:" + metadata.offset()); } catch (Exception e) { System.out.println("消息发送失败:" + e.getMessage()); }
- 异步发送方式:生产者调用
send()
方法后,可以传递一个回调函数,在消息发送完成后,会调用该回调函数,通过回调函数可以获取到发送结果。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println("消息发送失败:" + e.getMessage()); } else { System.out.println("消息发送成功,offset:" + metadata.offset()); } } });
无论是同步发送还是异步发送,如果发送失败,可以根据异常信息进行错误处理。