kafka发送消息流程

avatar
作者
筋斗云
阅读量:2

在这里插入图片描述
配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

public Map<String,Object> producerConfigs(){     Map<String,Object> props = new HashMap<>();     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);     props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);     return props; }  public ProducerFactory producerFactory(){     return new DefaultKafkaProducerFactory<>(producerConfigs()); }  // 覆盖spring-kafka中的配置 @Bean public KafkaTemplate<String,Object> kafkaTemplate(){     return new KafkaTemplate<String,Object>(producerFactory()); } 

自定义消息拦截器

public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {      // 发送消息时,对消息拦截。     @Override     public ProducerRecord<String,Object> onSend(ProducerRecord producerRecord) {         System.out.println("拦截消息" + producerRecord.toString());         return null;     }      // 服务器是否收到了当前这条消息     @Override     public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {         if(recordMetadata != null){             System.out.println("服务器收到消息" + recordMetadata.offset());         }else{             // 没有收到消息发送失败             System.out.println("消息发送失败!!!");         }     }      @Override     public void close() {      }      @Override     public void configure(Map<String, ?> map) {      } } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!