上一篇文章我们写了怎么去修改Seatunnel代码,并提交PR,详情请查看手把手教你贡献一个Seatunnel Connector PR | 新增jtds方式链接sqlserver
今天我们接着来写一个全新的 Seatunnel Connector : Access 连接器。
话不多说,我们直接上代码:
我们可以看到写一个简单的新的connector只要包括client,config,exception,source,sink,util六个module。
一、编写代码
1. client:
client主要用来获取数据库链接,获取数据库元数据等。
AccessClient.java
package org.apache.seatunnel.connectors.seatunnel.access.client;
import java.io.Serializable;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
public class AccessClient implements Serializable {
private String driver;
private String url;
private String username;
private String password;
private String query;
private Connection connection;
public AccessClient(String driver, String url, String username, String password, String query) {
this.driver = driver;
this.url = url;
this.username = username;
this.password = password;
this.query = query;
}
public Connection getAccessConnection(String url, String username, String password) {
try {
Class.forName(driver);
this.connection = DriverManager.getConnection(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
return this.connection;
}
public ResultSetMetaData selectMetaData() throws Exception {
connection = this.getAccessConnection(url, username, password);
Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(query);
ResultSetMetaData metaData = result.getMetaData();
statement.close();
return metaData;
}
public ResultSetMetaData getTableSchema(String tableName) throws Exception {
connection = this.getAccessConnection(url, username, password);
Statement statement = connection.createStatement();
ResultSet result =
statement.executeQuery(String.format("select * from %s limit 1", tableName));
ResultSetMetaData metaData = result.getMetaData();
statement.close();
return metaData;
}
}
2. config
该模块用来定义配置文件需要读的关键词,比如driver、url、username、password等
AccessConfig.java
package org.apache.seatunnel.connectors.seatunnel.access.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
public class AccessConfig {
public static final Integer DEFAULT_BATCH_SIZE = 5000;
public static final Option<String> DRIVER =
Options.key("driver").stringType().noDefaultValue().withDescription("driver");
public static final Option<String> URL =
Options.key("url").stringType().noDefaultValue().withDescription("url");
public static final Option<String> USERNAME =
Options.key("username").stringType().noDefaultValue().withDescription("username");
public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");
public static final Option<String> QUERY =
Options.key("query").stringType().noDefaultValue().withDescription("query");
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("table");
public static final Option<String> FIELDS =
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("fields");
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(DEFAULT_BATCH_SIZE)
.withDescription("");
}
AccessParameters.java
package org.apache.seatunnel.connectors.seatunnel.access.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.List;
public class AccessParameters implements Serializable {
private String driver;
private String url;
private String username;
private String password;
private String query;
private List<String> fields;
private String table;
private Integer batchSize;
public void buildWithConfig(Config config) {
this.driver = config.getString(AccessConfig.DRIVER.key());
this.url = config.getString(AccessConfig.URL.key());
if (config.hasPath(AccessConfig.USERNAME.key())) {
this.username = config.getString(AccessConfig.USERNAME.key());
}
if (config.hasPath(AccessConfig.PASSWORD.key())) {
this.password = config.getString(AccessConfig.PASSWORD.key());
}
if (config.hasPath(AccessConfig.QUERY.key())) {
this.query = config.getString(AccessConfig.QUERY.key());
}
if (config.hasPath(AccessConfig.FIELDS.key())) {
this.fields = config.getStringList(AccessConfig.FIELDS.key());
}
if (config.hasPath(AccessConfig.TABLE.key())) {
this.table = config.getString(AccessConfig.TABLE.key());
}
if (config.hasPath(AccessConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(AccessConfig.BATCH_SIZE.key());
} else {
this.batchSize = AccessConfig.BATCH_SIZE.defaultValue();
}
}
}
3. exception
该模块主要用来定义代码中可能出现的异常信息,定义出出错误代码,错误信息等。
AccessConnectorErrorCode.java
package org.apache.seatunnel.connectors.seatunnel.access.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum AccessConnectorErrorCode implements SeaTunnelErrorCode {
FIELD_NOT_IN_TABLE("ACCESS-01", "Field is not existed in target table"),
CLOSE_CQL_CONNECT_FAILED("ACCESS-03", "Close connect of access failed");
private final String code;
private final String description;
AccessConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
public String getCode() {
return null;
}
public String getDescription() {
return null;
}
}
AccessConnectorException.java
package org.apache.seatunnel.connectors.seatunnel.access.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class AccessConnectorException extends SeaTunnelRuntimeException {
public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}
public AccessConnectorException(
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}
public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
}
}
4. util
util主要用来封装一些通用方法,像类型转换等
TypeConvertUtil.java
package org.apache.seatunnel.connectors.seatunnel.access.util;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;
import org.apache.commons.lang.StringUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.sql.ResultSetMetaData;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.Arrays;
import java.util.HashSet;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.TABLE;
public class TypeConvertUtil {
public static SeaTunnelDataType<?> convert(String type) {
switch (type) {
case "INTEGER":
case "BIGINT":
return BasicType.INT_TYPE;
case "VARCHAR":
return BasicType.STRING_TYPE;
case "DECIMAL":
return BasicType.DOUBLE_TYPE;
case "TIMESTAMP":
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case "BOOLEAN":
return BasicType.BOOLEAN_TYPE;
default:
throw new AccessConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported this data type: " + type);
}
}
public static Object convertToObject(ResultSet result, String columnName, String columnType) {
Object value;
if ("INTEGER".equals(columnType) || "BIGINT".equals(columnType)) {
try {
value = result.getInt(columnName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
} else if ("VARCHAR".equals(columnType)) {
try {
value = result.getString(columnName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
} else if ("DECIMAL".equals(columnType)) {
try {
value = result.getDouble(columnName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
} else if ("TIMESTAMP".equals(columnType)) {
try {
value = result.getString(columnName);
if (null == value) {
} else {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = dateFormat.parse((String) value).getTime();
Instant instant = Instant.ofEpochMilli(timestamp);
value = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
}
} catch (SQLException | ParseException e) {
throw new RuntimeException(e);
}
} else if ("BOOLEAN".equals(columnType)) {
try {
value = result.getBoolean(columnName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
} else {
try {
value = result.getObject(columnName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
return value;
}
public static void reconvertAndInject(
PreparedStatement statement, int index, String type, Object fieldValue) {
switch (type) {
case "INT":
case "INTEGER":
try {
statement.setInt(
index + 1,
null == fieldValue ? 0 : Integer.parseInt(fieldValue.toString()));
return;
} catch (SQLException e) {
throw new RuntimeException(e);
}
case "STRING":
try {
statement.setString(index + 1, null == fieldValue ? "" : fieldValue.toString());
return;
} catch (SQLException e) {
throw new RuntimeException(e);
}
case "DOUBLE":
try {
statement.setDouble(
index + 1,
null == fieldValue ? 0D : Double.parseDouble(fieldValue.toString()));
return;
} catch (SQLException e) {
throw new RuntimeException(e);
}
case "TIMESTAMP":
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
try {
if (null == fieldValue) {
statement.setTimestamp(index + 1, Timestamp.valueOf(LocalDateTime.MIN));
} else {
Date date = dateFormat.parse(fieldValue.toString().substring(0, 10));
statement.setTimestamp(index + 1, new Timestamp(date.getTime()));
}
return;
} catch (ParseException | SQLException e) {
throw new RuntimeException(e);
}
case "BOOLEAN":
try {
statement.setBoolean(
index + 1,
null == fieldValue ? null : new Boolean(fieldValue.toString()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public static SeaTunnelDataType<?>[] getSeaTunnelDataTypes(
AccessParameters accessParameters,
AccessClient accessClient,
Config pluginConfig,
String pluginName) {
String[] sqlColumns;
List<String> fields = accessParameters.getFields();
SeaTunnelDataType<?>[] seaTunnelDataTypes;
try {
ResultSetMetaData metaData =
accessClient.getTableSchema(pluginConfig.getString(TABLE.key()));
int columnCount = metaData.getColumnCount();
String columnString =
StringUtils.substringBetween(pluginConfig.getString(QUERY.key()), "(", ")");
sqlColumns = columnString.split(",");
String[] tableColumns = new String[metaData.getColumnCount()];
for (int i = 1; i <= columnCount; i++) {
tableColumns[i - 1] = metaData.getColumnName(i).toUpperCase();
}
seaTunnelDataTypes = new SeaTunnelDataType[sqlColumns.length];
Set<String> tableColumnSet = new HashSet<>(Arrays.asList(tableColumns));
int tableColumnSetSize = tableColumnSet.size();
if (fields == null || fields.isEmpty()) {
for (int j = 0; j < sqlColumns.length; j++) {
tableColumnSet.add(sqlColumns[j].trim().toUpperCase());
if (tableColumnSetSize == tableColumnSet.size()) {
accessParameters.setFields(Arrays.asList(sqlColumns));
for (int k = 1; k <= columnCount; k++) {
if (metaData.getColumnName(k)
.toUpperCase()
.equals(sqlColumns[j].trim().toUpperCase())) {
seaTunnelDataTypes[j] =
TypeConvertUtil.convert(metaData.getColumnTypeName(j + 1));
}
}
} else {
throw new AccessConnectorException(
AccessConnectorErrorCode.FIELD_NOT_IN_TABLE,
"Field "
+ sqlColumns[j]
+ " does not exist in table "
+ pluginConfig.getString(TABLE.key()));
}
}
}
} catch (Exception e) {
throw new AccessConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
pluginName, PluginType.SINK, e));
}
return seaTunnelDataTypes;
}
}
5. source
source用来读取access数据库,并把消息发送给seatunnel引擎;
AccessSource.java
该类用来获取需要读取的表的字段及类型信息,同时调用AccessSourceReader类来读取消息。
package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;
import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import com.google.auto.service.AutoService;
import java.sql.ResultSetMetaData;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.DRIVER;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;
(SeaTunnelSource.class)
public class AccessSource
extends AbstractSingleSplitSource<SeaTunnelRow> implements SupportColumnProjection {
private SeaTunnelRowType rowTypeInfo;
private final AccessParameters accessParameters = new AccessParameters();
public String getPluginName() {
return "Access";
}
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult =
CheckConfigUtil.checkAllExists(
pluginConfig,
DRIVER.key(),
URL.key(),
USERNAME.key(),
PASSWORD.key(),
QUERY.key());
if (!checkResult.isSuccess()) {
throw new AccessConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, checkResult.getMsg()));
}
this.accessParameters.buildWithConfig(pluginConfig);
AccessClient accessClient =
new AccessClient(
pluginConfig.getString(DRIVER.key()),
pluginConfig.getString(URL.key()),
pluginConfig.getString(USERNAME.key()),
pluginConfig.getString(PASSWORD.key()),
pluginConfig.getString(QUERY.key()));
try {
ResultSetMetaData metaData = accessClient.selectMetaData();
int columnSize = metaData.getColumnCount();
String[] fieldNames = new String[columnSize];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize];
for (int i = 1; i <= columnSize; i++) {
fieldNames[i - 1] = metaData.getColumnName(i);
seaTunnelDataTypes[i - 1] = TypeConvertUtil.convert(metaData.getColumnTypeName(i));
}
this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.rowTypeInfo;
}
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
return new AccessSourceReader(accessParameters, readerContext);
}
}
AccessSourceReader.java
该类用来读取数据消息,并把消息发送给seatunnel。
package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;
import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
4j
public class AccessSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private Connection connection;
private final SingleSplitReaderContext readerContext;
private final AccessParameters accessParameters;
AccessSourceReader(AccessParameters accessParameters, SingleSplitReaderContext readerContext) {
this.accessParameters = accessParameters;
this.readerContext = readerContext;
}
public void open() throws Exception {
AccessClient accessClient =
new AccessClient(
accessParameters.getDriver(),
accessParameters.getUrl(),
accessParameters.getUsername(),
accessParameters.getPassword(),
accessParameters.getQuery());
connection =
accessClient.getAccessConnection(
accessParameters.getUrl(),
accessParameters.getUsername(),
accessParameters.getPassword());
}
public void close() throws IOException {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(accessParameters.getQuery());
ResultSetMetaData metaData = result.getMetaData();
while (result.next()) {
Object[] datas = new Object[metaData.getColumnCount()];
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
String columnType = metaData.getColumnTypeName(i);
datas[i - 1] = TypeConvertUtil.convertToObject(result, columnName, columnType);
}
output.collect(new SeaTunnelRow(datas));
}
this.readerContext.signalNoMoreElement();
}
}
AccessSourceFactory.java
package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class AccessSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return "Access";
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(AccessConfig.DRIVER, AccessConfig.URL)
.bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD)
.optional(AccessConfig.QUERY)
.build();
}
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return AccessSource.class;
}
}
6. sink
sink模块用来从seatunnel中读取消息并发送给access数据库。
AccessSink.java
用来读取需要写出的字段及类型,并调用AccessSinkWriter类。
package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;
import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.google.auto.service.AutoService;
import java.io.IOException;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.DRIVER;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;
(SeaTunnelSink.class)
public class AccessSink
extends AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType seaTunnelRowType;
private final AccessParameters accessParameters = new AccessParameters();
private SeaTunnelDataType<?>[] seaTunnelDataTypes;
public String getPluginName() {
return "Access";
}
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult =
CheckConfigUtil.checkAllExists(
pluginConfig,
DRIVER.key(),
URL.key(),
USERNAME.key(),
PASSWORD.key(),
TABLE.key(),
QUERY.key());
if (!checkResult.isSuccess()) {
throw new AccessConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
this.accessParameters.buildWithConfig(pluginConfig);
AccessClient accessClient =
new AccessClient(
pluginConfig.getString(DRIVER.key()),
pluginConfig.getString(URL.key()),
pluginConfig.getString(USERNAME.key()),
pluginConfig.getString(PASSWORD.key()),
pluginConfig.getString(QUERY.key()));
seaTunnelDataTypes =
TypeConvertUtil.getSeaTunnelDataTypes(
accessParameters, accessClient, pluginConfig, getPluginName());
}
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
}
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new AccessSinkWriter(accessParameters, seaTunnelDataTypes);
}
}
AccessSinkWriter.java
用来将seatunnel中读取到的消息,发送到access数据库。
package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;
import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
4j
public class AccessSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final AccessParameters accessParameters;
private Connection connection;
private final PreparedStatement statement;
private final SeaTunnelDataType<?>[] seaTunnelDataTypes;
public AccessSinkWriter(
AccessParameters accessParameters, SeaTunnelDataType<?>[] seaTunnelDataTypes) {
this.accessParameters = accessParameters;
this.seaTunnelDataTypes = seaTunnelDataTypes;
AccessClient accessClient =
new AccessClient(
accessParameters.getDriver(),
accessParameters.getUrl(),
accessParameters.getUsername(),
accessParameters.getPassword(),
accessParameters.getQuery());
connection =
accessClient.getAccessConnection(
accessParameters.getUrl(),
accessParameters.getUsername(),
accessParameters.getPassword());
try {
this.statement = connection.prepareStatement(initPrepareCQL());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public void write(SeaTunnelRow element) throws IOException {
try {
for (int i = 0; i < accessParameters.getFields().size(); i++) {
String type = this.seaTunnelDataTypes[i].toString();
TypeConvertUtil.reconvertAndInject(statement, i, type, element.getField(i));
}
statement.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public void close() throws IOException {
try {
if (this.connection != null) {
this.connection.close();
}
} catch (Exception e) {
throw new AccessConnectorException(
AccessConnectorErrorCode.CLOSE_CQL_CONNECT_FAILED, e);
}
}
private String initPrepareCQL() {
String[] placeholder = new String[accessParameters.getFields().size()];
Arrays.fill(placeholder, "?");
return String.format(
"INSERT INTO %s (%s) VALUES (%s)",
accessParameters.getTable(),
String.join(",", accessParameters.getFields()),
String.join(",", placeholder));
}
}
package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class AccessSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
return "Access";
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(AccessConfig.DRIVER, AccessConfig.URL, AccessConfig.TABLE)
.bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD)
.optional(AccessConfig.QUERY)
.build();
}
}
二、测试代码
在seatunnel examples模块下的seatunnel-flink-connector-v2-example下面的
resources/examples下面创建access_to_access.conf配置文件
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
Access {
driver = "net.ucanaccess.jdbc.UcanaccessDriver"
url = "jdbc:ucanaccess://D:\\geely\\code\\access-test\\Database1.accdb"
username = ""
password = ""
query = "select ID, name, address, age, birthday, a, b, c, d, e, f, g, h from student"
}
}
sink {
Access {
driver = "net.ucanaccess.jdbc.UcanaccessDriver"
url = "jdbc:ucanaccess://D:\\geely\\code\\access-test\\Database1.accdb"
username = ""
password = ""
table = "student"
query = """insert into student(id, naMe, address, age, birthday, a, b, c, d, e, f, g, h) values(?,?,?,?,?,?,?,?,?,?,?,?,?);"""
}
}
修改SeaTunnelApiExample.java,将配置文件修改成access_to_access.conf文件
修改pom.xml
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-access</artifactId>
<version>${project.version}</version>
</dependency>
右键运行SeaTunnelApiExample.java
可以看到access数据库中的student表里面数据被读出来后有写了回去。
运行前:
运行后:
至此,我们就已经开发好了一个新的Seatunnel的connector;后面去提交PR就可以了。
更多大数据内容,请关注大数据技能圈公众号: