JVM
1.内存设置
1、内存模型详解
Flink 内存大小 = taskmanager.memory.process.size指定的总内存 - jvm两块内存大小
#1.JVM 特定内存:JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head
1)JVM metaspace:JVM 元空间
taskmanager.memory.jvm-metaspace.size,#默认 256mb #默认值,一般不用调整
2)JVM over-head 执行开销:#JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
taskmanager.memory.jvm-overhead.fraction,默认 0.1
taskmanager.memory.jvm-overhead.min,默认 192mb
taskmanager.memory.jvm-overhead.max,默认 1gb
#总进程内存*fraction(默认 0.1这个比例),如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
#2.框架内存:Flink 框架,即 TaskManager 本身所占用的内存,不计入 Slot 的资源中。
堆内:taskmanager.memory.framework.heap.size,默认 128MB
堆外:taskmanager.memory.framework.off-heap.size,默认 128MB
#3.Task 内存:Task 执行用户代码时所使用的内存
堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。
堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存
#4.网络内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
堆外:taskmanager.memory.network.fraction,默认 0.1
taskmanager.memory.network.min,默认 64mb
taskmanager.memory.network.max,默认 1gb
#Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小
#5.托管内存:用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。
堆外:taskmanager.memory.managed.fraction,默认 0.4
taskmanager.memory.managed.size,默认 none
#如果 size 没指定,则等于 Flink 内存*fraction
Managed Memory: 管理内存 给kv键值对数据库用的内存(RocksDB)
2.合理利用 cpu 资源
2.1 使用DominantResourceCalculator策略
#该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->#默认值
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
#配置完yarn界面会启动7个vcore jobmanager一个, 2slot x 3核 =6个(满足五个并行度)
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ #指定并行度
-Dyarn.application.queue=test \ #指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ #JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ #单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ #与容器核数 1core:1slot 或 2core:1slot
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
2.2.使用 DefaultResourceCalculator 策略
#slot只隔离内存不隔离cpu,
#强行指定cpu核数 -Dyarn.containers.vcores=3 多给一点cpu资源 ,不要超过yarn-site的最大值
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \ #每个核3个slot 3x3=9+1jobmanager 一共十个
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
3.并行度设置(全局)
开发完成后,先进行压测。
关闭chain来看到每一个算子的情况 : env.disableOperatorChaining();
任务并行度给 10 以下,`测试单个并行度的处理上限`
#1.去flinkwebUI查看 算子的 `总条数 或者总数据量/ 秒数` 获得单秒的处理条数
#2.去flink mertrics 下面查看numRecordsOutPerSecond等参数查看 单秒的峰值传输数
然后 QPS/单并行度的处理能力 = 并行度
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。
3.1 Source 端并行度的配置
#classloader.check-leaked-classloader : false 如果报类加载器错误配置这个
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。
3.2Transform 端并行度的配置
➢ Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。
➢ Keyby 之后的算子
如果并发较大,建议`设置并行度为 2 的整数次幂`,例如:128、256、512;
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
#设置并行度为 2 的整数次幂 源码中 取得是两次hash 获得键组id 键组id x 下游并行度 / 最大并行度128(2*7)
#所以Transform 端并行度设置为2 的整数次幂
3.3Sink 端并行度的配置
如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数
2.状态及 Checkpoint 调优
1 .RocksDB 大状态调优
调试内存性能的问题主要是通过调整配置项
//taskmanager.memory.managed.size 或 者
//taskmanager.memory.managed.fraction 以增加 Flink 的托管内存(即堆外的托管内存)。
进一步可以调整一些参数进行高级性能调优,这些参数也可以在应用程序中通过
//RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)
指定
2.开启 State 访问性能监控
#可通过 启动命令 -D 一下参数启动状态的监控, 在mertires界面查看
#一般开启第一个参数即可
state.backend.latency-track.keyed-state-enabled:true #启用访问状态的性能监控
state.backend.latency-track.sample-interval: 100 #采样间隔
state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确
state.backend.latency-track.state-name-as-variable: true #将状态名作为变量
3.开启增量检查点和本地恢复
#1)开启增量检查点
state.backend.incremental: true #默认 false,改为 true。
#或代码中指定
new EmbeddedRocksDBStateBackend(true)
#2)开启本地恢复 `本地恢复目前仅涵盖键控类型的状态后端(RocksDB)`,MemoryStateBackend不支持
state.backend.local-recovery: true
#3)设置多目录
#如果有多块磁盘,也可以考虑指定本地多目录
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
`注意:`
#不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dstate.backend.incremental=true \ #
-Dstate.backend.local-recovery=true \ #
-Dstate.backend.latency-track.keyed-state-enabled=true \ #
-c com.atguigu.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
4. 调整预定义选项
#当 前 支 持 的 预 定 义 选 项 有 集体参数建源码类 ProdefinedOptions.java
DEFAULT
SPINNING_DISK_OPTIMIZED
SPINNING_DISK_OPTIMIZED_HIGH_MEM
FLASH_SSD_OPTIMIZED
FLASH_SSD_OPTIMIZED #SSD
##具体使用,命令使用 设置为机械硬盘+内存模式
-Dstate.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
#具体使用,代码设置
5.增大 block 缓存
整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,
默认大小为 8 MB,建议设置到 64 ~ 256 MB。
#一共三部 1.激活mem table 2.只读 3.刷数据到磁盘
-Dstate.backend.rocksdb.block.cache-size: 64m #默认 8m
6.增大 write buffer 和 level 阈值大小
RocksDB 中,每个 State 使用一个 Column Family,
每个 Column Family 使用独占的 write buffer,默认 64MB,建议调大。
调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m。
该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,合并困难。
建议设为 target_file_size_base(默认 64MB)的倍数,且不能太小,例如 5~10倍,即 `320~640MB`
-Dstate.backend.rocksdb.writebuffer.size: 128m
-Dstate.backend.rocksdb.compaction.level.max-size-level-base: 320m
7.增大 write buffer 数量
每个 Column Family 对应的 writebuffer 最大数量,这实际上是内存中“只读内存表“的最大数量,默认值是 2。
对于机械磁盘来说,如果内存足够大,可以调大到 5 左右
-Dstate.backend.rocksdb.writebuffer.count: 5
8.增大后台线程数和 write buffer 合并数
#1)增大线程数用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值
-Dstate.backend.rocksdb.thread.num: 4
#2)增大 writebuffer 最小合并数将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 最小数量,默认值为 1,可以调成 3。
-Dstate.backend.rocksdb.writebuffer.number-to-merge: 3
9.开启分区索引功能
Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。
线上测试中,`相对于内存比较小的场景中`,性能提升 10 倍左右。
`如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点`
-Dstate.backend.rocksdb.memory.partitioned-index-filters:true #默认 false
3.Checkpoint 设置
// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 3 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 4 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//checkPoint 设置具体大小一句如下 最大 最小 值
4.反压
#Metris 描述 利用监控查看
outPoolUsage #发送端 Buffer 的使用率
inPoolUsage #接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上) #接收端 Floating Buffer 的使用率
exclusiveBuffersUsage(1.9 以上) #接收端 Exclusive Buffer 的使用率
#火焰图
rest.flamegraph.enabled: true #默认 false
反压的原因及处理
//反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。
分析 GC 情况
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
维表join
https://flink-learning.org.cn/article/detail/b8df32fbc6542257a5b449114e137cc3
https://www.jianshu.com/p/a62fa483ff54
5.keyBy
keyBy 后的聚合操作存在数据倾斜
#SQL指定参数,开启 miniBatch 和 LocalGlobal
keyBy 之前发生数据倾斜
//让 Flink 任务强制进行 shuffle。
//使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。
keyBy 后的窗口聚合操作存在数据倾斜
#➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合 到一起)
#➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合
6.Job 优化
使用 DataGen 造数据 官方造数工具
import com.atguigu.flink.tuning.bean.OrderInfo;
import com.atguigu.flink.tuning.bean.UserInfo;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
public class DataStreamDataGenDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.disableOperatorChaining();
SingleOutputStreamOperator<OrderInfo> orderInfoDS = env
.addSource(new DataGeneratorSource<>(new RandomGenerator<OrderInfo>()
{
@Override
public OrderInfo next() {
return new OrderInfo(
random.nextInt(1, 100000),
random.nextLong(1, 1000000),
random.nextUniform(1, 1000),
System.currentTimeMillis());
}
}))
.returns(Types.POJO(OrderInfo.class));
SingleOutputStreamOperator<UserInfo> userInfoDS = env
.addSource(new DataGeneratorSource<UserInfo>(
new SequenceGenerator<UserInfo>(1, 1000000) {
RandomDataGenerator random = new RandomDataGenerator();
@Override
public UserInfo next() {
return new UserInfo(
valuesToEmit.peek().intValue(),
valuesToEmit.poll().longValue(),
random.nextInt(1, 100),
random.nextInt(0, 1));
}
}
))
.returns(Types.POJO(UserInfo.class));
orderInfoDS.print("order>>");
userInfoDS.print("user>>");
env.execute();
} }
算子指定 UUID
//对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户 ID(UUID)
链路延迟测量(延迟)
# Flink提供了开箱即用的 LatencyMarker 机制来测量链路延迟。
# 开启如下参数:
metrics.latency.interval: 30000 #默认 0,表示禁用,单位毫秒 30秒
#➢ single: 每个算子单独统计延迟;
#➢ operator (默认值):每个下游算子都统计自己与 Source 算子之间的延迟;
#➢ subtask: 每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。
metrics.latency.granularity: operator #默认 operator
#LatencyMarker 不会参与到数据流的用户逻辑中的,而是直接被各算子转发并统计。
#为了让它尽量精确,有两点特别需要注意:
➢ 保证 Flink 集群内所有节点的时区、时间是同步的:
ProcessingTimeService 产生时间戳最终是靠 `System.currentTimeMillis()`方法,可以用 ntp 等工具来配置。
➢ metrics.latency.interval 的时间间隔宜大不宜小:一般配置成` 30000(30 秒)`左右。
一是因为延迟监控的频率可以不用太频繁,二是因为 LatencyMarker 的处理也要消耗一定性能。
#可以通过下面的 metric 查看结果:
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
监控集成普罗米修斯
开启对象重用
当调用了 enableObjectReuse 方法后,Flink 会把`中间深拷贝`的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。
但需要特别注意的是,
`这个方法不能随便调用,
`必须要确保下游 Function 只有一种,
`或者下游的Function 均不会改变对象内部的值。
#会有线程安全的问题
#API
env.getConfig().enableObjectReuse();
#提交参数配置
-Dpipeline.object-reuse=true
7.细粒度滑动窗口优化
#重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降
#一般使用滚动窗口+在线存储+读时聚合按照步长去做滑动窗口解决此地问题
-----------------------------------------------------------------------------------------------
#(1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);
#(2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase);
#(3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示
`Flink 1.13 对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf
8.FlinkSQL 调优
#FlinkSQL 官网配置参数:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html
设置空闲状态保留时间
#忘记设置空闲状态保留时间导致状态爆炸
#Flink SQL 可以指定空闲状态(即未更新的状态)被保留的最小时间,当状态中某个 key对应的状态未更新的时间达到阈值时,该条状态被自动清理:
#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");
开启 MiniBatch
#MiniBatch 是微批处理,是缓存一定的数据后再触发处理,以减少对 State 的访问,从而提升吞吐并减少数据的输出量。
MiniBatch 主要依靠在每个 Task 上`注册的 Timer线程`来触发微批,需要消耗一定的线程调度性能。
#MiniBatch 默认关闭,开启方式如下:
#初始化 table environment
TableEnvironment tEnv = ...
#获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
#设置参数:
#开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
#批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
#防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
➢ 适用场景
微批处理`通过增加延迟换取高吞吐`,如果有超低延迟的要求,不建议开启微批处理。
通常`对于聚合的场景,微批处理可以显著的提升系统性能`,建议开启
#1.12 之前的版本有 bug,开启 miniBatch,不会清理过期状态,也就是说如果设置状态的 TTL,无法清理过期状态。1.12 版本才修复这个问题。
开启 LocalGlobal(优化SUM、COUNT、MAX、MIN 和 AVG)
LocalGlobal 优 化 将 原 先 的 Aggregate 分 成 `Local+Global` 两 阶 段 聚 合 , 即MapReduce 模型中的 Combine+Reduce 处理模式。 //先进行本地聚合,再进行全局聚合
➢ LocalGlobal 开启方式:
1)LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数。
2)table.optimizer.agg-phase-strategy: 聚合策略。
默认 AUTO,支持参数 AUTO、TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
//➢ 注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法。
开启 Split Distinct(优化distinct)
LocalGlobal 优化针对普通聚合(例如 `SUM、COUNT、MAX、MIN 和 AVG`)有较好的效果,
对于 DISTINCT 的聚合(如 COUNT DISTINCT)收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。
之前,为了解决 COUNT DISTINCT 的热点问题,通常需要手动改写为两层聚合(增加按 Distinct Key 取模的打散层)。
`从 Flink1.9.0 版 本 开 始 , 提 供 了 COUNT DISTINCT 自 动 打 散 功 能 `,
通过 `HASH_CODE(distinct_key) % BUCKET_NUM` 打散,不需要手动重写。
--案例
--Distinct 举例:
SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a
--手动打散举例:
SELECT a, SUM(cnt)
FROM (
SELECT a, COUNT(DISTINCT b) as cnt
FROM T
GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a
--➢ Split Distinct 开启方式
默认不开启,使用参数显式开启:
table.optimizer.distinct-agg.split.enabled: true --默认 false。
table.optimizer.distinct-agg.split.bucket-num: Split --Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
//➢ 注意事项:
(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Flink1.9.0 版本及以上版本才支持。
多维 DISTINCT 使用 Filter
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a
如果上面sql结果都走的是b,那么可以改写为以下的sql
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a
设置参数总结
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");
9.常见故障排除
IllegalConfigurationException
//存在无效的配置值(例如负内存大小、大于 1 的分数等)或配置冲突。请重新配置内存参数。
OutOfMemoryError: Java heap space
表示 JVM Heap 太小。
可以尝试通过增加总内存来增加 JVM 堆大小。
也可以直接为 TaskManager 增加任务堆内存或为 JobManager 增加 JVM 堆内存。
还可以为 TaskManagers 增加框架堆内存,但只有在确定 Flink 框架本身需要更多内存时才应该更改此选项。
OutOfMemoryError: Direct buffer memory
JVM 直接内存限制太小 或 存在直接内存泄漏
//检查用户代码或其他外部依赖项是否使用了 JVM 直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。
//可以参考如何为 TaskManagers、 JobManagers 和 Flink 设置的 JVM 参数配置堆外内存。
OutOfMemoryError: Metaspace
//JVM 元空间限制配置得太小。您可以尝试加大 JVM 元空间 TaskManagers 或 JobManagers 选项。
IOException: Insufficient number of network buffers
//配置的网络内存大小不够大。您可以尝试增加网络内存。
超出容器内存异常
JVM OverHead 内存调大
Checkpoint 失败
Checkpoint 失败大致分为两种情况:Checkpoint Decline 和 Checkpoint Expire。