【干货】Flink 作业还只用 DataStream? you out!, Flink Sql 本地快速开发调试代码分享

分享 leeston9 ⋅ 于 2023-03-04 21:28:29 ⋅ 924 阅读

上代码:(伪责任链模式)
代码结构如下:
file
maven: 由于最近在研究JDBC连接器源码已经BinaryRowData 和 GenericRowData转换的源码,依赖有多余, 不需要这些依赖的可以自行删除

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <log4j.version>2.17.1</log4j.version>
        <slf4j.version>1.7.32</slf4j.version>
        <flink.version>1.15.1</flink.version>
        <project.version>1.15.1</project.version>
        <postgres.version>42.2.10</postgres.version>
        <oracle.version>19.3.0.0</oracle.version>
        <otj-pg-embedded.version>0.13.4</otj-pg-embedded.version>
    </properties>

     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink 应用程序缺少 JSON 格式的 deserialization connector。
        在 Flink 中使用 JSON 格式的 deserialization,需要在应用程序依赖中包含 flink-json 依赖,您可以在 pom.xml 文件中添加以下依赖:-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 -->
        <dependency>
            <groupId>com.amazon.redshift</groupId>
            <artifactId>redshift-jdbc42</artifactId>
            <version>2.1.0.1</version>
        </dependency>

        <!--连接postGre需要的依赖-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-sql-client</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--        </dependency>-->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.3.1</version>
        </dependency>

        <!--rocksdb相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>5.18.4</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-compiler</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

        <!--如下是FLink的JDBC连接器需要引入的依赖-->
        <!-- Postgres -->

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>${postgres.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- Oracle -->
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>${oracle.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- MySQL tests -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
            <!--            <scope>test</scope>-->
        </dependency>

Handler :


// 父类
public abstract class Handler {
    protected Handler nextHandler;
    protected StreamTableEnvironment tableEnv;
    protected ArrayList<Object> arrayList;
    protected Configuration conf;

    public Handler() {
        arrayList = new ArrayList<>();
    }

    public Handler setNextHandler(Handler nextHandler) {
        this.nextHandler = nextHandler;
        return this;
    }

    public abstract void handle(Handler subHandler);

    public  void handle() throws Exception {
        throw new Exception("SubHandler Handle Failed!");

    }
}

ConfHandler:


public class ConfHandler extends Handler {
    private final String confPath;

    public ConfHandler(String confPath) {
        this.confPath = confPath;
    }

    @Override
    public void handle(Handler subHandler) {
        System.out.println(" ================ ConfHandler ================ ");
//        PropertyConfigurator.configure("src/main/resources/log4j.properties");
        if (null == subHandler.conf) {
            subHandler.conf = new Configuration();
        }
        subHandler.conf.setString("rest.port", "9101");
        subHandler.conf.set(RestOptions.BIND_PORT, "9101");
        subHandler.conf.set(WebOptions.LOG_PATH, "logs/FlinkTestLocal.log");
        subHandler.conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "logs/taskManager.log");
        for (String line : Objects.requireNonNull(Objects.requireNonNull(FileUtil.LoadFile(this.confPath)).replaceAll("hdfs:///", "file:///D:/")).trim().split("\n", -1)) {
            if (!line.startsWith("%flink")
                    && !line.startsWith("flink.execution.ars")
                    && !line.startsWith("flink.udf.jars")
                    && !line.startsWith("yarn")
                    && !line.startsWith("execution.checkpointing")
                    && line.trim().length() > 0) {
                String[] split = line.split("\\s+");
                String confKey = split[0];
                String confValue = line.substring(confKey.length()).trim();
                subHandler.conf.setString(confKey, confValue);
            }
        }

        // 添加的默认配置信息
        subHandler.conf.setString(" ", "");
        if (null != this.nextHandler) this.nextHandler.handle(subHandler);
    }

    @Override
    public void handle() throws Exception {
        this.handle(this);
    }
}

EnvHandler:


public class EnvHandler extends Handler {
    private final String envMode;

    public EnvHandler(String envMode) {
        this.envMode = envMode;
    }

    @Override
    public void handle(Handler subHandler) {
        if (this.envMode.equals("stream")) {
            System.out.println("=============== =StreamEnvHandler ================= ");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(subHandler.conf);
            subHandler.tableEnv = StreamTableEnvironment.create(env);
        } else if (this.envMode.equals("table")) {
            System.out.println("================ TableEnvHandler ================= ");
            Configuration sConf = new Configuration();
            System.out.println(" [------ your steamEnv configuration ------] ");
            for (Map.Entry<String, String> userConf : subHandler.conf.toMap().entrySet()) {
                if (!userConf.getKey().startsWith("table") && userConf.getKey().trim().length() > 0) {
                    sConf.setString(userConf.getKey(), userConf.getValue());
                    System.out.println(userConf.getKey() + "=" + userConf.getValue());
                }
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(sConf);
            ExecutionConfig executionConfig = env.getConfig();
            HashMap<Object, Object> configProp = (HashMap) JSONObject.parseObject(JSONObject.toJSONString(executionConfig), Map.class);
            System.out.println("\n [====== Total steamEnv configuration ====]\n ");
            for (Map.Entry<Object, Object> entry : configProp.entrySet())
                System.out.println(entry.getKey() + " -----> " + entry.getValue());

            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    //.useBlinkPlanner()
                    .inStreamingMode().build();
            subHandler.tableEnv = StreamTableEnvironment.create(env, settings);
            TableConfig config = subHandler.tableEnv.getConfig();
            config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
            config.setSqlDialect(SqlDialect.DEFAULT);
            Configuration tConf = config.getConfiguration();
            System.out.println(" [------ your tableEnv configuration ------] ");
            for (Map.Entry<String, String> userConf : subHandler.conf.toMap().entrySet()) {
                if (userConf.getKey().startsWith("table") && userConf.getKey().trim().length() > 0) {
                    tConf.setString(userConf.getKey(), userConf.getValue());
                    System.out.println(userConf.getKey() + "=" + userConf.getValue());
                }

            }
            System.out.println("\n [====== Total tableEnv configuration ====]\n");
            for (Map.Entry<String, String> entry : subHandler.conf.toMap().entrySet()) {
                if (entry.getKey().trim().length() > 0)
                    System.out.println(entry.getKey() + " -----> " + entry.getValue());
            }

        } else {
            System.out.println("wrong stream mode!");
            System.exit(-2);
        }
        if (null != this.nextHandler) this.nextHandler.handle(subHandler);
    }

    @Override
    public void handle() throws Exception {
        this.handle(this);
    }
}

DDLHandler:

public class DDLHandler extends Handler {
    private String ddlPath;

    public DDLHandler(String ddlPath) {
        this.ddlPath = ddlPath;
    }

    @Override
    public void handle(Handler subHandler) {
        System.out.println("================== DDLHandler =====================");
        for (String ddlLine : Objects.requireNonNull(Objects.requireNonNull(FileUtil.LoadFile(ddlPath))
                .replaceAll("\";'", "\001"))
                .trim().split(";")) {
            if (ddlLine.trim().length() > 0
                    && !ddlLine.trim().startsWith("%flink")
                    && !ddlLine.trim().toUpperCase().startsWith("DROP")
            ) {
                System.out.println("------------------------------------------");
                System.out.println(ddlLine.replaceAll("\001", "\";'"));
                String tarSql = ddlLine.replaceAll("\001", "\";'").trim();
                if (tarSql.endsWith(";"))
                    subHandler.tableEnv.executeSql(tarSql.substring(0, tarSql.length() - 1)).print();
                else
                    subHandler.tableEnv.executeSql(tarSql).print();

            }
        }
        if (null != this.nextHandler) this.nextHandler.handle(subHandler);
    }

    @Override
    public void handle() throws Exception {
        this.handle(this);
    }
}

DMLHandler:

public class DMLHandler extends Handler {
    private String dmlPath;

    public DMLHandler(String dmlPath) {
        this.dmlPath = dmlPath;
    }

    @Override
    public void handle(Handler subHandler) {
        System.out.println("================== DMLHandler =====================");
        for (String line : Objects.requireNonNull(Objects.requireNonNull(FileUtil.LoadFile(dmlPath))
                        .replaceAll("\";'", "\001"))
                .trim().split(";")) {
            if (line.trim().length() > 0
                    && !line.trim().startsWith("%flink")
                    && !line.trim().toUpperCase().startsWith("DROP")
            ) {
                System.out.println("------------------------------------------");
                System.out.println(line.replaceAll("\001", "\";'"));
                subHandler.tableEnv.executeSql(line.replaceAll("\001", "\";'")).print();

            }
        }
        if (null != this.nextHandler) this.nextHandler.handle(subHandler);
    }

    @Override
    public void handle() throws Exception {
        this.handle(this);
    }
}

DQLHandler:

public class   DQLHandler extends Handler {
    private String dqlPath;

    public DQLHandler(String dqlPath) {
        this.dqlPath = dqlPath;
    }

    @Override
    public void handle(Handler subHandler) {
        System.out.println("================== DQLHandler =====================");
        for (String line : Objects.requireNonNull(Objects.requireNonNull(FileUtil.LoadFile(dqlPath))
                .replaceAll("\";'", "\001"))
                .trim().split(";")) {
            if (line.trim().length() > 0
                    && !line.trim().startsWith("%flink")
                    && !line.trim().toUpperCase().startsWith("DROP")
            ) {
                System.out.println("------------------------------------------");
//                TableResult result = subHandler.tableEnv.executeSql(line.replaceAll("\001", "\";'"));
                Table table = subHandler.tableEnv.sqlQuery(line.replaceAll("\001", "\";'"));
                table.printExplain();
                TableResult result = table.execute();
                System.out.println("SCHEMA:" + result.getResolvedSchema().getColumnDataTypes());
                System.out.println("SCHEMA FOR JSON: " + JSONObject.toJSONString(result.getResolvedSchema().getColumnDataTypes()));
                System.out.println("ResultKind: " + result.getResultKind());
                System.out.println("ResolvedSchema: " + result.getResolvedSchema());
                result.print();

            }
        }
        if (null != this.nextHandler) this.nextHandler.handle(subHandler);
    }
    @Override
    public void handle() throws Exception {
        this.handle(this);
    }
}

conf 文件夹下需要配置对应的 配置:

  1. Flink env 及 tableEnv 相关的配置:basicConf: 如下简单示例
    component_id 10001
    jobmanager.memory.process.size 2048m
    taskmanager.memory.process.size 10240m
    taskmanager.numOfTaskSlots 1
    table.exec.resource.default-parallelism 1
    table.exec.state.ttl 6000
    execution.checkpointing.interval 600000
    execution.checkpointing.min-pause 780000
    state.backend rocksdb
    state.backend.incremental true
    state.checkpoints.dir hdfs:///flink/checkpoints/10001

    FlinkSQL中用到的DDL建表语句:DDL.sql:示例
    由于SQL 中存在反引号 `所以会有影响

DROP TABLE if EXISTS LEE_KAFKA_01;
CREATE TABLE LEE_KAFKA_01
(
col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR,
col4 timestamp(3),
WATERMARK FOR col4 AS col4 - INTERVAL '0' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'TEST-01',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'lee-group-1',
'format' = 'json',
'json.fail-on-missing-field' = 'true'
);

DROP TABLE if EXISTS LEE_KAFKA_02;
CREATE TABLE LEE_KAFKA_02
(
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000',
'fields.f_sequence.kind' = 'sequence',
'fields.f_sequence.start' = '1',
'fields.f_sequence.end' = '1000000',
'fields.f_random.min' = '10',
'fields.f_random.max' = '100',
'fields.f_random_str.length' = '10'
);

DROP TABLE if EXISTS LEE_KAFKA_022;
CREATE TABLE LEE_KAFKA_022
(
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000',
'fields.f_sequence.kind' = 'sequence',
'fields.f_sequence.start' = '1',
-- 事先把序列加载到内存中,才能起任务序列越长,越耗时
'fields.f_sequence.end' = '1000000',
'fields.f_random.min' = '10',
'fields.f_random.max' = '100',
'fields.f_random_str.length' = '10'
)
;

DROP TABLE if EXISTS LEE_KAFKA_03;
CREATE TABLE LEE_KAFKA_03
(
col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR,
col4 timestamp(3)
) WITH (
'connector' = 'kafka',
'topic' = 'TEST-01',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'lee-group-1',
'format' = 'json',
'json.fail-on-missing-field' = 'true'
);

CREATE TABLE finance_log
(
_id varchar(128),
user_id varchar(128),
cny double precision,
coin bigint,
via varchar(128),
rid varchar(128),
session_nick_name varchar(1024),
session_id varchar(128),
remark varchar(128),
returncoin bigint,
to_id varchar(128),
transfer_ratio varchar(128),
amount varchar(128),
qd varchar(128),
timestamp Timestamp(3),
en_mobile varchar(128)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://XXXXX.XXXXXX.XX-XXXX-1.redshift.amazonaws.com.cn:5439/XXXXX',
'table-name' = 'DB.YOUR_TABLE',
'driver' = 'org.postgresql.Driver',
'username' = 'USERNAME',
'password' = 'XXXXXXX'
);

CREATE TABLE blackhole_table
(
_id VARCHAR(100),
user_id VARCHAR(100),
cny DOUBLE PRECISION,
coin BIGINT,
via VARCHAR(100),
rid VARCHAR(100),
session_nick_name VARCHAR(100),
session_id VARCHAR(100),
remark VARCHAR(100),
returncoin BIGINT,
to_id VARCHAR(100),
transfer_ratio VARCHAR(100),
amount VARCHAR(100),
qd VARCHAR(100),
timestamp TIMESTAMP,
en_mobile VARCHAR(100)
) WITH (
'connector' = 'blackhole'
);

Flink SQL 的 DML语句:DML.sql 示例
```roomsql

INSERT INTO blackhole_table
select b.* from
           (SELECT 'id_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS _id,
                   'user_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS user_id,
                   FLOOR(RAND() * 1000) AS cny,
                   cast(FLOOR(RAND() * 1000) as bigint) AS coin,
                   'via_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS via,
                   'rid_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS rid,
                   'nickname_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS session_nick_name,
                   'session_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS session_id,
                   'remark_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS remark,
                   CAST(FLOOR(RAND() * 1000) AS BIGINT) AS returncoin,
                   'to_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS to_id,
                   'ratio_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS transfer_ratio,
                   CAST(FLOOR(RAND() * 1000) AS VARCHAR) || ' USD' AS amount,
                   'qd_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS qd,
                   -- TO_TIMESTAMP(FROM_UNIXTIME(numeric_col))
                   localtimestamp AS `timestamp`,
                   'mobile_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS en_mobile
FROM LEE_KAFKA_02) a left join
    (SELECT 'id_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS _id,
    'user_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS user_id,
  FLOOR(RAND() * 1000) AS cny,
  cast(FLOOR(RAND() * 1000) as bigint) AS coin,
    'via_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS via,
    'rid_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS rid,
    'nickname_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS session_nick_name,
    'session_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS session_id,
    'remark_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS remark,
  CAST(FLOOR(RAND() * 1000)AS BIGINT) AS returncoin,
    'to_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS to_id,
    'ratio_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS transfer_ratio,
  CAST(FLOOR(RAND() * 1000) AS VARCHAR) || ' USD' AS amount,
    'qd_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS qd,
   localtimestamp AS `timestamp`,
    'mobile_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS en_mobile
FROM LEE_KAFKA_022) b
on 1=1
and a.`timestamp`  BETWEEN b.`timestamp` - INTERVAL '20' SECOND  and b.`timestamp` + INTERVAL '10' SECOND

UNION ALL

select b.* from
    (SELECT 'id_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS _id,
            'user_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS user_id,
            FLOOR(RAND() * 1000) AS cny,
            cast(FLOOR(RAND() * 1000) as bigint) AS coin,
            'via_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS via,
            'rid_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS rid,
            'nickname_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS session_nick_name,
            'session_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS session_id,
            'remark_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS remark,
            CAST(FLOOR(RAND() * 1000) AS BIGINT) AS returncoin,
            'to_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS to_id,
            'ratio_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS transfer_ratio,
            CAST(FLOOR(RAND() * 1000) AS VARCHAR) || ' USD' AS amount,
            'qd_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS qd,
            -- TO_TIMESTAMP(FROM_UNIXTIME(numeric_col))
            localtimestamp AS `timestamp`,
            'mobile_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS en_mobile
     FROM LEE_KAFKA_022) a left join
    (SELECT 'id_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS _id,
            'user_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS user_id,
            FLOOR(RAND() * 1000) AS cny,
            cast(FLOOR(RAND() * 1000) as bigint) AS coin,
            'via_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS via,
            'rid_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS rid,
            'nickname_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS session_nick_name,
            'session_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS session_id,
            'remark_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS remark,
            CAST(FLOOR(RAND() * 1000)AS BIGINT) AS returncoin,
            'to_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS to_id,
            'ratio_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS transfer_ratio,
            CAST(FLOOR(RAND() * 1000) AS VARCHAR) || ' USD' AS amount,
            'qd_' || CAST(FLOOR(RAND() * 100) AS VARCHAR) AS qd,
            localtimestamp AS `timestamp`,
            'mobile_' || CAST(FLOOR(RAND() * 10000) AS VARCHAR) AS en_mobile
     FROM LEE_KAFKA_02) b

    on 1=1
        and a.`timestamp`  BETWEEN b.`timestamp` - INTERVAL '20' SECOND  and b.`timestamp` + INTERVAL '10' SECOND

Flink SQL 的 DQL语句:DQL.sql示例:


SELECT distinct *
    FROM (
        SELECT DISTINCT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY localtimestamp DESC) as rownum
          FROM (
                SELECT window_start,
                       window_end,
                       f_random,
                       LAST_VALUE(f_sequence) as s_last,
                       MAX(localtimestamp) m_localtimestamp,
                       COUNT(*) as cnt_total,
                       COUNT(DISTINCT localtimestamp) time_dis_count
                FROM TABLE(
                            TUMBLE(TABLE LEE_KAFKA_02, DESCRIPTOR(ts), INTERVAL '10' SECONDS)
                         )
                GROUP BY window_start, window_end, f_random
                order by  m_localtimestamp desc -- UPDATETIMEOUT 10 MINUTE -- 算子级别的 TTL 设置
               )
          ) a
        left join LEE_KAFKA_01 b
    on cast(a.f_random as varchar) = b.col1
    and a.m_localtimestamp
        between col4 - interval '1' hour and col4 + interval '1' hour
WHERE rownum < 10; -- 取出以 window_start, window_end, f_random 分组的最新的10条数据, KAFKA 表没数据 随便关联的,b表没数据,反压是正常现象

用户启动类: client.java 示例:
注意:本地测试时,需要指定conf文件夹的位置到环境变量CONF_FILE_DIR中,DQL 和 DML 不可同时使用,webWU端口号是:9101

/**
 * Created by Lijun at 2023/02/17
 */
public class Client {
    private static final Logger LOGGER =  LoggerFactory.getLogger(Client.class);
    public static void main(String[] args) throws Exception {
        LOGGER.info("------------------START---------------------");
        //新增默认配置
        System.out.println("准备删除文件...");
        FileUtil.deleteFolder(new File("logs"));
        // System.exit(-11);
        // flinkWebUI 的地址为 9101
        if(null == System.getenv("CONF_FILE_DIR")){
            System.err.println("环境变量:[CONF_FILE_DIR]未配置, 程序退出!");
            LOGGER.error("can not find the conf dir, please set in your Env!");
            System.exit(-2);
        }
        String confDir = System.getenv("CONF_FILE_DIR");
        EnvHandler envHandler = new EnvHandler("table");
        ConfHandler confHandler = new ConfHandler(confDir + "/basicConf");
        DDLHandler ddlHandler = new DDLHandler(confDir + "/DDL.sql");
        DMLHandler dmlHandler = new DMLHandler(confDir + "/DML.sql");
        DQLHandler dqlHandler = new DQLHandler(confDir +  "/DQL.sql");

        // dml 语句时使用
         confHandler.setNextHandler(envHandler.setNextHandler(ddlHandler.setNextHandler(dmlHandler))).handle();
        // dql 语句时使用
//        confHandler.setNextHandler(envHandler.setNextHandler(ddlHandler.setNextHandler(dqlHandler))).handle();
    }
}

log4j.properties 配置: 添加了jobmanager 和 taskmanager 的不同log位置输出:

# Root logger option,file ?????????,root??
log4j.rootLogger=INFO, file
# Direct log messages to stdout
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Reduce Noise
log4j.logger.org.apache.flink=INFO
log4j.logger.org.apache.flink.util.ShutdownHookUtil=ERROR
log4j.logger.org.apache.flink.runtime.entrypoint.ClusterEntrypoint=INFO
log4j.logger.org.apache.flink.runtime.rest.RestServerEndpoint=INFO
log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=INFO
log4j.logger.org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher=INFO
log4j.logger.org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager=INFO
log4j.logger.org.apache.flink.runtime.metrics.dump.QueryableStateUtils=INFO
log4j.logger.org.apache.flink.runtime.metrics.MetricRegistry=INFO
log4j.logger.org.apache.flink.runtime.rpc.akka.AkkaRpcService=INFO
# Disable Metrics for now
log4j.logger.org.apache.flink.metrics=ERROR
# Error file appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/FlinkTestLocal.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %-60c %x - %m%n
log4j.logger.akka=WARN
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=WARN
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

# define yourself taskmanager appender
log4j.logger.org.apache.flink.runtime.taskmanager=INFO, LeeTaskManager

# Define the file appender for handler logger
log4j.appender.LeeTaskManager=org.apache.log4j.RollingFileAppender
log4j.appender.LeeTaskManager.File=logs/taskManager.log
log4j.appender.LeeTaskManager.MaxFileSize=10MB
log4j.appender.LeeTaskManager.MaxBackupIndex=10
log4j.appender.LeeTaskManager.layout=org.apache.log4j.PatternLayout
log4j.appender.LeeTaskManager.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

util 工具类:(删除LOG时使用)

public class FileUtil {
    public static void deleteFolder(File folder) {
        if (folder.isDirectory()) {
            File[] files = folder.listFiles();
            if (files != null) {
                for (File file : files) {
                    deleteFolder(file);
                }
            }
        }
        try {
            if (!folder.delete()) {
                System.err.println("Failed to delete " + folder.getAbsolutePath());
            }
        } catch (Exception e) {
            System.err.println("Failed to delete " + folder.getAbsolutePath() + ": " + e.getMessage());
            e.printStackTrace();
        }
    }
    public static String LoadFile(String filePath) {

        filePath = filePath.replaceAll("\\\\", "/");
        try {
            InputStream inputStream = !filePath.contains("/") ? FileUtil.class.getClassLoader().getResourceAsStream(filePath)
                    : Files.newInputStream(Paths.get(filePath));

            ByteArrayOutputStream bots = new ByteArrayOutputStream();
            int _byte;
            while (true) {
                assert inputStream != null;
                if ((_byte = inputStream.read()) == -1) break;
                bots.write(_byte);
            }
            String res = bots.toString();
            bots.close();
            inputStream.close();
            return res;
        } catch (
                Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void deleteFile(File file) {
        if (!file.exists()) {
            return;
        }
        File[] files = file.listFiles();
        if (file.isFile()) {
            file.delete();
            return;
        }
        if (files.length == 0) {
            file.delete();
        } else {
            for (File value : files) {
                if (value.isDirectory()) {
                    deleteFile(value);
                    value.delete();
                } else {
                    value.delete();
                }
            }
            file.delete();
        }
    }
}

配置好并运行client.java启动你的作业:我以一个DQL作业为实例:控制台打印如下: (工具类中删除LOG的方法有点问题哦,你可以自行修改)

 com.lee.flinkSqlD.client.Client
准备删除文件...
 ================ ConfHandler ================ 
Failed to delete D:\workspace\FlinkTestLocalApache\FlinkTestLocalApache\logs\FlinkTestLocal.log
Failed to delete D:\workspace\FlinkTestLocalApache\FlinkTestLocalApache\logs\taskManager.log
Failed to delete D:\workspace\FlinkTestLocalApache\FlinkTestLocalApache\logs
================ TableEnvHandler ================= 
 [------ your steamEnv configuration ------] 
taskmanager.memory.process.size=10240m
component_id=10001
web.log.path=logs/FlinkTestLocal.log
state.backend.incremental=true
jobmanager.memory.process.size=2048m
rest.port=9101
taskmanager.log.path=logs/taskManager.log
rest.bind-port=9101
state.backend=rocksdb
taskmanager.numOfTaskSlots=1
state.checkpoints.dir=file:///D:/flink/checkpoints/10001

 [====== Total steamEnv configuration ====]

autoWatermarkInterval -----> 200
globalJobParameters -----> {}
maxParallelism -----> -1
parallelism -----> 12
numberOfExecutionRetries -----> -1
taskCancellationInterval -----> -1
defaultKryoSerializerClasses -----> {}
latencyTrackingInterval -----> 0
taskCancellationTimeout -----> -1
registeredPojoTypes -----> []
forceAvroEnabled -----> false
registeredKryoTypes -----> []
dynamicGraph -----> false
latencyTrackingConfigured -----> false
executionRetryDelay -----> 10000
forceKryoEnabled -----> false
registeredTypesWithKryoSerializers -----> {}
executionMode -----> PIPELINED
closureCleanerEnabled -----> true
autoTypeRegistrationDisabled -----> false
defaultKryoSerializers -----> {}
closureCleanerLevel -----> RECURSIVE
registeredTypesWithKryoSerializerClasses -----> {}
materializationMaxAllowedFailures -----> 3
periodicMaterializeIntervalMillis -----> 600000
defaultInputDependencyConstraint -----> ANY
restartStrategy -----> {"description":"Cluster level default restart strategy"}
objectReuseEnabled -----> false
useSnapshotCompression -----> false
 [------ your tableEnv configuration ------] 
table.exec.resource.default-parallelism=1
table.exec.state.ttl=6000

 [====== Total tableEnv configuration ====]

taskmanager.memory.process.size -----> 10240m
component_id -----> 10001
web.log.path -----> logs/FlinkTestLocal.log
table.exec.resource.default-parallelism -----> 1
state.backend.incremental -----> true
jobmanager.memory.process.size -----> 2048m
rest.port -----> 9101
taskmanager.log.path -----> logs/taskManager.log
table.exec.state.ttl -----> 6000
rest.bind-port -----> 9101
state.backend -----> rocksdb
taskmanager.numOfTaskSlots -----> 1
state.checkpoints.dir -----> file:///D:/flink/checkpoints/10001
================== DDLHandler =====================
------------------------------------------

CREATE TABLE LEE_KAFKA_01
(
    col1 VARCHAR,
    col2 VARCHAR,
    col3 VARCHAR,
    col4 timestamp(3),
    WATERMARK FOR col4 AS col4 - INTERVAL '0' SECOND
) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-01',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'lee-group-1',
      'format' = 'json',
      'json.fail-on-missing-field' = 'true'
      )
OK
------------------------------------------

CREATE TABLE LEE_KAFKA_02
(
    f_sequence   INT,
    f_random     INT,
    f_random_str STRING,
    ts AS localtimestamp,
    WATERMARK FOR ts AS ts
) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1000',
      'fields.f_sequence.kind' = 'sequence',
      'fields.f_sequence.start' = '1',
      'fields.f_sequence.end' = '1000000',
      'fields.f_random.min' = '10',
      'fields.f_random.max' = '100',
      'fields.f_random_str.length' = '10'
      )

OK
------------------------------------------

CREATE TABLE LEE_KAFKA_022
(
    f_sequence   INT,
    f_random     INT,
    f_random_str STRING,
    ts AS localtimestamp,
    WATERMARK FOR ts AS ts
) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1000',
      'fields.f_sequence.kind' = 'sequence',
      'fields.f_sequence.start' = '1',
      -- 事先把序列加载到内存中,才能起任务序列越长,越耗时
      'fields.f_sequence.end' = '1000000',
      'fields.f_random.min' = '10',
      'fields.f_random.max' = '100',
      'fields.f_random_str.length' = '10'
      )

OK
------------------------------------------

CREATE TABLE LEE_KAFKA_03
(
    col1 VARCHAR,
    col2 VARCHAR,
    col3 VARCHAR,
    col4 timestamp(3)
) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST-01',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'lee-group-1',
      'format' = 'json',
      'json.fail-on-missing-field' = 'true'
      )
OK
------------------------------------------

CREATE TABLE finance_log
(
    _id               varchar(128),
    user_id           varchar(128),
    cny               double precision,
    coin              bigint,
    via               varchar(128),
    rid               varchar(128),
    session_nick_name varchar(1024),
    session_id        varchar(128),
    remark            varchar(128),
    returncoin        bigint,
    to_id             varchar(128),
    transfer_ratio    varchar(128),
    amount            varchar(128),
    qd                varchar(128),
    `timestamp`       Timestamp(3),
    en_mobile         varchar(128)
) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:postgresql://xxxx-xxxx.xxx.cn-north-1.redshift.amazonaws.com.cn:5439/xxx',
      'table-name' = 'xxx.xx',
      'driver' = 'org.postgresql.Driver',
      'username' = 'xxx',
      'password' = 'xxx'
      )
OK
------------------------------------------

CREATE TABLE blackhole_table
(
    _id               VARCHAR(100),
    user_id           VARCHAR(100),
    cny               DOUBLE PRECISION,
    coin              BIGINT,
    via               VARCHAR(100),
    rid               VARCHAR(100),
    session_nick_name VARCHAR(100),
    session_id        VARCHAR(100),
    remark            VARCHAR(100),
    returncoin        BIGINT,
    to_id             VARCHAR(100),
    transfer_ratio    VARCHAR(100),
    amount            VARCHAR(100),
    qd                VARCHAR(100),
    `timestamp`       TIMESTAMP,
    en_mobile         VARCHAR(100)
) WITH (
      'connector' = 'blackhole'
      )
OK
================== DQLHandler =====================
------------------------------------------
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}])
+- LogicalProject(window_start=[$0], window_end=[$1], f_random=[$2], s_last=[$3], m_localtimestamp=[$4], cnt_total=[$5], time_dis_count=[$6], rownum=[$7], col1=[$8], col2=[$9], col3=[$10], col4=[$11])
   +- LogicalFilter(condition=[<($7, 10)])
      +- LogicalProject(window_start=[$0], window_end=[$1], f_random=[$2], s_last=[$3], m_localtimestamp=[$4], cnt_total=[$5], time_dis_count=[$6], rownum=[$7], col1=[$9], col2=[$10], col3=[$11], col4=[$12])
         +- LogicalJoin(condition=[AND(=($8, $9), >=($4, -($12, 3600000:INTERVAL HOUR)), <=($4, +($12, 3600000:INTERVAL HOUR)))], joinType=[left])
            :- LogicalProject(window_start=[$0], window_end=[$1], f_random=[$2], s_last=[$3], m_localtimestamp=[$4], cnt_total=[$5], time_dis_count=[$6], rownum=[ROW_NUMBER() OVER (PARTITION BY $0, $1 ORDER BY LOCALTIMESTAMP DESC NULLS LAST)], f_random0=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
            :  +- LogicalAggregate(group=[{0, 1, 2}], s_last=[LAST_VALUE($3)], m_localtimestamp=[MAX($4)], cnt_total=[COUNT()], time_dis_count=[COUNT(DISTINCT $4)])
            :     +- LogicalProject(window_start=[$4], window_end=[$5], f_random=[$1], f_sequence=[$0], $f4=[LOCALTIMESTAMP])
            :        +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3), 10000:INTERVAL SECOND)], rowType=[RecordType(INTEGER f_sequence, INTEGER f_random, VARCHAR(2147483647) f_random_str, TIMESTAMP(3) *ROWTIME* ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
            :           +- LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], ts=[$3])
            :              +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
            :                 +- LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], ts=[LOCALTIMESTAMP])
            :                    +- LogicalTableScan(table=[[default_catalog, default_database, LEE_KAFKA_02]])
            +- LogicalWatermarkAssigner(rowtime=[col4], watermark=[-($3, 0:INTERVAL SECOND)])
               +- LogicalTableScan(table=[[default_catalog, default_database, LEE_KAFKA_01]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4])
+- Exchange(distribution=[hash[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4]])
   +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7 AS rownum, col1, col2, col3, col4])
      +- Join(joinType=[LeftOuterJoin], where=[AND(=($8, col1), >=(m_localtimestamp, -(col4, 3600000:INTERVAL HOUR)), <=(m_localtimestamp, +(col4, 3600000:INTERVAL HOUR)))], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7, $8, col1, col2, col3, col4], leftInputSpec=[HasUniqueKey], rightInputSpec=[NoUniqueKey])
         :- Exchange(distribution=[hash[$8]])
         :  +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7_0 AS $7, $8])
         :     +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[$7 DESC], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7, $8, $7_0])
         :        +- Exchange(distribution=[single])
         :           +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, LOCALTIMESTAMP() AS $7, CAST(f_random AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $8])
         :              +- WindowAggregate(groupBy=[f_random], window=[TUMBLE(time_col=[ts], size=[10 s])], select=[f_random, LAST_VALUE(f_sequence) AS s_last, MAX($f4) AS m_localtimestamp, COUNT(*) AS cnt_total, COUNT(DISTINCT $f4) AS time_dis_count, start('w$) AS window_start, end('w$) AS window_end])
         :                 +- Exchange(distribution=[hash[f_random]])
         :                    +- Calc(select=[f_random, f_sequence, LOCALTIMESTAMP() AS $f4, ts])
         :                       +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
         :                          +- Calc(select=[f_sequence, f_random, LOCALTIMESTAMP() AS ts])
         :                             +- TableSourceScan(table=[[default_catalog, default_database, LEE_KAFKA_02]], fields=[f_sequence, f_random, f_random_str])
         +- Exchange(distribution=[hash[col1]])
            +- Calc(select=[col1, col2, col3, CAST(col4 AS TIMESTAMP(3)) AS col4])
               +- TableSourceScan(table=[[default_catalog, default_database, LEE_KAFKA_01, watermark=[-(col4, 0:INTERVAL SECOND)]]], fields=[col1, col2, col3, col4])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4])
+- Exchange(distribution=[hash[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, rownum, col1, col2, col3, col4]])
   +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7 AS rownum, col1, col2, col3, col4])
      +- Join(joinType=[LeftOuterJoin], where=[(($8 = col1) AND (m_localtimestamp >= (col4 - 3600000:INTERVAL HOUR)) AND (m_localtimestamp <= (col4 + 3600000:INTERVAL HOUR)))], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7, $8, col1, col2, col3, col4], leftInputSpec=[HasUniqueKey], rightInputSpec=[NoUniqueKey])
         :- Exchange(distribution=[hash[$8]])
         :  +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7_0 AS $7, $8])
         :     +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[], orderBy=[$7 DESC], select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, $7, $8, $7_0])
         :        +- Exchange(distribution=[single])
         :           +- Calc(select=[window_start, window_end, f_random, s_last, m_localtimestamp, cnt_total, time_dis_count, LOCALTIMESTAMP() AS $7, CAST(f_random AS VARCHAR(2147483647)) AS $8])
         :              +- WindowAggregate(groupBy=[f_random], window=[TUMBLE(time_col=[ts], size=[10 s])], select=[f_random, LAST_VALUE(f_sequence) AS s_last, MAX($f4) AS m_localtimestamp, COUNT(*) AS cnt_total, COUNT(DISTINCT $f4) AS time_dis_count, start('w$) AS window_start, end('w$) AS window_end])
         :                 +- Exchange(distribution=[hash[f_random]])
         :                    +- Calc(select=[f_random, f_sequence, LOCALTIMESTAMP() AS $f4, ts])
         :                       +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
         :                          +- Calc(select=[f_sequence, f_random, LOCALTIMESTAMP() AS ts])
         :                             +- TableSourceScan(table=[[default_catalog, default_database, LEE_KAFKA_02]], fields=[f_sequence, f_random, f_random_str])
         +- Exchange(distribution=[hash[col1]])
            +- Calc(select=[col1, col2, col3, CAST(col4 AS TIMESTAMP(3)) AS col4])
               +- TableSourceScan(table=[[default_catalog, default_database, LEE_KAFKA_01, watermark=[-(col4, 0:INTERVAL SECOND)]]], fields=[col1, col2, col3, col4])

SCHEMA:[TIMESTAMP(3) NOT NULL, TIMESTAMP(3) NOT NULL, INT, INT, TIMESTAMP(3) NOT NULL, BIGINT NOT NULL, BIGINT NOT NULL, BIGINT NOT NULL, STRING, STRING, STRING, TIMESTAMP(3) *ROWTIME*]
SCHEMA FOR JSON: [{"children":[],"conversionClass":"java.time.LocalDateTime","logicalType":{"children":[],"defaultConversion":"java.time.LocalDateTime","kind":"REGULAR","nullable":false,"precision":3,"typeRoot":"TIMESTAMP_WITHOUT_TIME_ZONE"}},{"children":[],"conversionClass":"java.time.LocalDateTime","logicalType":{"children":[],"defaultConversion":"java.time.LocalDateTime","kind":"REGULAR","nullable":false,"precision":3,"typeRoot":"TIMESTAMP_WITHOUT_TIME_ZONE"}},{"children":[],"conversionClass":"java.lang.Integer","logicalType":{"children":[],"defaultConversion":"java.lang.Integer","nullable":true,"typeRoot":"INTEGER"}},{"children":[],"conversionClass":"java.lang.Integer","logicalType":{"children":[],"defaultConversion":"java.lang.Integer","nullable":true,"typeRoot":"INTEGER"}},{"children":[],"conversionClass":"java.time.LocalDateTime","logicalType":{"children":[],"defaultConversion":"java.time.LocalDateTime","kind":"REGULAR","nullable":false,"precision":3,"typeRoot":"TIMESTAMP_WITHOUT_TIME_ZONE"}},{"children":[],"conversionClass":"java.lang.Long","logicalType":{"children":[],"defaultConversion":"java.lang.Long","nullable":false,"typeRoot":"BIGINT"}},{"children":[],"conversionClass":"java.lang.Long","logicalType":{"children":[],"defaultConversion":"java.lang.Long","nullable":false,"typeRoot":"BIGINT"}},{"children":[],"conversionClass":"java.lang.Long","logicalType":{"children":[],"defaultConversion":"java.lang.Long","nullable":false,"typeRoot":"BIGINT"}},{"children":[],"conversionClass":"java.lang.String","logicalType":{"children":[],"defaultConversion":"java.lang.String","length":2147483647,"nullable":true,"typeRoot":"VARCHAR"}},{"children":[],"conversionClass":"java.lang.String","logicalType":{"children":[],"defaultConversion":"java.lang.String","length":2147483647,"nullable":true,"typeRoot":"VARCHAR"}},{"children":[],"conversionClass":"java.lang.String","logicalType":{"children":[],"defaultConversion":"java.lang.String","length":2147483647,"nullable":true,"typeRoot":"VARCHAR"}},{"children":[],"conversionClass":"java.time.LocalDateTime","logicalType":{"children":[],"defaultConversion":"java.time.LocalDateTime","kind":"ROWTIME","nullable":true,"precision":3,"typeRoot":"TIMESTAMP_WITHOUT_TIME_ZONE"}}]
ResultKind: SUCCESS_WITH_CONTENT
ResolvedSchema: (
  `window_start` TIMESTAMP(3) NOT NULL,
  `window_end` TIMESTAMP(3) NOT NULL,
  `f_random` INT,
  `s_last` INT,
  `m_localtimestamp` TIMESTAMP(3) NOT NULL,
  `cnt_total` BIGINT NOT NULL,
  `time_dis_count` BIGINT NOT NULL,
  `rownum` BIGINT NOT NULL,
  `col1` STRING,
  `col2` STRING,
  `col3` STRING,
  `col4` TIMESTAMP(3) *ROWTIME*
)
+----+-------------------------+-------------------------+-------------+-------------+-------------------------+----------------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |            window_start |              window_end |    f_random |      s_last |        m_localtimestamp |            cnt_total |       time_dis_count |               rownum |                           col1 |                           col2 |                           col3 |                    col4 |
+----+-------------------------+-------------------------+-------------+-------------+-------------------------+----------------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          79 |        8824 | 2023-03-04 21:21:09.202 |                   94 |                   27 |                    1 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          48 |        8925 | 2023-03-04 21:21:09.202 |                   85 |                   25 |                    2 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          91 |        8910 | 2023-03-04 21:21:09.202 |                  112 |                   35 |                    3 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          98 |        8885 | 2023-03-04 21:21:09.202 |                   91 |                   32 |                    4 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          52 |        8997 | 2023-03-04 21:21:09.202 |                   99 |                   35 |                    5 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          95 |        8988 | 2023-03-04 21:21:09.202 |                   93 |                   34 |                    6 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          29 |        8948 | 2023-03-04 21:21:09.202 |                   96 |                   34 |                    7 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          64 |        8947 | 2023-03-04 21:21:09.202 |                   98 |                   32 |                    8 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |
| +I | 2023-03-04 21:21:00.000 | 2023-03-04 21:21:10.000 |          24 |        8977 | 2023-03-04 21:21:09.202 |                   88 |                   30 |                    9 |                         <NULL> |                         <NULL> |                         <NULL> |                  <NULL> |

查看WEBUI: localhost:9101, 测试和生产环境不能用 localEnv 创建env 对象

file

总节, 如果需要开发多个的SQL作业,只需要写 conf 文件下下的 basicConf, DDL , DML 或者 DQL 就行了,代码不需要动!控制台输出结果截图如下:
file
完毕,学废了请收藏把,让你的 SQL 开发如此简单

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-leeston9,http://hainiubl.com/topics/76231
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter