flink:keyedprocessfuction 添加 state 后,报 Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy/failureRateRestart?

问答 张浩 ⋅ 于 2021-09-16 19:37:38 ⋅ 最后回复由 青牛 2021-09-16 21:11:12 ⋅ 197 阅读
public class JobCodeKeyProcessFunction extends KeyedProcessFunction<String,ScanWrapInfo, JobInfo> {
    private static Jedis jedis = null;
    private static ValueState<Double> weight = null;
    private static ValueState<Double> volume = null;
    private static ValueState<Double> pcs = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = JedisUtil.getJedis();
        //keyState的TTL策略
        StateTtlConfig ttlConfig = StateTtlConfig
                //keyState的超时时间为1分钟
                .newBuilder(Time.seconds(60))
                //当创建和更新时,重新计时超时时间
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                //失败时不返回keyState的值
                //.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                //失败时返回keyState的值
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .build();
        //从runtimeContext中获得ck时保存的状态
#         ValueStateDescriptor<Double> weightState = new ValueStateDescriptor<>("weight", Double.class);
#         ValueStateDescriptor<Double> volumeState = new ValueStateDescriptor<>("volume", Double.class);
#         ValueStateDescriptor<Double> pcsState = new ValueStateDescriptor<>("pcs", Double.class);

        weightState.enableTimeToLive(ttlConfig);
        volumeState.enableTimeToLive(ttlConfig);
        pcsState.enableTimeToLive(ttlConfig);

        ValueState<Double> weight = getRuntimeContext().getState(weightState);
        ValueState<Double> volume = getRuntimeContext().getState(volumeState);
        ValueState<Double> pcs = getRuntimeContext().getState(pcsState);
    }
}

设置状态的代码如上:
1.运行后异常:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=0)
Caused by: java.lang.NullPointerException
at operator.JobCodeKeyProcessFunction.processElement(JobCodeKeyProcessFunction.java:146)
at operator.JobCodeKeyProcessFunction.processElement(JobCodeKeyProcessFunction.java:20)
其中: (JobCodeKeyProcessFunction.java:146)代码为:
Double curWeight = weight.value();weight为valueState;
2.debug异常
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=0)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out.
Caused by: java.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) timed out.
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/taskmanager_0#-2127188183]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalRpcInvocation]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

请问是什么导致以上异常的呢

点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 3
  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2021-09-16 20:12:24

    你也没给类中定义的变量复值呀

  • 张浩
    2021-09-16 20:47:57

    @青牛 我理解的是用托管状态代替原始状态,所以类中直接用托管状态了。报了空指针后,我填加了判断托管状态为空时赋值为0的条件,但不能取到我上传的数据值,而是0。当我上传数据后,不是应该输出我上传的数据吗。如果我要给类中变量赋值的话,我需要怎么赋值呢?

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2021-09-16 21:11:12

    @张浩 你改啥都没用,把这个去掉,我服了

    file

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter