想要运行kafka。发现windows系统运行kafka经常报问题怎么办?linux系统环境太复杂怎么办?
本文针对kafka服务器的启动,介绍如何利用docker在容器内运行kakfa的server。
1. 环境说明
docker提供了相对独立简洁的运行环境,能够让kafka的服务端良好独立地运行,遇到问题也方便进行排查。
经常使用的kafka镜像有wurminster/kafka, bitnami/kafka这两种,都可以使用。本文使用bitnami/kafka,并结合zookeeper使用(kafka的运行基础)。
docker: 24.0.6
镜像bitnami/kafka: 2.8.1
镜像bitnami/zookeeper: latest
镜像dushixiang/kafka-map:latest(kafka的管理界面,可同时启动使用)
不同版本基本不会对运行产生影响。
2. 方法介绍
为了让kafka有健全的运行体系,并且方便我们一键式地开启或关闭kafka的server,利用docker compose工具来建立一个由多容器构成的整体服务。
2.1. 文件准备
创建一个kafka文件夹,文件夹中创建2个文本文件,重命名为.env和compose.yaml
2.1.1. .env文件
.env文件的作用为设置环境变量,方便我们修改相关的参数,如端口、版本等。
ZOOKEEPER_IMAGE_VERSION='latest' KAFKA_IMAGE_VERSION='2.8.1' KAFKA_MAP_IMAGE_VERSION='latest' ZOOKEEPER_PORT=2181 KAFKA_PORT=9092 KAFKA_MAP_PORT=9001
2.1.2. compose.yaml文件
docker compose进行启动服务的执行文件,根据文件内容运行对应镜像。
将生成zookeeper,kafka,kafka-map三个容器。
services: zookeeper: image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION} container_name: zookeeper ports: - ${ZOOKEEPER_PORT}:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes networks: - app-tier kafka: image: bitnami/kafka:${KAFKA_IMAGE_VERSION} container_name: kafka ports: - ${KAFKA_PORT}:9092 environment: - KAFKA_CFG_NODE_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 # - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 不用advertised会导致client无法连接 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:${KAFKA_PORT} networks: - app-tier kafka-map: image: dushixiang/kafka-map:${KAFKA_MAP_IMAGE_VERSION} container_name: kafka-map restart: always ports: - ${KAFKA_MAP_PORT}:8080 environment: - DEFAULT_USERNAME=admin - DEFAULT_PASSWORD=admin volumes: - /opt/kafka-map/data/:/usr/local/kafka-map/data networks: - app-tier networks: app-tier:
2.2. 运行命令
打开cmd并cd到kafka文件夹的路径。
输入下方命令
docker compose up -d
成功运行将显示
此时即说明kafka的server已运行成功。
3. 后续操作
3.1. 运行client客户端(consumer和producer)
我的本机是windows,可以通过在windows安装kafka来生成producer和consumer。
也可以利用python代码,来生成producer和consumer。
client客户端的运行将依赖于docker生成的kafka server端,如果server停止运行,client会出问题。
3.1.1. 通过windows安装的kafka运行
首先在cmd中cd到kafka的执行路径。
cd "G:\kafka_2.13-2.8.1\bin\windows"
利用以下代码产生consumer或producer,注意一个窗口只能开一个,两个都开要两个窗口。
# 启动consumer kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning # 启动producer kafka-console-producer.bat --broker-list localhost:9092 --topic test
3.1.2. python代码生成
首先python环境中需安装kafka-python
producer: 每秒钟由producer发送时间
from kafka import KafkaProducer import time if __name__ == "__main__": producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(2, 8, 1)) print('Producer is ready to send messages') while True: time.sleep(1) msg = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) print(msg) msg = msg.encode('utf-8') producer.send(topic='python', value=msg)
consumer
from kafka import KafkaConsumer import os if __name__ == "__main__": consumer = KafkaConsumer('python', bootstrap_servers=['localhost:9092'], group_id = str(os.getpid()), api_version=(2, 8, 1), auto_commit_interval_ms=5000, ) for msg in consumer: value = msg.value.decode('utf-8') recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, value) print(recv)
3.2. kafka相关信息查看
这里以通过zookeeper查看当前已有的broker(即kafka的server端)为例。
在cmd中运行
docker exec -it zookeeper bin/bash zkCli.sh
进入该容器后,运行
# 查看当前已有的broker的id ls /brokers/ids # 输出: [1, 2, 3] # 获取该broker的信息 get /brokers/ids/<某id>
参考资料