Flink 学习心得

分享 leeston9 ⋅ 于 2020-08-05 14:19:40 ⋅ 最后回复由 leeston9 2020-08-05 20:32:41 ⋅ 302 阅读

学习flink的cogroup在session窗口函数运算的时候,发现老师讲的代码只是对两个socket流进行了简单的转为了pairData 然后就直接通过StringBuilder 进行了拼接,如果是在同一个窗口内的数据会攒起来,每个trigger动作(或session的gap时间外)会触发运算,但是从输出结果来看,仍需要优化,比如增加flatMap函数将soceket一行数据进行按空格拆分处理,用reduce函数将pairData的相同key的值进行累加在cogroup,因为只有一个StringBuilder,所以最后发射出来的值是一直是包含所有的历史数据的,我希望每个socket流的数据累加后的只保留key的最新值,因此做了简单的点简单的优化,不仅仅是cogroup,还有比如join等我都对代码做了些优化,我是flink初学者,路还很长,以此贴来记录自己的学习历程,继续加油。

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;

/**
 * CoGroup 是 join 的前提,CoGroup 就算没join 上也可以输出,
 * 在spark 算子中是CompactBuffer,就是iterator类型
 * 我增加reduce 函数对每个流的相同key进行了聚合操作
 * 但是如何才能去除历史数据呢? 使用2个StringBuilder来解决
 */
public class CoGroupOnAndJoinSessionWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        // 连接两个socket流

        DataStreamSource<String> s1 = env.socketTextStream("localhost", 6666);
        DataStreamSource<String> s2 = env.socketTextStream("localhost", 7777);

        // 先轻度处理一波,将输入的数据库转为String 数组
        SingleOutputStreamOperator<Tuple2<String, Long>> input1 = s1.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] split = value.split(" ");
                for (String s : split) {
                    out.collect(Tuple2.of(s, 1L));
                }
            }
        }).keyBy(new KeySelector<Tuple2<String, Long>, String>() {

            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        }).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Long>> input2 = s2.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] split = value.split(" ");
                for (String s : split) {
                    out.collect(Tuple2.of(s, 1L));
                }
            }
        }).keyBy(new KeySelector<Tuple2<String, Long>, String>() {

            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        }).reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        });

        // 两个流进行CoGroup
        CoGroupedStreams<Tuple2<String, Long>, Tuple2<String, Long>> coGroup = input1.coGroup(input2);
        // 给 where 条件 : 让key值相同
        CoGroupedStreams<Tuple2<String, Long>, Tuple2<String, Long>>.Where<String>.EqualTo equalTo = coGroup.where(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        }).equalTo(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        });

        // 再进行window 运算,用的是session window
        CoGroupedStreams.WithWindow<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> window = equalTo.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        // 设置触发器: 每个元素进入都能触发窗口操作!
        CoGroupedStreams.WithWindow<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> trigger = window.trigger(CountTrigger.of(1));
        // 使用窗口函数进行窗口数据的统计逻辑实现 ,第三个参数是key的类型
        DataStream<String> apply = trigger.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
            @Override
            public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                StringBuilder sb1 = new StringBuilder();
                StringBuilder sb2 = new StringBuilder();
                Iterator<Tuple2<String, Long>> iterator1 = first.iterator();
                while (iterator1.hasNext()) {
                    if (sb1.length() != 0) {
                        sb1 = new StringBuilder();
                        Tuple2<String, Long> next = iterator1.next();
                        // 只保留最新的数据
                        String lee = next.f0 + "  < -- >  " + next.f1 + "\t";
                        sb1.append(lee);
                    } else {
                        Tuple2<String, Long> next = iterator1.next();
                        // 只保留最新的数据
                        String lee = next.f0 + "  < -- >  " + next.f1 + "\t";
                        sb1.append(lee);
                    }
                }
                //再拼接流 2 数据
                Iterator<Tuple2<String, Long>> iterator2 = second.iterator();
                while (iterator2.hasNext()) {
                    if (sb2.length() != 0) {
                        sb2 = new StringBuilder();
                        Tuple2<String, Long> next = iterator2.next();
                        // 只保留最新的数据
                        String lee = next.f0 + "  < -- >  " + next.f1 + "\t";
                        sb2.append(lee);
                    } else {
                        Tuple2<String, Long> next = iterator2.next();
                        // 只保留最新的数据
                        String lee = next.f0 + "  < -- >  " + next.f1 + "\t";
                        sb2.append(lee);
                    }
                }
                // 发射数据
                out.collect(sb.append("流1: ").append(sb1.toString()).append("流2:  ").append(sb2.toString()).toString());

            }
        });

        apply.print();
        env.execute();

    }
}

改进后输出结果如下:file

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-leeston9,http://hainiubl.com/topics/75272
回复数量: 3
  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-08-05 14:23:19

    前面的注释有点小错误未改正,请忽略:flushed:

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2020-08-05 19:37:24

    @leeston9 我是故意使用一个StringBuilder的,为了引出keyedStream能解决这个问题并且能容错。

  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-08-05 20:32:41

    @青牛 早知道我就整最新值了:innocent:

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter