Flink 学习笔记

分享 123456789987654321 ⋅ 于 2021-10-06 14:14:28 ⋅ 443 阅读

Flink

启动

1.在yarn上启动jobManager

指定dirver端找到多个依赖的jar包

flink run -m yarn-cluster -yt /home/hadoop/spark_news_jars -yjm 1024 -ytm 1024 -yn 2 -ys 3 -yqu root.hainiu -ynm hainiuFlinkStreamingWordCount \
$(ll /home/hadoop/spark_news_jars/ |awk 'NR>1{print "
-C file:///home/hadoop/spark_news_jars/"$9}'|tr '\n' ' ') \
/usr/local/flink/examples/streaming/SocketWindowWordCount.jar --hostname nn1.hadoop --port 6666
#-C 命令是同时指定driver和taskManager运行的java程序的classpath。这里用这个命令只为了让driver找到jar包,taskmanager是通过yt命令找到jar包的,所以tm上有没有-C命令指定的文件是无所谓的。
#-C 命令指定的文件路径必须URI格式的,那本地文件就以file:///开头,注意不能使用文件通配符"*"
-yjm jobManager的内存
-ytm taskManager的内存
-yn tm的数量
-ys 每个tm的任务槽
-yqu yarn资源队列名称
-ynm yarn application name

wordcount代码

java

//可以设置延时时间
//taskmanager.network.netty.sendReceiveBufferSize 4M
//env.setBufferTimeout(-1); 

/**
 * @Description(描述): flink wordCount
 * @Version(版本): 1.0
 * @Author(创建者): ALIENWARE
 * @ * * * * * * * * * * * * * @
 */
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> socket = env.socketTextStream("localhost", 6666);
        //设置并行度
        env.setParallelism(4);
//1.lambda写法
         /*     SingleOutputStreamOperator<String> flatMap = socket.flatMap((String value,      Collector<String> out) -> {
                   Arrays.stream(value.split(" ")).forEach(word -> {
                       out.collect(word);
                   });
               }).returns(Types.STRING);

               SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(f ->       Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
               SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy(0).sum(1);
               sum.print();*/

//2.function写法
         /*      SingleOutputStreamOperator<String> flatMap = socket.flatMap(new                FlatMapFunction<String, String>() {
                   @Override
                   public void flatMap(String value, Collector<String> out) throws Exception {
                       String[] s = value.split(" ");
                       for (String ss : s) {
                           out.collect(ss);
                       }
                   }
               });

               SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new        MapFunction<String, Tuple2<String, Integer>>() {
                   @Override
                   public Tuple2<String, Integer> map(String value) throws Exception {
                       return Tuple2.of(value, 1);
                   }
               });

               SingleOutputStreamOperator<Tuple2<String, Integer>> sum =                        map.keyBy("f0").sum(1);
               sum.print();*/

//3.function组合写法
              /* SingleOutputStreamOperator<Tuple2<String,Integer>> flatMap =                   socket.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                   @Override
                   public void flatMap(String value, Collector<Tuple2<String,Integer>> out)         throws Exception {
                       String[] s = value.split(" ");
                       for (String ss : s) {
                           out.collect(Tuple2.of(ss,1));
                       }
                   }
               });

               SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f ->             f.f0).sum(1);

               sum.print();*/

//4.richfunction组合写法 //rich包含了 open初始化 close关闭方法 getruntime方法

              /* SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap =                              socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
                   private String name = null;
                   @Override
                   public void open(Configuration parameters) throws Exception {
                       name = "hainiu_";
                   }
                   @Override
                   public void close() throws Exception {
                       name = null;
                   }
                   @Override
                   public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                       String[] s = value.split(" ");
                       for (String ss : s) {
                           System.out.println(getRuntimeContext().getIndexOfThisSubtask());
                           out.collect(Tuple2.of(name + ss, 1));
                       }
                   }
               });
               SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                   @Override
                   public String getKey(Tuple2<String, Integer> value) throws Exception {
                       return value.f0;
                   }
               }).sum(1);
               sum.print();*/

//5.processfunction组合写法
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {

            private String name = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                name = "hainiu_";
            }

            @Override
            public void close() throws Exception {
                name = null;
            }

            @Override
            public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                //                getRuntimeContext()
                String[] s = value.split(" ");
                for (String ss : s) {
                    System.out.println(getRuntimeContext().getIndexOfThisSubtask());
                    out.collect(Tuple2.of(name + ss, 1));
                }
            }
        }).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            private Integer num = 0;

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                num += value.f1;
                out.collect(Tuple2.of(value.f0, num));
            }
        });

        //打印执行计划
        System.out.println(env.getExecutionPlan());

        sum.print();
        env.execute();
    }
}

kafka查看toiopic分区

`kafka-topic.sh--zookeeper你那.hadoop:2181--describe --topic topic_32  查看toiopic分区

一、DataSource

默认输入源

数据输入源
//1.文件
//相同格式的文件
env.readTextFile("file:///path")
//不同格式的文件    
env.readFile(inputFormat, "file:///path");-->比如env.readFile(new FileInputFormat(""), "file:///path")
//2.Socket

自定义输入源

1.实现SourceFunction(非并行的)

package com.hainiuxy.flink;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * flink自定义数据源
 */
public class MyFileSource implements SourceFunction<String>, CheckpointedFunction {
    /**
     * SourceFunction是单并行度的
     * ParallismSourceFunction
     * 读取数据然后发送出去,发送给下游的operator
     *
     * @param ctx
     * @throws Exception
     * ctx.collect(count);最终发送数据
     * <p>
     * 比如我们要监控一个hdfs上面的文件是不是产生的了变化,如果变化了我们就打印出来所有的数据信息
     * 监控的时间是2000ms,检验和check_sum
     * LoL.exe --> lol.exe 20G -->md5 sadasdadadadqeeq2ew3223232adsasdas
     */
    Boolean flag = true;
    int interval = 2000;
    String md5 = "";

    //        public MyFileSouce(String path){
    //            System.out.println(Thread.currentThread().getName()+"xxxx");
    //            this.path = new Path(path);
    //        }
    private ListState<String> listState = null;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        Path path = new Path(("/user/yeniu/data/country_data1"));
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        FileSystem fs = FileSystem.get(conf);
        while (flag) {
            if (!fs.exists(path)) {
                Thread.sleep(interval);
                continue;
            }
            System.out.println("md5====" + md5);
            String originMD5 = fs.getFileChecksum(path).toString();
            String current_md5 = originMD5.split(":")[1];
            if (!current_md5.equals(md5)) {
                FSDataInputStream in = fs.open(path);
                BufferedReader bf = new BufferedReader(new InputStreamReader(in));
                String line = null;
                while ((line = bf.readLine()) != null)
                    ctx.collect(line);

                bf.close();
                in.close();
                this.md5 = current_md5;
            }
            Thread.sleep(interval);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    //将用户的数据放入到状态中
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.listState.clear();
        this.listState.add(md5);
    }

    //初始化(1.初始化   2.恢复)
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> lds = new ListStateDescriptor<>("listStateDesc", BasicTypeInfo.STRING_TYPE_INFO);
        this.listState = context.getOperatorStateStore().getListState(lds);
        if (context.isRestored()) {
            String stateMD5 = this.listState.get().iterator().next();
            this.md5 = stateMD5;
        }
    }
}

/**
* 自定义数据源运行类
*/

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FileSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stringDataStreamSource = env.addSource(new FileCountryDictSourceFunction());
        stringDataStreamSource.print();

        env.execute();
    }
}

2.实现ParallelSourceFunction与RichParallelSourceFunction(并行的)

kafka反序列化
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.util.Properties;

/**
 * flink读取kafka数据的时候 反序列化的方式
 * <p>
 * kafka的反序列化的接口
 *  1.DeserializationSchema (Byte[] message) !! (只能反序列化value)
 *      实现类:SimpleStringScheme 转换为string字符串
 *      
 *  2.KafkaDeserializationSchema(ConsumerRecord[k,v] record) !!  (可以反序列化key)
 *      实现类:JSONKeyValueDeserializationSchema 转换为json类型
 *      实现类:KeyedDeserializationSchema 转换的数据不仅有value还有key
 */
public class FLinkKafkaDeser {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        //        pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        //        pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //        pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
        pro.setProperty("flink.partition-discovery.interval-millis", "3000");
        FlinkKafkaConsumer010<JSONObject> kafkaSc = new FlinkKafkaConsumer010<>("topic_32", new MyScheme4(true), pro);
        //下边设置完了以后对于上面的设置就是覆盖
        kafkaSc.setStartFromLatest();
        //kafkaSc.setStartFromGroupOffsets();
        env.addSource(kafkaSc).print();
        env.execute();
    }
2.1字段反序列化 (value)
    /**
     * 字段反序列化 (value)
     */
    public static class MyScheme1 implements DeserializationSchema<String> {

        @Override
        public String deserialize(byte[] message) throws IOException {
            String word = new String(message);
            return word;
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }

        //反序列化
        @Override
        public TypeInformation<String> getProducedType() {
            //            return TypeInformation.of(String.class);
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
2.2对象返序列化
public static class Hainiu {
        private String word;

        public Hainiu(String word) {
            this.word = word;
        }

        public Hainiu() {
        }

        @Override
        public String toString() {
            return "word:" + word;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }
    }

   /**
     * 对象反序列化
     */
    public static class MyScheme2 implements DeserializationSchema<Hainiu> {

        @Override
        public Hainiu deserialize(byte[] message) throws IOException {
            String msg = new String(message);
            return new Hainiu(msg);
        }

        @Override
        public boolean isEndOfStream(Hainiu nextElement) {
            return false;
        }

        @Override
        public TypeInformation<Hainiu> getProducedType() {
            return TypeInformation.of(Hainiu.class);
        }
    }
2.3key的反序列化
2.4json反序列化
public class FLinkKafkaDeser {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        //        pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        //        pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //        pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
        pro.setProperty("flink.partition-discovery.interval-millis", "3000");
        //json的反序列化
        FlinkKafkaConsumer010<ObjectNode> kafkaSc = new FlinkKafkaConsumer010<>("topic_32", new JSONKeyValueDeserializationSchema(true), pro);
        //下边设置完了以后对于上面的设置就是覆盖
        kafkaSc.setStartFromLatest();
        //kafkaSc.setStartFromGroupOffsets();
        env.addSource(kafkaSc).print();
        env.execute();
    }
2.5自定义json反序列化
    /**
     *自定义json序列化
     */
    public static class MyScheme4 implements KafkaDeserializationSchema<JSONObject> {

        private Boolean includeMetadata;

        public MyScheme4(Boolean includeMetadata) {
            this.includeMetadata = includeMetadata;
        }

        @Override
        public boolean isEndOfStream(JSONObject nextElement) {
            return false;
        }

        @Override
        public JSONObject deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            byte[] values = record.value();
            byte[] keys = record.key();
            String topic = record.topic();
            int partition = record.partition();
            Long offset = record.offset();
            JSONObject obj = new JSONObject();

            if (keys != null) {
                obj.put("key", new String(keys));
            }

            if (values != null) {
                obj.put("value", new String(values));
            }

            if (includeMetadata) {
                obj.put("partition", partition);
                obj.put("offset", offset);
                obj.put("topic", topic);
            }
            return obj;
        }

        @Override
        public TypeInformation<JSONObject> getProducedType() {
            return TypeInformation.of(JSONObject.class);
        }
    }

kafka动态发现分区配置

properties.setProperty("flink.partition-discovery.interval-millis", "30000")

二、transformations

转换算子

connect 与 union (合并流)

//connect连接的两个流类型可以不一致,而union连接的流的类型必须一致

//connect要用coProcessFunction   union用processfunction
//1.ds2.connect(ds1).process(new CoProcessFunction<Tuple2<String, String>, String, String>() 
//2.

1.union

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * flink 两个流uniuon
 */
public class FlinkUnion {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());
        //        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
        //            @Override
        //            public Tuple2<String, String> map(String value) throws Exception {
        //                String[] split = value.split("\t");
        //                return Tuple2.of(split[0], split[1]);
        //            }
        //        });
        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
        DataStreamSource<String> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------
        ds.union(ds1).process(new ProcessFunction<String, String>() {
            Map<String, String> map = new HashMap<>();

            @Override
            public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                String[] split = value.split("\t");
                if (split.length > 1) {
                    map.put(split[0], split[1]);
                    out.collect(value);
                } else {
                    String s = map.get(value);
                    if (s == null) {
                        out.collect("unknow");
                    } else
                        out.collect(s);
                }
            }
        }).print();

        env.execute();
    }
}

2.connect

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * flink两个流connect
 */
public class FlinkConnect {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //        env.setParallelism(1);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());
        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split("\t");
                return Tuple2.of(split[0], split[1]);
            }
        });

        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
        DataStreamSource<String> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------

        ds2.connect(ds1).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
            Map<String, String> map = new HashMap<>();
            //两个流的处理逻辑
            @Override
            public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                map.put(value.f0, value.f1);
                out.collect(value.toString());
            }

            @Override
            public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
                String s = map.get(value);
                out.collect(s == null ? "no match" : s);
            }
        }).print();

        env.execute();
    }
}

3.keyby

//1.需要两个流先keyby 然后再拿两个keyby进行分组
//一般不用 会数据倾斜
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * flink connect按照 keyby分组,
 */
public class FlinkConnectKeyBy {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //        env.setParallelism(1);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());
        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split("\t");
                return Tuple2.of(split[0], split[1]);
            }
        });
        KeyedStream<Tuple2<String, String>, String> ds2keyBy = ds2.keyBy(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        });

        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
        DataStreamSource<String> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------
        KeyedStream<String, String> ds1KeyBy = ds1.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });

        ds2keyBy.connect(ds1KeyBy).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
            Map<String, String> map = new HashMap<>();

            @Override
            public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                map.put(value.f0, value.f1);
                out.collect(getRuntimeContext().getIndexOfThisSubtask() + ":" + value.toString());
            }

            @Override
            public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
                String s = map.get(value);
                out.collect(s == null ? "no match" : s);
            }
        }).print();
        env.execute();
    }
}

3.1keyby按照实体类分组

package com.hainiuxy.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

/**
 * flink自定义分组
 * 1.实体类必须重写hashcode
 * 2.两个流keyby必须都按照 实体类分组
 */

public class FlinkConnectKeyByVO {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//        env.setParallelism(1);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());
        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split("\t");
                return Tuple2.of(split[0], split[1]);
            }
        });

        KeyedStream<Tuple2<String, String>, HainiuVo> ds2Keyby = ds2.keyBy(new KeySelector<Tuple2<String, String>, HainiuVo>() {
            @Override
            public HainiuVo getKey(Tuple2<String, String> value) throws Exception {
                return new HainiuVo(value.f0);
            }
        });

        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"g_32");
        FlinkKafkaConsumer010<HainiuVo> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new MyDeserialization(), pro);
        DataStreamSource<HainiuVo> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------
        //如果使用Bean分组当成key那么必须加上一个单独的hashcode方法
        KeyedStream<HainiuVo, HainiuVo> ds1KeyBy = ds1.keyBy(new KeySelector<HainiuVo, HainiuVo>() {
            @Override
            public HainiuVo getKey(HainiuVo value) throws Exception {
                return value;
            }
        });

        ds2Keyby.connect(ds1KeyBy).process(new CoProcessFunction<Tuple2<String,String>, HainiuVo, String>() {
            Map<String,String> map = new HashMap<>();
            @Override
            public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                map.put(value.f0,value.f1);
                out.collect(getRuntimeContext().getIndexOfThisSubtask()+":"+value.toString());
            }

            @Override
            public void processElement2(HainiuVo value, Context ctx, Collector<String> out) throws Exception {
                String s = map.get(value.getName());
                out.collect(s==null?"no match":s);
            }
        }).print();

        env.execute();
    }

    public static class MyDeserialization implements DeserializationSchema<HainiuVo> {

        @Override
        public HainiuVo deserialize(byte[] message) throws IOException {
            return new HainiuVo(new String(message));
        }

        @Override
        public boolean isEndOfStream(HainiuVo nextElement) {
            return false;
        }

        @Override
        public TypeInformation<HainiuVo> getProducedType() {
            return TypeInformation.of(HainiuVo.class);
        }
    }

    public static class HainiuVo{
        private String name;

        @Override
        public int hashCode() {
            return Objects.hash(getName());
        }

        public HainiuVo() {

        }
        public HainiuVo(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

3.2KeyedCoProcessFunction

//KeyedCoProcessFunction 能获取key的值,  CoProcessFunction更关注流信息

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

/**
 * flink  KeyedCoProcessFunction
 * ctx.getCurrentKey().name 获取当前key的名字
 */
public class FlinkConnectKeyByVOAndKeyedProcess {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//        env.setParallelism(1);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());
        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split("\t");
                return Tuple2.of(split[0], split[1]);
            }
        });

        KeyedStream<Tuple2<String, String>, HainiuVo> ds2Keyby = ds2.keyBy(new KeySelector<Tuple2<String, String>, HainiuVo>() {
            @Override
            public HainiuVo getKey(Tuple2<String, String> value) throws Exception {
                return new HainiuVo(value.f0);
            }
        });

        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"g_32");
        FlinkKafkaConsumer010<HainiuVo> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new MyDeserialization(), pro);
        DataStreamSource<HainiuVo> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------
        //如果使用Bean分组当成key那么必须加上一个单独的hashcode方法
        KeyedStream<HainiuVo, HainiuVo> ds1KeyBy = ds1.keyBy(new KeySelector<HainiuVo, HainiuVo>() {
            @Override
            public HainiuVo getKey(HainiuVo value) throws Exception {
                return value;
            }
        });

        ds2Keyby.connect(ds1KeyBy).process(new KeyedCoProcessFunction<HainiuVo, Tuple2<String,String>, HainiuVo, String>() {
            Map<String,String> map  = new HashMap<>();
            @Override
            public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                map.put(ctx.getCurrentKey().name,value.f1);
                out.collect(value.toString());
            }

            @Override
            public void processElement2(HainiuVo value, Context ctx, Collector<String> out) throws Exception {
                String s = map.get(value.name);
                  out.collect(s==null?"unknow":s);
            }
        }).print();

        env.execute();
    }

    public static class MyDeserialization implements DeserializationSchema<HainiuVo> {

        @Override
        public HainiuVo deserialize(byte[] message) throws IOException {
            return new HainiuVo(new String(message));
        }

        @Override
        public boolean isEndOfStream(HainiuVo nextElement) {
            return false;
        }

        @Override
        public TypeInformation<HainiuVo> getProducedType() {
            return TypeInformation.of(HainiuVo.class);
        }
    }

    public static class HainiuVo{
        private String name;

        @Override
        public int hashCode() {
            return Objects.hash(getName());
        }

        public HainiuVo() {

        }
        public HainiuVo(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

Redistributing streams 解决数据倾斜的方法:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * flink 解决数据倾斜问题
 * 1.ds.rescale(); 通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
 *   数据传输都在一个TaskManager内,不需要通过网络。
 * 2. ds.rebalance(); 对全局进行重分区
 */
public class FlinkRedistribute {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        DataStreamSource<String> ds = env.addSource(new MyFileSource()).setParallelism(3);
        //        DataStream<String> ds1 = ds.rebalance();
        DataStream<String> ds1 = ds.rescale();
        SingleOutputStreamOperator<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value);
            }
        }).setParallelism(5);
        //        keyby  同样的key的数据跑到一起  HASH
        //customer_partitioning自定义分区其 HASH
    }
}

1.自定义partitioner

//使用ds,partitionCustom 对每个流进行处理 重分区

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.*;

/**
 * flink解决数据倾斜
 * 1.自定义partitioner
 * 自定义分区器实现分组
 */
public class FlinkUDPartitionerTest {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        DataStreamSource<String> ds = env.addSource(new MyFileSource());

        SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                String[] split = value.split("\t");
                return Tuple2.of(split[0], split[1]);
            }
        });

        //对每个流进行partitionCustom
        DataStream<Tuple2<String, String>> ds2Partitioned = ds2.partitionCustom(new MyPartitioner(), new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        }).keyBy(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        });

        //----------------------------------------------------------------
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
        DataStreamSource<String> ds1 = env.addSource(kafkaSource);
        //-----------------------------------------------------------

        //对每个流进行partitionCustom
        DataStream<String> ds1Partitioned = ds1.partitionCustom(new MyPartitioner(), new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        }).keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });

        //两个流join
        ds2Partitioned.connect(ds1Partitioned).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
            Map<String, String> map = new HashMap<>();

            @Override
            public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                map.put(value.f0, value.f1);
                System.out.println(getRuntimeContext().getIndexOfThisSubtask() + ":" + value.toString());
                out.collect(value.toString());
            }

            @Override
            public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
                String s = map.get(value);
                out.collect(s == null ? "no match" : s);
            }
        }).print();
        env.execute();

    }

    //自定义partition
    public static class MyPartitioner implements Partitioner<String> {
        List<String> list = Arrays.asList(new String[]{"AF", "AE", "AG", "AI", "AO"});

        @Override
        public int partition(String key, int numPartitions) {
            if (list.contains(key)) {
                return 0;
            } else {
                return 1;
            }
        }
    }

}

2.join数据倾斜问题加爵

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

/**
 * Hdfs出现的数据
 * kafka中出现的数据
 * join
 * join后数据倾斜问题解决
 */
public class FlinkRebalancePartitioner {
   public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
      DataStreamSource<String> ds = env.addSource(new MyFileSource());

      Properties pro = new Properties();
      pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "s3.hadoop:9092,s4.hadoop:9092");
      pro.put(ConsumerConfig.GROUP_ID_CONFIG, "g_32");
      FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("topic_32", new SimpleStringSchema(), pro);
      DataStreamSource<String> ds1 = env.addSource(kafkaSource);

      SingleOutputStreamOperator<Tuple2<String, String>> ds2 = ds.map(new MapFunction<String, Tuple2<String, String>>() {
         @Override
         public Tuple2<String, String> map(String value) throws Exception {
            String[] split = value.split("\t");
            return Tuple2.of(split[0], split[1]);
         }
      });

      SingleOutputStreamOperator<Tuple2<String, String>> ds3 = ds2.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
         @Override
         public void flatMap(Tuple2<String, String> value, Collector<Tuple2<String, String>> out) throws Exception {
            for (int i = 0; i < 24; i++) {
               out.collect(Tuple2.of(i + "_" + value.f0, value.f1));
            }
         }
      });

      DataStream<Tuple2<String, String>> ds4 = ds3.partitionCustom(new Partitioner<String>() {
         @Override
         public int partition(String key, int numPartitions) {
            String s = key.split("_")[0];
            return Integer.valueOf(s);
         }
      }, new KeySelector<Tuple2<String, String>, String>() {
         @Override
         public String getKey(Tuple2<String, String> value) throws Exception {
            return value.f0;
         }
      });

      SingleOutputStreamOperator<String> ds11 = ds1.map(new RichMapFunction<String, String>() {
         Random random = null;

         @Override
         public void open(Configuration parameters) throws Exception {
            random = new Random();
         }

         @Override
         public String map(String value) throws Exception {
            int index = random.nextInt(24);
            return index + "_" + value;
         }
      });

      DataStream<String> ds111 = ds11.partitionCustom(new Partitioner<String>() {
         @Override
         public int partition(String key, int numPartitions) {
            String[] split = key.split("_");
            return Integer.valueOf(split[0]);
         }
      }, new KeySelector<String, String>() {
         @Override
         public String getKey(String value) throws Exception {
            return value;
         }
      });
      //mapState ListState valueState reducing fold aggregate
      //ListState
      //BroadcastState
      ds4.connect(ds111).process(new CoProcessFunction<Tuple2<String, String>, String, String>() {
         Map<String, String> map = new HashMap<String, String>();

         @Override
         public void processElement1(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
            int index = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("data_assigner:" + index + "<--->" + value);
            map.put(value.f0, value.f1);
         }

         @Override
         public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
            String s = map.get(value);
            out.collect(s == null ? "no match" : s);
         }
      }).print();
      env.execute();
   }
}

reduce与fold聚合操作

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;

/**
 * flink聚合操作
 * reduce聚合
 * folod聚合(过时)
 * process(用不成) 按照线程来的,不会按照key区分
 */
public class FlinkAggregateTest {
    public static void main(String[] args) throws Exception {
        List<String> list = new ArrayList<String>();
        list.add("hainiu1");
        list.add("hainiu1");
        list.add("hainiu2");
        list.add("hainiu2");
        list.add("hainiu3");
        list.add("hainiu3");
        list.add("hainiu4");
        list.add("hainiu4");
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(2);
        DataStreamSource<String> ds = env.fromCollection(list);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> ds3 = ds2.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        //输入的两个参数类型和返回值类型必须一直
        ds3.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        })/*.print("ReduceFunction")*/;

        ds3.fold(0, new FoldFunction<Tuple2<String, Integer>, Integer>() {
            @Override
            public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception {
                return accumulator + value.f1;
            }
        })/*.print("FoldFunction")*/;

        /*ds3.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Integer>() {
            int sum = 0;

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Integer> out) throws Exception {
                String currentKey = ctx.getCurrentKey();
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(currentKey + ":" + index);
                sum += value.f1;
                out.collect(sum);
            }
        }).print("KeyedProcessFunction");*/

        env.execute();
    }
}

min,minBy(聚合)

//尽量使用minby/maxby
//min 会按照首字母输出,minby按照整个单词进行输出

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 聚合操作
 * min minby max maxby
 * 尽量使用minby maxby
 */
public class FlinkAggregateMaxMinSum {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds = env.socketTextStream("localhost", 6666);

        KeyedStream<Tuple2<String, Integer>, String> ds1 = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] split = value.split(" ");
                return Tuple2.of(split[0], Integer.valueOf(split[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0.substring(0, 1);
            }
        });
        //按照key的第一个字母分组
        //        ds1.maxBy(1).print("maxBy");
        ds1.min(1).print("min");
        ds1.minBy(1).print("minBY");
        env.execute();
    }
}

OutputTag(侧输出流)

//只能在processFunction中使用
//根据条件输出不同类型的数据

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;

/**
 * 侧输出流 拆分流
 * 根据条件输出不同类型的数据
 */
public class FlinkOutPutTagTest {
    private static final OutputTag<String> tag = new OutputTag<>("invalid", BasicTypeInfo.STRING_TYPE_INFO);
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds = env.socketTextStream("localhost", 6666);
        //想要使用分流操作必须使用最底层的process方法
        SingleOutputStreamOperator<String> ds1 = ds.process(new ProcessFunction<String, String>() {
            List<String> list = new ArrayList<String>();
            @Override
            public void open(Configuration parameters) throws Exception {
                list.add("138");
                list.add("136");
                list.add("152");
                list.add("139");
                list.add("110");
            }

            @Override
            public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                int length = value.length();
                boolean valid = list.contains(value.substring(0, 3));
                if (length == 11 && valid) {
                    out.collect(value);
                } else {
                    ctx.output(tag, value);
                }
            }
        });
        ds1.print("valid");
        ds1.getSideOutput(tag).print("invalid");
        env.execute();
    }
}

sink

1.socket text csv

package sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * sink输出的三种形式
 * socket
 * text
 * csv
 */
public class FlinkTextSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dss = env.socketTextStream("localhost", 6666);
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds = dss.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //输出到csv
        ds.writeAsCsv("csv");
        //输出到 text
        ds.writeAsText("text");
        //输出到socket
        ds.writeToSocket("localhost",8888,new SerializationSchema<Tuple2<String,Integer>>(){
            @Override
            public byte[] serialize(Tuple2<String, Integer> element) {
              String s = element.f0+":"+element.f1;
              return s.getBytes();
            }
        });
        ds.print();
        //打印错误 (生产不能使用)
        ds.printToErr();
        env.execute();
    }
}

2.自定义数据输出

//实现SinkFunction 或 继承RichSinkFunction
//(在没有自行改变并行度的情况下,是否并行取决其父operator)

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * flink 自定义sinkfunction输出
 */
public class FlinkSinkFunctionTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dss = env.socketTextStream("localhost", 6666);
        dss.addSink(new MyHdfsSink("hdfs://ns1/user/qingniu/flink_hdfs_sink"));
        env.execute();
    }

    /**
     * 目标是插入文件到hdfs中,如果没有就创建一个新的,如果存在就拼接
     */
    public static class MyHdfsSink extends RichSinkFunction<String> {
        /**
         * 如果使用构造器那么必须要注意,他是在driver端执行的,不要使用非序列化的对象
         *
         * @param parameters
         * @throws Exception
         */
        private String hdfsPath;
        private Path hPath;
        private FileSystem fs = null;
        private SimpleDateFormat df;
        public MyHdfsSink(String hdfsPath) {
            this.hdfsPath = hdfsPath;
        }

        //建立hdfs连接
        @Override
        public void open(Configuration parameters) throws Exception {
            df = new SimpleDateFormat("yyyyMMddHHmm");
            hPath = new Path(this.hdfsPath);
            org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
            //设置hdfs用户权限
            fs = FileSystem.get(new URI("hdfs://ns1"), conf, "qingniu");
        }

        @Override
        public void close() throws Exception {
            fs.close();
        }

        /**
         * 首先传递进来一个根目录
         * 在根目录下面创建的文件是按照日期和线程的index为标识的
         * 执行方法
         * @param value
         * @param context
         */
        @Override
        public void invoke(String value, Context context) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            String dateStr = df.format(new Date());
            String allPath = this.hdfsPath + "/" + dateStr + "_" + indexOfThisSubtask;
            Path realPath = new Path(allPath);
            FSDataOutputStream fsos = null;
            if (fs.exists(realPath)) {
                fsos = this.fs.append(realPath);
            } else {
                fsos = fs.create(realPath);
            }
            fsos.writeUTF(value);
            fsos.flush();
            fsos.close();
        }
    }
}

3.FlinkKafkaProducer(输出到kafka)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.nio.charset.StandardCharsets;
import java.util.Properties;

/**
 * flink数据输出到kafka
 * 4种方式
 */
public class FlinkKafkaSink01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds = env.socketTextSt
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-123456789987654321,http://hainiubl.com/topics/75791
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter