kafka-原理和安装以及操作
1.kafka是什么
Kafka是由LinkedIn开发的一个分布式的消息队列。它是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper的协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比,KafKa能够很好的处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。
Kafka使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。
2.什么是消息队列
消息队列:是在消息的传输过程中保存消息的容器。
消息在原始的传输过程中是直接传输的,端对端的数据传递,但是有的时候我们需要将消息数据进行部分的缓冲存储,以达到方便使用的目的,中间的组件可以做消息的传输中间介质,这个组件就是消息队列。更像是一个消息的蓄水池一样的功能
类比现实中更像是高速公路的休息区
那么消息队列在什么场景使用呢
3.消息队列的好处
缓冲
解耦
异步
如果是同步服务,如上图,那么用户的体验度是非常差的,因为需要将短信发送完毕然后在返回成功通知,页面才会跳转,但是如果短信发送遇见网络等问题,没有办法直接发送,这个时候客户需要等待很长的时间
直接将需要发送的消息放入到消息队列中,然后消息服务会不停的扫描队列中需要发送的消息将数据发送出去,但是不会让客户等待,用户会大大的增加体验度
抵挡洪峰
如上图,我们需要处理流量的波峰和抖动,那么我们需要设定整个集群的处理能力达到最大的5M/s才可以,但是大多数时候这个处理能力都是浪费的,我们用不到
我们可以使用消息队列进行数据的存储,然后计算服务慢慢去消息队列中拉取数据进行消费就可以了,可以在一定程度节省成本
4.消息消费模式
通过以上的讲解,我们可以了解整个kafka的原理和架构了,消息队列其实就是一个数据传输过程中的缓冲区,能够存储数据,在流程中作为一个中间的介质,承上启下,在特定的情景下起到解耦,缓冲,异步的功能。
作为中间的介质,它的上下游需要传输和拉取数据,上游传输数据的部分称之为生产者,生产数据发送到kafka中,下游拉取数据的组件称之为消费者,自己拉取想要的数据并且进行数据的计算和处理,消费者和消费者以及kafka是三个部分,大家一定要注意,他们不是一个整体
其中生产者只是发送数据到队列中,但是消费者在消费数据的时候却有两种不同的方式
1.点对点消费
点对点的方式,在队列中的数据有且只有一个消费者可以消费数据,在消费完毕数据以后会将数据从队列中删除,这个数据有且只有一次消费
2.发布订阅模式
发布定语模式中每个人可以消费数据,这个数据会在队列中存储七天,每个订阅这个数据的人都可以消费到相应的数据,并且可以重复的进行消费数据,在大多数情况下我们都使用发布订阅模式
5.kafka的基础架构
kafka作为消息队列,主要在大数据的处理和计算过程中起到承上启下的作用,作为中间的一个缓冲中间件,那么它首先就要具备数据的稳定性和可靠性,并且存储数据量和吞吐量以及数据的检索速度一定要有所保证,那么单台机器肯定是没有办法解决这个问题的,在之前的所有的大数据组件部分我们都有所介绍,首先要保证性能一定要多台机器,分布式计算和存储才能保证性能,其次就是数据一定要做副本备份才能在多台机器的集群中保证数据的稳定性。所以首先要给大家介绍的就是kafka的集群组成结构
kafka首先我们要知道它不是一个主从集群,之前的课程中我们讲过,主从集群中节点有的天生就是主节点不能被其他的从节点替代,在非主从集群中每个节点都可以作为主节点,如果一个节点宕机那么其他的节点可以选举为主节点管理整个集群,在kafka集群中每个节点都称之为broker,其中每个节点都存在一个kafka_controller组件,但是只有一台节点的controller组件是活跃状态的,其他的都是standby状态,只有主节点宕机了,那么从节点才会选举成为主节点,但是究竟谁是主节点呢?这个时候我们需要一个外部协调管理组件zookeeper进行集群选举
针对以上的集群结构和部分原理我们先将kafka的集群搭建起来
6.kafka集群的搭建
可以基于基础镜像进行搭建kafka集群,集群的地址如下
http://cloud.hainiubl.com/#/privateImageDetail?id=2969&imageType=private
搭建准备工作,首先在海牛实验室中启动基础镜像,并且调节资源大小
调节大小如下:
启动实验,并且启动zookeeper集群
# 选在nn1机器并且启动zookeeper集群
su - hadoop
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh start
# 查看启动情况
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh status
现在开始集群的搭建
集群规划
因为演示效果每个机器都安装一个kafka的集群节点
安装包等信息全部都已经存放到 /public/software/bigdata中为大家准备好了。我们直接使用就可以
# 首先将安装包解压到每个机器中
ssh_root.sh tar -zxvf /public/software/bigdata/kafka_2.12-3.3.2.tgz -C /usr/local/
# 查看解压情况
ssh_root.sh ls /usr/local|grep kafka
# 修改权限
ssh_root.sh chown hadoop:hadoop -R /usr/local/kafka_2.12-3.3.2/
# 创建软连接
ssh_root.sh ln -s /usr/local/kafka_2.12-3.3.2/ /usr/local/kafka
查看安装包中的内容
比较重要的三个文件夹
bin 执行脚本
config 配置文件
libs 依赖包
# 配置环境变量
echo 'export KAFKA_HOME=/usr/local/kafka' >> /etc/profile
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
source /etc/profile
首先连接所有的节点
选择批量命令
将上面的命令群体执行
到现在为止集群的初始化安装已经完毕,我们下面做kafka的自定义设置
# 修改server.properties配置文件
cd /usr/local/kafka/config/
vim server.properties
修改如下的配置
broker.id=0 #节点编号,每个节点的编号都不能相同
log.dirs=/data/kafka-logs # kafka存放数据的位置
zookeeper.connect=nn1:2181,nn2:2181,s1:2181 #zookeeper集群的链接地址
# 首先修改log.dirs zookeeper.connect两个参数
# 分发到不同的机器节点
scp_all.sh /usr/local/kafka/config/server.properties /usr/local/kafka/config/
# 分别在不同的节点修改broker.id的编号
# nn1 --> broker.id=0
# nn2 --> broker.id=1
# s1 --> broker.id=2
# s2 --> broker.id=3
# s3 --> broker.id=4
# 查看每个机器的编号
ssh_all.sh cat /usr/local/kafka/config/server.properties | grep broker.id
# 创建每个机器上的kafka的数据存储文件夹
ssh_root.sh mkdir /data/kafka-logs
# 修改权限
ssh_root.sh chown hadoop:hadoop -R /data/kafka-logs
# kafka集群的启动
ssh_all.sh /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 查看kafka的执行进程
ssh_all.sh jps
到此为止,集群启动已经完毕
7.kafka的组件结构
broker:每个kafka的机器节点都会运行一个进程,这个进程叫做broker,负责管理自身的topic和partition,以及数据的存储和处理,因为kafka是集群形式的,所以一个集群中会存在多个broker,但是kafka的整体又不是一个主从集群,需要选举出来一个broker节点为主节点,管理整个集群中所有的数据和操作,以及所有节点的协同工作。每个broker上面都存在一个controller组件,这个组件就是主节点管理组件,负责整个集群的管理,但是只有一个机器是active状态的,这个需要zookeeper进行协调和选举
topic:在kafka中存在一个非常重要的逻辑结构叫做topic,可以称之为主题。当我们很多业务需要使用kafka进行消息队列的消息缓存和处理的时候我们会将消息进行分类处理,不能让多种类的数据放入到一起,这样使用特别混乱,所以topic主主题进行分类,是kafka数据处理的一大特色,可以类比现实中的主播。一个主播在直播的时候都会创建一个自己的房间,每个主播都不会相互干扰。各自主播自己的内容。
partition:分区,每个topic中在使用过程中会存储很多数据,这些数据如果默认只给一个broker进行处理,那么这个broker的压力会太大,集群应该负载均衡让数据的压力在不同的机器上共同分摊,所以每个topic都会分为不同的分区,一个分区是一个topic数据真正的物理存储方式,让数据分为不同的部分,在多个节点上存储和管理。分区是kafka物理存储最小的负载均衡单位,生产者生产数据的时候指向多个分区,消费者也可以在消费数据的时候从不同的分区读取数据
每个broker节点会按照topic的名称和分区的名称组合在一起形成一个文件夹进行文件内容的存储,一个broker会管理多个topic的不同分区的数据
备份:在一个topic中存在多个分区,每个分区存储一部分这个topic的数据,但是因为存在多个机器上,不能够保证数据的稳定性,所以数据需要进行备份管理,所以分区是存在备份的,比如topicA的数据就需要存储多份在不同的机器上,这样数据损坏一份,其他的部分还可以使用
主从:数据在存储的时候需要备份多个,那么这些数据就要保证数据的一致性,所以我们不能再存放数据的时候随意的向任何副本写入,因为这样集群中一个分区的多个副本没有办法保证数据的一致性,所以我们只能写入数据到一个副本,这个副本叫做主副本,其他的副本会从主副本同步数据,从而保证数据的一致性,那么这个主从的选举是broker的主节点进行选举的和zookeeper没有关系
zookeeper:帮助选举broker为主,记录哪个是主broker,集群存在几个topic,每个topic存在几个分区,分区存在几个副本,每个分区分别在哪个机器节点上
producer: 生产者,将数据远程发送到kafka集群,一般都是flume进行数据采集,并且发送到集群,producer一般只能发送数据到一个topic中,和一个主播只能在自己的房间直播一样
consumer:消费者,消费数据并且参加计算处理,一般都是spark,flink等计算框架充当。但是一个消费者可以同时消费多个分区的数据,就如一个观众可以一起看多个小姐姐直播一样
大家一定要知道一个重要的问题就是数据不管是生产者还是消费者,都是一条一条的操作,这个才是消息队列,这也是消息队列和hdfs等存储介质不同的地方,消息队列更加偏向于流式处理,并不是整体存取
8.kafka的shell操作
首先是topic的管理命令
kafka-topics.sh
参数如下:
# 创建
kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_a --partitions 3 --replication-factor 2
# --bootstrap-server 指定集群地址,因为每个节点都存在controller所以想要获取元数据只需要指定集群中的一台机器就行了
# --create是创建命令
# --topic 指定topic的名称
# --partitions 分区数量,分区数量没有限定
# --replication-factor 副本数量,副本数量必须小于等集群的机器的个数,因为一个节点上面不能存在多个副本
# list查看所有topic
kafka-topics.sh --bootstrap-server nn1:9092 --list
# 描述 desc
kafka-topics.sh --bootstrap-server nn1:9092 --describe --topic topic_a
describe命令展示的topic信息如下:
topic名称
topic_id随机id
partition_count 分区数量
replicationFactor 副本数量
Topic: topic_a Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
内容解释:topic_a,0号分区在,leader分区在brokerid为1的broker上面,副本在brokerid为1和2两个节点上面
isr是数据的备份情况,先进broker1然后进入到broker2
# 删除
kafka-topics.sh --bootstrap-server nn1:9092 --delete --topic topic_a
# 重新创建
kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_a --partitions 3 --replication-factor 2
# alter修改topic
kafka-topics.sh --bootstrap-server nn1:9092 --alter --topic topic_a --partitions 2
# 分区数量不能减少只能增加,减少分区会使得数据丢失
kafka-topics.sh --bootstrap-server nn1:9092 --alter --topic topic_a --partitions 4
# 修改副本数据
# 在创建完毕的topic以后,我们在使用的时候可能会遇见,副本不足的情况,这个时候我们可以动态增加topic的副本数量,但是增加的副本数量要在原有的基础上进行增加
# 首先我们创建一个json文件,用于支配topic的分区副本和节点的对应关系
vim json.txt
# 以原有分区分配的策略基础之上做二次变化
{"partitions":[{"topic":"topic_a","partition":0,"replicas":[4,3,2]},{"topic":"topic_a","partition":1,"replicas":[1,0,2]},{"topic":"topic_a","partition":2,"replicas":[2,4,0]},{"topic":"topic_a","partition":3,"replicas":[2,3,1]}],"version":1}
# 执行重新分配命令
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file json.txt --execute
数据的生产消费命令
# 生产者命令
kafka-console-producer.sh --bootstrap-server nn1:9092 --topic topic_a
# 消费者命令 --from-beginning 从头消费数据 --partition 指定分区消费
kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_a
# 指定分区,并且消费历史数据
kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_a --from-beginning --partition 2