阅读量:0
在C++中使用librdkafka库可以很方便地发送JSON数据到Kafka。下面是一个简单的示例代码:
#include #include #include int main() { std::string brokers = "localhost:9092"; std::string topic = "test_topic"; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); conf->set("metadata.broker.list", brokers, errstr); RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { std::cerr << "Failed to create producer: " << errstr << std::endl; return 1; } RdKafka::Topic *kafka_topic = RdKafka::Topic::create(producer, topic, tconf, errstr); if (!kafka_topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; return 1; } std::string json_data = "{"key": "value"}"; RdKafka::ErrorCode resp = producer->produce(kafka_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(json_data.c_str()), json_data.size(), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl; return 1; } producer->flush(1000); delete kafka_topic; delete producer; return 0; }
在这个示例中,我们首先创建一个生产者和一个主题对象,然后使用produce
方法发送一个JSON数据到Kafka主题。在实际使用中,你可能需要根据你的需求调整代码,并添加错误处理逻辑。