Flink 调优 (学习)

分享 123456789987654321 ⋅ 于 2022-08-29 19:25:14 ⋅ 257 阅读

file
file

JVM

1.内存设置

1、内存模型详解

Flink 内存大小 =  taskmanager.memory.process.size指定的总内存 - jvm两块内存大小

#1.JVM 特定内存:JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head
1)JVM metaspace:JVM 元空间
    taskmanager.memory.jvm-metaspace.size,#默认 256mb  #默认值,一般不用调整
2)JVM over-head 执行开销:#JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
    taskmanager.memory.jvm-overhead.fraction,默认 0.1
    taskmanager.memory.jvm-overhead.min,默认 192mb
    taskmanager.memory.jvm-overhead.max,默认 1gb
    #总进程内存*fraction(默认 0.1这个比例),如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小

#2.框架内存:Flink 框架,即 TaskManager 本身所占用的内存,不计入 Slot 的资源中。
    堆内:taskmanager.memory.framework.heap.size,默认 128MB
    堆外:taskmanager.memory.framework.off-heap.size,默认 128MB

#3.Task 内存:Task 执行用户代码时所使用的内存
    堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。
    堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存

#4.网络内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
堆外:taskmanager.memory.network.fraction,默认 0.1
     taskmanager.memory.network.min,默认 64mb
     taskmanager.memory.network.max,默认 1gb
     #Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小

#5.托管内存:用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。
堆外:taskmanager.memory.managed.fraction,默认 0.4
     taskmanager.memory.managed.size,默认 none
     #如果 size 没指定,则等于 Flink 内存*fraction
Managed Memory: 管理内存  给kv键值对数据库用的内存(RocksDB)

2.合理利用 cpu 资源

2.1 使用DominantResourceCalculator策略

#该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->#默认值
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
#配置完yarn界面会启动7个vcore    jobmanager一个, 2slot x 3核 =6个(满足五个并行度)
bin/flink run \ 
-t yarn-per-job \ 
-d \ 
-p 5 \ #指定并行度
-Dyarn.application.queue=test \ #指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ #JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ #单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ #与容器核数 1core:1slot 或 2core:1slot
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2.2.使用 DefaultResourceCalculator 策略

#slot只隔离内存不隔离cpu, 
#强行指定cpu核数  -Dyarn.containers.vcores=3  多给一点cpu资源 ,不要超过yarn-site的最大值
bin/flink run \ 
-t yarn-per-job \ 
-d \ 
-p 5 \ 
-Drest.flamegraph.enabled=true \ 
-Dyarn.application.queue=test \ 
-Dyarn.containers.vcores=3 \  #每个核3个slot  3x3=9+1jobmanager 一共十个
-Djobmanager.memory.process.size=1024mb \ 
-Dtaskmanager.memory.process.size=4096mb \ 
-Dtaskmanager.numberOfTaskSlots=2 \ 
-c com.atguigu.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

3.并行度设置(全局)

开发完成后,先进行压测。
关闭chain来看到每一个算子的情况  : env.disableOperatorChaining();
任务并行度给 10 以下,`测试单个并行度的处理上限`
 #1.去flinkwebUI查看 算子的 `总条数 或者总数据量/ 秒数` 获得单秒的处理条数
 #2.去flink mertrics 下面查看numRecordsOutPerSecond等参数查看 单秒的峰值传输数
然后  QPS/单并行度的处理能力 = 并行度
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。

3.1 Source 端并行度的配置

#classloader.check-leaked-classloader : false 如果报类加载器错误配置这个
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。

3.2Transform 端并行度的配置

➢ Keyby 之前的算子
  一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。
➢ Keyby 之后的算子
  如果并发较大,建议`设置并行度为 2 的整数次幂`,例如:128、256、512;
  小并发任务的并行度不一定需要设置成 2 的整数次幂;
  大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
  #设置并行度为 2 的整数次幂 源码中 取得是两次hash 获得键组id  键组id x 下游并行度  /  最大并行度128(2*7)
  #所以Transform 端并行度设置为2 的整数次幂 

3.3Sink 端并行度的配置

如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数

2.状态及 Checkpoint 调优

1 .RocksDB 大状态调优

调试内存性能的问题主要是通过调整配置项 
//taskmanager.memory.managed.size 或 者 
//taskmanager.memory.managed.fraction 以增加 Flink 的托管内存(即堆外的托管内存)。
进一步可以调整一些参数进行高级性能调优,这些参数也可以在应用程序中通过
//RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)
指定

2.开启 State 访问性能监控

#可通过 启动命令 -D 一下参数启动状态的监控, 在mertires界面查看 

#一般开启第一个参数即可

state.backend.latency-track.keyed-state-enabled:true #启用访问状态的性能监控
state.backend.latency-track.sample-interval: 100 #采样间隔
state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确
state.backend.latency-track.state-name-as-variable: true #将状态名作为变量

3.开启增量检查点和本地恢复

#1)开启增量检查点
state.backend.incremental: true #默认 false,改为 true。
#或代码中指定
new EmbeddedRocksDBStateBackend(true)

#2)开启本地恢复  `本地恢复目前仅涵盖键控类型的状态后端(RocksDB)`,MemoryStateBackend不支持
state.backend.local-recovery: true

#3)设置多目录
#如果有多块磁盘,也可以考虑指定本地多目录
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
`注意:` 
#不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。
bin/flink run \ 
-t yarn-per-job \ 
-d \ 
-p 5 \ 
-Drest.flamegraph.enabled=true \ 
-Dyarn.application.queue=test \ 
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \ 
-Dtaskmanager.numberOfTaskSlots=2 \ 
-Dstate.backend.incremental=true \ #
-Dstate.backend.local-recovery=true \ #
-Dstate.backend.latency-track.keyed-state-enabled=true \ #
-c com.atguigu.flink.tuning.RocksdbTuning \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

4. 调整预定义选项

#当 前 支 持 的 预 定 义 选 项 有  集体参数建源码类 ProdefinedOptions.java
DEFAULT
SPINNING_DISK_OPTIMIZED
SPINNING_DISK_OPTIMIZED_HIGH_MEM
FLASH_SSD_OPTIMIZED
FLASH_SSD_OPTIMIZED #SSD

##具体使用,命令使用  设置为机械硬盘+内存模式
-Dstate.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
#具体使用,代码设置

1661764989458

5.增大 block 缓存

整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,
默认大小为 8 MB,建议设置到 64 ~ 256 MB。
#一共三部 1.激活mem table 2.只读  3.刷数据到磁盘 
-Dstate.backend.rocksdb.block.cache-size: 64m #默认 8m

6.增大 write buffer 和 level 阈值大小

RocksDB 中,每个 State 使用一个 Column Family,
每个 Column Family 使用独占的 write buffer,默认 64MB,建议调大。
调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m。
该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,合并困难。
建议设为 target_file_size_base(默认 64MB)的倍数,且不能太小,例如 5~10倍,即 `320~640MB`

-Dstate.backend.rocksdb.writebuffer.size: 128m
-Dstate.backend.rocksdb.compaction.level.max-size-level-base: 320m

7.增大 write buffer 数量

每个 Column Family 对应的 writebuffer 最大数量,这实际上是内存中“只读内存表“的最大数量,默认值是 2。
对于机械磁盘来说,如果内存足够大,可以调大到 5 左右

-Dstate.backend.rocksdb.writebuffer.count: 5

8.增大后台线程数和 write buffer 合并数

#1)增大线程数用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值
-Dstate.backend.rocksdb.thread.num: 4
#2)增大 writebuffer 最小合并数将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 最小数量,默认值为 1,可以调成 3。
-Dstate.backend.rocksdb.writebuffer.number-to-merge: 3

9.开启分区索引功能

Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。
线上测试中,`相对于内存比较小的场景中`,性能提升 10 倍左右。
`如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点`

-Dstate.backend.rocksdb.memory.partitioned-index-filters:true #默认 false

3.Checkpoint 设置

// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 3 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 4 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//checkPoint 设置具体大小一句如下 最大 最小 值

1661766102030

4.反压

#Metris 描述 利用监控查看
outPoolUsage                     #发送端 Buffer 的使用率
inPoolUsage                      #接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上)  #接收端 Floating Buffer 的使用率
exclusiveBuffersUsage(1.9 以上) #接收端 Exclusive Buffer 的使用率

#火焰图
rest.flamegraph.enabled: true #默认 false

反压的原因及处理

//反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。

分析 GC 情况

-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"

维表join

https://flink-learning.org.cn/article/detail/b8df32fbc6542257a5b449114e137cc3
https://www.jianshu.com/p/a62fa483ff54

5.keyBy

keyBy 后的聚合操作存在数据倾斜

 #SQL指定参数,开启 miniBatch 和 LocalGlobal 

keyBy 之前发生数据倾斜

//让 Flink 任务强制进行 shuffle。
//使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。

keyBy 后的窗口聚合操作存在数据倾斜

#➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
 注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合  到一起)
#➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

6.Job 优化

使用 DataGen 造数据 官方造数工具

import com.atguigu.flink.tuning.bean.OrderInfo;
import com.atguigu.flink.tuning.bean.UserInfo;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
public class DataStreamDataGenDemo {
 public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
 env.setParallelism(1);
 env.disableOperatorChaining();
 SingleOutputStreamOperator<OrderInfo> orderInfoDS = env
 .addSource(new DataGeneratorSource<>(new RandomGenerator<OrderInfo>() 
{
 @Override
 public OrderInfo next() {
 return new OrderInfo(
 random.nextInt(1, 100000),
 random.nextLong(1, 1000000),
 random.nextUniform(1, 1000),
 System.currentTimeMillis());
 }
 }))
 .returns(Types.POJO(OrderInfo.class));
 SingleOutputStreamOperator<UserInfo> userInfoDS = env
 .addSource(new DataGeneratorSource<UserInfo>(
 new SequenceGenerator<UserInfo>(1, 1000000) {
 RandomDataGenerator random = new RandomDataGenerator();
 @Override
 public UserInfo next() {
 return new UserInfo(
 valuesToEmit.peek().intValue(),
 valuesToEmit.poll().longValue(),
 random.nextInt(1, 100),
 random.nextInt(0, 1));
 }
 }
 ))
 .returns(Types.POJO(UserInfo.class));
 orderInfoDS.print("order>>");
 userInfoDS.print("user>>");
 env.execute();
 } }

算子指定 UUID

//对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户 ID(UUID)

链路延迟测量(延迟)

# Flink提供了开箱即用的 LatencyMarker 机制来测量链路延迟。
# 开启如下参数:
metrics.latency.interval: 30000 #默认 0,表示禁用,单位毫秒  30秒

#➢ single:  每个算子单独统计延迟;
#➢ operator (默认值):每个下游算子都统计自己与 Source 算子之间的延迟;
#➢ subtask: 每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。
metrics.latency.granularity: operator #默认 operator

#LatencyMarker 不会参与到数据流的用户逻辑中的,而是直接被各算子转发并统计。
#为了让它尽量精确,有两点特别需要注意:
➢ 保证 Flink 集群内所有节点的时区、时间是同步的:
ProcessingTimeService 产生时间戳最终是靠 `System.currentTimeMillis()`方法,可以用 ntp 等工具来配置。

➢ metrics.latency.interval 的时间间隔宜大不宜小:一般配置成` 30000(30 秒)`左右。
一是因为延迟监控的频率可以不用太频繁,二是因为 LatencyMarker 的处理也要消耗一定性能。

#可以通过下面的 metric 查看结果:
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency

监控集成普罗米修斯

1661769090467

开启对象重用

当调用了 enableObjectReuse 方法后,Flink 会把`中间深拷贝`的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。
但需要特别注意的是,
`这个方法不能随便调用,
`必须要确保下游 Function 只有一种,
`或者下游的Function 均不会改变对象内部的值。
#会有线程安全的问题

#API
env.getConfig().enableObjectReuse();

#提交参数配置
-Dpipeline.object-reuse=true

7.细粒度滑动窗口优化

#重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降

#一般使用滚动窗口+在线存储+读时聚合按照步长去做滑动窗口解决此地问题
-----------------------------------------------------------------------------------------------
#(1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度); 
#(2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase);
#(3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示

`Flink 1.13 对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf

1661770096336

8.FlinkSQL 调优

#FlinkSQL 官网配置参数:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html

设置空闲状态保留时间

#忘记设置空闲状态保留时间导致状态爆炸
#Flink SQL 可以指定空闲状态(即未更新的状态)被保留的最小时间,当状态中某个 key对应的状态未更新的时间达到阈值时,该条状态被自动清理:

#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");

开启 MiniBatch

#MiniBatch 是微批处理,是缓存一定的数据后再触发处理,以减少对 State 的访问,从而提升吞吐并减少数据的输出量。
MiniBatch 主要依靠在每个 Task 上`注册的 Timer线程`来触发微批,需要消耗一定的线程调度性能。

#MiniBatch 默认关闭,开启方式如下:
    #初始化 table environment
    TableEnvironment tEnv = ...
    #获取 tableEnv 的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    #设置参数:
    #开启 miniBatch
    configuration.setString("table.exec.mini-batch.enabled", "true");
    #批量输出的间隔时间
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    #防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
    configuration.setString("table.exec.mini-batch.size", "20000");

➢ 适用场景
微批处理`通过增加延迟换取高吞吐`,如果有超低延迟的要求,不建议开启微批处理。
通常`对于聚合的场景,微批处理可以显著的提升系统性能`,建议开启

#1.12 之前的版本有 bug,开启 miniBatch,不会清理过期状态,也就是说如果设置状态的 TTL,无法清理过期状态。1.12 版本才修复这个问题。

开启 LocalGlobal(优化SUM、COUNT、MAX、MIN 和 AVG)

LocalGlobal 优 化 将 原 先 的 Aggregate 分 成 `Local+Global` 两 阶 段 聚 合 , 即MapReduce 模型中的 Combine+Reduce 处理模式。 //先进行本地聚合,再进行全局聚合

➢ LocalGlobal 开启方式:
1)LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数。
2)table.optimizer.agg-phase-strategy: 聚合策略。
默认 AUTO,支持参数 AUTO、TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

//➢ 注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法。

开启 Split Distinct(优化distinct)

LocalGlobal 优化针对普通聚合(例如 `SUM、COUNT、MAX、MIN 和 AVG`)有较好的效果,
对于 DISTINCT 的聚合(如 COUNT DISTINCT)收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。

之前,为了解决 COUNT DISTINCT 的热点问题,通常需要手动改写为两层聚合(增加按 Distinct Key 取模的打散层)。

`从 Flink1.9.0 版 本 开 始 , 提 供 了 COUNT DISTINCT 自 动 打 散 功 能 `, 
通过 `HASH_CODE(distinct_key) % BUCKET_NUM` 打散,不需要手动重写。
--案例
--Distinct 举例:
SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a

--手动打散举例:
SELECT a, SUM(cnt)
FROM (
 SELECT a, COUNT(DISTINCT b) as cnt
 FROM T
 GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a

--➢ Split Distinct 开启方式
默认不开启,使用参数显式开启: 
table.optimizer.distinct-agg.split.enabled: true     --默认 false。 
table.optimizer.distinct-agg.split.bucket-num: Split --Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

//➢ 注意事项:
(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Flink1.9.0 版本及以上版本才支持。

多维 DISTINCT 使用 Filter

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a

如果上面sql结果都走的是b,那么可以改写为以下的sql

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a

设置参数总结

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");

9.常见故障排除

IllegalConfigurationException

//存在无效的配置值(例如负内存大小、大于 1 的分数等)或配置冲突。请重新配置内存参数。

OutOfMemoryError: Java heap space

表示 JVM Heap 太小。
可以尝试通过增加总内存来增加 JVM 堆大小。
也可以直接为 TaskManager 增加任务堆内存或为 JobManager 增加 JVM 堆内存。
还可以为 TaskManagers 增加框架堆内存,但只有在确定 Flink 框架本身需要更多内存时才应该更改此选项。

OutOfMemoryError: Direct buffer memory

JVM 直接内存限制太小 或  存在直接内存泄漏
//检查用户代码或其他外部依赖项是否使用了 JVM 直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。
//可以参考如何为 TaskManagers、 JobManagers 和 Flink 设置的 JVM 参数配置堆外内存。

OutOfMemoryError: Metaspace

//JVM 元空间限制配置得太小。您可以尝试加大 JVM 元空间 TaskManagers 或 JobManagers 选项。

IOException: Insufficient number of network buffers

//配置的网络内存大小不够大。您可以尝试增加网络内存。

超出容器内存异常

JVM OverHead 内存调大

Checkpoint 失败

Checkpoint 失败大致分为两种情况:Checkpoint Decline 和 Checkpoint Expire。
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-123456789987654321,http://hainiubl.com/topics/75902
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter