C/C++ 如何发送与接收Kafka消息

avatar
作者
筋斗云
阅读量:0

一、背景

        在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。

        系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(Java)依赖与算法工程(C++) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。

        算法工程为C/C++工程,本文将介绍如何在C/C++中如何发送与接收Kakfa消息(包含:Kafka的SASL认证方式),并提供了详细的源码和讲解。(至于Java中如何发送与接收Kakfa消息如有需要,可留言或私聊!)

二、环境依赖安装

# 下载librdkafka git clone https://github.com/edenhill/librdkafka.git  # 编译 cd librdkafka ./configure --prefix=/usr/local  # 安装 sudo make install  # 验证:查看/usr/local/lib目录下是否有librdkafka文件 ls /usr/local/lib | grep kafka

三、编写kakfa生产者消费者

3.1 生产者

#include <rdkafka.h>     // 包含C API头文件 #include <iostream> #include <cstring> #include <cerrno>    int main() {     const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址     const char *topic_name = "kafka_msg_topic_test";     const char *payload = "Hello, Kafka from librdkafka!";     size_t len = strlen(payload);        // 创建配置对象     rd_kafka_conf_t *conf = rd_kafka_conf_new();     if (!conf) {         std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         return 1;     }        // 设置broker地址     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {         std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }        // 创建生产者实例     rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);     if (!rk) {         std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }        // 创建topic句柄(可选,但推荐)     rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);     if (!rkt) {         std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_destroy(rk); //        rd_kafka_conf_destroy(conf);         return 1;     }        // 发送消息     int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区     int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);     if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {         std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;     } else {         std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;     }        // 等待所有消息发送完成(可选,但推荐)     // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认     int msgs_sent = 0;     while (rd_kafka_outq_len(rk) > 0) {         rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去         msgs_sent += rd_kafka_outq_len(rk);     }        // 销毁topic句柄     rd_kafka_topic_destroy(rkt);        // 销毁生产者实例     rd_kafka_destroy(rk);        // 销毁配置对象 //    rd_kafka_conf_destroy(conf);        return 0; }

3.2 消费者

#include <rdkafka.h> #include <iostream> #include <cerrno> #include <cstring> #include <cstdlib>    void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {     // 错误处理回调     std::cerr << "Kafka error: " << err << ": " << reason << std::endl; }    int main() {     std::cerr << "start " << std::endl;     const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址     const char *group_id = "kafka_msg_topic_test"; // 消费者组ID     const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名称          // 创建配置对象     rd_kafka_conf_t *conf = rd_kafka_conf_new();     if (!conf) {         std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         return 1;     }          // 设置broker地址     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {         std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }          // 设置消费者组ID     if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) {         std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }          // 设置错误处理回调(可选)     rd_kafka_conf_set_error_cb(conf, error_cb);          // 创建消费者实例     rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);     if (!rk) {         std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         return 1;     }               // 创建一个topic分区列表     rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);     if (!topics) {         std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_destroy(rk);         return 1;     }          // 添加topic到分区列表     if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) {         std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_topic_partition_list_destroy(topics);         rd_kafka_destroy(rk);         return 1;     }     // 订阅topic     rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics);     if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {         std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;         rd_kafka_topic_partition_list_destroy(topics);         rd_kafka_destroy(rk);         return 1;     }          // 销毁分区列表(订阅后不再需要)     rd_kafka_topic_partition_list_destroy(topics);               // 轮询消息     while (true) {         rd_kafka_message_t *rkmessage;         rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息                  if (rkmessage == NULL) {             // 没有消息或者超时             continue;         }                  if (rkmessage->err) {             // 处理错误             if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {                 // 消息流的末尾                 std::cout << "End of partition event" << std::endl;             } else {                 // 打印错误并退出                 std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl;                 break;             }         } else {             // 处理消息             std::cout << "Received message at offset " << rkmessage->offset             << " from partition " << rkmessage->partition             << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len             << " value :" <<(char *)rkmessage->payload             << std::endl;                          // 如果需要,可以在这里处理消息内容             // 例如,使用rkmessage->payload()获取消息内容                          // 释放消息             rd_kafka_message_destroy(rkmessage);         }     }          // 清理     rd_kafka_destroy(rk);          return 0; }

3.3 编译运行

3.3.1 编译生产者消费者

g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread

3.3.2 运行验证

执行时,若出现错误: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory

则需要执行下面环境变量配置:

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

生产者:发送消息

消费者:接收消息

3.4 SASL认证kakfa

下面是,支持sasl认证的kakka生产者完整代码

#include <rdkafka.h> #include <iostream> #include <cstring> #include <cerrno>  int main(int argc, char *argv[]) {     const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址     const char *username = "xxx";     const char *password = "xxx";     const char *topic_name = "kafka_msg_test_sasl";     const char *payload = "Hello, Kafka from librdkafka! sasl";     size_t len = strlen(payload);      // 初始化配置     rd_kafka_conf_t *conf = rd_kafka_conf_new();     if (!conf)     {         std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         return 1;     }     char errstr[512]; // 声明一个足够大的字符数组来存储错误信息      // 设置SASL相关的配置     if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)     {         std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }     if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)     {         std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }     if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)     {         std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }     if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)     {         std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }      if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)     {         std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_conf_destroy(conf);         return 1;     }     // 检查配置是否设置成功     if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {         std::cerr << "Failed to set configuration: " << errstr << std::endl;         return 1;     }      // 创建producer实例     rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));     if (!rk) {         std::cerr << "Failed to create new producer: " << errstr << std::endl;         return 1;     }      // 创建topic句柄(可选,但推荐)     rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);     if (!rkt)     {         std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;         rd_kafka_destroy(rk);         //        rd_kafka_conf_destroy(conf);         return 1;     }      // 发送消息     int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区     int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);     if (err != RD_KAFKA_RESP_ERR_NO_ERROR)     {         std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;     }     else     {         std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;     }      // 等待所有消息发送完成(可选,但推荐)     // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认     int msgs_sent = 0;     while (rd_kafka_outq_len(rk) > 0)     {         rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去         msgs_sent += rd_kafka_outq_len(rk);     }      // 销毁topic句柄     rd_kafka_topic_destroy(rkt);      // 清理资源     rd_kafka_destroy(rk);     return 0; }

在kafka map 管理界面中查看发送效果如下:

3.5 结束语

本文详细描述了kakfa依赖安装、消息生产者、消费者、kafka sasl认证等相关完整代码,完整工程见github地址。如果有疑问,可留言/私聊!

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!