阅读量:0
要连接Kafka集群,需要在Java中进行以下配置:
- 添加Kafka客户端依赖:在项目的pom.xml文件中添加以下依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
- 创建KafkaProducer实例:通过以下代码创建KafkaProducer实例,并配置Kafka集群的地址:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Kafka集群地址 String kafkaServers = "localhost:9092,localhost:9093,localhost:9094"; // KafkaProducer配置 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 创建KafkaProducer实例 Producer<String, String> producer = new KafkaProducer<>(props); // 发送消息 // ... // 关闭KafkaProducer producer.close(); } }
- 创建KafkaConsumer实例:通过以下代码创建KafkaConsumer实例,并配置Kafka集群的地址:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Kafka集群地址 String kafkaServers = "localhost:9092,localhost:9093,localhost:9094"; // KafkaConsumer配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 创建KafkaConsumer实例 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅topic consumer.subscribe(Arrays.asList("my-topic")); // 消费消息 // ... // 关闭KafkaConsumer consumer.close(); } }
以上代码示例中,需要根据实际情况替换kafkaServers
、GROUP_ID_CONFIG
和subscribe
的参数值。kafkaServers
是Kafka集群的地址,GROUP_ID_CONFIG
是消费者组的ID,subscribe
指定要订阅的topic名称。
注意:上述代码示例仅作为演示,实际应用中可能需要配置更多参数,如序列化器、消费位置等。具体配置根据需求来定。