为什么 flink wordcount 程序跑了 10s?

问答 liuye ⋅ 于 2019-08-14 16:57:35 ⋅ 最后回复由 青牛 2019-08-15 09:06:02 ⋅ 1507 阅读

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.List;

public class WordCount {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long st = System.currentTimeMillis();
DataSet text = env.fromElements(
"Using the Hello World guide",
" you’ll start a branch",
" write comments",
"and open a pull request. ");
System.out.println("init" + (System.currentTimeMillis() - st));

long st2 = System.currentTimeMillis();
DataSet<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String value, Collector<Tuple2<String, Integer>> collectors) {
String[] tokens = value.toLowerCase().split("\W+");
for (String token : tokens) {
if (token.length() > 0) {
collectors.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).groupBy(0).sum(1);
System.out.println("handle" + (System.currentTimeMillis() - st2));

long st3 = System.currentTimeMillis();
List<Tuple2<String, Integer>> collect = sum.collect();
sum.print();
System.out.println("print" + (System.currentTimeMillis() - st3));

}

}

成为第一个点赞的人吧 :bowtie:
回复数量: 1
  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2019-08-15 09:06:02

    啥错误?

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter