flinksql 完结版

分享 123456789987654321 ⋅ 于 2021-12-13 21:55:40 ⋅ 278 阅读

FLINK

1.tableAPI

package com.practice.apitest.tableapi;

import com.practice.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @ClassName: TableTest1_Example
 * @Description:
 * @Version: 1.0
 */
public class TableTest1_Example {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 读取数据
        DataStreamSource<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");

        // 2. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 3. 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 4. 基于流创建一张表
        Table dataTable = tableEnv.fromDataStream(dataStream);

        // 5. 调用table API进行转换操作
        Table resultTable = dataTable.select("id, temperature")
                .where("id = 'sensor_1'");

        // 6. 执行SQL
        tableEnv.createTemporaryView("sensor", dataTable);
        String sql = "select id, temperature from sensor where id = 'sensor_1'";
        Table resultSqlTable = tableEnv.sqlQuery(sql);

        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }
}

2.Blink解析器

package com.practice.apitest.tableapi;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

/**
 * @ClassName: TableTest2_CommonApi
 * @Description:
 * @Version: 1.0
 */
public class TableTest2_CommonApi {
    public static void main(String[] args) throws Exception{
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 基于老版本planner的流处理
        EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);

        // 1.2 基于老版本planner的批处理
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);

        // 1.3 基于Blink的流处理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);

        // 1.4 基于Blink的批处理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);

        // 2. 表的创建:连接外部系统,读取数据
        // 2.1 读取文件
        String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
        tableEnv.connect( new FileSystem().path(filePath))
                .withFormat( new Csv())
                .withSchema( new Schema()
                .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                )
                .createTemporaryTable("inputTable");

        Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();

        // 3. 查询转换
        // 3.1 Table API
        // 简单转换
        Table resultTable = inputTable.select("id, temp")
                .filter("id === 'sensor_6'");

        // 聚合统计
        Table aggTable = inputTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");

        // 3.2 SQL
        tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
        Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");

        // 打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toRetractStream(aggTable, Row.class).print("agg");
        tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");

        env.execute();
    }
}

3.输出到外部文件系统

package com.practice.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

/**
 * @ClassName: TableTest3_FileOutput
 * @Description:
 * @Version: 1.0
 */
public class TableTest3_FileOutput {
    public static void main(String[] args) throws Exception {
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 表的创建:连接外部系统,读取数据
        // 读取文件
        String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";
        tableEnv.connect(new FileSystem().path(filePath))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                )
                .createTemporaryTable("inputTable");

        Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();

        // 3. 查询转换
        // 3.1 Table API
        // 简单转换
        Table resultTable = inputTable.select("id, temp")
                .filter("id === 'sensor_6'");

        // 聚合统计
        Table aggTable = inputTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");

        // 3.2 SQL
        tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
        Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");

        // 4. 输出到文件
        // 连接外部文件注册输出表
        String outputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\out.txt";
        tableEnv.connect(new FileSystem().path(outputPath))
                .withFormat(new Csv())
                .withSchema(new Schema()
                                .field("id", DataTypes.STRING())
//                        .field("cnt", DataTypes.BIGINT())
                                .field("temperature", DataTypes.DOUBLE())
                )
                .createTemporaryTable("outputTable");

        //往文件里面写入数据不支持追加写入并更新的方式
        resultTable.insertInto("outputTable");
//        aggTable.insertInto("outputTable");

        env.execute();
    }
}

4.写入到kafka

package com.practice.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

/**
 * @ClassName: TableTest4_KafkaPipeLine
 * @Description:
 * @Version: 1.0
 */
public class TableTest4_KafkaPipeLine {
    public static void main(String[] args) throws Exception {
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 连接Kafka,读取数据
        tableEnv.connect(new Kafka()
                .version("0.11")
                .topic("sensor")
                .property("zookeeper.connect", "localhost:2181")
                .property("bootstrap.servers", "localhost:9092")
        )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                )
                .createTemporaryTable("inputTable");

        // 3. 查询转换
        // 简单转换
        Table sensorTable = tableEnv.from("inputTable");
        Table resultTable = sensorTable.select("id, temp")
                .filter("id === 'sensor_6'");

        // 聚合统计
        Table aggTable = sensorTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");

        // 4. 建立kafka连接,输出到不同的topic下
        tableEnv.connect(new Kafka()
                .version("0.11")
                .topic("sinktest")
                .property("zookeeper.connect", "localhost:2181")
                .property("bootstrap.servers", "localhost:9092")
        )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
//                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                )
                .createTemporaryTable("outputTable");
        //不支持更新操作
        resultTable.insertInto("outputTable");

        env.execute();
    }
}

5.tableapi的更新模式

//追加(Append)模式
– 表只做插入操作,和外部连接器只交换插入(Insert)消息

// 撤回(Retract)模式
– 表和外部连接器交换添加(Add)和撤回(Retract)消息
– 插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)
编码为上一条的 Retract 和下一条的 Add 消息

//更新插入(Upsert)模式
//更新插入必须制定key,且只生成一条记录
– 更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息

6.输出到 Kafka

tableEnv.connect(
new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092") )
.withFormat( new Csv() )
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable");
resultTable.insertInto("kafkaOutputTable");

7.输出到es

tableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp") )
.inUpsertMode()
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");

8.输出到mysql

tableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp") )
.inUpsertMode()
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");

9.Table 与 DataStream转换

9.1将 Table 转换成 DataStream

//追加模式(Append Mode) – 用于表只会被插入(Insert)操作更改的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

//撤回模式(Retract Mode) – 用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
    -得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是
    -新增的数据(Insert),还是被删除的数据(Delete)
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv
.toRetractStream(aggResultTable , Row.class);

9.2将 DataStream 转换成表

//对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API做转换操作
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);

//默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");

9.3创建临时视图(Temporary View)

//基于 DataStream 创建临时视图
tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", 
dataStream, "id, temperature, timestamp as ts");

//基于 Table 创建临时视图
tableEnv.createTemporaryView("sensorView", sensorTable);

9.4查看执行计划

Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
//查看执行计划,可以通过 TableEnvironment.explain(table) 方法或
TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划
➢ 优化的逻辑查询计划
➢ 优化后的逻辑查询计划
➢ 实际执行计划。
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

10.动态表(Dynamic Tables)

package com.practice.apitest.beans;
/**
 * @ClassName: SensorReading
 * @Description:
 * @Version: 1.0
 */

// 传感器温度读数的数据类型
public class SensorReading {
    // 属性:id,时间戳,温度值
    private String id;
    private Long timestamp;
    private Double temperature;

    public SensorReading() {
    }

    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Double getTemperature() {
        return temperature;
    }

    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + '\'' +
                ", timestamp=" + timestamp +
                ", temperature=" + temperature +
                '}';
    }
}
package com.practice.apitest.tableapi;

import com.practice.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @ClassName: TableTest5_TimeAndWindow
 * @Description:
 * @Version: 1.0
 */
public class TableTest5_TimeAndWindow {
    public static void main(String[] args) throws Exception {
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 读入文件数据,得到DataStream
        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 4. 将流转换成表,定义时间特性
        // pt:随便写,不要和关键字冲突就行,
        // proctime: 关键字,自动生成毫秒值
        //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
        Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");

        tableEnv.createTemporaryView("sensor", dataTable);

        // 5. 窗口操作
        // 5.1 Group Window
        // table API
        Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                .groupBy("id, tw")
                .select("id, id.count, temp.avg, tw.end");

        // SQL
        Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) " +
                "from sensor group by id, tumble(rt, interval '10' second)");

        // 5.2 Over Window
        // table API
        Table overResult = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                .select("id, rt, id.count over ow, temp.avg over ow");

        // SQL
        Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
                " from sensor " +
                " window ow as (partition by id order by rt rows between 2 preceding and current row)");

//        dataTable.printSchema();
//        tableEnv.toAppendStream(resultTable, Row.class).print("result");
//        tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
        tableEnv.toAppendStream(overResult, Row.class).print("result");
        tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");

        env.execute();
    }
}

10.1定义处理时间Processing Time

//定义处理时间(Processing Time)
//TODO 这种写法目前还不支持,会报错
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
.field("pt", DataTypes.TIMESTAMP(3))
.proctime()
)

11.定义处理时间(Processing Time)

定义事件时间,同样有三种方法:
➢ 由 DataStream 转换成表时指定
➢ 定义 Table Schema 时指定
➢ 在创建表的 DDL 中定义

11.1定义事件时间(Event Time)

11.1.1 由 DataStream 转换成表时指定

在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性
// 将 DataStream转换为 Table,并指定时间字段
Table sensorTable =                        tableEnv.fromDataStream(dataStream,"id,timestamp.rowtime,temperature");
// 或者,直接追加时间字段
Table sensorTable = 
    tableEnv.fromDataStream(dataStream, " id, temperature, timestamp, rt.rowtime");

11.1.2定义 Table Schema 时指定

.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.rowtime(
new Rowtime()
.timestampsFromField("timestamp") // 从字段中提取时间戳
.watermarksPeriodicBounded(1000) // watermark延迟1秒 )
.field("temperature", DataTypes.DOUBLE())
)

11.1.3在创建表的 DDL 中定义

//interval '1' second    sql中一秒的写法
String sinkDDL=
"create table dataTable (" + " id varchar(20) not null, " 
    + " ts bigint, " 
    + " temperature double, " 
    + " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " 
    + " watermark for rt as rt - interval '1' second" + ") with (" 
    + " 'connector.type' = 'filesystem', " 
    + " 'connector.path' = '/sensor.txt', " 
    + " 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);

12.窗口(两种)

//Group Windows(分组窗口)
– 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

// Over Windows
– 针对每个输入行,计算相邻行范围内的聚合

12.1Group Windows

//1.Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。

//2.为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用
Table table = input
.window([w: GroupWindow] as "w") // 定义窗口,别名为 w
.groupBy("w, a") // 按照字段 a和窗口 w分组
.select("a, b.sum"); // 聚合

//3.Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作

12.1.1滚动窗口(Tumbling windows)

//滚动窗口要用 Tumble 类来定义

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))

// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))

// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))

12.1.2滑动窗口(Sliding windows)

//滑动窗口要用 Slide 类来定义

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))

// Sliding Processing-time window 
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))

// Sliding Row-count window  滚动计数窗口
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))

12.1.3会话窗口(Session windows)

//会话窗口要用 Session 类来定义

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))

// Session Processing-time Window
.window(Session.withGap("10.minutes").on(“proctime").as("w"))

12.1.4SQL 中的 Group Windows

Group Windows 定义在 SQL 查询的 Group By 子句中
//1. 定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
//"select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second)from sensor group by id, tumble(rt, interval '10' second)"
➢ TUMBLE(time_attr, interval)

//2. 定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度    
➢ HOP(time_attr, interval, interval)

//3. 定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
➢ SESSION(time_attr, interval)

12.1.5窗口实现

//toAppendStream 追加流,仅支持insert操作
//toRetractStream 撤回流

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @ClassName: TableTest5_TimeAndWindow
 * @Description:
 * @Version: 1.0
 */
public class TableTest5_TimeAndWindow {
    public static void main(String[] args) throws Exception {
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 读入文件数据,得到DataStream
        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        // 4. 将流转换成表,定义时间特性
        // pt:随便写,不要和关键字冲突就行,
        // proctime: 关键字,自动生成毫秒值
        //Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
        Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime");

        tableEnv.createTemporaryView("sensor", dataTable);

        // 5. 窗口操作
        // 5.1 Group Window
        // table API
        Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                .groupBy("id, tw")
                .select("id, id.count, temp.avg, tw.end");

        // SQL
        Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) " +
                "from sensor group by id, tumble(rt, interval '10' second)");

        // 5.2 Over Window
        // table API
        Table overResult = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                .select("id, rt, id.count over ow, temp.avg over ow");

        // SQL
        Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
                " from sensor " +
                " window ow as (partition by id order by rt rows between 2 preceding and current row)");

//        dataTable.printSchema();
//        tableEnv.toAppendStream(resultTable, Row.class).print("result");
//        tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
        tableEnv.toAppendStream(overResult, Row.class).print("result");
        tableEnv.toRetractStream(overSqlResult, Row.class).print("sql");

        env.execute();
    }
}

12.2Over Windows

12.2.1概念

Over window 聚合是标准 SQL 中已有的(over 子句),可以在查询的
SELECT 子句中定义
• Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
• Over windows 使用 window(w:overwindows*)子句定义,并在 select
()方法中通过别名来引用
    Table table = input
    .window([w: OverWindow] as "w")
    .select("a, b.sum over w, c.min over w"); 
• Table API 提供了 Over 类,来配置 Over 窗口的属性

12.2.2无界 Over Windows

可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定
义 Over windows
• 无界的 over window 是使用常量指定的
// 无界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
//无界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 无界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
//无界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))

12.2.3有界 Over Windows

有界的 over window 是用间隔的大小指定的

// 有界的事件时间 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件时间 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有界的处理时间 Row-count over window
.window(Over.partitionBy("a").orderBy("procime").preceding("10.rows").as("w"))

12.2.4SQL 中的 Over Windows

用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
• 目前仅支持在当前行范围之前的窗口
• ORDER BY 必须在单一的时间属性上指定
    SELECT COUNT(amount) OVER (
    PARTITION BY user
    ORDER BY proctime
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
    FROM Orders

函数(Functions)

1.函数
file

2.函数
file

用户自定义函数(UDF)

用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们
显著地扩展了查询的表达能力
• 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
• 函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当
用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,
这样Table API 或 SQL 解析器就可以识别并正确地解释它

标量函数(Scalar Functions)

用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值 • 为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类
Scalar Function,并实现(一个或多个)求值(eval)方法
• 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval
public static class HashCode extends ScalarFunction {
    private int factor = 13;
    public HashCode(int factor) {
        this.factor = factor;
    }
    public int eval(String s) {
        return s.hashCode() * factor; 
    } 
}

表函数(Table Functions)

用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同
的是,它可以返回任意数量的行作为输出,而不是单个值
• 为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类
TableFunction 并实现(一个或多个)求值方法
• 表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval
public static class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = ",";
    public Split(String separator) {
        this.separator = separator;
    }
    public void eval(String str) {
        for (String s : str.split(separator)) {
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    } 
}

聚合函数(Aggregate Functions)

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值
• 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的

file
file

AggregationFunction要求必须实现的方法:
    – createAccumulator()
    – accumulate()
    – getValue()
• AggregateFunction 的工作原理如下: 
– 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;
  可以通过调用 createAccumulator() 方法创建空累加器
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
– 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果

表聚合函数(Table Aggregate Functions)

• 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚   合为具有多行和多列的结果表 
• 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的

file

AggregationFunction 要求必须实现的方法:
    – createAccumulator()
    – accumulate()
    – emitValue()
• TableAggregateFunction 的工作原理如下: 
– 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据
  结构。通过调用 createAccumulator() 方法可以创建空累加器。
– 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
– 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-123456789987654321,http://hainiubl.com/topics/75813
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter