阅读量:4
在Java中连接Kafka并创建topic,可以使用KafkaAdminClient类的createTopics()方法。
以下是一个示例代码:
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import java.util.Arrays; import java.util.Properties; public class KafkaTopicCreator { public static void main(String[] args) { // Kafka broker地址 String bootstrapServers = "localhost:9092"; // 创建AdminClient的配置 Properties adminClientConfig = new Properties(); adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 创建AdminClient实例 try (AdminClient adminClient = AdminClient.create(adminClientConfig)) { // 创建一个NewTopic对象 NewTopic newTopic = new NewTopic("my-topic", 3, (short) 1); // 使用AdminClient创建topic adminClient.createTopics(Arrays.asList(newTopic)).all().get(); System.out.println("Topic created successfully"); } catch (Exception e) { e.printStackTrace(); } } }
在上述代码中,我们首先创建一个AdminClient实例,然后使用NewTopic类创建一个新的topic对象。接下来,我们使用AdminClient的createTopics()方法,并将新的topic对象作为参数传递给它。最后,通过调用all().get()方法等待创建topic的完成。
注意:为了能够成功创建topic,您需要运行Kafka服务,并且您的Java应用程序需要能够访问Kafka broker。