本文共 9340 字,大约阅读时间需要 31 分钟。
由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息系统.
操作系统:centos6.5
kafka:1.0.1 zookeeper:3.4.6为了照顾对MQ不是很了解的同学,先讲一下MQ的原理.一般MQ都是在服务端存储一个队列.生产者把消息丢到MQ server,消费者从MQ server消费.这样一来解决了生产者和消费者的高耦合问题,同时也解决了生产速度和消费速度差异导致的消费者跟不上生产者的生产速度而导致的消费者压力过大问题.
在kafka中的topic就是一系列队列的总称,称为一个主题.当然ActiveMQ和RabbitMQ中都有这个概念.一类消息都会丢到一个topic中去.
讲完topic我们讲一下partition(分区),这个东西是kafka独有的东西,也是kafka实现横向扩展和高并发的一个重要设计.我们试想一下,如果每个topic只有一个队列,随着业务增加topic里消息越来越多.多到一台server装不下了怎么办.为了解决这个问题,我们引入了partition这个概念.一个partition(分区)代表了一个物理上存在的队列.topic只是一组partition(分区)的总称,也就是说topic仅是逻辑上的概念.这样一来当topic上的消息越来越多.我们就可以将新增的partition(分区)放在其他server上.也就是说topic里边的partition(分区)可以分属于不同的机器.实际生产中,也基本都是这样玩的.
这里说一个特殊情况,有时我们创建了一个topic没有指定partition(分区)数量或者指定了partition(分区)数量为1,这时实际也是有一个默认的partition(分区)的,名字我忘记了.
从Producer(生产者)角度,一个消息丢到topic中任务就完成了.至于具体丢到了topic中的哪个partition(分区),Producer(生产者)不需要关注.这里kafka自动帮助我们做了负载均衡.当然如果我们指定某个partition(分区)也是可以的.这个大家官方文档和百度.
接下里我们讲Consumer Group(消费组),Consumer Group(消费组)顾名思义就是一组Consumer(消费者)的总称.那有了组的概念以后能起到什么作用.如果只有一组内且组内只有一个Consumer,那这个就是传统的点对点模式,如果有多组,每组内都有一个Consumer,那这个就是发布-订阅(pub-sub)模式.每组都会收到同样的消息.
最后讲最难理解也是大家讨论最多的地方,partition(分区)和Consumer(消费者)的关系.首先,一个Consumer(消费者)的一个线程在某个时刻只能接收一个partition(分区)的数据,一个partition(分区)某个时刻也只会把消息发给一个Consumer(消费者).我们设计出来几种场景:
场景一: topic-1 下有partition-1和partition-2
group-1 下有consumer-1和consumer-2和consumer-3 所有consumer只有一个线程,且都消费topic-1的消息. 消费情况 : consumer-1只消费partition-1的数据 consumer-2只消费partition-2的数据 consumer-3不会消费到任何数据 原因 : 只能接受一个partition(分区)的数据场景二: topic-1 下有partition-1和partition-2
group-1 下有consumer-1 consumer只有一个线程,且消费topic-1的消息. 消费情况 : consumer-1先消费partition-1的数据 consumer-1消费完partition-1数据后开始消费partition-2的数据 原因 : 这里是kafka检测到当前consumer-1消费完partition-1处于空闲状态,自动帮我做了负载.所以大家看到这里在看一下上边那句话的”某个时刻” 特例: consumer在消费消息时必须指定topic,可以不指定partition,场景二的情况就是发生在不指定partition的情况下,如果consumer-1指定了partition-1,那么consumer-1消费完partition-1后哪怕处于空闲状态了也是不会消费partition-2的消息的.进而我们总结出了一条经验,同组内的消费者(单线程消费)数量不应多于topic下的partition(分区)数量,不然就会出有消费者空闲的状态,此时并发线程数=partition(分区)数量.反之消费者数量少于topic下的partition(分区)数量也是不理想的,原因是此时并发线程数=消费者数量,并不能完全发挥kafka并发效率.
最后我们看下上边的图,Consumer Group A的两个机器分别开启两个线程消费P0 P1 P2 P3的消息Consumer Group B的四台机器单线程消费P0 P1 P2 P3的消息就可以了.此时效率最高.
下载地址:
这里我们下载到/usr/local
目录下 解压 : cd /usr/local && tar -xzvf kafka_2.11-1.0.1.tgz
创建log目录 : cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs
配置:vi /usr/local/kafka_2.11-1.0.1/config/server.properties
需改下边五个地方
#broker的id,集群中的每台机器id唯一,其他两台分别1和2broker.id=0 #是Kafka绑定的interface,这里需要写本机内网ip地址,不然bind端口失败#其他两台分别是192.168.1.5和192.168.1.9host.name=192.168.1.3 #向zookeeper注册的对外暴露的ip和port,118.212.149.51是192.168.1.3的外网ip地址#如果不配置kafka部署在外网服务器的话本地是访问不到的.advertised.listeners=PLAINTEXT://118.212.149.51:9092 #zk集群的ip和port,zk集群教程:zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181#log目录,刚刚上边建好的.log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs
启动集群(分别在三台broker执行):进入bin目录cd /usr/local/kafka_2.11-1.0.1/bin/
执行启动脚本并指定配置文件./kafka-server-start.sh -daemon ../config/server.properties
验证集群是否启动成功:
[root@template ~]# cd /usr/local/zookeeper-3.4.6/bin/[root@template bin]# ./zkCli.sh -server 127.0.0.1:2181...[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids[0, 1, 2] #这里的012分别是三个broker的id
查看某个broker信息:注意endpoints
信息的ip:port,这个就是我们对外服务暴露的地址,我这里是外网访问,所以暴露的是外网ip和端口
[zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0{ "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://118.212.149.51:9092"],"jmx_port":-1,"host":"118.212.149.51","timestamp":"1521010377533","port":9092,"version":4}cZxid = 0x700000626ctime = Wed Mar 14 14:52:57 CST 2018mZxid = 0x700000626mtime = Wed Mar 14 14:52:57 CST 2018pZxid = 0x700000626cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x3621e366ae20014dataLength = 198numChildren = 0
创建topic :
#--replication-factor 创建的副本数,这个使用来备份的.副本数不能大于broker数#--partitions 1 创建的分区数.根据实际情况创建./kafka-topics.sh --create --zookeeper 192.168.1.3:2181 --replication-factor 1 --partitions 1 --topic milo
查看topic :
./kafka-topics.sh --list --zookeeper 192.168.1.3:2181
查看topic详细信息 :
./kafka-topics.sh --describe --zookeeper 192.168.1.3:2181
结果如下:
第一行topic信息摘要:分别是topic名字(Topic),partition数量(PartitionCount),副本数量(ReplicationFactor),配置(Config) 第二行~第四行分别列出了名为milo的topic的所有partition.依次为topic名字(Topic),partition号(Partition),此partition所在的borker(Leader),副本所在的broker(Replicas),Isr列表(Isr) ps:同步状态的副本的集合(a set of in-sync replicas),简称ISR,通俗理解就是替补队员,不是每个broker都可以作为替补队员.首先这个broker得存有副本,其次副本还得满足条件.就像我们大学足球队,有的人是替补,有的人连大名单都没进去,原因是他不会踢球. ^ ^生产消息 :
./kafka-console-producer.sh --broker-list 118.212.149.51:9092 --topic test\>hello world
消费消息 :
./kafka-console-consumer.sh --zookeeper 118.212.149.51:2181 --topic milo --from-beginninghello world
pom.xml
org.apache.kafka kafka_2.11 1.0.1 org.apache.kafka kafka-clients 1.0.1
Producer.java
package cn.milo.kafka;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.log4j.Logger;import java.util.Properties;/****************************************************** ****** @ClassName : Producer.java ****** @author : milo ^ ^ ****** @date : 2018 03 14 11:34 ****** @version : v1.0.x *******************************************************/public class Producer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "milo2"; private static final String BROKER_LIST = "118.212.149.51:9092"; private static KafkaProducerproducer = null; /* 初始化生产者 */ static { Properties configs = initConfig(); producer = new KafkaProducer (configs); } /* 初始化配置 */ private static Properties initConfig(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); return properties; } public static void main(String[] args) throws InterruptedException { //消息实体 ProducerRecord record = null; for (int i = 0; i < 1000; i++) { record = new ProducerRecord (TOPIC, "value"+(int)(10*(Math.random()))); //发送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e){ log.info("send error" + e.getMessage()); }else { System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition())); } } }); } producer.close(); }}
Consumer :
package cn.milo.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.log4j.Logger;import java.util.Properties;/****************************************************** ****** @ClassName : Consumer.java ****** @author : milo ^ ^ ****** @date : 2018 03 14 15:50 ****** @version : v1.0.x *******************************************************/public class Consumer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "milo2"; private static final String BROKER_LIST = "118.212.149.51:9092"; private static KafkaConsumerconsumer = null; static { Properties configs = initConfig(); consumer = new KafkaConsumer (configs); } private static Properties initConfig(){ Properties properties = new Properties(); properties.put("bootstrap.servers",BROKER_LIST); properties.put("group.id","0"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.offset.reset", "earliest"); return properties; } public static void main(String[] args) { while (true) { ConsumerRecords records = consumer.poll(10); for (ConsumerRecord record : records) { log.info(record); } } }}
[1].kafka 学习 非常详细的经典教程 :
[2].Kafka入门与实践.牟大恩