学习flink的cogroup在session窗口函数运算的时候,发现老师讲的代码只是对两个socket流进行了简单的转为了pairData 然后就直接通过StringBuilder 进行了拼接,如果是在同一个窗口内的数据会攒起来,每个trigger动作(或session的gap时间外)会触发运算,但是从输出结果来看,仍需要优化,比如增加flatMap函数将soceket一行数据进行按空格拆分处理,用reduce函数将pairData的相同key的值进行累加在cogroup,因为只有一个StringBuilder,所以最后发射出来的值是一直是包含所有的历史数据的,我希望每个socket流的数据累加后的只保留key的最新值,因此做了简单的点简单的优化,不仅仅是cogroup,还有比如join等我都对代码做了些优化,我是flink初学者,路还很长,以此贴来记录自己的学习历程,继续加油。
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
/**
* CoGroup 是 join 的前提,CoGroup 就算没join 上也可以输出,
* 在spark 算子中是CompactBuffer,就是iterator类型
* 我增加reduce 函数对每个流的相同key进行了聚合操作
* 但是如何才能去除历史数据呢? 使用2个StringBuilder来解决
*/
public class CoGroupOnAndJoinSessionWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 连接两个socket流
DataStreamSource<String> s1 = env.socketTextStream("localhost", 6666);
DataStreamSource<String> s2 = env.socketTextStream("localhost", 7777);
// 先轻度处理一波,将输入的数据库转为String 数组
SingleOutputStreamOperator<Tuple2<String, Long>> input1 = s1.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(Tuple2.of(s, 1L));
}
}
}).keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> input2 = s2.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(Tuple2.of(s, 1L));
}
}
}).keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
// 两个流进行CoGroup
CoGroupedStreams<Tuple2<String, Long>, Tuple2<String, Long>> coGroup = input1.coGroup(input2);
// 给 where 条件 : 让key值相同
CoGroupedStreams<Tuple2<String, Long>, Tuple2<String, Long>>.Where<String>.EqualTo equalTo = coGroup.where(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
});
// 再进行window 运算,用的是session window
CoGroupedStreams.WithWindow<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> window = equalTo.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
// 设置触发器: 每个元素进入都能触发窗口操作!
CoGroupedStreams.WithWindow<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> trigger = window.trigger(CountTrigger.of(1));
// 使用窗口函数进行窗口数据的统计逻辑实现 ,第三个参数是key的类型
DataStream<String> apply = trigger.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
StringBuilder sb = new StringBuilder();
StringBuilder sb1 = new StringBuilder();
StringBuilder sb2 = new StringBuilder();
Iterator<Tuple2<String, Long>> iterator1 = first.iterator();
while (iterator1.hasNext()) {
if (sb1.length() != 0) {
sb1 = new StringBuilder();
Tuple2<String, Long> next = iterator1.next();
// 只保留最新的数据
String lee = next.f0 + " < -- > " + next.f1 + "\t";
sb1.append(lee);
} else {
Tuple2<String, Long> next = iterator1.next();
// 只保留最新的数据
String lee = next.f0 + " < -- > " + next.f1 + "\t";
sb1.append(lee);
}
}
//再拼接流 2 数据
Iterator<Tuple2<String, Long>> iterator2 = second.iterator();
while (iterator2.hasNext()) {
if (sb2.length() != 0) {
sb2 = new StringBuilder();
Tuple2<String, Long> next = iterator2.next();
// 只保留最新的数据
String lee = next.f0 + " < -- > " + next.f1 + "\t";
sb2.append(lee);
} else {
Tuple2<String, Long> next = iterator2.next();
// 只保留最新的数据
String lee = next.f0 + " < -- > " + next.f1 + "\t";
sb2.append(lee);
}
}
// 发射数据
out.collect(sb.append("流1: ").append(sb1.toString()).append("流2: ").append(sb2.toString()).toString());
}
});
apply.print();
env.execute();
}
}
改进后输出结果如下: