阅读量:2
配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
public Map<String,Object> producerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); return props; } public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs()); } // 覆盖spring-kafka中的配置 @Bean public KafkaTemplate<String,Object> kafkaTemplate(){ return new KafkaTemplate<String,Object>(producerFactory()); }
自定义消息拦截器
public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> { // 发送消息时,对消息拦截。 @Override public ProducerRecord<String,Object> onSend(ProducerRecord producerRecord) { System.out.println("拦截消息" + producerRecord.toString()); return null; } // 服务器是否收到了当前这条消息 @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(recordMetadata != null){ System.out.println("服务器收到消息" + recordMetadata.offset()); }else{ // 没有收到消息发送失败 System.out.println("消息发送失败!!!"); } } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }