Kafka是开源的分布式、可拓展高可用日志服务,常用于分布式队列服务。基于Kafka的producer/consumer模型,用户可以按topic发布消息,通过partition保留多份数据保证即使有节点故障也不会丢数据,更重要的是即使消息量巨大也可以通过添加服务器水平拓展。
Kafka的源代码可以在Github获得,搭建分布式集群的步骤也比较简单,如果希望快速体验的话可以使用Docker容器技术。
快速启动Kafka
由于Kafka服务依赖ZooKeeper,我们可以起两个容器来搭建服务,也可以起一个all in one的容器,这会同时暴露ZooKeeper和Kafka的端口。
docker run -d –net=host spotify/kafka |
我们可以docker exec进入容器执行kafka命令,也可以下载编译好的kafka命令来测试。
./bin/kafka-console-producer.sh –broker-list 127.0.0.1:9092 –topic test
./bin/kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic test |
这是Spotify提供的Kafka容器镜像,详细信息可参考 https://hub.docker.com/r/spotify/kafka/
准备分布式环境
如果想搭建分布式集群,我们需要多台物理机或虚拟机环境,为了简化搭建过程,我们将使用UnitedStack提供的公有云服务。
首先是创建Linux虚拟机,建议使用Ubuntu 15.04系统。
由于“已知原因”安装Java和Kafka需要从国外下载文件,配置时间很长,我们建议搭建好kafka-master后通过虚拟机快照的方式简化安装步骤,创建快照也很简单。
搭建Kafka服务
Kafka是Scala实现的分布式服务,Scala需要运行在JVM环境,因此需要提前安装Java,建议使用Oracle Java7,安装步骤通过以下命令即可完成。
apt-get purge -y openjdk* add-apt-repository -y ppa:webupd8team/java apt-get update -y apt-get install -y oracle-java7-installer
export JAVA_HOME=/usr/lib/jvm/java-7-oracle/ export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin |
安装好Java后,可以安装编译好的ZooKeeper,如果通过源码编译还需要安装Maven等环境。
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz tar xzvf zookeeper-3.4.7.tar.gz mv zookeeper-3.4.7 /usr/lib/zookeeper |
我们可以通过下载的Jar包启动ZooKeeper,也可以直接使用Kafka压缩包里自带的ZooKeeper工具,安装Kafka的过程类似。
wget http://mirrors.noc.im/apache/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz tar xzvf kafka_2.10-0.8.2.0.tgz mv kafka_2.10-0.8.2.0 /usr/lib/kafka |
接着启动ZooKeeper和Kafka服务。
cd /usr/lib/kafka ./bin/zookeeper-server-start.sh ./config/zookeeper.properties |
启动单机版Kafka服务。
cd /usr/lib/kafka bin/kafka-server-start.sh config/server.properties |
添加分布式Broker
为了实现消息多备份分布在不同节点,或者在单机存在瓶颈时我们可以轻易地拓展Broker。
基于OpenStack强大的快照功能,我们可以先对kafka-master打快照,然后创建基于次快照的虚拟机,这样就不需要重新安装Java、Kafka了,只需修改Kafka的server.properties配置文件。
broker.id=1 zookeeper.connect=192.168.0.2:2181 |
这里需要修改zookeeper.connect为ZooKeeper的启动地址,也就是kafka-master的IP地址,无论消息量有多大也可以通过添加broker来解决。
注意如果部署期间Java进程找不到主机名,可以修改/etc/hosts添加主机名对应的IP为127.0.0.1。
测试集群
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test bin/kafka-topics.sh –list –zookeeper localhost:2181
# producer bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
# consumer bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning |
在两个终端启动producer和consumer,就可以实现类似聊天室的功能,更多高级用法可参考官方文档 http://kafka.apache.org/documentation.html#gettingStarted 。
总结
Kafka是业界非常重要的分布式消息队列服务,目前已经大规模在LinkedIn、Yahoo、Netflix、Twitter等生产环境,基于此文大家也可以在云平台快速搭建自己的消息队列服务。