java连接kafka集群怎么配置

avatar
作者
筋斗云
阅读量:0

要连接Kafka集群,需要在Java中进行以下配置:

  1. 添加Kafka客户端依赖:在项目的pom.xml文件中添加以下依赖:
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.8.0</version> </dependency> 
  1. 创建KafkaProducer实例:通过以下代码创建KafkaProducer实例,并配置Kafka集群的地址:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig;  import java.util.Properties;  public class KafkaProducerExample {    public static void main(String[] args) {        // Kafka集群地址       String kafkaServers = "localhost:9092,localhost:9093,localhost:9094";        // KafkaProducer配置       Properties props = new Properties();       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        // 创建KafkaProducer实例       Producer<String, String> producer = new KafkaProducer<>(props);        // 发送消息       // ...        // 关闭KafkaProducer       producer.close();    } } 
  1. 创建KafkaConsumer实例:通过以下代码创建KafkaConsumer实例,并配置Kafka集群的地址:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;  import java.util.Arrays; import java.util.Properties;  public class KafkaConsumerExample {    public static void main(String[] args) {        // Kafka集群地址       String kafkaServers = "localhost:9092,localhost:9093,localhost:9094";        // KafkaConsumer配置       Properties props = new Properties();       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);       props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        // 创建KafkaConsumer实例       Consumer<String, String> consumer = new KafkaConsumer<>(props);        // 订阅topic       consumer.subscribe(Arrays.asList("my-topic"));        // 消费消息       // ...        // 关闭KafkaConsumer       consumer.close();    } } 

以上代码示例中,需要根据实际情况替换kafkaServersGROUP_ID_CONFIGsubscribe的参数值。kafkaServers是Kafka集群的地址,GROUP_ID_CONFIG是消费者组的ID,subscribe指定要订阅的topic名称。

注意:上述代码示例仅作为演示,实际应用中可能需要配置更多参数,如序列化器、消费位置等。具体配置根据需求来定。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!