Flink 处理数据时效性高,但对于新手来讲开发较为困难,Flink SQL 是比较简单易上手的基础,也是实时计算流平台开发最主要的技术,有能力的企业一般会对开源的 Flink 功能加以改造,今天就给大家分享一下如何开发一款自己的Flink SQL 连接器,Flink 连接器代码相较于算子类的代码比较简单,读者更容易理解,我以 HTTP
连接器为示例,简单讲解一下大致思路与开发流程:
代码结构:
一: Flink 主要是通过 Service Provider Interface (SPI) 机制查找并加载实现了特定接口的类。这种机制允许你将自定义的连接器 factory 注册为服务,然后 Flink 可以在运行时发现并使用它们。
我们需要需要实现 DynamicTableFactory
接口,并将你的工厂类按如下方式注册,方便 FactoryUtil 发现并处理:
因此我们新建一个类 HttpDynamicTableFactory, 其主要功能是为了初始化 HttpDynamicTableSink 和 HttpDynamicTableSource, 并初始化并校验所有的 table options, 并构建出 lookup function 和 sink function 需要一些信息, 比如 DataType 等,详细代码如下:
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.config.HttpOptions;
import com.lee.connector.http.exception.ColumnNotFoundException;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.table.HttpDynamicTableSink;
import com.lee.connector.http.table.HttpDynamicTableSource;
import com.lee.connector.http.util.TableUtils;
import lombok.SneakyThrows;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* create by leeston
* register this class in factoryUtil for this connectors
*/
public class HttpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static Logger LOG = LogManager.getLogger(HttpDynamicTableFactory.class);
/**
* connector identifier
*/
private static final String IDENTIFIER = "http";
@SneakyThrows
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
// sink options, real value
ReadableConfig readableConfig = helper.getOptions();
// tableSchema
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
List<Column> columns = resolvedSchema.getColumns();
DataType physicalDataType = resolvedSchema.toPhysicalRowDataType();
// tableColumns to FieldsDataType (contains all column dataTypes --> fieldDataTypes)
DataType fieldsDataType = this.column2FieldsDataType(columns);
validateOptions(context, readableConfig, false);
// user sink options
HttpConfigs sinkOptions = getHttpOptions(readableConfig,
fieldsDataType,
physicalDataType,
extractHeadersByCatalogOptions(
context.getCatalogTable().getOptions()
));
LOG.info("httpConfigs for Sink Option: {}", sinkOptions.toString());
return new HttpDynamicTableSink(sinkOptions);
}
private Map<String, String> extractHeadersByCatalogOptions(Map<String, String> options) {
return options.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(HttpOptions.PROPERTIES_HEADER_PREFIX))
// remove headers prefix
.collect(Collectors.toMap(entry -> entry.getKey().replaceAll(HttpOptions.PROPERTIES_HEADER_PREFIX, ""),
Map.Entry::getValue, (v1, v2) -> v2));
}
/**
* @param readableConfig
* @param fieldsRowDataType
* @param headers
* @return HttpLookupConfigs
*/
private HttpConfigs getHttpOptions(ReadableConfig readableConfig, DataType fieldsRowDataType, DataType physicalDataType, Map<String, String> headers) throws UnSupportedException, ColumnNotFoundException {
// fieldsRowDataType or physicalDataType all are ok
List<String> childFieldNames = TableUtils.getFieldNames(fieldsRowDataType);
HttpConfigs.HttpLookupBuilder httpLookupBuilder = HttpConfigs.builder()
.setUrl(readableConfig.get(HttpOptions.REQUEST_URL))
.setMethod(readableConfig.get(HttpOptions.REQUEST_METHOD))
.setHeaders(headers)
.setFieldsDataType(fieldsRowDataType)
.setPhysicalDataType(physicalDataType)
.setCharset(TableUtils.UserTypes.getCharSetName())
.setColumnNames(childFieldNames);
if (readableConfig.getOptional(HttpOptions.PROPERTIES_DO_INPUT).isPresent()) {
httpLookupBuilder.setDoInput(readableConfig.get(HttpOptions.PROPERTIES_DO_INPUT));
}
switch (readableConfig.get(HttpOptions.REQUEST_METHOD)) {
case "GET":
httpLookupBuilder.setDoInput(true);
httpLookupBuilder.setDoOutput(false);
httpLookupBuilder.setUseCache(true);
break;
case "POST":
httpLookupBuilder.setDoInput(true);
httpLookupBuilder.setDoOutput(true);
httpLookupBuilder.setUseCache(false);
readableConfig.getOptional(HttpOptions.POST_BODY).ifPresent(httpLookupBuilder::setBody);
break;
default:
String errorMsgPattern = "UnSupport method: %s, %s Supports are [%s, %s]";
throw new UnSupportedException(String.format(errorMsgPattern
, readableConfig.get(HttpOptions.REQUEST_METHOD)
, readableConfig.get(HttpOptions.REQUEST_METHOD).equalsIgnoreCase(TableUtils.UserTypes.GET.name())
|| readableConfig.get(HttpOptions.REQUEST_METHOD).equalsIgnoreCase(TableUtils.UserTypes.POST.name())
? String.format("Do you mean '%s'? ", readableConfig.get(HttpOptions.REQUEST_METHOD).toUpperCase()) : ""
, TableUtils.UserTypes.GET.name()
, TableUtils.UserTypes.POST.name()
));
}
readableConfig.getOptional(HttpOptions.CACHE_MAX_SIZE).ifPresent(httpLookupBuilder::setCacheMaxSize);
readableConfig.getOptional(HttpOptions.CACHE_TTL).ifPresent(duration -> {
long cacheTtl = duration.toMillis();
httpLookupBuilder.setCacheTimeToLive(cacheTtl);
});
readableConfig.getOptional(HttpOptions.CONNECT_TIMEOUT).ifPresent(connectTimeoutDuration -> {
int timeout = (int) connectTimeoutDuration.toMillis();
httpLookupBuilder.setConnectTimeout(timeout);
});
readableConfig.getOptional(HttpOptions.READ_TIMEOUT).ifPresent(readTimeoutDuration -> {
int timeout = (int) readTimeoutDuration.toMillis();
httpLookupBuilder.setReadTimeout(timeout);
});
readableConfig.getOptional(HttpOptions.FAIL_MAX_RETRIES).ifPresent(httpLookupBuilder::setMaxRetryTimes);
readableConfig.getOptional(HttpOptions.FAIL_INTERPRET).ifPresent(httpLookupBuilder::setFailInterpret);
readableConfig.getOptional(HttpOptions.OUTPUT_CHARSET).ifPresent(httpLookupBuilder::setCharset);
readableConfig.getOptional(HttpOptions.PAGE_QUERY_ENABLED).ifPresent(httpLookupBuilder::setPageQueryEnabled);
readableConfig.getOptional(HttpOptions.PAGE_QUERY_KEY).ifPresent(httpLookupBuilder::setPageQueryKey);
if (readableConfig.getOptional(HttpOptions.OUTPUT_FORMAT).isPresent()
&& !readableConfig.get(HttpOptions.OUTPUT_FORMAT)
.equalsIgnoreCase(TableUtils.UserTypes.JSON.name()))
throw new UnSupportedException("value for key: 'output.format' only supports for 'JSON', but found: " + readableConfig.get(HttpOptions.OUTPUT_FORMAT));
httpLookupBuilder.setOutputFormat(HttpOptions.OUTPUT_FORMAT.defaultValue());
if (readableConfig.getOptional(HttpOptions.LOOKUP_RESP_COLUMN).isPresent()) {
String respColumn = readableConfig.get(HttpOptions.LOOKUP_RESP_COLUMN);
if (childFieldNames.contains(respColumn))
httpLookupBuilder.setLookupRespColumn(respColumn);
else
throw new ColumnNotFoundException(String.format("wrong property ['%s'='%s']: table column not found: '%s' ",
HttpOptions.LOOKUP_RESP_COLUMN.key()
, respColumn
, respColumn));
}
return httpLookupBuilder.build();
}
// use factoryUtil to validate
private void validateOptions(Context context, ReadableConfig readableConfig, boolean isLookup) {
HashSet<ConfigOption<?>> userAllOpSet = new HashSet<>();
// 校验 options
Set<ConfigOption<?>> requiredOptions = requiredOptions();
Set<ConfigOption<?>> optionalOptions = optionalOptions();
if (isLookup)
requiredOptions.add(HttpOptions.LOOKUP_RESP_COLUMN);
userAllOpSet.addAll(optionalOptions);
userAllOpSet.addAll(requiredOptions);
FactoryUtil.validateFactoryOptions(requiredOptions, optionalOptions(), readableConfig);
// 从catalog 获取的所有options
Map<String, String> allOptionsByCatalog = context.getCatalogTable().getOptions();
allOptionsByCatalog.forEach((k, v) -> {
if (k.contains(HttpOptions.PROPERTIES_HEADER_PREFIX)) {
userAllOpSet.add(ConfigOptions
.key(k).stringType()
.noDefaultValue()
.withDescription(String.format("Http headers : key=%s, value=%s", k, v)));
}
});
HashSet<String> consumedOptionKeys = new HashSet<>();
// 将 allOpSet 的keys 添加至 consumedOptionKeys 中去
userAllOpSet.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
// 使用 FactoryUtil 去校验
FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), allOptionsByCatalog.keySet(), consumedOptionKeys);
}
@SneakyThrows
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper dynamicSourceHelper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig readableConfig = dynamicSourceHelper.getOptions();
// obtain user options
ReadableConfig sourceReadableConfig = dynamicSourceHelper.getOptions();
// create resolvedSchema
ResolvedSchema sourceResolvedSchema = context.getCatalogTable().getResolvedSchema();
// physicalDataType, not contains the computed dataType
DataType physicalDataType = sourceResolvedSchema.toPhysicalRowDataType();
// get fieldDataType , important because source function will use it
DataType fieldDataType = column2FieldsDataType(sourceResolvedSchema.getColumns());
// validateOptions: requiredOptions, optionalOptions
validateOptions(context, readableConfig, true);
// use infos of above to init source configs
HttpConfigs httpLookupOptions = getHttpOptions(sourceReadableConfig, fieldDataType, physicalDataType, extractHeadersByCatalogOptions(context.getCatalogTable().getOptions()));
LOG.info("httpConfigs for Lookup Option: {}", httpLookupOptions.toString());
// construct HttpDynamicTableSource, can not use fieldsDataType
return new HttpDynamicTableSource(fieldDataType, physicalDataType, httpLookupOptions);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
/**
* @return DDL 的 properties 中必须要填写的选项
*/
@Override
public Set<ConfigOption<?>> requiredOptions() {
HashSet<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(HttpOptions.REQUEST_URL);
requiredOptions.add(HttpOptions.REQUEST_METHOD);
requiredOptions.add(HttpOptions.CONNECTOR);
return requiredOptions;
}
/**
* @return DDL 连接器 properties 中可选择的 options
*/
@Override
public Set<ConfigOption<?>> optionalOptions() {
HashSet<ConfigOption<?>> optionalKeySet = new HashSet<>();
optionalKeySet.add(HttpOptions.FAIL_INTERPRET);
optionalKeySet.add(HttpOptions.FAIL_MAX_RETRIES);
optionalKeySet.add(HttpOptions.CONNECT_TIMEOUT);
optionalKeySet.add(HttpOptions.READ_TIMEOUT);
optionalKeySet.add(HttpOptions.OUTPUT_FORMAT);
optionalKeySet.add(HttpOptions.CACHE_ENABLED);
optionalKeySet.add(HttpOptions.CACHE_MAX_SIZE);
optionalKeySet.add(HttpOptions.CACHE_TTL);
optionalKeySet.add(HttpOptions.POST_BODY);
optionalKeySet.add(HttpOptions.OUTPUT_CHARSET);
optionalKeySet.add(HttpOptions.PROPERTIES_DO_INPUT);
optionalKeySet.add(HttpOptions.PROPERTIES_DO_OUTPUT);
optionalKeySet.add(HttpOptions.PAGE_QUERY_ENABLED);
optionalKeySet.add(HttpOptions.PAGE_QUERY_KEY);
return optionalKeySet;
}
public DataType column2FieldsDataType(List<Column> columns) {
// Column --> DataTypes.Filed
Stream<DataTypes.Field> fieldStream = columns
.stream()
.map(tableColumn -> DataTypes.FIELD(tableColumn.getName(), tableColumn.getDataType()));
// param: Int -> how to create Array
DataTypes.Field[] fields = fieldStream.toArray(new IntFunction<DataTypes.Field[]>() {
@Override
public DataTypes.Field[] apply(int size) {
return new DataTypes.Field[size];
}
});
// then return all fields DataType, include physical and unphysical
return DataTypes.ROW(fields).notNull();
}
}
- 在
resources
目录下创建一个名为META-INF/services
的目录。 - 在
META-INF/services
目录下,创建一个名为org.apache.flink.table.factories.Factory
的文件。这个文件名需要完全匹配你想要 Flink SPI 机制发现的接口名。 - 在
org.apache.flink.table.factories.Factory
文件中,写下你的连接器工厂类的全名,每个类名占一行。
这样,当 Flink 启动并运行时,ServiceLoader
就会查找类路径下的org.apache.flink.table.factories.Factory
文件,然后加载文件中列出的所有类, 如图:
二: 先实现比较简单的sink function
逻辑, 需构建HttpDynamicTableSink
, 其主要是重写了getSinkRuntimeProvider
方法:
package com.lee.connector.http.table;
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.funtion.HttpSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class HttpDynamicTableSink implements DynamicTableSink {
private static final Logger LOG = LogManager.getLogger(HttpDynamicTableSink.class);
private final HttpConfigs sinkOptions;
public HttpDynamicTableSink(HttpConfigs sinkOptions) {
this.sinkOptions = sinkOptions;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
// + I -D +U
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
LOG.info("Create HttpSinkFunction with \n :{} " + sinkOptions.toString());
return SinkFunctionProvider.of(new HttpSinkFunction<>(this.sinkOptions));
}
@Override
public DynamicTableSink copy() {
return new HttpDynamicTableSink(sinkOptions);
}
@Override
public String asSummaryString() {
return "HTTP sink";
}
}
三:再通过 HttpDynamicTableSink
构建出 HttpSinkFunction
,主要重写了 invoke
方法, 代码如下:
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.util.TableUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Map;
import static com.lee.connector.http.funtion.HttpLookupFunction.getHttpURLConnection;
public class HttpSinkFunction<I> extends RichSinkFunction<I> {
private String requestUrl;
private final Map<String, String> headers;
private final String requestMethod;
private final String outputFormat;
private String postBody;
private final String charset;
private final boolean failInterpret;
private final boolean cacheEnabled;
private final boolean doInput;
private final boolean doOutput;
private final int maxRetryTimes;
private final Duration connectTimeout;
private final Duration readTimeout;
private final DataType fieldsDataType;
private final DataType physicalDataType;
public static Logger LOG = LogManager.getLogger(HttpSinkFunction.class);
public HttpSinkFunction(HttpConfigs httpSinkConfigs) {
this.requestUrl = httpSinkConfigs.getUrl();
this.headers = httpSinkConfigs.getHeaders();
this.requestMethod = httpSinkConfigs.getRequestMethod();
this.outputFormat = httpSinkConfigs.getOutputFormat();
this.postBody = httpSinkConfigs.getBody();
this.charset = httpSinkConfigs.getCharset();
this.failInterpret = httpSinkConfigs.isFailInterpret();
this.cacheEnabled = httpSinkConfigs.isUseCache();
this.doInput = httpSinkConfigs.isDoInput();
this.doOutput = httpSinkConfigs.isDoOutput();
this.maxRetryTimes = httpSinkConfigs.getMaxRetryTimes() == 0 ? 3 : httpSinkConfigs.getMaxRetryTimes();
this.connectTimeout = Duration.ofMillis(httpSinkConfigs.getConnectTimeout()); /*毫秒时间长度*/
this.readTimeout = Duration.ofMillis(httpSinkConfigs.getReadTimeout()); /*毫秒时间*/
this.fieldsDataType = httpSinkConfigs.getFieldsDataType();
this.physicalDataType = httpSinkConfigs.getPhysicalDataType();
}
// 实现具体的 sink 代码
@Override
public void invoke(I input, Context context) throws Exception {
int currTimes = 0;
while (currTimes <= maxRetryTimes) {
boolean succeed = true;
try {
++currTimes;
if (null != input) {
RowData row = (RowData) input;
// Map(columnName -> (field value, field pos))
Map<String, Tuple2<?, Integer>> richDataRelation = TableUtils.buildRichDataRelation(row, this.physicalDataType);
LOG.info("Input Data Type: " + input.getClass());
headersAndUrlBodyFormat(richDataRelation);
HttpURLConnection conn = this.getConn(this.requestUrl);
String errMsg = String.format("unknown request method: %s !, Supports are : [%s, %s]",
requestMethod,
TableUtils.UserTypes.GET.name(),
TableUtils.UserTypes.POST.name());
switch (requestMethod) {
case "GET":
LOG.info("[SINK]: request method: {} , url: {}", requestMethod, this.requestUrl);
break;
case "POST":
String data = postBody;
if (!TableUtils.isValidJSON(data, LOG) && failInterpret)
throw new UnSupportedException("value for key: 'output.format' only supports for 'JSON', but found: " + data);
LOG.info("[SINK]: request method: {} , data: {}", requestMethod, data);
try (OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), this.charset)) {
writer.write(data);
} catch (IOException e) {
errMsg = "[SINK]: send response data error! " + e.getMessage();
LOG.error(errMsg, e);
throw new IOException(errMsg);
}
break;
default:
throw new UnSupportedException(errMsg);
}
succeed = TableUtils.responseStreamHandler(input, conn, headers, this.requestUrl, LOG).f0;
}
} catch (Exception e) {
succeed = false;
LOG.error(e.getMessage(), e);
if (failInterpret)
throw e;
} finally {
LOG.info(String.format("Row :%s sink finished.", input));
}
if (succeed)
break;
else
LOG.warn(currTimes + " failed! retry");
}
}
// 初始化连接信息, jre 自带
private HttpURLConnection getConn(String urlStr) throws Exception {
return getHttpURLConnection(urlStr, doOutput, doInput, requestMethod, this.connectTimeout, this.readTimeout, this.cacheEnabled, this.headers);
}
// RowData to generic type array
private Object[] toExternal(RowData rowData) {
int idx = 0;
int len = rowData.getArity();
Object[] values = new Object[len];
for (int i = 0; i < len; i++) {
values[idx] = TableUtils.getRowDataValue(
// 1. get children LogicalType by LogicalTypeChecks.getFieldTypes(this.fieldsDataType.getLogicalType()).get(idx)
// 2. get children LogicalType by RowType: ((RowType) fieldsDataType.getLogicalType()).getFields().get(idx).getLogicalType()
LogicalTypeChecks.getFieldTypes(this.fieldsDataType.getLogicalType()).get(idx)
, rowData, idx);
++idx;
}
return values;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
/**
* @param richData Map(name -> value, pos)
*/
private void headersAndUrlBodyFormat(Map<String, Tuple2<?, Integer>> richData) {
// Object[] rowInfo = toExternal(row);
for (Map.Entry<String, Tuple2<?, Integer>> richEntry : richData.entrySet()) {
this.headers.forEach((k, v) -> headers.put(k, v.replaceAll("##" + richEntry.getKey() + "##", String.valueOf(
richEntry.getValue().f0 == null ? "null" : richEntry.getValue().f0))));
this.requestUrl = this.requestUrl.replaceAll("##" + richEntry.getKey() + "##", richEntry.getValue().f0 == null
? "null"
: richEntry.getValue().f0.toString());
if (null != this.postBody)
this.postBody = this.postBody.replaceAll("##" + richEntry.getKey() + "##", richEntry.getValue().f0 == null
? "null"
: richEntry.getValue().f0.toString());
}
}
}
四:接下来是 lookup function
的构建, 其需要先创建类 : HttpDynamicTableSource
,其主要重写了 getLookupRuntimeProvider
方法:
package com.lee.connector.http.table;
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.funtion.HttpLookupFunction;
import com.lee.connector.http.util.TableUtils;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class HttpDynamicTableSource implements LookupTableSource {
private static final Logger LOG = LogManager.getLogger(HttpDynamicTableSource.class);
private final DataType fieldRowDataType; /* All Columns' Data Type which contains all RowData's every column logicalType*/
private final DataType physicalRowDataType;
private final HttpConfigs lookupConfigs;
public HttpDynamicTableSource(DataType fieldRowDataType, DataType physicalRowDataType, HttpConfigs lookupConfigs) {
this.fieldRowDataType = fieldRowDataType;
this.physicalRowDataType = physicalRowDataType;
this.lookupConfigs = lookupConfigs;
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// return new HttpLookupFunction(physicalRowDataType, lookupConfigs);
int[][] lookupKeys = context.getKeys();
String[] keyNamesArr = new String[lookupKeys.length];
// on join fieldNames
List<String> fieldNames = TableUtils.getFieldNames(this.physicalRowDataType);
for (int i = 0; i < lookupKeys.length; i++) {
// fetch lookupKeys, not support nested join keys, so the first element is the field's name
keyNamesArr[i] = fieldNames.get(lookupKeys[i][0]);
}
return buildLookupFunction(keyNamesArr, this.fieldRowDataType,this.physicalRowDataType);
}
private LookupRuntimeProvider buildLookupFunction(String[] keyNamesArr, DataType fieldRowDataType,DataType physicalRowDataType ) {
HttpLookupFunction.PhysicalJoinedColumnData columnData = new HttpLookupFunction.PhysicalJoinedColumnData(keyNamesArr, fieldRowDataType, physicalRowDataType);
HttpLookupFunction lookupFunction = HttpLookupFunction.builder()
.setColumnData(columnData)
.setLookupConfigs(lookupConfigs)
.build();
LOG.info("Using blocking version of http lookup table");
return TableFunctionProvider.of(lookupFunction);
}
@Override
public DynamicTableSource copy() {
return new HttpDynamicTableSource(this.fieldRowDataType, this.physicalRowDataType, this.lookupConfigs);
}
@Override
public String asSummaryString() {
return "HTTP lookup source";
}
}
五: 接下来真正创建 lookup function
,, 主要是重写了 eval
方法, 其本质是一个 UDF, 具体代码为 HttpLookupFunction
:
package com.lee.connector.http.funtion;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.lee.connector.http.config.HttpConfigs;
import com.lee.connector.http.config.HttpOptions;
import com.lee.connector.http.exception.UnSupportedException;
import com.lee.connector.http.exception.UnSupportedLogicalTypeException;
import com.lee.connector.http.util.TableUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.cache.Cache;
import org.apache.flink.calcite.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class HttpLookupFunction extends TableFunction<RowData> {
private PhysicalJoinedColumnData columnData;
// lookupConfigs
public static Logger LOG = LogManager.getLogger(HttpLookupFunction.class);
private final HttpConfigs lookupConfigs;
private String requestUrl;
private final Map<String, String> headers;
private final String requestMethod;
private final String outputFormat;
private String postBody;
private final String charset;
private final List<String> fieldColumnNames; /* All field column names*/
private boolean pageQueryEnabled;
private String pageQueryKey;
private String lookupRespColumn;
private final Duration connectTimeout;
private final Duration readTimeout;
private final int cacheMaxSize;
private final long cacheExpireMs;
private final int maxRetryTimes;
private final boolean failInterpret;
private final boolean cacheEnabled;
private final boolean doInput;
private final boolean doOutput;
private static volatile JsonRowDataDeserializationSchema deSerSchema;
private transient Cache<RowData, List<RowData>> cache;
public HttpLookupFunction(PhysicalJoinedColumnData columnData, HttpConfigs lookupConfigs) {
this.lookupConfigs = lookupConfigs;
this.requestUrl = lookupConfigs.getUrl();
this.headers = lookupConfigs.getHeaders();
this.requestMethod = lookupConfigs.getRequestMethod();
this.postBody = lookupConfigs.getBody();
this.outputFormat = StringUtils.isAnyBlank(this.postBody) ? "JSON" : "";
this.charset = lookupConfigs.getCharset();
this.columnData = columnData;
this.fieldColumnNames = lookupConfigs.getColumnNames();
this.pageQueryEnabled = lookupConfigs.isPageQueryEnabled();
this.pageQueryKey = lookupConfigs.getPageQueryKey();
this.connectTimeout = Duration.ofMillis(lookupConfigs.getConnectTimeout());
this.readTimeout = Duration.ofMillis(lookupConfigs.getReadTimeout());
this.cacheMaxSize = lookupConfigs.getCacheMaxSize();
this.cacheExpireMs = lookupConfigs.getCacheTimeToLive();
this.maxRetryTimes = lookupConfigs.getMaxRetryTimes() == 0 ? 3 : lookupConfigs.getMaxRetryTimes();
this.failInterpret = lookupConfigs.isFailInterpret();
this.cacheEnabled = lookupConfigs.isUseCache();
this.doInput = lookupConfigs.isDoInput();
this.doOutput = lookupConfigs.isDoOutput();
this.lookupRespColumn = lookupConfigs.getLookupRespColumn();
}
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
this.cache = this.cacheMaxSize == -1 || cacheExpireMs == -1
? null
: CacheBuilder.newBuilder().expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
}
/**
* lookup 过程:
* 1> 获得 joined keys
* 2> 使用 joined keys 查询 lookup 表
* 3> 构建返回值(RowData)
* 4> 返回值 RowData 中 join on 条件校验
*/
public void eval(Object... joinedAndSortedKeys) throws Exception {
// find the relationship of joined key name and value.
Map<String, Tuple2<?, Integer>> keyNameAndValPosMap = zipColumnDataKeyNameAndJoinedValue(this.columnData, joinedAndSortedKeys);
GenericRowData keyRow = GenericRowData.of(joinedAndSortedKeys);
if (null != cache && this.cacheEnabled) {
List<RowData> cacheRows = cache.getIfPresent(keyRow);
if (null != cacheRows)
cacheRows.forEach(this::collect);
return;
}
// if caches is none
ArrayList<RowData> rows = new ArrayList<>();
int pageNum = 1;
// https://example.com/api/resources?page=2&size=10
while (this.pageQueryEnabled || pageNum == 1) {
if (this.pageQueryEnabled && requestMethod.equals(TableUtils.UserTypes.GET.name())) {
this.requestUrl = TableUtils.URIFormat(this.requestUrl, this.pageQueryKey, pageNum, LOG);
}
int alreadyTimes = 0;
while (alreadyTimes <= maxRetryTimes) {
boolean succ;
try {
++alreadyTimes;
HttpURLConnection conn = null;
// headersFormat(urlFormat, joinedAndSortedKeys);
headersAndUrlBodyFormat(keyNameAndValPosMap);
conn = getConn(requestUrl);
String errMsg = "unknown request method: {} !" + requestMethod;
switch (requestMethod) {
case "GET":
LOG.info("[LOOKUP]: request method: {} , url: {}", requestMethod, requestUrl);
break;
case "POST":
String data = postBody;
LOG.info("[LOOKUP]: request method: {} , data: {}", requestMethod, data);
try (OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), this.charset)) {
writer.write(data);
} catch (IOException e) {
errMsg = "[LOOKUP]: send response data error! " + e.getMessage();
LOG.error(errMsg, e);
throw new IOException(errMsg);
}
break;
default:
LOG.error(errMsg);
throw new UnSupportedException(errMsg);
}
// to solve response data
Tuple2<Boolean, String> httpStreamReadRes = TableUtils.responseStreamHandler(keyRow, conn, headers, requestUrl, LOG);
succ = httpStreamReadRes.f0;
JSONArray resJsonObjs = constructJson(httpStreamReadRes.f1);
if (null != cache && cacheEnabled)
// refresh cache rows
cache.put(keyRow, rows);
if (!(null == resJsonObjs || resJsonObjs.isEmpty())) {
// 获得每个返回值对应的字段的 pos, 这里我们统一使用一个ROW类型的字段接收用户的lookup返回值,这样做的好处是可以接收任意格式的JSON
if (fieldColumnNames.contains(this.lookupRespColumn)) {
List<RowType.RowField> rowFieldList = ((RowType) this.columnData.getPhysicalRowDataType().getLogicalType()).getFields();
List<RowType.RowField> filteredRowFiled = rowFieldList
.stream().filter(rowField -> rowField.getName().equals(lookupRespColumn)
&& rowField.getType().getTypeRoot().name().equals(TableUtils.UserTypes.ROW.name()))
.collect(Collectors.toList());
if (filteredRowFiled.size() == 0)
throw new UnSupportedLogicalTypeException(String.format("%s must be 'ROW' type.", lookupRespColumn));
// construct result and collect
RowType.RowField resultRowColumn = filteredRowFiled.get(0);
int resRowIndex = rowFieldList.indexOf(resultRowColumn);
// RowType rowType = new RowType(Collections.singletonList(resultRowColumn));
RowType rowType = (RowType) resultRowColumn.getType();
// rowDataSchema
JsonRowDataDeserializationSchema deSchema = deSerSchema(rowType);
resJsonObjs.forEach(jsonObj -> {
try {
// this row data shall contain all columns, including physical and compute columns
GenericRowData genericRowData = new GenericRowData(rowFieldList.size());
byte[] jsonBytes = jsonObj.toString().getBytes(null == this.charset
? HttpOptions.OUTPUT_CHARSET.defaultValue()
: this.charset);
RowData rowData = deSchema.deserialize(jsonBytes);
genericRowData.setField(resRowIndex, rowData);
keyNameAndValPosMap.forEach((key, value) -> genericRowData.setField(value.f1, value.f0));
LOG.debug("genericRowData: " + genericRowData);
// System.err.println("genericRowData: " + genericRowData);
collect(genericRowData);
rows.add(genericRowData);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
e.printStackTrace();
}
});
}
}
} catch (Exception e) {
succ = false;
if (failInterpret) {
LOG.error(e.getMessage(), e);
throw e;
}
LOG.error(e.getMessage(), e);
}
if (succ)
break;
else
LOG.warn(alreadyTimes + " times failed, retry!");
}
pageNum++;
}
if (null != cache && cacheEnabled)
cache.put(keyRow, rows);
}
public JSONArray constructJson(String content) {
return JSONArray.parseArray(content.startsWith("[") && content.endsWith("]")
? content : "[" + content + "]");
}
@Deprecated
public JSONArray constructJsonWithRespColumnKey(String content) {
JSONArray array = new JSONArray();
if (!content.trim().startsWith("[")) {
JSONObject jsonO = new JSONObject();
jsonO.put(this.lookupRespColumn, JSONObject.parseObject(content));
array.add(jsonO);
} else {
JSONArray jsonArray = JSONObject.parseArray(content);
for (Object o : jsonArray) {
JSONObject jsonO = new JSONObject();
JSONObject jsonObject = JSONObject.parseObject(o.toString());
jsonO.put(this.lookupRespColumn, jsonObject);
array.add(jsonO);
}
}
return array;
}
private HttpURLConnection getConn(String urlStr) throws Exception {
return getHttpURLConnection(urlStr, doOutput, doInput, requestMethod, this.connectTimeout, this.readTimeout, this.cacheEnabled, this.headers);
}
static HttpURLConnection getHttpURLConnection(String urlStr, boolean doOutput, boolean doInput, String requestMethod, Duration connectTimeout, Duration readTimeout, boolean cacheEnabled, Map<String, String> headers) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setDoOutput(doOutput);
httpURLConnection.setDoInput(doInput);
httpURLConnection.setRequestMethod(requestMethod);
httpURLConnection.setConnectTimeout((int) connectTimeout.toMillis());
httpURLConnection.setReadTimeout((int) readTimeout.toMillis());
httpURLConnection.setUseCaches(cacheEnabled);
if (headers != null)
headers.forEach(httpURLConnection::setRequestProperty);
return httpURLConnection;
}
/**
* joined and sorted key names
*/
public static class PhysicalJoinedColumnData implements Serializable {
private final String[] joinedKeyNames;
private final DataType PhysicalRowDataType;
private final DataType fieldRowDataType; // 包含全部的字段及类型
public PhysicalJoinedColumnData(String[] joinedKeyNames, DataType fieldRowDataType, DataType physicalRowDataType) {
this.joinedKeyNames = joinedKeyNames;
PhysicalRowDataType = physicalRowDataType;
this.fieldRowDataType = fieldRowDataType;
}
public String[] getJoinedKeyNames() {
return joinedKeyNames;
}
public DataType getPhysicalRowDataType() {
return PhysicalRowDataType;
}
public DataType getFieldRowDataType() {
return fieldRowDataType;
}
}
@Override
public void close() throws Exception {
super.close();
}
public static HttpLookupFunctionBuilder builder() {
return new HttpLookupFunctionBuilder();
}
/**
* @param rowData PhysicalJoinedColumnData --> which columns' name were sorted as the same as joinedSortedValues
* @param joinedSortedValues --> sorted joined keys' value
* @return ConcurrentHashMap --> key --> (value, physicalColPos)
*/
public Map<String, Tuple2<?, Integer>> zipColumnDataKeyNameAndJoinedValue(PhysicalJoinedColumnData rowData, Object[] joinedSortedValues) {
List<String> physicalFieldNames = TableUtils.getFieldNames(rowData.PhysicalRowDataType);
return new BiFunction<PhysicalJoinedColumnData, Object[], Map<String, Tuple2<?, Integer>>>() {
final ConcurrentHashMap<String, Tuple2<?, Integer>> keyNameAndValMap = new ConcurrentHashMap<>();
@Override
public Map<String, Tuple2<?, Integer>> apply(PhysicalJoinedColumnData physicalJoinedColumnData, Object[] joinedKeys) {
for (int index = 0; index < joinedKeys.length; index++) {
String joinedKeyName = rowData.getJoinedKeyNames()[index];
keyNameAndValMap.put(joinedKeyName, Tuple2.of(joinedKeys[index], physicalFieldNames.indexOf(joinedKeyName)));
}
return keyNameAndValMap;
}
}.apply(rowData, joinedSortedValues);
}
private void headersAndUrlBodyFormat(Map<String, Tuple2<?, Integer>> richData) {
// Object[] rowInfo = toExternal(row);
for (Map.Entry<String, Tuple2<?, Integer>> richEntry : richData.entrySet()) {
String rickK = richEntry.getKey();
Object rickV = null == richEntry.getValue().f0 ? "null" : richEntry.getValue().f0;
this.headers.forEach((k, v) -> headers.put(k, v.replaceAll("##" + rickK + "##", rickV.toString())));
this.requestUrl = this.requestUrl.replaceAll("##" + rickK + "##", rickV.toString());
if (null != this.postBody)
this.postBody = this.postBody.replaceAll("##" + rickK + "##", rickV.toString());
}
}
private static JsonRowDataDeserializationSchema deSerSchema(RowType rowType) {
if (null == deSerSchema)
deSerSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
false,
TimestampFormat.SQL
);
return deSerSchema;
}
public static class HttpLookupFunctionBuilder {
private PhysicalJoinedColumnData columnData;
private HttpConfigs lookupConfigs;
public HttpLookupFunctionBuilder setColumnData(PhysicalJoinedColumnData columnData) {
this.columnData = columnData;
return this;
}
public HttpLookupFunctionBuilder setLookupConfigs(HttpConfigs lookupConfigs) {
this.lookupConfigs = lookupConfigs;
return this;
}
public HttpLookupFunction build() {
return new HttpLookupFunction(this.columnData, this.lookupConfigs);
}
}
}
六: 所用到的我自己定义的工具类, TableUtils
:
package com.lee.connector.http.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lee.connector.http.exception.UnSupportedLogicalTypeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TableUtils {
public static List<String> getFieldNames(DataType dataType) {
LogicalType logicalType = dataType.getLogicalType();
return LogicalTypeChecks.isCompositeType(logicalType)
? LogicalTypeChecks.getFieldNames(logicalType)
: Collections.emptyList();
}
public static GenericRowData buildGenericRowData(List<?> values) {
Function<List<?>, GenericRowData> transFunction = v -> {
GenericRowData res = new GenericRowData(v.size());
for (int i = 0; i < v.size(); i++) {
res.setField(i, v.get(i));
}
return res;
};
return transFunction.apply(values);
}
public static GenericRowData buildEmptyRow(int size) {
return new GenericRowData(size);
}
/**
* @param lineRow RowData
* @return colName -> (value, pos)
*/
public static Map<String, Tuple2<?, Integer>> buildRichDataRelation(RowData lineRow, DataType physicalDataType) throws UnSupportedLogicalTypeException {
LogicalType logicalType = physicalDataType.getLogicalType();
if (!logicalType.getTypeRoot().name().equals(UserTypes.ROW.name())) {
throw new UnSupportedLogicalTypeException("Can only support for [RowType], but found: " + logicalType.getTypeRoot().name());
}
ConcurrentHashMap<String, Tuple2<?, Integer>> richDataMap = new ConcurrentHashMap<>();
List<RowType.RowField> rowFields = ((RowType) logicalType).getFields();
if (lineRow.getArity() != rowFields.size()) {
throw new InternalError(String.format("Internal error, this is a bug!, rowData: %s, rowData length: %s, rowFields: %s",
lineRow,
lineRow.getArity(),
StringUtils.join(rowFields.stream()
.map((Function<RowType.RowField, Object>) RowType.RowField::getName)
.collect(Collectors.toList()))
));
}
for (int pos = 0; pos < rowFields.size(); pos++)
// name, value, pos --> lineRow pos is the same as rowFields pos, so using pos to get field name and field value
richDataMap.put(rowFields.get(pos).getName(), Tuple2.of(getRowDataValue(rowFields.get(pos).getType(), lineRow, pos), pos));
return richDataMap;
}
public static <T> Tuple2<Boolean, String> responseStreamHandler(T input, HttpURLConnection conn, Map<String, String> headers, String requestUrl, Logger LOG) throws IOException {
int respCode = conn.getResponseCode();
String readRes;
try (BufferedReader bufferedInputReader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
BufferedReader bufferedErrorReader = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
readRes = bufferedLineReader(input, respCode, conn, respCode == 200 ? bufferedInputReader : bufferedErrorReader, headers, requestUrl, LOG);
} finally {
conn.disconnect();
}
return Tuple2.of(respCode == 200, readRes);
}
public static <T> String bufferedLineReader(T input, int respCode, HttpURLConnection conn, BufferedReader bufferedReader, Map<String, String> headers, String requestUrl, Logger LOG) throws IOException {
String inputLine;
StringBuilder builder = new StringBuilder();
while ((inputLine = bufferedReader.readLine()) != null)
builder.append(inputLine);
String msg = "Http Response Code: " + respCode
+ ", Response Massage:" + conn.getResponseMessage() + ", "
+ builder + ", Submitted RowData: "
+ input.toString()
+ ", Request Url: " + requestUrl + ", Http Headers:" + headers;
if (respCode >= 200 && respCode < 300)
LOG.info(msg);
else
LOG.error(msg);
return builder.toString();
}
/**
* +--------------------------------+-----------------------------------------+
* | SQL Data Types | Internal Data Structures |
* +--------------------------------+-----------------------------------------+
* | BOOLEAN | boolean |
* +--------------------------------+-----------------------------------------+
* | CHAR / VARCHAR / STRING | StringData |
* +--------------------------------+-----------------------------------------+
* | BINARY / VARBINARY / BYTES | byte[] |
* +--------------------------------+-----------------------------------------+
* | DECIMAL | DecimalData |
* +--------------------------------+-----------------------------------------+
* | TINYINT | byte |
* +--------------------------------+-----------------------------------------+
* | SMALLINT | short |
* +--------------------------------+-----------------------------------------+
* | INT | int |
* +--------------------------------+-----------------------------------------+
* | BIGINT | long |
* +--------------------------------+-----------------------------------------+
* | FLOAT | float |
* +--------------------------------+-----------------------------------------+
* | DOUBLE | double |
* +--------------------------------+-----------------------------------------+
* | DATE | int (number of days since epoch) |
* +--------------------------------+-----------------------------------------+
* | TIME | int (number of milliseconds of the day) |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP | TimestampData |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP WITH LOCAL TIME ZONE | TimestampData |
* +--------------------------------+-----------------------------------------+
* | INTERVAL YEAR TO MONTH | int (number of months) |
* +--------------------------------+-----------------------------------------+
* | INTERVAL DAY TO MONTH | long (number of milliseconds) |
* +--------------------------------+-----------------------------------------+
* | ROW / structured types | RowData |
* +--------------------------------+-----------------------------------------+
* | ARRAY | ArrayData |
* +--------------------------------+-----------------------------------------+
* | MAP / MULTISET | MapData |
* +--------------------------------+-----------------------------------------+
* | RAW | RawValueData |
* +--------------------------------+-----------------------------------------+
*
* @param logicalType Sql Data type
* @param record GenericRowData/BinaryRowData ...
* @param pos position
* @return Internal Data Structure Type
*/
public static Object getRowDataValue(LogicalType logicalType, RowData record, int pos) {
if (record.isNullAt(pos)) {
return null;
} else {
switch (logicalType.getTypeRoot()) {
case BOOLEAN:
return record.getBoolean(pos);
case TINYINT:
return record.getByte(pos);/* Bytes mean tinyint*/
case SMALLINT:
return record.getShort(pos);
case INTEGER:
return record.getInt(pos);
case FLOAT:
return record.getFloat(pos);
case DOUBLE:
return record.getDouble(pos);
case CHAR:
case VARCHAR:
return record.getString(pos);
case DATE:
// long 类型转为date 类型
return Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos)));
case TIME_WITHOUT_TIME_ZONE:
int timestampPrecision = ((TimestampType) logicalType).getPrecision();
return record.getTimestamp(pos, timestampPrecision).toTimestamp();
case DECIMAL:
int decimalPrecision = ((DecimalType) logicalType).getPrecision();
int decimalScale = ((DecimalType) logicalType).getScale();
return record.getDecimal(pos, decimalPrecision, decimalScale);
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
}
public static boolean isValidJSON(final String json, Logger LOG) {
boolean valid = false;
try {
final ObjectMapper mapper = new ObjectMapper();
mapper.readTree(json);
valid = true;
} catch (Exception e) {
// Ignore exceptions
LOG.error(e.getMessage(), e);
}
return valid;
}
public static String URIFormat(String url, String pageQueryKey, int pageNum, Logger LOG) {
if(null == pageQueryKey){
throw new NullPointerException("page.query.enabled was set 'true' but page.query.key is null, do you means page or pageNum ?");
}
URI uri;
String urlRes = url;
try {
uri = new URI(url);
String query = uri.getQuery();
if(null == query){
throw new URISyntaxException(url, "Bad URL, please recheck it!");
}
Scanner scanner = new Scanner(query);
scanner.useDelimiter("&");
while (scanner.hasNext()) {
String[] param = scanner.next().split("=");
if (param[0].equals(pageQueryKey)) {
urlRes = url.replaceAll(param[1], String.valueOf(pageNum));
}
}
} catch (URISyntaxException e) {
LOG.error("bad URL: {}, " + e.getMessage(), url, e);
}
return urlRes;
}
public static void main(String[] args) throws IOException {
RowType rowType = new RowType(Arrays.asList(
new RowType.RowField("k1", DataTypes.STRING().getLogicalType()),
new RowType.RowField("k2", DataTypes.STRING().getLogicalType()),
new RowType.RowField("k3", DataTypes.INT().getLogicalType()),
new RowType.RowField("k4", new RowType(Arrays.asList(
new RowType.RowField("kk1", DataTypes.STRING().getLogicalType()),
new RowType.RowField("kk2", DataTypes.STRING().getLogicalType()),
new RowType.RowField("kk3", new ArrayType(new RowType(Arrays.asList(
new RowType.RowField("kkk1", DataTypes.STRING().getLogicalType()),
new RowType.RowField("kkk2", DataTypes.STRING().getLogicalType())
))))
)))
));
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.SQL
);
String jsonString = "{...}"; // 你的 JSON 数据
byte[] jsonBytes = jsonString.getBytes(StandardCharsets.UTF_8);
RowData rowData = deserializationSchema.deserialize(jsonBytes);
System.out.println(rowData);
}
public enum UserTypes {
GET,
POST,
JSON,
ROW;
public static String getCharSetName() {
return "UTF-8";
}
}
}
七:构建http table options
需要的类: HttpOptions
,代码如下:
package com.lee.connector.http.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.kerby.config.Conf;
import java.time.Duration;
public class HttpOptions {
private static final String IDENTIFIER = "http";
public static final String PROPERTIES_HEADER_PREFIX = "properties.headers.";
public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
.stringType().defaultValue(IDENTIFIER).withDescription("the connector's name");
public static final ConfigOption<String> REQUEST_URL = ConfigOptions.key("request.url")
.stringType().noDefaultValue().withDescription("http request url");
public static final ConfigOption<String> REQUEST_METHOD = ConfigOptions.key("request.method")
.stringType().noDefaultValue().withDescription("http request method");
public static final ConfigOption<String> REQUEST_TYPE = ConfigOptions.key("request.type").stringType()
.noDefaultValue().withDescription("the http request type, support for GET or POST currently. ");
// & is a placeholder
public static final ConfigOption<String> PROPERTIES_HEADER = ConfigOptions.key(String.format("%s.&", "properties.headers"))
.stringType()
.defaultValue("properties.header.application/json")
.withDescription("the properties.header of http, default properties.header.application/json");
public static final ConfigOption<Boolean> FAIL_INTERPRET = ConfigOptions.key("fail.interpret").booleanType()
.defaultValue(true).withDescription("interpret or not when failed");
public static final ConfigOption<Integer> FAIL_MAX_RETRIES = ConfigOptions.key("fail.max-retries").intType()
.defaultValue(3).withDescription("fail.max-retries");
// connect.timeout
public static final ConfigOption<Duration> CONNECT_TIMEOUT = ConfigOptions.key("connect.timeout")
.durationType().defaultValue(Duration.ofMillis(10000L)).withDescription("connection timeout, default is 10 seconds");
// read.timeout
public static final ConfigOption<Duration> READ_TIMEOUT = ConfigOptions.key("read.timeout")
.durationType().defaultValue(Duration.ofMillis(10000L))
.withDescription("read timeout, default is 10 seconds");
// output.format
public static final ConfigOption<String> OUTPUT_FORMAT = ConfigOptions.key("output.format")
.stringType().defaultValue("JSON").withDescription("only support for json currently");
// do input
public static final ConfigOption<Boolean> PROPERTIES_DO_INPUT = ConfigOptions.key("do.input")
.booleanType()
.defaultValue(true)
.withDescription("if the url connection use for input, default true");
// do output
public static final ConfigOption<Boolean> PROPERTIES_DO_OUTPUT = ConfigOptions.key("do.output")
.booleanType()
.noDefaultValue()
.withDescription("if the url connection use for output, no default value");
// cache.enabled
public static final ConfigOption<Boolean> CACHE_ENABLED = ConfigOptions.key("cache.enabled")
.booleanType().defaultValue(true).withDescription("weather or not to use cache, default true");
// cache.max-size
public static final ConfigOption<Integer> CACHE_MAX_SIZE = ConfigOptions.key("cache.max-size")
.intType().noDefaultValue().withDescription("when use cache, how many records need to be cached");
// cache.ttl
public static final ConfigOption<Duration> CACHE_TTL = ConfigOptions.key("cache.expire-ms")
.durationType().defaultValue(Duration.ofMillis(-1))
.withDescription("cache ttl with millis");
// post.body
public static final ConfigOption<String> POST_BODY = ConfigOptions.key("post.body")
.stringType().noDefaultValue().withDescription("the post body when request type is POST");
// output.charset
public static final ConfigOption<String> OUTPUT_CHARSET = ConfigOptions.key("output.charset")
.stringType().defaultValue("UTF-8").withDescription("output charset");
// page.query.enabled
public static final ConfigOption<Boolean> PAGE_QUERY_ENABLED = ConfigOptions.key("page.query.enabled")
.booleanType().defaultValue(false).withDescription("Pagination Query Enabled");
// page.query.key
public static final ConfigOption<String> PAGE_QUERY_KEY = ConfigOptions.key("page.query.key")
.stringType().noDefaultValue().withDescription("page.query.key, egg: pageNum");
// lookup.resp.column
public static final ConfigOption<String> LOOKUP_RESP_COLUMN = ConfigOptions.key("lookup.resp.column")
.stringType().noDefaultValue().withDescription("the lookup response column, in order to receive the lookup query response, which logical type is 'ROW'");
}
八:构建 lookup function
和 source function
时,HttpDynamicTableSink
、HttpDynamicTableSource
经常用到的配置类:HttpConfigs
:
package com.lee.connector.http.config;
import org.apache.flink.table.types.DataType;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class HttpConfigs implements Serializable {
private String url;
private int connectTimeout;
private int readTimeout;
private String body;
private String requestMethod;
private String outputFormat;
private String charset;
private boolean useCache;
private boolean doInput;
private boolean doOutput;
private boolean pageQueryEnabled;
private String pageQueryKey;
private String lookupRespColumn;
private Map<String, String> headers = Collections.emptyMap();
private final List<String> aliasPaths = Collections.emptyList();
private final List<String> argList = Collections.emptyList();
private List<String> columnNames = Collections.emptyList();
private DataType fieldsDataType;
private DataType physicalDataType;
// lookup cache
/**
* 最大缓存数
*/
private int cacheMaxSize = -1;
/**
* lookup 缓存数据状态过期时间
*/
private long cacheTimeToLive = -1;
/**
* 失败重试次数
*/
private int maxRetryTimes = 3;
/**
* 失败是否中断作业
*/
private boolean failInterpret = true;
public HttpConfigs(String url, int connectTimeout, int readTimeout, String body, String requestMethod, String outputFormat, String charset, boolean useCache, boolean doInput, boolean doOutput, boolean pageQueryEnabled, String pageQueryKey, String lookupRespColumn, Map<String, String> headers, List<String> columnNames, DataType fieldsDataType, DataType physicalDataType, int cacheMaxSize, long cacheTimeToLive, int maxRetryTimes, boolean failInterpret) {
this.url = url;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.body = body;
this.requestMethod = requestMethod;
this.outputFormat = outputFormat;
this.charset = charset;
this.useCache = useCache;
this.doInput = doInput;
this.doOutput = doOutput;
this.pageQueryEnabled = pageQueryEnabled;
this.pageQueryKey = pageQueryKey;
this.lookupRespColumn = lookupRespColumn;
this.headers = headers;
this.columnNames = columnNames;
this.fieldsDataType = fieldsDataType;
this.physicalDataType = physicalDataType;
this.cacheMaxSize = cacheMaxSize;
this.cacheTimeToLive = cacheTimeToLive;
this.maxRetryTimes = maxRetryTimes;
this.failInterpret = failInterpret;
}
public String getUrl() {
return url;
}
public int getConnectTimeout() {
return connectTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
public String getBody() {
return body;
}
public String getRequestMethod() {
return requestMethod;
}
public String getOutputFormat() {
return outputFormat;
}
public boolean isPageQueryEnabled() {
return pageQueryEnabled;
}
public String getPageQueryKey() {
return pageQueryKey;
}
public String getLookupRespColumn() {
return lookupRespColumn;
}
public String getCharset() {
return charset;
}
public boolean isUseCache() {
return useCache;
}
public boolean isDoInput() {
return doInput;
}
public boolean isDoOutput() {
return doOutput;
}
public Map<String, String> getHeaders() {
return headers;
}
public List<String> getArgList() {
return argList;
}
public DataType getFieldsDataType() {
return fieldsDataType;
}
public DataType getPhysicalDataType() {
return physicalDataType;
}
public int getCacheMaxSize() {
return cacheMaxSize;
}
public long getCacheTimeToLive() {
return cacheTimeToLive;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public List<String> getColumnNames() {
return columnNames;
}
public boolean isFailInterpret() {
return failInterpret;
}
public List<String> getAliasPaths() {
return aliasPaths;
}
public static HttpLookupBuilder builder() {
return new HttpLookupBuilder();
}
public static class HttpLookupBuilder {
private String url;
private int connectTimeout;
private int readTimeout;
private String body;
private String requestMethod;
private String outputFormat;
private String charset;
private boolean useCache;
private boolean doInput;
private boolean doOutput;
private boolean pageQueryEnabled;
private String pageQueryKey;
private String lookupRespColumn;
private Map<String, String> headers = Collections.emptyMap();
private List<String> columnNames = Collections.emptyList();
private DataType fieldsDataType;
private DataType physicalDataType;
private int cacheMaxSize;
private long cacheTimeToLive;
private int maxRetryTimes;
private boolean failInterpret;
public HttpLookupBuilder setUrl(String url) {
this.url = url;
return this;
}
public HttpLookupBuilder setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
public HttpLookupBuilder setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
return this;
}
public HttpLookupBuilder setBody(String body) {
this.body = body;
return this;
}
public HttpLookupBuilder setMethod(String requestMethod) {
this.requestMethod = requestMethod;
return this;
}
public HttpLookupBuilder setOutputFormat(String outputFormat) {
this.outputFormat = outputFormat;
return this;
}
public HttpLookupBuilder setCharset(String charset) {
this.charset = charset;
return this;
}
public HttpLookupBuilder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}
public HttpLookupBuilder setDoInput(boolean doInput) {
this.doInput = doInput;
return this;
}
public HttpLookupBuilder setDoOutput(boolean doOutput) {
this.doOutput = doOutput;
return this;
}
public HttpLookupBuilder setFieldsDataType(DataType fieldsDataType) {
this.fieldsDataType = fieldsDataType;
return this;
}
public HttpLookupBuilder setPhysicalDataType(DataType physicalDataType) {
this.physicalDataType = physicalDataType;
return this;
}
public HttpLookupBuilder setCacheMaxSize(int cacheMaxSize) {
this.cacheMaxSize = cacheMaxSize;
return this;
}
public HttpLookupBuilder setCacheTimeToLive(long cacheTimeToLive) {
this.cacheTimeToLive = cacheTimeToLive;
return this;
}
public HttpLookupBuilder setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
return this;
}
public HttpLookupBuilder setFailInterpret(boolean failInterpret) {
this.failInterpret = failInterpret;
return this;
}
public HttpLookupBuilder setRequestMethod(String requestMethod) {
this.requestMethod = requestMethod;
return this;
}
public HttpLookupBuilder setPageQueryEnabled(boolean pageQueryEnabled) {
this.pageQueryEnabled = pageQueryEnabled;
return this;
}
public HttpLookupBuilder setPageQueryKey(String pageQueryKey) {
this.pageQueryKey = pageQueryKey;
return this;
}
public HttpLookupBuilder setLookupRespColumn(String lookupRespColumn) {
this.lookupRespColumn = lookupRespColumn;
return this;
}
public HttpLookupBuilder setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
}
public HttpLookupBuilder setColumnNames(List<String> columnNames) {
this.columnNames = columnNames;
return this;
}
public HttpConfigs build() {
return new HttpConfigs(
this.url,
this.connectTimeout,
this.readTimeout,
this.body,
this.requestMethod,
this.outputFormat,
this.charset,
this.useCache,
this.doInput,
this.doOutput,
this.pageQueryEnabled,
this.pageQueryKey,
this.lookupRespColumn,
this.headers,
this.columnNames,
this.fieldsDataType,
this.physicalDataType,
this.cacheMaxSize,
this.cacheTimeToLive,
this.maxRetryTimes,
this.failInterpret
);
}
}
@Override
public String toString() {
return "HttpConfigs{" +
"url='" + url + '\'' +
", connectTimeout=" + connectTimeout +
", readTimeout=" + readTimeout +
", body='" + body + '\'' +
", requestMethod='" + requestMethod + '\'' +
", outputFormat='" + outputFormat + '\'' +
", charset='" + charset + '\'' +
", useCache=" + useCache +
", doInput=" + doInput +
", doOutput=" + doOutput +
", pageQueryEnabled=" + pageQueryEnabled +
", pageQueryKey='" + pageQueryKey + '\'' +
", lookupRespColumn='" + lookupRespColumn + '\'' +
", headers=" + headers +
", aliasPaths=" + aliasPaths +
", argList=" + argList +
", columnNames=" + columnNames +
", fieldsDataType=" + fieldsDataType +
", physicalDataType=" + physicalDataType +
", cacheMaxSize=" + cacheMaxSize +
", cacheTimeToLive=" + cacheTimeToLive +
", maxRetryTimes=" + maxRetryTimes +
", failInterpret=" + failInterpret +
'}';
}
}
九:涉及到的异常类:ColumnNotFoundException
, UnSupportedException
, UnSupportedLogicalTypeException
ColumnNotFoundException
:
package com.lee.connector.http.exception;
public class ColumnNotFoundException extends Exception{
public ColumnNotFoundException(String msg) {
super(msg);
}
}
类:UnSupportedException
:
package com.lee.connector.http.exception;
public class UnSupportedException extends Exception{
public UnSupportedException(String msg) {
super(msg);
}
}
类:UnSupportedLogicalTypeException
代码:
package com.lee.connector.http.exception;
public class UnSupportedLogicalTypeException extends Exception{
public UnSupportedLogicalTypeException(String msg) {
super(msg);
}
}
十: 测试 http sink:
CREATE TABLE kafka_table
(
id String,
`name` String,
gender String,
age String,
score String,
eventT timestamp(3)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_test_0001',
'properties.bootstrap.servers' = '192.168.1.102:9092',
'scan.startup.mode' = 'latest-offset',
'properties.group.id' = 'group_id_test_01',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
-- http 作为`` 表时, %coln% 与字段名对应, 作为 lookup表时,与 on 的顺序对应!
CREATE TABLE HTTP_SINK_TABLE
(
res_col1 String,
res_col2 String,
eventT TIMESTAMP(3),
eventT2 as localtimestamp,
procTime as procTime(),
res_col3 String,
res_col4 as res_col1,
WATERMARK FOR eventT AS eventT - interval '0' second
) WITH (
'connector' = 'http',
-- test GET first
'request.method' = 'POST',
-- 'request.url' = 'http://localhost:8080/users/leeuser/##res_col3##',
'request.url' = 'http://localhost:8080/users/chenuser',
'read.timeout' = '60000',
'connect.timeout' = '60000',
-- 'output.format' = 'JSON',
'post.body' = '{"name" : "##res_col1##", "age": ##res_col2##, "gender": "##res_col3##"}',
'fail.interpret' = 'false',
'fail.max-retries' = '3',
'properties.headers.Content-type' = 'application/json',
'fail.max-retries' = '2'
);
insert into HTTP_SINK_TABLE(res_col1, res_col2, res_col3)
select 'leeston', '30', 'male' from kafka_table;
flinkUI 查看 jobGraph:
日志查看sink 触发http 请求的结果:
http lookup 表sql示例:
CREATE TABLE kafka_table
(
id String,
`name` String,
gender String,
age String,
score String,
eventT timestamp(3)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_test_0001',
'properties.bootstrap.servers' = '192.168.1.102:9092',
'scan.startup.mode' = 'latest-offset',
'properties.group.id' = 'group_id_test_01',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
CREATE TABLE HTTP_LOOKUP_TABLE
(
res_col1 String,
res_col2 String,
eventT TIMESTAMP(3),
eventT2 as localtimestamp,
-- procTime as procTime(),
res_col3 String,
col_res_row ROW<name String, gender String, age int>
) WITH (
'connector' = 'http',
'request.method' = 'POST',
'request.url' = 'http://localhost:8080/users/chenuser',
'read.timeout' = '60000',
'connect.timeout' = '60000',
-- ##col## : col must be in on condition
'post.body' = '{"name" : "##res_col1##", "age": ##res_col2##, "gender": "##res_col3##"}',
'fail.interpret' = 'true',
'properties.headers.Content-type' = 'application/json',
-- 'page.query.enabled' = 'true',
'fail.max-retries' = '2',
'cache.enabled' = 'false',
'output.format' = 'JSON',
'cache.max-size' = '1000',
'cache.expire-ms' = '50000',
'lookup.resp.column' = 'col_res_row'
);
-- HTTP LOOKUP TABLE TEST
select a.col1,a.col2,a.col3, a.procTime, b.col_res_row.gender,b.* from (
select '30' as col1,
'male' as col2,
'leeston' as col3,
procTime() as procTime
from kafka_table
) a
left join HTTP_LOOKUP_TABLE FOR SYSTEM_TIME AS OF a.procTime as b
-- +I(30,male,leeston)
on a.col3 = b.res_col1
and a.col1 = b.res_col2
and a.col2 = b.res_col3
lookup function 关联查询的结果:
ResultKind: SUCCESS_WITH_CONTENT
ResolvedSchema: (
`col1` CHAR(2) NOT NULL,
`col2` CHAR(4) NOT NULL,
`col3` CHAR(7) NOT NULL,
`procTime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
`gender` STRING,
`res_col1` STRING,
`res_col2` STRING,
`eventT` TIMESTAMP(3),
`eventT2` TIMESTAMP(3),
`res_col3` STRING,
`col_res_row` ROW<`name` STRING, `gender` STRING, `age` INT>
)
+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | col1 | col2 | col3 | procTime | gender | res_col1 | res_col2 | eventT | eventT2 | res_col3 | col_res_row |
+----+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 30 | male | leeston | 2023-10-14 01:55:37.094 | male_res | leeston | 30 | <NULL> | 2023-10-14 01:55:37.014 | male | (leeston, male_res, 31) |
flink的 webUI:
日志详情:
其他输出详情略..
思考:上述实现的 Http 连接器只能用于 lookup join 和 sink, 如果我需要自定义一款source 连接器,比如 socket, 该如何处理呢? 读者可参考 datagen 连接器或者 kafka 连接器或者 hudi 连接器,它们三者均实现了 source 功能,但实现方式有很大出入,这里不再赘述。
注:上述分享省略了90%以上的代码解释和源码处理流程详解,对自定义Flink Table/SQL 连接器感兴趣的可直接联系我,咱们再深入探讨,所有代码均为我一行行手写,非本人许可,请勿用于商业用途,谢谢。