【干货分享】如何自定义 Flink SQL 连接器?企业级代码教程,带你简单了解 Flink 连接器(http)工作原理

分享 leeston9 ⋅ 于 2023-10-14 00:48:51 ⋅ 225 阅读

Flink 处理数据时效性高,但对于新手来讲开发较为困难,Flink SQL 是比较简单易上手的基础,也是实时计算流平台开发最主要的技术,有能力的企业一般会对开源的 Flink 功能加以改造,今天就给大家分享一下如何开发一款自己的Flink SQL 连接器,Flink 连接器代码相较于算子类的代码比较简单,读者更容易理解,我以 HTTP 连接器为示例,简单讲解一下大致思路与开发流程:
代码结构:
file
一: Flink 主要是通过 Service Provider Interface (SPI) 机制查找并加载实现了特定接口的类。这种机制允许你将自定义的连接器 factory 注册为服务,然后 Flink 可以在运行时发现并使用它们。
我们需要需要实现 DynamicTableFactory 接口,并将你的工厂类按如下方式注册,方便 FactoryUtil 发现并处理:
因此我们新建一个类 HttpDynamicTableFactory, 其主要功能是为了初始化 HttpDynamicTableSink 和 HttpDynamicTableSource, 并初始化并校验所有的 table options, 并构建出 lookup function 和 sink function 需要一些信息, 比如 DataType 等,详细代码如下:

import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.config.HttpOptions;
import com.lee.connector.http.exception.ColumnNotFoundException;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.table.HttpDynamicTableSink;
import com.lee.connector.http.table.HttpDynamicTableSource;
import com.lee.connector.http.util.TableUtils;
import lombok.SneakyThrows;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * create by leeston
 * register this class in factoryUtil for this connectors
 */
public class HttpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static Logger LOG = LogManager.getLogger(HttpDynamicTableFactory.class);
    /**
     * connector identifier
     */
    private static final String IDENTIFIER = "http";

    @SneakyThrows
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        // sink options, real value
        ReadableConfig readableConfig = helper.getOptions();
        // tableSchema
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = resolvedSchema.getColumns();
        DataType physicalDataType = resolvedSchema.toPhysicalRowDataType();
        // tableColumns to FieldsDataType (contains all column dataTypes --> fieldDataTypes)
        DataType fieldsDataType = this.column2FieldsDataType(columns);
        validateOptions(context, readableConfig, false);
        // user sink options
        HttpConfigs sinkOptions = getHttpOptions(readableConfig,
                fieldsDataType,
                physicalDataType,
                extractHeadersByCatalogOptions(
                        context.getCatalogTable().getOptions()
                ));

        LOG.info("httpConfigs for Sink Option: {}", sinkOptions.toString());
        return new HttpDynamicTableSink(sinkOptions);
    }

    private Map<String, String> extractHeadersByCatalogOptions(Map<String, String> options) {
        return options.entrySet()
                .stream()
                .filter(entry -> entry.getKey().startsWith(HttpOptions.PROPERTIES_HEADER_PREFIX))
                // remove headers prefix
                .collect(Collectors.toMap(entry -> entry.getKey().replaceAll(HttpOptions.PROPERTIES_HEADER_PREFIX, ""),
                        Map.Entry::getValue, (v1, v2) -> v2));
    }

    /**
     * @param readableConfig
     * @param fieldsRowDataType
     * @param headers
     * @return HttpLookupConfigs
     */
    private HttpConfigs getHttpOptions(ReadableConfig readableConfig, DataType fieldsRowDataType, DataType physicalDataType, Map<String, String> headers) throws UnSupportedException, ColumnNotFoundException {
        // fieldsRowDataType or physicalDataType all are ok
        List<String> childFieldNames = TableUtils.getFieldNames(fieldsRowDataType);
        HttpConfigs.HttpLookupBuilder httpLookupBuilder = HttpConfigs.builder()
                .setUrl(readableConfig.get(HttpOptions.REQUEST_URL))
                .setMethod(readableConfig.get(HttpOptions.REQUEST_METHOD))
                .setHeaders(headers)
                .setFieldsDataType(fieldsRowDataType)
                .setPhysicalDataType(physicalDataType)
                .setCharset(TableUtils.UserTypes.getCharSetName())
                .setColumnNames(childFieldNames);

        if (readableConfig.getOptional(HttpOptions.PROPERTIES_DO_INPUT).isPresent()) {
            httpLookupBuilder.setDoInput(readableConfig.get(HttpOptions.PROPERTIES_DO_INPUT));
        }
        switch (readableConfig.get(HttpOptions.REQUEST_METHOD)) {
            case "GET":
                httpLookupBuilder.setDoInput(true);
                httpLookupBuilder.setDoOutput(false);
                httpLookupBuilder.setUseCache(true);
                break;
            case "POST":
                httpLookupBuilder.setDoInput(true);
                httpLookupBuilder.setDoOutput(true);
                httpLookupBuilder.setUseCache(false);
                readableConfig.getOptional(HttpOptions.POST_BODY).ifPresent(httpLookupBuilder::setBody);
                break;
            default:
                String errorMsgPattern = "UnSupport method: %s, %s Supports are [%s, %s]";
                throw new UnSupportedException(String.format(errorMsgPattern
                        , readableConfig.get(HttpOptions.REQUEST_METHOD)
                        , readableConfig.get(HttpOptions.REQUEST_METHOD).equalsIgnoreCase(TableUtils.UserTypes.GET.name())
                                || readableConfig.get(HttpOptions.REQUEST_METHOD).equalsIgnoreCase(TableUtils.UserTypes.POST.name())
                                ? String.format("Do you mean '%s'? ", readableConfig.get(HttpOptions.REQUEST_METHOD).toUpperCase()) : ""
                        , TableUtils.UserTypes.GET.name()
                        , TableUtils.UserTypes.POST.name()
                ));
        }

        readableConfig.getOptional(HttpOptions.CACHE_MAX_SIZE).ifPresent(httpLookupBuilder::setCacheMaxSize);
        readableConfig.getOptional(HttpOptions.CACHE_TTL).ifPresent(duration -> {
            long cacheTtl = duration.toMillis();
            httpLookupBuilder.setCacheTimeToLive(cacheTtl);
        });

        readableConfig.getOptional(HttpOptions.CONNECT_TIMEOUT).ifPresent(connectTimeoutDuration -> {
            int timeout = (int) connectTimeoutDuration.toMillis();
            httpLookupBuilder.setConnectTimeout(timeout);
        });
        readableConfig.getOptional(HttpOptions.READ_TIMEOUT).ifPresent(readTimeoutDuration -> {
            int timeout = (int) readTimeoutDuration.toMillis();
            httpLookupBuilder.setReadTimeout(timeout);
        });
        readableConfig.getOptional(HttpOptions.FAIL_MAX_RETRIES).ifPresent(httpLookupBuilder::setMaxRetryTimes);
        readableConfig.getOptional(HttpOptions.FAIL_INTERPRET).ifPresent(httpLookupBuilder::setFailInterpret);
        readableConfig.getOptional(HttpOptions.OUTPUT_CHARSET).ifPresent(httpLookupBuilder::setCharset);
        readableConfig.getOptional(HttpOptions.PAGE_QUERY_ENABLED).ifPresent(httpLookupBuilder::setPageQueryEnabled);
        readableConfig.getOptional(HttpOptions.PAGE_QUERY_KEY).ifPresent(httpLookupBuilder::setPageQueryKey);

        if (readableConfig.getOptional(HttpOptions.OUTPUT_FORMAT).isPresent()
                && !readableConfig.get(HttpOptions.OUTPUT_FORMAT)
                .equalsIgnoreCase(TableUtils.UserTypes.JSON.name()))
            throw new UnSupportedException("value for key: 'output.format' only supports for 'JSON', but found: " + readableConfig.get(HttpOptions.OUTPUT_FORMAT));
        httpLookupBuilder.setOutputFormat(HttpOptions.OUTPUT_FORMAT.defaultValue());
        if (readableConfig.getOptional(HttpOptions.LOOKUP_RESP_COLUMN).isPresent()) {
            String respColumn = readableConfig.get(HttpOptions.LOOKUP_RESP_COLUMN);
            if (childFieldNames.contains(respColumn))
                httpLookupBuilder.setLookupRespColumn(respColumn);
            else
                throw new ColumnNotFoundException(String.format("wrong property ['%s'='%s']: table column not found: '%s' ",
                        HttpOptions.LOOKUP_RESP_COLUMN.key()
                        , respColumn
                        , respColumn));
        }
        return httpLookupBuilder.build();
    }

    // use factoryUtil to validate
    private void validateOptions(Context context, ReadableConfig readableConfig, boolean isLookup) {

        HashSet<ConfigOption<?>> userAllOpSet = new HashSet<>();
        // 校验 options
        Set<ConfigOption<?>> requiredOptions = requiredOptions();
        Set<ConfigOption<?>> optionalOptions = optionalOptions();
        if (isLookup)
            requiredOptions.add(HttpOptions.LOOKUP_RESP_COLUMN);
        userAllOpSet.addAll(optionalOptions);
        userAllOpSet.addAll(requiredOptions);
        FactoryUtil.validateFactoryOptions(requiredOptions, optionalOptions(), readableConfig);
        // 从catalog 获取的所有options
        Map<String, String> allOptionsByCatalog = context.getCatalogTable().getOptions();
        allOptionsByCatalog.forEach((k, v) -> {
            if (k.contains(HttpOptions.PROPERTIES_HEADER_PREFIX)) {
                userAllOpSet.add(ConfigOptions
                        .key(k).stringType()
                        .noDefaultValue()
                        .withDescription(String.format("Http headers : key=%s, value=%s", k, v)));
            }
        });

        HashSet<String> consumedOptionKeys = new HashSet<>();
        // 将 allOpSet 的keys 添加至 consumedOptionKeys 中去
        userAllOpSet.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);

        // 使用 FactoryUtil 去校验
        FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), allOptionsByCatalog.keySet(), consumedOptionKeys);
    }

    @SneakyThrows
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper dynamicSourceHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig readableConfig = dynamicSourceHelper.getOptions();
        // obtain user options
        ReadableConfig sourceReadableConfig = dynamicSourceHelper.getOptions();
        // create resolvedSchema
        ResolvedSchema sourceResolvedSchema = context.getCatalogTable().getResolvedSchema();
        // physicalDataType, not contains the computed dataType
        DataType physicalDataType = sourceResolvedSchema.toPhysicalRowDataType();
        // get fieldDataType , important because source function will use it
        DataType fieldDataType = column2FieldsDataType(sourceResolvedSchema.getColumns());
        // validateOptions: requiredOptions, optionalOptions
        validateOptions(context, readableConfig, true);
        // use infos of above to init source configs
        HttpConfigs httpLookupOptions = getHttpOptions(sourceReadableConfig, fieldDataType, physicalDataType, extractHeadersByCatalogOptions(context.getCatalogTable().getOptions()));
        LOG.info("httpConfigs for Lookup Option: {}", httpLookupOptions.toString());
        // construct HttpDynamicTableSource, can not use fieldsDataType
        return new HttpDynamicTableSource(fieldDataType, physicalDataType, httpLookupOptions);
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    /**
     * @return DDL 的 properties 中必须要填写的选项
     */
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(HttpOptions.REQUEST_URL);
        requiredOptions.add(HttpOptions.REQUEST_METHOD);
        requiredOptions.add(HttpOptions.CONNECTOR);
        return requiredOptions;
    }

    /**
     * @return DDL 连接器 properties 中可选择的 options
     */
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> optionalKeySet = new HashSet<>();
        optionalKeySet.add(HttpOptions.FAIL_INTERPRET);
        optionalKeySet.add(HttpOptions.FAIL_MAX_RETRIES);
        optionalKeySet.add(HttpOptions.CONNECT_TIMEOUT);
        optionalKeySet.add(HttpOptions.READ_TIMEOUT);
        optionalKeySet.add(HttpOptions.OUTPUT_FORMAT);

        optionalKeySet.add(HttpOptions.CACHE_ENABLED);
        optionalKeySet.add(HttpOptions.CACHE_MAX_SIZE);
        optionalKeySet.add(HttpOptions.CACHE_TTL);

        optionalKeySet.add(HttpOptions.POST_BODY);
        optionalKeySet.add(HttpOptions.OUTPUT_CHARSET);
        optionalKeySet.add(HttpOptions.PROPERTIES_DO_INPUT);
        optionalKeySet.add(HttpOptions.PROPERTIES_DO_OUTPUT);
        optionalKeySet.add(HttpOptions.PAGE_QUERY_ENABLED);
        optionalKeySet.add(HttpOptions.PAGE_QUERY_KEY);
        return optionalKeySet;
    }

    public DataType column2FieldsDataType(List<Column> columns) {
        // Column --> DataTypes.Filed
        Stream<DataTypes.Field> fieldStream = columns
                .stream()
                .map(tableColumn -> DataTypes.FIELD(tableColumn.getName(), tableColumn.getDataType()));
        // param: Int  -> how to create Array
        DataTypes.Field[] fields = fieldStream.toArray(new IntFunction<DataTypes.Field[]>() {
            @Override
            public DataTypes.Field[] apply(int size) {
                return new DataTypes.Field[size];
            }
        });
        // then return all fields DataType, include physical and unphysical
        return DataTypes.ROW(fields).notNull();

    }

}
  1. resources 目录下创建一个名为 META-INF/services 的目录。
  2. META-INF/services 目录下,创建一个名为 org.apache.flink.table.factories.Factory 的文件。这个文件名需要完全匹配你想要 Flink SPI 机制发现的接口名。
  3. org.apache.flink.table.factories.Factory 文件中,写下你的连接器工厂类的全名,每个类名占一行。
    这样,当 Flink 启动并运行时,ServiceLoader 就会查找类路径下的 org.apache.flink.table.factories.Factory 文件,然后加载文件中列出的所有类, 如图:
    file
    二: 先实现比较简单的 sink function 逻辑, 需构建 HttpDynamicTableSink , 其主要是重写了getSinkRuntimeProvider 方法:
package com.lee.connector.http.table;

import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.funtion.HttpSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class HttpDynamicTableSink implements DynamicTableSink {
    private static final Logger LOG = LogManager.getLogger(HttpDynamicTableSink.class);
    private final HttpConfigs sinkOptions;

    public HttpDynamicTableSink(HttpConfigs sinkOptions) {
        this.sinkOptions = sinkOptions;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        // + I -D +U
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.DELETE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        LOG.info("Create HttpSinkFunction with \n :{} " + sinkOptions.toString());
        return SinkFunctionProvider.of(new HttpSinkFunction<>(this.sinkOptions));
    }

    @Override
    public DynamicTableSink copy() {
        return new HttpDynamicTableSink(sinkOptions);
    }

    @Override
    public String asSummaryString() {
        return "HTTP sink";
    }
}

三:再通过 HttpDynamicTableSink 构建出 HttpSinkFunction ,主要重写了 invoke 方法, 代码如下:

import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.util.TableUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Map;
import static com.lee.connector.http.funtion.HttpLookupFunction.getHttpURLConnection;

public class HttpSinkFunction<I> extends RichSinkFunction<I> {
    private String requestUrl;
    private final Map<String, String> headers;
    private final String requestMethod;
    private final String outputFormat;
    private String postBody;
    private final String charset;
    private final boolean failInterpret;
    private final boolean cacheEnabled;
    private final boolean doInput;
    private final boolean doOutput;
    private final int maxRetryTimes;
    private final Duration connectTimeout;
    private final Duration readTimeout;
    private final DataType fieldsDataType;
    private final DataType physicalDataType;
    public static Logger LOG = LogManager.getLogger(HttpSinkFunction.class);

    public HttpSinkFunction(HttpConfigs httpSinkConfigs) {
        this.requestUrl = httpSinkConfigs.getUrl();
        this.headers = httpSinkConfigs.getHeaders();
        this.requestMethod = httpSinkConfigs.getRequestMethod();
        this.outputFormat = httpSinkConfigs.getOutputFormat();
        this.postBody = httpSinkConfigs.getBody();
        this.charset = httpSinkConfigs.getCharset();
        this.failInterpret = httpSinkConfigs.isFailInterpret();
        this.cacheEnabled = httpSinkConfigs.isUseCache();
        this.doInput = httpSinkConfigs.isDoInput();
        this.doOutput = httpSinkConfigs.isDoOutput();
        this.maxRetryTimes = httpSinkConfigs.getMaxRetryTimes() == 0 ? 3 : httpSinkConfigs.getMaxRetryTimes();
        this.connectTimeout = Duration.ofMillis(httpSinkConfigs.getConnectTimeout()); /*毫秒时间长度*/
        this.readTimeout = Duration.ofMillis(httpSinkConfigs.getReadTimeout()); /*毫秒时间*/
        this.fieldsDataType = httpSinkConfigs.getFieldsDataType();
        this.physicalDataType = httpSinkConfigs.getPhysicalDataType();
    }

    // 实现具体的 sink 代码
    @Override
    public void invoke(I input, Context context) throws Exception {
        int currTimes = 0;
        while (currTimes <= maxRetryTimes) {
            boolean succeed = true;
            try {
                ++currTimes;
                if (null != input) {
                    RowData row = (RowData) input;
                    // Map(columnName -> (field value, field pos))
                    Map<String, Tuple2<?, Integer>> richDataRelation = TableUtils.buildRichDataRelation(row, this.physicalDataType);
                    LOG.info("Input Data Type: " + input.getClass());
                    headersAndUrlBodyFormat(richDataRelation);
                    HttpURLConnection conn = this.getConn(this.requestUrl);
                    String errMsg = String.format("unknown request method: %s !, Supports are : [%s, %s]",
                            requestMethod,
                            TableUtils.UserTypes.GET.name(),
                            TableUtils.UserTypes.POST.name());
                    switch (requestMethod) {
                        case "GET":
                            LOG.info("[SINK]: request method: {} , url: {}", requestMethod, this.requestUrl);
                            break;
                        case "POST":
                            String data = postBody;
                            if (!TableUtils.isValidJSON(data, LOG) && failInterpret)
                                throw new UnSupportedException("value for key: 'output.format' only supports for 'JSON', but found: " + data);

                            LOG.info("[SINK]: request method: {} , data: {}", requestMethod, data);
                            try (OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), this.charset)) {
                                writer.write(data);
                            } catch (IOException e) {
                                errMsg = "[SINK]: send response data error! " + e.getMessage();
                                LOG.error(errMsg, e);
                                throw new IOException(errMsg);
                            }
                            break;
                        default:
                            throw new UnSupportedException(errMsg);
                    }
                    succeed = TableUtils.responseStreamHandler(input, conn, headers, this.requestUrl, LOG).f0;
                }
            } catch (Exception e) {
                succeed = false;
                LOG.error(e.getMessage(), e);
                if (failInterpret)
                    throw e;
            } finally {
                LOG.info(String.format("Row :%s sink finished.", input));
            }

            if (succeed)
                break;
            else
                LOG.warn(currTimes + " failed! retry");
        }
    }

    // 初始化连接信息, jre 自带
    private HttpURLConnection getConn(String urlStr) throws Exception {
        return getHttpURLConnection(urlStr, doOutput, doInput, requestMethod, this.connectTimeout, this.readTimeout, this.cacheEnabled, this.headers);
    }

    // RowData to generic type array
    private Object[] toExternal(RowData rowData) {
        int idx = 0;
        int len = rowData.getArity();
        Object[] values = new Object[len];

        for (int i = 0; i < len; i++) {
            values[idx] = TableUtils.getRowDataValue(
                    // 1. get children LogicalType by LogicalTypeChecks.getFieldTypes(this.fieldsDataType.getLogicalType()).get(idx)
                    // 2. get children LogicalType by RowType:  ((RowType) fieldsDataType.getLogicalType()).getFields().get(idx).getLogicalType()
                    LogicalTypeChecks.getFieldTypes(this.fieldsDataType.getLogicalType()).get(idx)
                    , rowData, idx);
            ++idx;
        }
        return values;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    /**
     * @param richData Map(name -> value, pos)
     */
    private void headersAndUrlBodyFormat(Map<String, Tuple2<?, Integer>> richData) {
        // Object[] rowInfo = toExternal(row);
        for (Map.Entry<String, Tuple2<?, Integer>> richEntry : richData.entrySet()) {
            this.headers.forEach((k, v) -> headers.put(k, v.replaceAll("##" + richEntry.getKey() + "##", String.valueOf(
                    richEntry.getValue().f0 == null ? "null" : richEntry.getValue().f0))));
            this.requestUrl = this.requestUrl.replaceAll("##" + richEntry.getKey() + "##", richEntry.getValue().f0 == null
                    ? "null"
                    : richEntry.getValue().f0.toString());
            if (null != this.postBody)
                this.postBody = this.postBody.replaceAll("##" + richEntry.getKey() + "##", richEntry.getValue().f0 == null
                        ? "null"
                        : richEntry.getValue().f0.toString());
        }
    }
}

四:接下来是 lookup function 的构建, 其需要先创建类 : HttpDynamicTableSource,其主要重写了 getLookupRuntimeProvider 方法:

package com.lee.connector.http.table;

import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.funtion.HttpLookupFunction;
import com.lee.connector.http.util.TableUtils;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

public class HttpDynamicTableSource implements LookupTableSource {

    private static final Logger LOG = LogManager.getLogger(HttpDynamicTableSource.class);
    private final DataType fieldRowDataType; /* All Columns' Data Type which contains all RowData's every column logicalType*/
    private final DataType physicalRowDataType;
    private final HttpConfigs lookupConfigs;

    public HttpDynamicTableSource(DataType fieldRowDataType, DataType physicalRowDataType, HttpConfigs lookupConfigs) {
        this.fieldRowDataType = fieldRowDataType;
        this.physicalRowDataType = physicalRowDataType;
        this.lookupConfigs = lookupConfigs;
    }

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        // return new HttpLookupFunction(physicalRowDataType, lookupConfigs);
        int[][] lookupKeys = context.getKeys();
        String[] keyNamesArr = new String[lookupKeys.length];
        // on join fieldNames
        List<String> fieldNames = TableUtils.getFieldNames(this.physicalRowDataType);
        for (int i = 0; i < lookupKeys.length; i++) {
            // fetch lookupKeys, not support nested join keys, so the first element is the field's name
            keyNamesArr[i] = fieldNames.get(lookupKeys[i][0]);
        }
        return buildLookupFunction(keyNamesArr, this.fieldRowDataType,this.physicalRowDataType);
    }

    private LookupRuntimeProvider buildLookupFunction(String[] keyNamesArr, DataType fieldRowDataType,DataType physicalRowDataType ) {
        HttpLookupFunction.PhysicalJoinedColumnData columnData = new HttpLookupFunction.PhysicalJoinedColumnData(keyNamesArr, fieldRowDataType, physicalRowDataType);
        HttpLookupFunction lookupFunction = HttpLookupFunction.builder()
                .setColumnData(columnData)
                .setLookupConfigs(lookupConfigs)
                .build();
        LOG.info("Using blocking version of http lookup table");
        return TableFunctionProvider.of(lookupFunction);
    }

    @Override
    public DynamicTableSource copy() {
        return new HttpDynamicTableSource(this.fieldRowDataType, this.physicalRowDataType, this.lookupConfigs);
    }

    @Override
    public String asSummaryString() {
        return "HTTP lookup source";
    }
}

五: 接下来真正创建 lookup function,, 主要是重写了 eval 方法, 其本质是一个 UDF, 具体代码为 HttpLookupFunction

package com.lee.connector.http.funtion;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.config.HttpOptions;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.exception.UnSupportedLogicalTypeException;
import com.lee.connector.http.util.TableUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.cache.Cache;
import org.apache.flink.calcite.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class HttpLookupFunction extends TableFunction<RowData> {
    private PhysicalJoinedColumnData columnData;

    // lookupConfigs
    public static Logger LOG = LogManager.getLogger(HttpLookupFunction.class);
    private final HttpConfigs lookupConfigs;
    private String requestUrl;
    private final Map<String, String> headers;
    private final String requestMethod;
    private final String outputFormat;
    private String postBody;
    private final String charset;
    private final List<String> fieldColumnNames; /* All field column names*/

    private boolean pageQueryEnabled;
    private String pageQueryKey;
    private String lookupRespColumn;

    private final Duration connectTimeout;
    private final Duration readTimeout;
    private final int cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final boolean failInterpret;
    private final boolean cacheEnabled;
    private final boolean doInput;
    private final boolean doOutput;

    private static volatile JsonRowDataDeserializationSchema deSerSchema;

    private transient Cache<RowData, List<RowData>> cache;

    public HttpLookupFunction(PhysicalJoinedColumnData columnData, HttpConfigs lookupConfigs) {
        this.lookupConfigs = lookupConfigs;
        this.requestUrl = lookupConfigs.getUrl();
        this.headers = lookupConfigs.getHeaders();
        this.requestMethod = lookupConfigs.getRequestMethod();
        this.postBody = lookupConfigs.getBody();
        this.outputFormat = StringUtils.isAnyBlank(this.postBody) ? "JSON" : "";
        this.charset = lookupConfigs.getCharset();
        this.columnData = columnData;
        this.fieldColumnNames = lookupConfigs.getColumnNames();
        this.pageQueryEnabled = lookupConfigs.isPageQueryEnabled();
        this.pageQueryKey = lookupConfigs.getPageQueryKey();
        this.connectTimeout = Duration.ofMillis(lookupConfigs.getConnectTimeout());
        this.readTimeout = Duration.ofMillis(lookupConfigs.getReadTimeout());
        this.cacheMaxSize = lookupConfigs.getCacheMaxSize();
        this.cacheExpireMs = lookupConfigs.getCacheTimeToLive();
        this.maxRetryTimes = lookupConfigs.getMaxRetryTimes() == 0 ? 3 : lookupConfigs.getMaxRetryTimes();
        this.failInterpret = lookupConfigs.isFailInterpret();
        this.cacheEnabled = lookupConfigs.isUseCache();
        this.doInput = lookupConfigs.isDoInput();
        this.doOutput = lookupConfigs.isDoOutput();
        this.lookupRespColumn = lookupConfigs.getLookupRespColumn();
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        this.cache = this.cacheMaxSize == -1 || cacheExpireMs == -1
                ? null
                : CacheBuilder.newBuilder().expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                .maximumSize(cacheMaxSize)
                .build();
    }

    /**
     * lookup 过程:
     * 1> 获得 joined keys
     * 2> 使用 joined keys 查询 lookup 表
     * 3> 构建返回值(RowData)
     * 4> 返回值 RowData 中 join on 条件校验
     */
    public void eval(Object... joinedAndSortedKeys) throws Exception {
        // find the relationship of joined key name and value.
        Map<String, Tuple2<?, Integer>> keyNameAndValPosMap = zipColumnDataKeyNameAndJoinedValue(this.columnData, joinedAndSortedKeys);
        GenericRowData keyRow = GenericRowData.of(joinedAndSortedKeys);
        if (null != cache && this.cacheEnabled) {
            List<RowData> cacheRows = cache.getIfPresent(keyRow);
            if (null != cacheRows)
                cacheRows.forEach(this::collect);
            return;
        }
        // if caches is none
        ArrayList<RowData> rows = new ArrayList<>();
        int pageNum = 1;
        // https://example.com/api/resources?page=2&size=10
        while (this.pageQueryEnabled || pageNum == 1) {
            if (this.pageQueryEnabled && requestMethod.equals(TableUtils.UserTypes.GET.name())) {
                this.requestUrl = TableUtils.URIFormat(this.requestUrl, this.pageQueryKey, pageNum, LOG);
            }
            int alreadyTimes = 0;
            while (alreadyTimes <= maxRetryTimes) {
                boolean succ;
                try {
                    ++alreadyTimes;
                    HttpURLConnection conn = null;
//                    headersFormat(urlFormat, joinedAndSortedKeys);
                    headersAndUrlBodyFormat(keyNameAndValPosMap);
                    conn = getConn(requestUrl);
                    String errMsg = "unknown request method: {} !" + requestMethod;
                    switch (requestMethod) {
                        case "GET":
                            LOG.info("[LOOKUP]: request method: {} , url: {}", requestMethod, requestUrl);
                            break;
                        case "POST":
                            String data = postBody;
                            LOG.info("[LOOKUP]: request method: {} , data: {}", requestMethod, data);
                            try (OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), this.charset)) {
                                writer.write(data);
                            } catch (IOException e) {
                                errMsg = "[LOOKUP]: send response data error! " + e.getMessage();
                                LOG.error(errMsg, e);
                                throw new IOException(errMsg);
                            }
                            break;
                        default:
                            LOG.error(errMsg);
                            throw new UnSupportedException(errMsg);
                    }
                    // to solve response data
                    Tuple2<Boolean, String> httpStreamReadRes = TableUtils.responseStreamHandler(keyRow, conn, headers, requestUrl, LOG);
                    succ = httpStreamReadRes.f0;
                    JSONArray resJsonObjs = constructJson(httpStreamReadRes.f1);
                    if (null != cache && cacheEnabled)
                        // refresh cache rows
                        cache.put(keyRow, rows);
                    if (!(null == resJsonObjs || resJsonObjs.isEmpty())) {
                        // 获得每个返回值对应的字段的 pos, 这里我们统一使用一个ROW类型的字段接收用户的lookup返回值,这样做的好处是可以接收任意格式的JSON
                        if (fieldColumnNames.contains(this.lookupRespColumn)) {
                            List<RowType.RowField> rowFieldList = ((RowType) this.columnData.getPhysicalRowDataType().getLogicalType()).getFields();
                            List<RowType.RowField> filteredRowFiled = rowFieldList
                                    .stream().filter(rowField -> rowField.getName().equals(lookupRespColumn)
                                            && rowField.getType().getTypeRoot().name().equals(TableUtils.UserTypes.ROW.name()))
                                    .collect(Collectors.toList());
                            if (filteredRowFiled.size() == 0)
                                throw new UnSupportedLogicalTypeException(String.format("%s must be 'ROW' type.", lookupRespColumn));

                            // construct result and collect
                            RowType.RowField resultRowColumn = filteredRowFiled.get(0);
                            int resRowIndex = rowFieldList.indexOf(resultRowColumn);
                            // RowType rowType = new RowType(Collections.singletonList(resultRowColumn));
                            RowType rowType = (RowType) resultRowColumn.getType();
                            // rowDataSchema
                            JsonRowDataDeserializationSchema deSchema = deSerSchema(rowType);
                            resJsonObjs.forEach(jsonObj -> {
                                try {
                                    // this row data shall contain all columns, including physical and compute columns
                                    GenericRowData genericRowData = new GenericRowData(rowFieldList.size());
                                    byte[] jsonBytes = jsonObj.toString().getBytes(null == this.charset
                                            ? HttpOptions.OUTPUT_CHARSET.defaultValue()
                                            : this.charset);
                                    RowData rowData = deSchema.deserialize(jsonBytes);
                                    genericRowData.setField(resRowIndex, rowData);
                                    keyNameAndValPosMap.forEach((key, value) -> genericRowData.setField(value.f1, value.f0));
                                    LOG.debug("genericRowData: " + genericRowData);
//                                    System.err.println("genericRowData: " + genericRowData);
                                    collect(genericRowData);
                                    rows.add(genericRowData);
                                } catch (IOException e) {
                                    LOG.error(e.getMessage(), e);
                                    e.printStackTrace();
                                }
                            });
                        }
                    }

                } catch (Exception e) {
                    succ = false;
                    if (failInterpret) {
                        LOG.error(e.getMessage(), e);
                        throw e;
                    }
                    LOG.error(e.getMessage(), e);
                }
                if (succ)
                    break;
                else
                    LOG.warn(alreadyTimes + " times failed, retry!");
            }
            pageNum++;
        }

        if (null != cache && cacheEnabled)
            cache.put(keyRow, rows);

    }

    public JSONArray constructJson(String content) {

        return JSONArray.parseArray(content.startsWith("[") && content.endsWith("]")
                ? content : "[" + content + "]");
    }

    @Deprecated
    public JSONArray constructJsonWithRespColumnKey(String content) {
        JSONArray array = new JSONArray();
        if (!content.trim().startsWith("[")) {
            JSONObject jsonO = new JSONObject();
            jsonO.put(this.lookupRespColumn, JSONObject.parseObject(content));
            array.add(jsonO);
        } else {
            JSONArray jsonArray = JSONObject.parseArray(content);
            for (Object o : jsonArray) {
                JSONObject jsonO = new JSONObject();
                JSONObject jsonObject = JSONObject.parseObject(o.toString());
                jsonO.put(this.lookupRespColumn, jsonObject);
                array.add(jsonO);
            }
        }
        return array;
    }

    private HttpURLConnection getConn(String urlStr) throws Exception {
        return getHttpURLConnection(urlStr, doOutput, doInput, requestMethod, this.connectTimeout, this.readTimeout, this.cacheEnabled, this.headers);
    }

    static HttpURLConnection getHttpURLConnection(String urlStr, boolean doOutput, boolean doInput, String requestMethod, Duration connectTimeout, Duration readTimeout, boolean cacheEnabled, Map<String, String> headers) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
        httpURLConnection.setDoOutput(doOutput);
        httpURLConnection.setDoInput(doInput);
        httpURLConnection.setRequestMethod(requestMethod);
        httpURLConnection.setConnectTimeout((int) connectTimeout.toMillis());
        httpURLConnection.setReadTimeout((int) readTimeout.toMillis());
        httpURLConnection.setUseCaches(cacheEnabled);
        if (headers != null)
            headers.forEach(httpURLConnection::setRequestProperty);
        return httpURLConnection;
    }

    /**
     * joined and sorted key names
     */
    public static class PhysicalJoinedColumnData implements Serializable {
        private final String[] joinedKeyNames;
        private final DataType PhysicalRowDataType;
        private final DataType fieldRowDataType; // 包含全部的字段及类型

        public PhysicalJoinedColumnData(String[] joinedKeyNames, DataType fieldRowDataType, DataType physicalRowDataType) {
            this.joinedKeyNames = joinedKeyNames;
            PhysicalRowDataType = physicalRowDataType;
            this.fieldRowDataType = fieldRowDataType;
        }

        public String[] getJoinedKeyNames() {
            return joinedKeyNames;
        }

        public DataType getPhysicalRowDataType() {
            return PhysicalRowDataType;
        }

        public DataType getFieldRowDataType() {
            return fieldRowDataType;
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    public static HttpLookupFunctionBuilder builder() {
        return new HttpLookupFunctionBuilder();
    }

    /**
     * @param rowData            PhysicalJoinedColumnData --> which columns' name were sorted as the same as joinedSortedValues
     * @param joinedSortedValues --> sorted joined keys' value
     * @return ConcurrentHashMap --> key --> (value, physicalColPos)
     */
    public Map<String, Tuple2<?, Integer>> zipColumnDataKeyNameAndJoinedValue(PhysicalJoinedColumnData rowData, Object[] joinedSortedValues) {
        List<String> physicalFieldNames = TableUtils.getFieldNames(rowData.PhysicalRowDataType);
        return new BiFunction<PhysicalJoinedColumnData, Object[], Map<String, Tuple2<?, Integer>>>() {
            final ConcurrentHashMap<String, Tuple2<?, Integer>> keyNameAndValMap = new ConcurrentHashMap<>();

            @Override
            public Map<String, Tuple2<?, Integer>> apply(PhysicalJoinedColumnData physicalJoinedColumnData, Object[] joinedKeys) {
                for (int index = 0; index < joinedKeys.length; index++) {
                    String joinedKeyName = rowData.getJoinedKeyNames()[index];
                    keyNameAndValMap.put(joinedKeyName, Tuple2.of(joinedKeys[index], physicalFieldNames.indexOf(joinedKeyName)));
                }
                return keyNameAndValMap;
            }
        }.apply(rowData, joinedSortedValues);

    }

    private void headersAndUrlBodyFormat(Map<String, Tuple2<?, Integer>> richData) {
        // Object[] rowInfo = toExternal(row);
        for (Map.Entry<String, Tuple2<?, Integer>> richEntry : richData.entrySet()) {
            String rickK = richEntry.getKey();
            Object rickV = null == richEntry.getValue().f0 ? "null" : richEntry.getValue().f0;
            this.headers.forEach((k, v) -> headers.put(k, v.replaceAll("##" + rickK + "##", rickV.toString())));
            this.requestUrl = this.requestUrl.replaceAll("##" + rickK + "##", rickV.toString());
            if (null != this.postBody)
                this.postBody = this.postBody.replaceAll("##" + rickK + "##", rickV.toString());
        }
    }

    private static JsonRowDataDeserializationSchema deSerSchema(RowType rowType) {

        if (null == deSerSchema)
            deSerSchema = new JsonRowDataDeserializationSchema(
                    rowType,
                    InternalTypeInfo.of(rowType),
                    false,
                    false,
                    TimestampFormat.SQL
            );
        return deSerSchema;
    }

    public static class HttpLookupFunctionBuilder {
        private PhysicalJoinedColumnData columnData;
        private HttpConfigs lookupConfigs;

        public HttpLookupFunctionBuilder setColumnData(PhysicalJoinedColumnData columnData) {
            this.columnData = columnData;
            return this;
        }

        public HttpLookupFunctionBuilder setLookupConfigs(HttpConfigs lookupConfigs) {
            this.lookupConfigs = lookupConfigs;
            return this;
        }

        public HttpLookupFunction build() {
            return new HttpLookupFunction(this.columnData, this.lookupConfigs);
        }
    }
}

六: 所用到的我自己定义的工具类, TableUtils:

package com.lee.connector.http.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.lee.connector.http.exception.UnSupportedLogicalTypeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

public class TableUtils {

    public static List<String> getFieldNames(DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        return LogicalTypeChecks.isCompositeType(logicalType)
                ? LogicalTypeChecks.getFieldNames(logicalType)
                : Collections.emptyList();
    }

    public static GenericRowData buildGenericRowData(List<?> values) {
        Function<List<?>, GenericRowData> transFunction = v -> {
            GenericRowData res = new GenericRowData(v.size());
            for (int i = 0; i < v.size(); i++) {
                res.setField(i, v.get(i));
            }
            return res;
        };
        return transFunction.apply(values);
    }

    public static GenericRowData buildEmptyRow(int size) {
        return new GenericRowData(size);
    }

    /**
     * @param lineRow RowData
     * @return colName -> (value, pos)
     */
    public static Map<String, Tuple2<?, Integer>> buildRichDataRelation(RowData lineRow, DataType physicalDataType) throws UnSupportedLogicalTypeException {
        LogicalType logicalType = physicalDataType.getLogicalType();
        if (!logicalType.getTypeRoot().name().equals(UserTypes.ROW.name())) {
            throw new UnSupportedLogicalTypeException("Can only support for [RowType], but found:  " + logicalType.getTypeRoot().name());
        }
        ConcurrentHashMap<String, Tuple2<?, Integer>> richDataMap = new ConcurrentHashMap<>();
        List<RowType.RowField> rowFields = ((RowType) logicalType).getFields();
        if (lineRow.getArity() != rowFields.size()) {
            throw new InternalError(String.format("Internal error, this is a bug!, rowData: %s, rowData length: %s, rowFields: %s",
                    lineRow,
                    lineRow.getArity(),
                    StringUtils.join(rowFields.stream()
                            .map((Function<RowType.RowField, Object>) RowType.RowField::getName)
                            .collect(Collectors.toList()))
            ));
        }
        for (int pos = 0; pos < rowFields.size(); pos++)
            // name, value, pos --> lineRow pos is the same as rowFields pos, so using pos to get field name and field value
            richDataMap.put(rowFields.get(pos).getName(), Tuple2.of(getRowDataValue(rowFields.get(pos).getType(), lineRow, pos), pos));
        return richDataMap;
    }

    public static <T> Tuple2<Boolean, String> responseStreamHandler(T input, HttpURLConnection conn, Map<String, String> headers, String requestUrl, Logger LOG) throws IOException {
        int respCode = conn.getResponseCode();
        String readRes;
        try (BufferedReader bufferedInputReader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
             BufferedReader bufferedErrorReader = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
            readRes = bufferedLineReader(input, respCode, conn, respCode == 200 ? bufferedInputReader : bufferedErrorReader, headers, requestUrl, LOG);
        } finally {
            conn.disconnect();
        }
        return Tuple2.of(respCode == 200, readRes);
    }

    public static <T> String bufferedLineReader(T input, int respCode, HttpURLConnection conn, BufferedReader bufferedReader, Map<String, String> headers, String requestUrl, Logger LOG) throws IOException {
        String inputLine;
        StringBuilder builder = new StringBuilder();
        while ((inputLine = bufferedReader.readLine()) != null)
            builder.append(inputLine);
        String msg = "Http Response Code: " + respCode
                + ", Response Massage:" + conn.getResponseMessage() + ", "
                + builder + ", Submitted RowData: "
                + input.toString()
                + ", Request Url: " + requestUrl + ", Http Headers:" + headers;
        if (respCode >= 200 && respCode < 300)
            LOG.info(msg);
        else
            LOG.error(msg);
        return builder.toString();
    }

    /**
     * +--------------------------------+-----------------------------------------+
     * | SQL Data Types                 | Internal Data Structures                |
     * +--------------------------------+-----------------------------------------+
     * | BOOLEAN                        | boolean                                 |
     * +--------------------------------+-----------------------------------------+
     * | CHAR / VARCHAR / STRING        | StringData                              |
     * +--------------------------------+-----------------------------------------+
     * | BINARY / VARBINARY / BYTES     | byte[]                                  |
     * +--------------------------------+-----------------------------------------+
     * | DECIMAL                        | DecimalData                             |
     * +--------------------------------+-----------------------------------------+
     * | TINYINT                        | byte                                    |
     * +--------------------------------+-----------------------------------------+
     * | SMALLINT                       | short                                   |
     * +--------------------------------+-----------------------------------------+
     * | INT                            | int                                     |
     * +--------------------------------+-----------------------------------------+
     * | BIGINT                         | long                                    |
     * +--------------------------------+-----------------------------------------+
     * | FLOAT                          | float                                   |
     * +--------------------------------+-----------------------------------------+
     * | DOUBLE                         | double                                  |
     * +--------------------------------+-----------------------------------------+
     * | DATE                           | int (number of days since epoch)        |
     * +--------------------------------+-----------------------------------------+
     * | TIME                           | int (number of milliseconds of the day) |
     * +--------------------------------+-----------------------------------------+
     * | TIMESTAMP                      | TimestampData                           |
     * +--------------------------------+-----------------------------------------+
     * | TIMESTAMP WITH LOCAL TIME ZONE | TimestampData                           |
     * +--------------------------------+-----------------------------------------+
     * | INTERVAL YEAR TO MONTH         | int (number of months)                  |
     * +--------------------------------+-----------------------------------------+
     * | INTERVAL DAY TO MONTH          | long (number of milliseconds)           |
     * +--------------------------------+-----------------------------------------+
     * | ROW / structured types         | RowData                                 |
     * +--------------------------------+-----------------------------------------+
     * | ARRAY                          | ArrayData                               |
     * +--------------------------------+-----------------------------------------+
     * | MAP / MULTISET                 | MapData                                 |
     * +--------------------------------+-----------------------------------------+
     * | RAW                            | RawValueData                            |
     * +--------------------------------+-----------------------------------------+
     *
     * @param logicalType Sql Data type
     * @param record      GenericRowData/BinaryRowData ...
     * @param pos         position
     * @return Internal Data Structure Type
     */
    public static Object getRowDataValue(LogicalType logicalType, RowData record, int pos) {
        if (record.isNullAt(pos)) {
            return null;
        } else {
            switch (logicalType.getTypeRoot()) {
                case BOOLEAN:
                    return record.getBoolean(pos);
                case TINYINT:
                    return record.getByte(pos);/* Bytes mean tinyint*/
                case SMALLINT:
                    return record.getShort(pos);
                case INTEGER:
                    return record.getInt(pos);
                case FLOAT:
                    return record.getFloat(pos);
                case DOUBLE:
                    return record.getDouble(pos);
                case CHAR:
                case VARCHAR:
                    return record.getString(pos);
                case DATE:
                    // long 类型转为date 类型
                    return Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos)));
                case TIME_WITHOUT_TIME_ZONE:
                    int timestampPrecision = ((TimestampType) logicalType).getPrecision();
                    return record.getTimestamp(pos, timestampPrecision).toTimestamp();
                case DECIMAL:
                    int decimalPrecision = ((DecimalType) logicalType).getPrecision();
                    int decimalScale = ((DecimalType) logicalType).getScale();
                    return record.getDecimal(pos, decimalPrecision, decimalScale);
                case ARRAY:
                case MAP:
                case MULTISET:
                case ROW:
                case RAW:
                default:
                    throw new UnsupportedOperationException("Unsupported type: " + logicalType);

            }
        }
    }

    public static boolean isValidJSON(final String json, Logger LOG) {
        boolean valid = false;
        try {
            final ObjectMapper mapper = new ObjectMapper();
            mapper.readTree(json);
            valid = true;
        } catch (Exception e) {
            // Ignore exceptions
            LOG.error(e.getMessage(), e);
        }
        return valid;
    }

    public static String URIFormat(String url, String pageQueryKey, int pageNum, Logger LOG) {
        if(null == pageQueryKey){
            throw new NullPointerException("page.query.enabled was set 'true' but page.query.key is null, do you means page or pageNum ?");
        }
        URI uri;
        String urlRes = url;
        try {
            uri = new URI(url);
            String query = uri.getQuery();
            if(null == query){
                throw new URISyntaxException(url, "Bad URL, please recheck it!");
            }
            Scanner scanner = new Scanner(query);
            scanner.useDelimiter("&");
            while (scanner.hasNext()) {
                String[] param = scanner.next().split("=");
                if (param[0].equals(pageQueryKey)) {
                    urlRes = url.replaceAll(param[1], String.valueOf(pageNum));
                }
            }
        } catch (URISyntaxException e) {
            LOG.error("bad URL: {}, " + e.getMessage(), url, e);
        }
        return urlRes;
    }

    public static void main(String[] args) throws IOException {
        RowType rowType = new RowType(Arrays.asList(
                new RowType.RowField("k1", DataTypes.STRING().getLogicalType()),
                new RowType.RowField("k2", DataTypes.STRING().getLogicalType()),
                new RowType.RowField("k3", DataTypes.INT().getLogicalType()),
                new RowType.RowField("k4", new RowType(Arrays.asList(
                        new RowType.RowField("kk1", DataTypes.STRING().getLogicalType()),
                        new RowType.RowField("kk2", DataTypes.STRING().getLogicalType()),
                        new RowType.RowField("kk3", new ArrayType(new RowType(Arrays.asList(
                                new RowType.RowField("kkk1", DataTypes.STRING().getLogicalType()),
                                new RowType.RowField("kkk2", DataTypes.STRING().getLogicalType())
                        ))))
                )))
        ));

        JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
                rowType,
                InternalTypeInfo.of(rowType),
                false,
                true,
                TimestampFormat.SQL
        );

        String jsonString = "{...}";  // 你的 JSON 数据
        byte[] jsonBytes = jsonString.getBytes(StandardCharsets.UTF_8);
        RowData rowData = deserializationSchema.deserialize(jsonBytes);

        System.out.println(rowData);

    }

    public enum UserTypes {
        GET,
        POST,
        JSON,
        ROW;
        public static String getCharSetName() {
            return "UTF-8";
        }
    }
}

七:构建http table options 需要的类: HttpOptions,代码如下:

package com.lee.connector.http.config;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.kerby.config.Conf;

import java.time.Duration;

public class HttpOptions {
    private static final String IDENTIFIER = "http";
    public static final String PROPERTIES_HEADER_PREFIX = "properties.headers.";
    public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
            .stringType().defaultValue(IDENTIFIER).withDescription("the connector's name");

    public static final ConfigOption<String> REQUEST_URL = ConfigOptions.key("request.url")
            .stringType().noDefaultValue().withDescription("http request url");

    public static final ConfigOption<String> REQUEST_METHOD = ConfigOptions.key("request.method")
            .stringType().noDefaultValue().withDescription("http request method");

    public static final ConfigOption<String> REQUEST_TYPE = ConfigOptions.key("request.type").stringType()
            .noDefaultValue().withDescription("the http request type, support for GET or POST currently. ");

    // & is a placeholder
    public static final ConfigOption<String> PROPERTIES_HEADER = ConfigOptions.key(String.format("%s.&", "properties.headers"))
            .stringType()
            .defaultValue("properties.header.application/json")
            .withDescription("the properties.header of http, default properties.header.application/json");

    public static final ConfigOption<Boolean> FAIL_INTERPRET = ConfigOptions.key("fail.interpret").booleanType()
            .defaultValue(true).withDescription("interpret or not when failed");

    public static final ConfigOption<Integer> FAIL_MAX_RETRIES = ConfigOptions.key("fail.max-retries").intType()
            .defaultValue(3).withDescription("fail.max-retries");

    // connect.timeout

    public static final ConfigOption<Duration> CONNECT_TIMEOUT = ConfigOptions.key("connect.timeout")
            .durationType().defaultValue(Duration.ofMillis(10000L)).withDescription("connection timeout, default is 10 seconds");

    // read.timeout
    public static final ConfigOption<Duration> READ_TIMEOUT = ConfigOptions.key("read.timeout")
            .durationType().defaultValue(Duration.ofMillis(10000L))
            .withDescription("read timeout, default is 10 seconds");

    // output.format
    public static final ConfigOption<String> OUTPUT_FORMAT = ConfigOptions.key("output.format")
            .stringType().defaultValue("JSON").withDescription("only support for json currently");

    // do input
    public static final ConfigOption<Boolean> PROPERTIES_DO_INPUT = ConfigOptions.key("do.input")
            .booleanType()
            .defaultValue(true)
            .withDescription("if the url connection use for input, default true");
    // do output
    public static final ConfigOption<Boolean> PROPERTIES_DO_OUTPUT = ConfigOptions.key("do.output")
            .booleanType()
            .noDefaultValue()
            .withDescription("if the url connection use for output, no default value");

    // cache.enabled
    public static final ConfigOption<Boolean> CACHE_ENABLED = ConfigOptions.key("cache.enabled")
            .booleanType().defaultValue(true).withDescription("weather or not to use cache, default true");

    // cache.max-size
    public static final ConfigOption<Integer> CACHE_MAX_SIZE = ConfigOptions.key("cache.max-size")
            .intType().noDefaultValue().withDescription("when use cache, how many records need to be cached");

    // cache.ttl
    public static final ConfigOption<Duration> CACHE_TTL = ConfigOptions.key("cache.expire-ms")
            .durationType().defaultValue(Duration.ofMillis(-1))
            .withDescription("cache ttl with millis");

    // post.body
    public static final ConfigOption<String> POST_BODY = ConfigOptions.key("post.body")
            .stringType().noDefaultValue().withDescription("the post body when request type is POST");

    // output.charset
    public static final ConfigOption<String> OUTPUT_CHARSET = ConfigOptions.key("output.charset")
            .stringType().defaultValue("UTF-8").withDescription("output charset");

    // page.query.enabled
    public static final ConfigOption<Boolean> PAGE_QUERY_ENABLED = ConfigOptions.key("page.query.enabled")
            .booleanType().defaultValue(false).withDescription("Pagination Query Enabled");

    // page.query.key
    public static final ConfigOption<String> PAGE_QUERY_KEY = ConfigOptions.key("page.query.key")
            .stringType().noDefaultValue().withDescription("page.query.key, egg: pageNum");

    // lookup.resp.column
    public static final ConfigOption<String> LOOKUP_RESP_COLUMN = ConfigOptions.key("lookup.resp.column")
            .stringType().noDefaultValue().withDescription("the lookup response column, in order to receive the lookup query response, which logical type is 'ROW'");

}

八:构建 lookup functionsource function 时,HttpDynamicTableSinkHttpDynamicTableSource 经常用到的配置类:HttpConfigs:

package com.lee.connector.http.config;

import org.apache.flink.table.types.DataType;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class HttpConfigs implements Serializable {
    private String url;
    private int connectTimeout;
    private int readTimeout;
    private String body;
    private String requestMethod;
    private String outputFormat;
    private String charset;
    private boolean useCache;
    private boolean doInput;
    private boolean doOutput;
    private boolean pageQueryEnabled;
    private String pageQueryKey;
    private String lookupRespColumn;
    private Map<String, String> headers = Collections.emptyMap();
    private final List<String> aliasPaths = Collections.emptyList();
    private final List<String> argList = Collections.emptyList();
    private List<String> columnNames = Collections.emptyList();

    private DataType fieldsDataType;

    private DataType physicalDataType;

    // lookup cache
    /**
     * 最大缓存数
     */
    private int cacheMaxSize = -1;
    /**
     * lookup 缓存数据状态过期时间
     */
    private long cacheTimeToLive = -1;
    /**
     * 失败重试次数
     */
    private int maxRetryTimes = 3;
    /**
     * 失败是否中断作业
     */
    private boolean failInterpret = true;

    public HttpConfigs(String url, int connectTimeout, int readTimeout, String body, String requestMethod, String outputFormat, String charset, boolean useCache, boolean doInput, boolean doOutput, boolean pageQueryEnabled, String pageQueryKey, String lookupRespColumn, Map<String, String> headers, List<String> columnNames, DataType fieldsDataType, DataType physicalDataType, int cacheMaxSize, long cacheTimeToLive, int maxRetryTimes, boolean failInterpret) {
        this.url = url;
        this.connectTimeout = connectTimeout;
        this.readTimeout = readTimeout;
        this.body = body;
        this.requestMethod = requestMethod;
        this.outputFormat = outputFormat;
        this.charset = charset;
        this.useCache = useCache;
        this.doInput = doInput;
        this.doOutput = doOutput;
        this.pageQueryEnabled = pageQueryEnabled;
        this.pageQueryKey = pageQueryKey;
        this.lookupRespColumn = lookupRespColumn;
        this.headers = headers;
        this.columnNames = columnNames;
        this.fieldsDataType = fieldsDataType;
        this.physicalDataType = physicalDataType;
        this.cacheMaxSize = cacheMaxSize;
        this.cacheTimeToLive = cacheTimeToLive;
        this.maxRetryTimes = maxRetryTimes;
        this.failInterpret = failInterpret;
    }

    public String getUrl() {
        return url;
    }

    public int getConnectTimeout() {
        return connectTimeout;
    }

    public int getReadTimeout() {
        return readTimeout;
    }

    public String getBody() {
        return body;
    }

    public String getRequestMethod() {
        return requestMethod;
    }

    public String getOutputFormat() {
        return outputFormat;
    }

    public boolean isPageQueryEnabled() {
        return pageQueryEnabled;
    }

    public String getPageQueryKey() {
        return pageQueryKey;
    }

    public String getLookupRespColumn() {
        return lookupRespColumn;
    }

    public String getCharset() {
        return charset;
    }

    public boolean isUseCache() {
        return useCache;
    }

    public boolean isDoInput() {
        return doInput;
    }

    public boolean isDoOutput() {
        return doOutput;
    }

    public Map<String, String> getHeaders() {
        return headers;
    }

    public List<String> getArgList() {
        return argList;
    }

    public DataType getFieldsDataType() {
        return fieldsDataType;
    }

    public DataType getPhysicalDataType() {
        return physicalDataType;
    }

    public int getCacheMaxSize() {
        return cacheMaxSize;
    }

    public long getCacheTimeToLive() {
        return cacheTimeToLive;
    }

    public int getMaxRetryTimes() {
        return maxRetryTimes;
    }

    public List<String> getColumnNames() {
        return columnNames;
    }

    public boolean isFailInterpret() {
        return failInterpret;
    }

    public List<String> getAliasPaths() {
        return aliasPaths;
    }

    public static HttpLookupBuilder builder() {
        return new HttpLookupBuilder();
    }

    public static class HttpLookupBuilder {
        private String url;
        private int connectTimeout;
        private int readTimeout;
        private String body;
        private String requestMethod;
        private String outputFormat;
        private String charset;
        private boolean useCache;
        private boolean doInput;
        private boolean doOutput;
        private boolean pageQueryEnabled;
        private String pageQueryKey;
        private String lookupRespColumn;
        private Map<String, String> headers = Collections.emptyMap();
        private List<String> columnNames = Collections.emptyList();
        private DataType fieldsDataType;
        private DataType physicalDataType;
        private int cacheMaxSize;
        private long cacheTimeToLive;
        private int maxRetryTimes;
        private boolean failInterpret;

        public HttpLookupBuilder setUrl(String url) {
            this.url = url;
            return this;
        }

        public HttpLookupBuilder setConnectTimeout(int connectTimeout) {
            this.connectTimeout = connectTimeout;
            return this;
        }

        public HttpLookupBuilder setReadTimeout(int readTimeout) {
            this.readTimeout = readTimeout;
            return this;
        }

        public HttpLookupBuilder setBody(String body) {
            this.body = body;
            return this;
        }

        public HttpLookupBuilder setMethod(String requestMethod) {
            this.requestMethod = requestMethod;
            return this;
        }

        public HttpLookupBuilder setOutputFormat(String outputFormat) {
            this.outputFormat = outputFormat;
            return this;
        }

        public HttpLookupBuilder setCharset(String charset) {
            this.charset = charset;
            return this;
        }

        public HttpLookupBuilder setUseCache(boolean useCache) {
            this.useCache = useCache;
            return this;
        }

        public HttpLookupBuilder setDoInput(boolean doInput) {
            this.doInput = doInput;
            return this;
        }

        public HttpLookupBuilder setDoOutput(boolean doOutput) {
            this.doOutput = doOutput;
            return this;
        }

        public HttpLookupBuilder setFieldsDataType(DataType fieldsDataType) {
            this.fieldsDataType = fieldsDataType;
            return this;
        }

        public HttpLookupBuilder setPhysicalDataType(DataType physicalDataType) {
            this.physicalDataType = physicalDataType;
            return this;
        }

        public HttpLookupBuilder setCacheMaxSize(int cacheMaxSize) {
            this.cacheMaxSize = cacheMaxSize;
            return this;
        }

        public HttpLookupBuilder setCacheTimeToLive(long cacheTimeToLive) {
            this.cacheTimeToLive = cacheTimeToLive;
            return this;
        }

        public HttpLookupBuilder setMaxRetryTimes(int maxRetryTimes) {
            this.maxRetryTimes = maxRetryTimes;
            return this;
        }

        public HttpLookupBuilder setFailInterpret(boolean failInterpret) {
            this.failInterpret = failInterpret;
            return this;
        }

        public HttpLookupBuilder setRequestMethod(String requestMethod) {
            this.requestMethod = requestMethod;
            return this;
        }

        public HttpLookupBuilder setPageQueryEnabled(boolean pageQueryEnabled) {
            this.pageQueryEnabled = pageQueryEnabled;
            return this;
        }

        public HttpLookupBuilder setPageQueryKey(String pageQueryKey) {
            this.pageQueryKey = pageQueryKey;
            return this;
        }

        public HttpLookupBuilder setLookupRespColumn(String lookupRespColumn) {
            this.lookupRespColumn = lookupRespColumn;
            return this;
        }

        public HttpLookupBuilder setHeaders(Map<String, String> headers) {
            this.headers = headers;
            return this;
        }

        public HttpLookupBuilder setColumnNames(List<String> columnNames) {
            this.columnNames = columnNames;
            return this;
        }

        public HttpConfigs build() {
            return new HttpConfigs(
                    this.url,
                    this.connectTimeout,
                    this.readTimeout,
                    this.body,
                    this.requestMethod,
                    this.outputFormat,
                    this.charset,
                    this.useCache,
                    this.doInput,
                    this.doOutput,
                    this.pageQueryEnabled,
                    this.pageQueryKey,
                    this.lookupRespColumn,
                    this.headers,
                    this.columnNames,
                    this.fieldsDataType,
                    this.physicalDataType,
                    this.cacheMaxSize,
                    this.cacheTimeToLive,
                    this.maxRetryTimes,
                    this.failInterpret
            );
        }
    }

    @Override
    public String toString() {
        return "HttpConfigs{" +
                "url='" + url + '\'' +
                ", connectTimeout=" + connectTimeout +
                ", readTimeout=" + readTimeout +
                ", body='" + body + '\'' +
                ", requestMethod='" + requestMethod + '\'' +
                ", outputFormat='" + outputFormat + '\'' +
                ", charset='" + charset + '\'' +
                ", useCache=" + useCache +
                ", doInput=" + doInput +
                ", doOutput=" + doOutput +
                ", pageQueryEnabled=" + pageQueryEnabled +
                ", pageQueryKey='" + pageQueryKey + '\'' +
                ", lookupRespColumn='" + lookupRespColumn + '\'' +
                ", headers=" + headers +
                ", aliasPaths=" + aliasPaths +
                ", argList=" + argList +
                ", columnNames=" + columnNames +
                ", fieldsDataType=" + fieldsDataType +
                ", physicalDataType=" + physicalDataType +
                ", cacheMaxSize=" + cacheMaxSize +
                ", cacheTimeToLive=" + cacheTimeToLive +
                ", maxRetryTimes=" + maxRetryTimes +
                ", failInterpret=" + failInterpret +
                '}';
    }
}

九:涉及到的异常类:ColumnNotFoundException, UnSupportedException, UnSupportedLogicalTypeException

ColumnNotFoundException:

package com.lee.connector.http.exception;

public class ColumnNotFoundException extends Exception{
    public ColumnNotFoundException(String msg) {
        super(msg);
    }
}

类:UnSupportedException:

package com.lee.connector.http.exception;

public class UnSupportedException extends Exception{
    public UnSupportedException(String msg) {
        super(msg);
    }
}

类:UnSupportedLogicalTypeException 代码:

package com.lee.connector.http.exception;

public class UnSupportedLogicalTypeException extends Exception{
    public UnSupportedLogicalTypeException(String msg) {
        super(msg);
    }
}

十: 测试 http sink:

CREATE TABLE kafka_table
(
    id     String,
    `name` String,
    gender String,
    age    String,
    score  String,
    eventT timestamp(3)

) WITH (
      'connector' = 'kafka',
      'topic' = 'kafka_test_0001',
      'properties.bootstrap.servers' = '192.168.1.102:9092',
      'scan.startup.mode' = 'latest-offset',
      'properties.group.id' = 'group_id_test_01',
      'format' = 'json',
      'json.fail-on-missing-field' = 'false'
      );

-- http 作为`` 表时, %coln% 与字段名对应, 作为 lookup表时,与 on 的顺序对应!
CREATE TABLE HTTP_SINK_TABLE
(
    res_col1 String,
    res_col2 String,
    eventT   TIMESTAMP(3),
    eventT2 as localtimestamp,
    procTime as procTime(),
    res_col3 String,
    res_col4 as res_col1,
    WATERMARK FOR eventT AS eventT - interval '0' second
) WITH (
      'connector' = 'http',
      -- test GET first
      'request.method' = 'POST',
--       'request.url' = 'http://localhost:8080/users/leeuser/##res_col3##',
      'request.url' = 'http://localhost:8080/users/chenuser',
      'read.timeout' = '60000',
      'connect.timeout' = '60000',
      -- 'output.format' = 'JSON',
      'post.body' = '{"name" : "##res_col1##", "age": ##res_col2##, "gender": "##res_col3##"}',
      'fail.interpret' = 'false',
      'fail.max-retries' = '3',
      'properties.headers.Content-type' = 'application/json',
      'fail.max-retries' = '2'
      );

insert into HTTP_SINK_TABLE(res_col1, res_col2, res_col3)
select 'leeston', '30', 'male' from kafka_table;

flinkUI 查看 jobGraph:
file
日志查看sink 触发http 请求的结果:
file
http lookup 表sql示例:

CREATE TABLE kafka_table
(
    id     String,
    `name` String,
    gender String,
    age    String,
    score  String,
    eventT timestamp(3)

) WITH (
      'connector' = 'kafka',
      'topic' = 'kafka_test_0001',
      'properties.bootstrap.servers' = '192.168.1.102:9092',
      'scan.startup.mode' = 'latest-offset',
      'properties.group.id' = 'group_id_test_01',
      'format' = 'json',
      'json.fail-on-missing-field' = 'false'
      );

CREATE TABLE HTTP_LOOKUP_TABLE
(
    res_col1 String,
    res_col2 String,
    eventT   TIMESTAMP(3),
    eventT2 as localtimestamp,
    -- procTime as procTime(),
    res_col3 String,
    col_res_row ROW<name String, gender String, age int>
) WITH (
      'connector' = 'http',
      'request.method' = 'POST',
      'request.url' = 'http://localhost:8080/users/chenuser',
      'read.timeout' = '60000',
      'connect.timeout' = '60000',
      -- ##col## : col must be in on condition
      'post.body' = '{"name" : "##res_col1##", "age": ##res_col2##, "gender": "##res_col3##"}',
      'fail.interpret' = 'true',
      'properties.headers.Content-type' = 'application/json',
      -- 'page.query.enabled' = 'true',
      'fail.max-retries' = '2',
      'cache.enabled' = 'false',
      'output.format' = 'JSON',
      'cache.max-size' = '1000',
      'cache.expire-ms' = '50000',
      'lookup.resp.column' = 'col_res_row'
      );

      -- HTTP LOOKUP TABLE TEST
select a.col1,a.col2,a.col3, a.procTime, b.col_res_row.gender,b.* from (
    select '30' as col1,
           'male' as col2,
           'leeston' as col3,
           procTime() as procTime
    from kafka_table
) a
left join HTTP_LOOKUP_TABLE FOR SYSTEM_TIME AS OF a.procTime as b
-- +I(30,male,leeston)
on a.col3 = b.res_col1
and a.col1 = b.res_col2
and a.col2 = b.res_col3

lookup function 关联查询的结果:

ResultKind: SUCCESS_WITH_CONTENT
ResolvedSchema: (
  `col1` CHAR(2) NOT NULL,
  `col2` CHAR(4) NOT NULL,
  `col3` CHAR(7) NOT NULL,
  `procTime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
  `gender` STRING,
  `res_col1` STRING,
  `res_col2` STRING,
  `eventT` TIMESTAMP(3),
  `eventT2` TIMESTAMP(3),
  `res_col3` STRING,
  `col_res_row` ROW<`name` STRING, `gender` STRING, `age` INT>
)
+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |                           col1 |                           col2 |                           col3 |                procTime |                         gender |                       res_col1 |                       res_col2 |                  eventT |                 eventT2 |                       res_col3 |                    col_res_row |
+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I |                             30 |                           male |                        leeston | 2023-10-14 01:55:37.094 |                       male_res |                        leeston |                             30 |                  <NULL> | 2023-10-14 01:55:37.014 |                           male |        (leeston, male_res, 31) |

flink的 webUI:
file
日志详情:
file
其他输出详情略..

思考:上述实现的 Http 连接器只能用于 lookup join 和 sink, 如果我需要自定义一款source 连接器,比如 socket, 该如何处理呢? 读者可参考 datagen 连接器或者 kafka 连接器或者 hudi 连接器,它们三者均实现了 source 功能,但实现方式有很大出入,这里不再赘述。

注:上述分享省略了90%以上的代码解释和源码处理流程详解,对自定义Flink Table/SQL 连接器感兴趣的可直接联系我,咱们再深入探讨,所有代码均为我一行行手写,非本人许可,请勿用于商业用途,谢谢。

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-leeston9,http://hainiubl.com/topics/76386
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter