kafka高级部分
1.kafka的文件存储原理
在以上部分的讲解中我们知道了,kafka的使用场景就是在流式处理过程中,充当一个中间缓冲介质的作用,主要功能是将数据先放入到kafka中,计算框架会自己拉取要消费和计算的数据过来,采用poll的方式完全适配自身消费速率。那么kafka的存储和hdfs的存储的区别非常清楚的就可以提现出来,hdfs更加适用于整体的存储和取出,kafka主要做的是流,数据都是按照条进行流转的,主打的是灵活和效率,那么效率提现在哪些方面上呢???
首先topic是按照分区进行划分的,因为多个分区可以将存储的数据放入到不同的机器节点上,这样起到负载均衡的作用,所以每个broker机器节点上面存储的数据都是多个topic的不同的分区的数据,这样分布式处理可以增加kafka的计算和处理能力
所以broker上面会管理很多topic的不同partition的数据,存储的结构就是以topic-partition方式进行命名的文件夹存储数据,但是随着数据的增加,单个分区的数据也会随之增多,这样管理和检索都在一个文件中也是非常低效率的,解决办法就是单个分区的数据也会切段进行存储,每个段称之为segment
每一个段称之为一个segement
在官网中形容的是单个日志文件的最大值,默认是1G
这样不管找寻什么样的数据都会直接找寻相应的segment段落就行了,不管数据多大,其检索范围也不会超过1G
但是一个G的文件检索还是比较大的,所以kafka在存储数据的时候,首先存储数据在内存中,然后将数据刷写到磁盘上,这个刷写的大小是以4K为主的
在这个插入过程中会追加的形式存储到log文件中,并且在index和timeindex中存在稀疏的索引数据
这个时候查询的时候就可以直接去根据文件条数命名的对应segment中查询数据。能够轻易的跳过1G的部分
在具体查询数据的时候可以根据index去log中查询数据,速度更快,效率更高
检索过程为先跳过整体segment部分,然后在segment部分找到index,根据index找到相对应偏移量位置,然后找寻log日志中的数据
以如此方式进行数据检索,这样的存储格式让检索效果更佳明显
以上只是kafka的存储方式之一,主要是为了让数据存储更加方便管理和检索
2.kafka的快速读写
根据以上的部分我们发现存储的方式比较有规划是对于后续查询非常便捷的,但是这样存储是不是会更加消耗存储性能呢?
其实kafka的数据存储是追加形式的,也就是数据在存储到文件中的时候是以追加方式拼接到文件末尾的,这样就非常快速的跳过了文件的检索步骤
机械磁盘的文件检索需要使用到磁头进行不断扫描数据,如果存储大量的小文件或者存储位置不同的时候需要不停的扫描检索文件的位置,这个过程是非常浪费时间的,但是kafka的数据完全以追加的方式存储到磁盘中的,那么这个时候就完全省去了这样的一个过程,使得机械磁盘的性能和固态的性能相差无几
我们可以看到经过测试,机械磁盘的存储性能可以达到600M/s 但是随机读写就比较慢100k/s
所以追加写造就了kafka的高写入性能
写入的速度非常快,那么读取的性能是如何保证的呢
首先kafka的数据就是以分区作为单位进行分布式管理的,所以多个机器共同管理,效果更加明显
前文中说过kafka的存储是按照segment切分的,并且存储的数据是带有index索引的,这个速度可以几乎直接定位到相应的检索文件
并且kafka还实现了零拷贝技术
首先我们可以看到普通的存储在磁盘上的文件要想发送出去的话,需要走以上的步骤
通过内核和用户空间的加载,反反复复经过4次加载和拷贝过程,这个过程是非常消耗性能和io的
其实直白来说,如果数据加载的过程中不走用户缓冲区的话直接以内核加载一次的方式进行传输效率是更加高效的
所以使用到零拷贝技术,方式就是只将数据从磁盘加载到内存中一次,然后直接从内核空间将数据发送到网卡从而直接传输给消费者端
零拷贝技术的本质就是怎么减少数据的复制过程,并不是没有数据的复制
这个实现方式就是使用到sendFile的系统函数,它可以直接实现系统内存的映射
3.文件清除原理
kafka数据并不是为了做大量存储使用的,主要的功能是在流式计算中进行数据的流转,所以kafka中的数据并不做长期存储,默认存储时间为7天
那么问题来了,kafka中的数据是如何进行删除的呢?
在Kafka中,存在数据过期的机制,称为data expire。如何处理过期数据是根据指定的policy(策略)决定的,而处理过期数据的行为,即为log cleanup。
在Kafka中有以下几种处理过期数据的策略:
-
log.cleanup.policy=delete(Kafka中所有用户创建的topics,默认均为此策略)
-
根据数据已保存的时间,进行删除(默认为1周)
-
根据log的max size,进行删除(默认为-1,也就是无限制)
-
log.cleanup.policy=compact(topic __consumer_offsets 默认为此策略)
-
根据messages中的key,进行删除操作
-
在active segment 被commit 后,会删除掉old duplicate keys
- 无限制的时间与空间的日志保留
自动清理Kafka中的数据可以控制磁盘上数据的大小、删除不需要的数据,同时也减少了对Kafka集群的维护成本。
那Log cleanup 在什么时候发生呢?
-
首先值得注意的是:log cleanup 在partition segment 上发生
-
更小/更多的segment,也就意味着log cleanup 发生的频率会上升
-
Log cleanup 不应该频繁发生=> 因为它会消耗CPU与内存资源
- Cleaner的检查会在每15秒进行一次,由log.cleaner.backoff.ms 控制
log.cleanup.policy=delete
log.cleanup.policy=delete 的策略,根据数据保留的时间、以及log的max size,对数据进行cleanup。控制数据保留时间以及log max size的参数分别为:
-
log.retention.hours:指定数据保留的时常(默认为一周,168)
-
将参数调整到更高的值,也就意味着会占据更多的磁盘空间
-
更小值意味着保存的数据量会更少(假如consumer 宕机超过一周,则数据便会再未处理前即丢失)
-
log.retention.bytes:每个partition中保存的最大数据量大小(默认为-1,也就是无限大)
- 再控制log的大小不超过一个阈值时,会比较有用
在到达log cleanup 的条件后,cleaner会自动根据时间或是空间的规则进行删除,新数据仍写入active segment:
针对于这个参数,一般有以下两种使用场景,分别为:
-
log保留周期为一周,根据log保留期进行log cleanup:
-
log.retention.hours=168 以及 log.retention.bytes=-1
-
log保留期为无限制,根据log大小进行进行log cleanup:
- log.retention.hours=17520以及 log.retention.bytes=524288000
其中第一个场景会更常见。
Log Compaction
Log compaction用于确保:在一个partition中,对任意一个key,它所对应的value都是最新的。
这里举个例子:我们有个topic名为employee-salary,我们希望维护每个employee当前最新的工资情况。
左边的是compaction前,segments中的数据,右边为compaction 后,segments中的数据,其中有部分key对应的value有更新:
可以看到在log compaction后,相对于更新后的key-value message,旧的message被删除。
Log Compaction 有如下特点:
-
messages的顺序仍然是保留的,log compaction 仅移除一些messages,但不会重新对它们进行排序
-
一条message的offset是无法改变的(immutable),如果一条message缺失,则offset会直接被跳过
- 被删除的records在一段时间内仍然可以被consumers访问到,这段时间由参数delete.retention.ms(默认为24小时)控制
需要注意的是:Kafka 本身是不会组织用户发送duplicate data的。这些重复数据也仅会在一个segment在被commit 的时候做重复数据删除,所以consumer仍会读取到这部分重复数据(如果客户端有发的话)。
Log Compaction也会有时失败,compaction thread 可能会crash,所以需要确保给Kafka server 足够的内存用于做这些操作。如果log compaction异常,则需要重启Kafka(此为一个已知的bug)。
Log Compaction也无法通过API手动触发(至少到现在为止是这样),只能server端自动触发。
下面是一个 Log Compaction过程的示意图:
正在写入的records仍会被写入Active Segment,已经committed segments会自动做compaction。此过程会遍历所有segments中的records,并移除掉所有需要被移除的messages。
Log compaction由上文提到的log.cleanup.policy=compact进行配置,其中:
-
Segment.ms(默认为7天):在关闭一个active segment前,所需等待的最长时间
-
Segment.bytes(默认为1G):一个segment的最大大小
-
Min.compaction .lag.ms(默认为0):在一个message可以被compact前,所需等待的时间
-
Delete.retention.ms(默认为24小时):在一条message被加上删除标记后,在实际删除前等待的时间
- Min.Cleanable.dirty.ratio(默认为0.5):若是设置的更高,则会有更高效的清理,但是更少的清理操作触发。若是设置的更低,则清理的效率稍低,但是会有更多的清理操作被触发
4.kafka的监控
在操作kafka的时候我们都使用命令进行操作,但是操作过程中没有办法直观的查看信息,或者节点的运行情况,这个时候我们需要一个专业的监控软件用于视图话管理和监控数据
kafka-console-ui 非常轻量级的一个监控软件,安装之后就可以使用
这个软件已经帮大家准备好了,在/public软件目录中可以直接查看
# 首先解压安装包到/usr/local中
unzip /public/software/bigdata/kafka-console-ui.zip -d /usr/local/
十分轻量级,解压完毕直接可以使用,不需要任何的配置
# 启动
cd /usr/local/kafka-console-ui
bin/start.sh
监控端口号为7766,启动后稍等初始化过程
直接访问就可以进入监控页面,但是刚进来的时候会报错,这个是因为没有kafka集群信息所致
这个时候点击运维-->集群切换 --> 新增集群
添加集群连接信息
点击提交以后选择切换按钮,切换集群
这个时候我们发现页面右上角的集群名称发生了变化
点击相应的按钮就可以对集群中的topic做切换配置使用了
5.kafka动态增加节点
在生产环境中数据量存储可能随着公司业务的发展而增加,但是集群的存储性能和处理能力没有办法满足日常的使用需要,这个时候我们需要增加kafka的节点数量,增加机器减少各个节点的压力,这个时候我们需要增加kafka的broker数量,并且实现数据的转移
首先在实验室中选择一个空的linux操作系统
并且提交资源的大小1core 1.5G
然后启动节点后开始配置
-
host映射
-
主机名
-
创建hadoop用户,修改密码
-
和其他机器的免密
-
java环境 安装配置
- kafka环境 安装配置
首先自己的名称映射改为s4
在运行中的实验界面点击复制网络按钮,复制原生kafka集群中的所有机器映射
那么这台机器映射就改完了,但是所有机器都应该识别这个机器,所以这个配置要每个机器都一样
在群体分发命令中执行
cat > /etc/hosts <<EOF
127.0.0.1 localhost
11.147.251.92 s4
11.18.17.14 nn1
11.106.67.9 nn2
11.138.24.88 s1
11.138.24.85 s2
11.87.38.3 s3
EOF
使用输入重定向的方式将配置信息放入到原kafka集群的所有机器节点
# 修改新增节点的主机名为 s4
vim /etc/hostname
# 输入s4
# 创建hadoop用户
useradd hadoop
passwd hadoop
# 密码为123456
创建完毕用户以后将原kafka集群的nn1节点的hadoop用户的ssh文件夹发送过来,就可以免密相互通信了
# nn1节点执行
scp -r /home/hadoop/.ssh hadoop@s4:/home/hadoop
测试免密登录
安装java环境
rpm -ivh /public/software/java/jdk-8u144-linux-x64.rpm
安装kafka
# 解压kafka
tar -zxvf /public/software/bigdata/kafka_2.12-3.3.2.tgz -C /usr/local/
# 创建软连接
ln -s /usr/local/kafka_2.12-3.3.2/ /usr/local/kafka
# 修改权限
chown hadoop:hadoop -R /usr/local/kafka_2.12-3.3.2/
# 创建kafka存储目录
mkdir /data/kafka-logs
# 修改文件夹权限
chown hadoop:hadoop /data/kafka-logs
修改环境变量
echo 'export JAVA_HOME=/usr/java/jdk1.8.0_144/' >> /etc/profile
echo 'export PATH=$PATH:$JAVA_HOME/bin' >> /etc/profile
echo 'export KAFKA_HOME=/usr/local/kafka' >> /etc/profile
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
source /etc/profile
配置kafka的参数
broker.id=5
log.dirs=/data/kafka-logs
zookeeper.connect=nn1:2181
启动kafka
su - hadoop
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
在监控页面中可以看到多了一台机器
6.节点压力均衡
首先我们查看节点topic的原生分布情况
kafka-topics.sh --bootstrap-server nn1:9092 --list
kafka-topics.sh --bootstrap-server nn1:9092 --describe --topic topic_a
这个时候我们需要使用新的机器节点来分摊压力,这个时候要生成分配的计划
使用集群重新分配命令
kafka-reassign-partitions.sh # 重分命令
--bootstrap-server #集群地址
--broker-list # 将要分布在那几个broker上面的列表
--topics-to-move-json-file # 哪个topic需要重新均衡,这个配置需要一个json文件,结构如下图
--generate # 生成计划
# 首先创建一个topic.json 输入如下内容
{"topics":[{"topic":"topic_a"}],"version":1}
# 整体代码命令如下
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --broker-list 0,1,2,3,4,5 --topics-to-move-json-file topic.json --generate
结果如下:
# 当前分配情况
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[4,3,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[2,4,0],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}
# 目标分配情况
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[2,3,4],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[3,4,5],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[4,5,0],"log_dirs":["any","any","any"]}]}
将目标计划放入到一个文件中
# 创建 new.json 输入如下内容
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[2,3,4],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[3,4,5],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[4,5,0],"log_dirs":["any","any","any"]}]}
在命令中增加如下参数,进行重新分配
# 重新分配命令如下
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file new.json --execute
# 验证
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file new.json --verify
# 查看最新topic的分布情况
kafka-topics.sh --bootstrap-server nn1:9092 --topic topic_a --describe
已经重新分布了,这次比较均匀
7.动态删除broker
在生产环境中,有的broker可能性能不足或者损坏等问题,我们需要动态的将这个节点从kafka集群中删除,保证集群正常执行,防止损坏带来问题。
这个过程如上面的增加节点相同,需要将即将删除的broker节点上所管理的topic的分区分摊给其他节点,然后将这个节点删除掉就可以了
在重新的列表中将broker5 删除掉
# 首先创建一个topic.json 输入如下内容
{"topics":[{"topic":"topic_a"}],"version":1}
# 整体代码命令如下
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --broker-list 0,1,2,3,4 --topics-to-move-json-file topic.json --generate
# 列表中只有0 1 2 3 4
产生结果如下:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[2,3,4],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[3,4,5],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[4,5,0],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[3,4,0],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[4,0,1],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
保留计划
# 创建 new1.json 输入如下内容
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[3,4,0],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[4,0,1],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
执行计划
# 重新分配命令如下
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file new1.json --execute
验证是否完毕
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file new1.json --verify
关闭broker5
kafka-server-stop.sh
查看监控页面,broker5已经从列表中删除了
8.kafka中的选举机制
controller的选举
首先第一个选举就是借助于zookeeper的controller的选举
第一个就是controller的选举,这个选举是借助于zookeeper的独享锁实现的,先启动的broker会在zookeeper的/contoller节点上面增加一个broker信息,谁创建成功了谁就是主节点,其他的broker会启动watch监视器进行监听其中的数据变化,如果宕机了其他的节点会抢占这个节点选举为controller节点
partition leader选举
每个topic都存在多个分区,每个分区又存在多个副本,其中有一个副本是主节点其他都是从节点,负责和主分区同步数据,并且生产者和消费者都是通过主节点进行操作kafka数据的
那么他们的选举是通过什么进行选择的呢?
controller节点会按照分区的注册顺序,优先选择前面的节点进行选择
如果主节点宕机后是如何选择的呢?
首先我们使用命令来查看其中一个topic的详细信息
kafka-topics.sh --bootstrap-server nn1:9092 --topic topic_a --describe
其中我们会看到最后一列内容是ISR,这个叫做动态副本集,它的作用使用强大,在kafka中存储数据的时候首先存储数据到主分区中,然后主分区中的数据会同步到不同的副本分区中,做数据的同步备份,尤其是在producer端设定ack=-1的时候,要在所有副本都同步完毕消息以后才会返回ack,producer才会发送下一条数据过来,但是如果因为其中一个副本的网络卡顿或者是自己宕机那么会出现一直同步不成功的情况,从而producer不能继续发送数据,所以kafka动态维护了一个副本集,这个副本集中都是可以正常同步数据的,也就是说他们都是正常的,如果不正常的不能保持和主节点同步的副本就会从这个里面删除掉
为什么说这个ISR呢?
因为一旦主分区宕机了,那么broker【controller】节点就会检测到,那么就会在ISR中按照顺序选择一个好的节点成为主分区。这就是主分区宕机后的选举实现,但是选择的节点一般都是数据比较新的,会选择落后太多的副本
但是有时候ISR中的副本如果都为空就没有办法选择新的leader分区了,这个时候为了集群的稳定性,可以设定
# 将这个开关打开,默认是false关闭的,它允许实现ISR以外的节点成为主节点
unclean.leader.election.enable
这个会出现数据丢失问题,但是可以在一定程度上保证程序的稳定性
我们可以做一个实验,先查看一下topic_a的分区详细信息,可以发现partition0的主分区在broker3上面
我们可以在s2节点关闭broker,来模拟集群单节点损坏的情况
kafka-server-stop.sh
再次查看可以看到partition0的主分区在broker4上面
9.数据同步
通过上图我们发现每个分区的数据都不一样,但是三个分区对外的数据却是一致的
这个时候如果第二个副本宕机了
但是如果是leader副本宕机了会发生什么呢?
10.数据均衡
在线上程序运行的时候,有的时候因为上面副本的损坏,从而系统会自动选举出来一个新的leader并且分配到不同的节点上,有的时候这个leader的节点分布的并不是特别均匀,这个时候就需要进行均衡一下,使得每个broker的节点压力均衡
这个时候需要以下三个参数进行控制
参数 | 解释 |
---|---|
auto.leader.rebalance.enable | 系统每隔300s会自动检查系统的leader分布是否均匀,如果不均匀会自动进行leader的切换 |
leader.imbalance.per.broker.percentage | broker上的leader比例超过10%认为不均衡 |
leader.imbalance.check.interval.seconds | 检查间隔300s默认值 |
auto.leader.rebalance.enable 这个开关开启会自动选举或者切换leader节点,并且分布在不同的节点上,但是有的时候这个开关开启会影响系统性能,因为线上环境切换leader是比较繁琐的
但是不开的话可能会出现启动kafka而没有leader分区的情况
一般我们会关闭这个开关并且选择手动切换均衡
kafka-leader-election.sh --bootstrap-server nn1:9092 --topic topic_a --partition 1 --election-type preferred
优先在ISR中选举出来新的leader进行负载
并且我们也可以自己进行副本的位置进行设定
# 首先创建一个topic.json 输入如下内容
{"topics":[{"topic":"topic_a"}],"version":1}
# 整体代码命令如下
kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --broker-list 0,1,2,3,4 --topics-to-move-json-file topic.json --generate
使用这个均衡优化命令生成优化计划
{"version":1,"partitions":[{"topic":"topic_a","partition":0,"replicas":[3,4,0],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":1,"replicas":[4,0,1],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_a","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
修改其中副本的位置
并且设定ISR的优先顺序
11.kraft集群
首先我们分析一下zookeeper在kafka中的作用
zookeeper可以实现controller的选举,并且记录topic和partition的元数据信息,帮助多个broker同步数据信息
在新版本中的kraft模式中可以这个管理和选举可以用kafka自己完成,而不再依赖zookeeper
搭建kraft集群
首先要设定process.roles,可以人为指定每个人的角色,如果指定broker那么它永远是broker
如果指定controller那么它一定是controller,但是这样的话其他人就没有办法作为主节点了
所以一般选择broker,controller可以自动进行适配和选举
一般一个集群中会设定大部分是broker,少量的几个是controller+broker方便选举
设定哪几个机器作为协调的机器出现,帮助kafka自身选举
还需要设定对外端口和每个节点的编号
最后还要设定数据存储的位置
log.dirs=/data/kafka-logs
12.集群的搭建和启动
首先选择我们之前搭建kafka时候的初始化镜像
http://cloud.hainiubl.com/#/privateImageDetail?id=2969&imageType=private
搭建准备工作,首先在海牛实验室中启动基础镜像,并且调节资源大小
调节大小如下:
因为演示效果每个机器都安装一个kafka的集群节点
安装包等信息全部都已经存放到 /public/software/bigdata中为大家准备好了。我们直接使用就可以
# 首先将安装包解压到每个机器中
ssh_root.sh tar -zxvf /public/software/bigdata/kafka_2.12-3.3.2.tgz -C /usr/local/
# 查看解压情况
ssh_root.sh ls /usr/local|grep kafka
# 修改权限
ssh_root.sh chown hadoop:hadoop -R /usr/local/kafka_2.12-3.3.2/
# 创建软连接
ssh_root.sh ln -s /usr/local/kafka_2.12-3.3.2/ /usr/local/kafka
查看安装包中的内容
比较重要的三个文件夹
bin 执行脚本
config 配置文件
libs 依赖包
# 配置环境变量
echo 'export KAFKA_HOME=/usr/local/kafka' >> /etc/profile
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
source /etc/profile
首先连接所有的节点
选择批量命令
将上面的命令群体执行
到现在为止集群的初始化安装已经完毕,我们下面做kafka的自定义设置
# 修改server.properties配置文件
cd /usr/local/kafka/config/kraft
vim server.properties
修改如下的配置
log.dirs=/data/kafka-logs # kafka存放数据的位置
#角色
process.roles=broker, controller
# 节点id
node.id=0
# 选举节点
controller.quorum.voters=0@nn1:9093,1@nn2:9093,2@s1:9093
# broker暴露端口
advertised.Listeners=PLAINTEXT://nn1:9092
# 首先修改log.dirs zookeeper.connect两个参数
# 分发到不同的机器节点
scp_all.sh /usr/local/kafka/config/kraft/server.properties /usr/local/kafka/config/kraft/
分发配置文件以后一定要记得将node.id和ip修改,并且只有前三个的角色是controller后面的角色不能存在controller,因为controller角色必须在controller.quorum.voters中包含
listeners=PLAINTEXT://:9092,CONTROLLER://:9093 前三个机器可以有CONTROLLER后面两个不能有,因为他们是broker节点不参与选举
# 分别在不同的节点修改 node.id的编号
# nn1 --> node.id=0
# nn2 --> node.id=1
# s1 --> node.id=2
# s2 --> node.id=3
# s3 --> node.id=4
# 查看每个机器的编号
ssh_all.sh cat /usr/local/kafka/config/kraft/server.properties | grep node.id
# 创建每个机器上的kafka的数据存储文件夹
ssh_root.sh mkdir /data/kafka-logs
# 修改权限
ssh_root.sh chown hadoop:hadoop -R /data/kafka-logs
kafka-storage.sh random-uuid 生成集群id
kafka-storage.sh format 格式化存储空间
# -c 指定配置文件 -t指定集群id
kafka-storage.sh format -t bzSjRpblTiOfeQE6KJ2WjQ -c /usr/local/kafka/config/kraft/server.properties
每个机器都要执行
启动集群
# 五个机器
ssh_all.sh /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/kraft/server.properties
# 测试集群
kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_1 --partitions 3 --replication-factor 2
kafka-topics.sh --bootstrap-server nn1:9092 --list
安装kafka监控
# 切换用户解压
su - root
unzip /public/software/bigdata/kafka-console-ui.zip -d /usr/local/
# 启动监控
/usr/local/kafka-console-ui/bin/start.sh
集群运行很正常
13.kafka的优化参数整理
参数 | 解释 |
---|---|
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m |
batch.size | 默认 16k,sender线程拉取数据大小 |
linger.ms | sender线程拉取数据等待时长 |
acks | 确认应答 0 1 -1 |
max.in.flight.requests.per.connection | 没有ack返回时候可以发送几次数据 |
retries | producer失败重试次数 |
enable.idempotence | 启幂等性,默认 true |
compression.type | 生产者发送的所有数据的压缩方式 |
auto.leader.rebalance.enable | leader是否自动切换 |
leader.imbalance.per.broker.percentage | leader均衡比10% |
leader.imbalance.check.interval.seconds | leader均衡检测时间五分钟 |
log.segment.bytes | segment大小 |
log.index.interval.bytes | 每4k生成一个索引数据,写入一次文件 |
log.cleanup.policy | 日志删除方式 |
log.retention.hours | 数据保存时长 |
enable.auto.commit | 自动提交 |
auto.commit.interval.ms | 提交间隔 |
auto.offset.reset | 初始化消费位置 |
offsets.topic.num.partitions | __consumer_offsets分区数量 |
session.timeout.ms | 消费者断开超时时间 |
max.poll.records | 消费者拉取条数 |
fetch.max.bytes | 消费者拉取大小 |
partition.assignment.strategy | 消费者分区分配策略 |
14.数据吞吐量和数据重复问题
数据在消费的时候可能会遇见数据堆积,无法及时消费计算的问题
这个时候可以适当的调节broker的数量和partition的数量,让多个机器帮助进行处理可提高吞吐量,并且分区越多消费者就可以适当增多,让消费速度得到很大的提升
适当增加每次拉取的大小也会增加消费速度
max.poll.records 消费者拉取条数
fetch.max.bytes 消费者拉取大小
kafka数据稳定性保证
首先从producer出发
ack = 0 or ack = 1 会出现数据丢失问题
ack = -1 会出现数据重复问题
开始幂等性可以进行单分区去重
保证一批次数据稳定性可以开启事物
消费者部分如果是自动提交偏移量会出现重复消费问题
手动保存偏移量就不会出现这个问题
15.flume接入kakfa数据
flume是采集数据的组件,能够监控日志数据的变化,能够实时采集数据,这个以后再工作过程中会充当我们的kafka数据的producer生产者,将采集到的数据直接推送到kafka中
# 首先安装flume
# flume的安装非常简单,只需要解压完毕就可以直接使用
tar -zxvf /public/software/bigdata/apache-flume-1.10.1-bin.tar.gz -C /usr/local/
# 创建软连接
ln -s /usr/local/apache-flume-1.10.1-bin/ /usr/local/flume
# 修改权限
chown hadoop:hadoop -R /usr/local/apache-flume-1.10.1-bin/
# 切换用户
su - hadoop
# 创建topic
kafka-topics.sh --bootstrap-server nn1:9092 --create --topic topic_flume --partitions 3 --replication-factor 2
flume分为三个部分 source channel sink
我们可以使用sink端将数据发送到kafka,也可以将数据通过channel发送到kafka中
首先使用sink端发送数据到kafka
# 创建配置文件
vim /usr/local/flume/conf/sink.conf
# 输入如下内容
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /home/hadoop/a.txt
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_flume
a1.sinks.k1.kafka.bootstrap.servers = nn1:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
# 创建文件
touch /home/hadoop/a.txt
# 启动消费者
kafka-console-consumer.sh --bootstrap-server nn1:9092 --topic topic_flume
# 启动 flume
/usr/local/flume/bin/flume-ng agent --name a1 --conf /usr/local/flume/conf/ --conf-file /usr/local/flume/conf/sink.conf -Dflume.root.logger=info,console
# 追加数据到 /home/hadoop/a.txt
echo 1 >> /home/hadoop/a.txt
echo 1 >> /home/hadoop/a.txt
echo 1 >> /home/hadoop/a.txt
消费者中打印出来数据
第二种kafka channel
直接通过channel将数据放入到kafka中
# 创建配置文件
vim /usr/local/flume/conf/channel.conf
# 输入如下内容
a1.sources = r1
a1.channels = c1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /home/hadoop/a.txt
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = nn1:9092
a1.channels.c1.kafka.topic = topic_flume
a1.sources.r1.channels=c1
输入数据后发现
数据在消费者中打印出来了
16.镜像位置
http://cloud.hainiubl.com/#/privateImageDetail?id=3033&imageType=private
kafka完整版镜像
http://cloud.hainiubl.com/#/privateImageDetail?id=3033&imageType=private
kraft镜像带有flume的