自己手搓一个 Seatunnel Access Connector

文摘   2024-11-15 00:00   重庆  

上一篇文章我们写了怎么去修改Seatunnel代码,并提交PR,详情请查看手把手教你贡献一个Seatunnel Connector PR | 新增jtds方式链接sqlserver

今天我们接着来写一个全新的 Seatunnel Connector : Access 连接器。

话不多说,我们直接上代码:

我们可以看到写一个简单的新的connector只要包括client,config,exception,source,sink,util六个module。

一、编写代码

1. client:

client主要用来获取数据库链接,获取数据库元数据等。

AccessClient.java

package org.apache.seatunnel.connectors.seatunnel.access.client;

import java.io.Serializable;import java.sql.DriverManager;import java.sql.Connection;import java.sql.Statement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;
public class AccessClient implements Serializable { private String driver; private String url; private String username; private String password; private String query; private Connection connection;
public AccessClient(String driver, String url, String username, String password, String query) { this.driver = driver; this.url = url; this.username = username; this.password = password; this.query = query; }
public Connection getAccessConnection(String url, String username, String password) { try { Class.forName(driver); this.connection = DriverManager.getConnection(url, username, password); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } return this.connection; }
public ResultSetMetaData selectMetaData() throws Exception { connection = this.getAccessConnection(url, username, password); Statement statement = connection.createStatement(); ResultSet result = statement.executeQuery(query); ResultSetMetaData metaData = result.getMetaData(); statement.close(); return metaData; }
public ResultSetMetaData getTableSchema(String tableName) throws Exception { connection = this.getAccessConnection(url, username, password); Statement statement = connection.createStatement(); ResultSet result = statement.executeQuery(String.format("select * from %s limit 1", tableName)); ResultSetMetaData metaData = result.getMetaData(); statement.close(); return metaData; }}

2. config

该模块用来定义配置文件需要读的关键词,比如driver、url、username、password等

AccessConfig.java

package org.apache.seatunnel.connectors.seatunnel.access.config;
import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;
public class AccessConfig { public static final Integer DEFAULT_BATCH_SIZE = 5000;
public static final Option<String> DRIVER = Options.key("driver").stringType().noDefaultValue().withDescription("driver");
public static final Option<String> URL = Options.key("url").stringType().noDefaultValue().withDescription("url");
public static final Option<String> USERNAME = Options.key("username").stringType().noDefaultValue().withDescription("username");
public static final Option<String> PASSWORD = Options.key("password").stringType().noDefaultValue().withDescription("password");
public static final Option<String> QUERY = Options.key("query").stringType().noDefaultValue().withDescription("query");
public static final Option<String> TABLE = Options.key("table").stringType().noDefaultValue().withDescription("table");
public static final Option<String> FIELDS = Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("fields"); public static final Option<Integer> BATCH_SIZE = Options.key("batch_size") .intType() .defaultValue(DEFAULT_BATCH_SIZE) .withDescription("");}

AccessParameters.java

package org.apache.seatunnel.connectors.seatunnel.access.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Getter;import lombok.Setter;
import java.io.Serializable;import java.util.List;
@Setter@Getterpublic class AccessParameters implements Serializable { private String driver; private String url; private String username; private String password; private String query; private List<String> fields; private String table; private Integer batchSize;
public void buildWithConfig(Config config) { this.driver = config.getString(AccessConfig.DRIVER.key()); this.url = config.getString(AccessConfig.URL.key());
if (config.hasPath(AccessConfig.USERNAME.key())) { this.username = config.getString(AccessConfig.USERNAME.key()); } if (config.hasPath(AccessConfig.PASSWORD.key())) { this.password = config.getString(AccessConfig.PASSWORD.key()); } if (config.hasPath(AccessConfig.QUERY.key())) { this.query = config.getString(AccessConfig.QUERY.key()); } if (config.hasPath(AccessConfig.FIELDS.key())) { this.fields = config.getStringList(AccessConfig.FIELDS.key()); } if (config.hasPath(AccessConfig.TABLE.key())) { this.table = config.getString(AccessConfig.TABLE.key()); } if (config.hasPath(AccessConfig.BATCH_SIZE.key())) { this.batchSize = config.getInt(AccessConfig.BATCH_SIZE.key()); } else { this.batchSize = AccessConfig.BATCH_SIZE.defaultValue(); } }}

3. exception

该模块主要用来定义代码中可能出现的异常信息,定义出出错误代码,错误信息等。

AccessConnectorErrorCode.java

package org.apache.seatunnel.connectors.seatunnel.access.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum AccessConnectorErrorCode implements SeaTunnelErrorCode { FIELD_NOT_IN_TABLE("ACCESS-01", "Field is not existed in target table"),
CLOSE_CQL_CONNECT_FAILED("ACCESS-03", "Close connect of access failed"); private final String code; private final String description;
AccessConnectorErrorCode(String code, String description) { this.code = code; this.description = description; }
@Override public String getCode() { return null; }
@Override public String getDescription() { return null; }}

AccessConnectorException.java

package org.apache.seatunnel.connectors.seatunnel.access.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class AccessConnectorException extends SeaTunnelRuntimeException { public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { super(seaTunnelErrorCode, errorMessage); }
public AccessConnectorException( SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { super(seaTunnelErrorCode, errorMessage, cause); }
public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { super(seaTunnelErrorCode, cause); }}

4. util

util主要用来封装一些通用方法,像类型转换等

TypeConvertUtil.java

package org.apache.seatunnel.connectors.seatunnel.access.util;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;import org.apache.seatunnel.api.table.type.BasicType;import org.apache.seatunnel.api.table.type.LocalTimeType;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.common.constants.PluginType;import org.apache.seatunnel.common.exception.CommonErrorCode;import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;
import org.apache.commons.lang.StringUtils;
import java.sql.PreparedStatement;import java.sql.SQLException;import java.sql.ResultSet;import java.sql.Timestamp;import java.sql.ResultSetMetaData;

import java.text.ParseException;import java.text.SimpleDateFormat;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;
import java.util.Date;import java.util.List;import java.util.Set;import java.util.Arrays;import java.util.HashSet;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.TABLE;
public class TypeConvertUtil { public static SeaTunnelDataType<?> convert(String type) { switch (type) { case "INTEGER": case "BIGINT": return BasicType.INT_TYPE; case "VARCHAR": return BasicType.STRING_TYPE; case "DECIMAL": return BasicType.DOUBLE_TYPE; case "TIMESTAMP": return LocalTimeType.LOCAL_DATE_TIME_TYPE; case "BOOLEAN": return BasicType.BOOLEAN_TYPE; default: throw new AccessConnectorException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported this data type: " + type); } }
public static Object convertToObject(ResultSet result, String columnName, String columnType) { Object value; if ("INTEGER".equals(columnType) || "BIGINT".equals(columnType)) { try { value = result.getInt(columnName); } catch (SQLException e) { throw new RuntimeException(e); } } else if ("VARCHAR".equals(columnType)) { try { value = result.getString(columnName); } catch (SQLException e) { throw new RuntimeException(e); } } else if ("DECIMAL".equals(columnType)) { try { value = result.getDouble(columnName); } catch (SQLException e) { throw new RuntimeException(e); } } else if ("TIMESTAMP".equals(columnType)) { try { value = result.getString(columnName); if (null == value) { } else { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timestamp = dateFormat.parse((String) value).getTime(); Instant instant = Instant.ofEpochMilli(timestamp); value = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); } } catch (SQLException | ParseException e) { throw new RuntimeException(e); } } else if ("BOOLEAN".equals(columnType)) { try { value = result.getBoolean(columnName); } catch (SQLException e) { throw new RuntimeException(e); } } else { try { value = result.getObject(columnName); } catch (SQLException e) { throw new RuntimeException(e); } } return value; }
public static void reconvertAndInject( PreparedStatement statement, int index, String type, Object fieldValue) { switch (type) { case "INT": case "INTEGER": try { statement.setInt( index + 1, null == fieldValue ? 0 : Integer.parseInt(fieldValue.toString())); return; } catch (SQLException e) { throw new RuntimeException(e); } case "STRING": try { statement.setString(index + 1, null == fieldValue ? "" : fieldValue.toString()); return; } catch (SQLException e) { throw new RuntimeException(e); } case "DOUBLE": try { statement.setDouble( index + 1, null == fieldValue ? 0D : Double.parseDouble(fieldValue.toString())); return; } catch (SQLException e) { throw new RuntimeException(e); } case "TIMESTAMP": SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); try { if (null == fieldValue) { statement.setTimestamp(index + 1, Timestamp.valueOf(LocalDateTime.MIN)); } else { Date date = dateFormat.parse(fieldValue.toString().substring(0, 10)); statement.setTimestamp(index + 1, new Timestamp(date.getTime())); } return; } catch (ParseException | SQLException e) { throw new RuntimeException(e); } case "BOOLEAN": try { statement.setBoolean( index + 1, null == fieldValue ? null : new Boolean(fieldValue.toString())); } catch (SQLException e) { throw new RuntimeException(e); } } }
public static SeaTunnelDataType<?>[] getSeaTunnelDataTypes( AccessParameters accessParameters, AccessClient accessClient, Config pluginConfig, String pluginName) { String[] sqlColumns; List<String> fields = accessParameters.getFields(); SeaTunnelDataType<?>[] seaTunnelDataTypes; try { ResultSetMetaData metaData = accessClient.getTableSchema(pluginConfig.getString(TABLE.key())); int columnCount = metaData.getColumnCount(); String columnString = StringUtils.substringBetween(pluginConfig.getString(QUERY.key()), "(", ")"); sqlColumns = columnString.split(","); String[] tableColumns = new String[metaData.getColumnCount()]; for (int i = 1; i <= columnCount; i++) { tableColumns[i - 1] = metaData.getColumnName(i).toUpperCase(); } seaTunnelDataTypes = new SeaTunnelDataType[sqlColumns.length]; Set<String> tableColumnSet = new HashSet<>(Arrays.asList(tableColumns)); int tableColumnSetSize = tableColumnSet.size(); if (fields == null || fields.isEmpty()) { for (int j = 0; j < sqlColumns.length; j++) { tableColumnSet.add(sqlColumns[j].trim().toUpperCase()); if (tableColumnSetSize == tableColumnSet.size()) { accessParameters.setFields(Arrays.asList(sqlColumns)); for (int k = 1; k <= columnCount; k++) { if (metaData.getColumnName(k) .toUpperCase() .equals(sqlColumns[j].trim().toUpperCase())) { seaTunnelDataTypes[j] = TypeConvertUtil.convert(metaData.getColumnTypeName(j + 1)); } } } else { throw new AccessConnectorException( AccessConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + sqlColumns[j] + " does not exist in table " + pluginConfig.getString(TABLE.key())); } } } } catch (Exception e) { throw new AccessConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format( "PluginName: %s, PluginType: %s, Message: %s", pluginName, PluginType.SINK, e)); } return seaTunnelDataTypes; }}

5. source

source用来读取access数据库,并把消息发送给seatunnel引擎;

AccessSource.java

该类用来获取需要读取的表的字段及类型信息,同时调用AccessSourceReader类来读取消息。

package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;import org.apache.seatunnel.api.source.Boundedness;import org.apache.seatunnel.api.source.SeaTunnelSource;import org.apache.seatunnel.api.source.SupportColumnProjection;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.common.config.CheckConfigUtil;import org.apache.seatunnel.common.config.CheckResult;import org.apache.seatunnel.common.constants.PluginType;import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import com.google.auto.service.AutoService;
import java.sql.ResultSetMetaData;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.DRIVER;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.URL;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.USERNAME;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.PASSWORD;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;

@AutoService(SeaTunnelSource.class)public class AccessSource extends AbstractSingleSplitSource<SeaTunnelRow> implements SupportColumnProjection { private SeaTunnelRowType rowTypeInfo; private final AccessParameters accessParameters = new AccessParameters();
@Override public String getPluginName() { return "Access"; }
@Override public void prepare(Config pluginConfig) throws PrepareFailException { CheckResult checkResult = CheckConfigUtil.checkAllExists( pluginConfig, DRIVER.key(), URL.key(), USERNAME.key(), PASSWORD.key(), QUERY.key());
if (!checkResult.isSuccess()) { throw new AccessConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format( "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, checkResult.getMsg())); }
this.accessParameters.buildWithConfig(pluginConfig);
AccessClient accessClient = new AccessClient( pluginConfig.getString(DRIVER.key()), pluginConfig.getString(URL.key()), pluginConfig.getString(USERNAME.key()), pluginConfig.getString(PASSWORD.key()), pluginConfig.getString(QUERY.key())); try { ResultSetMetaData metaData = accessClient.selectMetaData(); int columnSize = metaData.getColumnCount(); String[] fieldNames = new String[columnSize]; SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; for (int i = 1; i <= columnSize; i++) { fieldNames[i - 1] = metaData.getColumnName(i); seaTunnelDataTypes[i - 1] = TypeConvertUtil.convert(metaData.getColumnTypeName(i)); } this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); } catch (Exception e) { throw new RuntimeException(e); } }
@Override public Boundedness getBoundedness() { return Boundedness.BOUNDED; }
@Override public SeaTunnelDataType<SeaTunnelRow> getProducedType() { return this.rowTypeInfo; }
@Override public AbstractSingleSplitReader<SeaTunnelRow> createReader( SingleSplitReaderContext readerContext) throws Exception { return new AccessSourceReader(accessParameters, readerContext); }}

AccessSourceReader.java

该类用来读取数据消息,并把消息发送给seatunnel。

package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.api.source.Collector;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;import java.sql.Connection;import java.sql.SQLException;import java.sql.Statement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;

@Slf4jpublic class AccessSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private Connection connection;
private final SingleSplitReaderContext readerContext;
private final AccessParameters accessParameters;
AccessSourceReader(AccessParameters accessParameters, SingleSplitReaderContext readerContext) { this.accessParameters = accessParameters; this.readerContext = readerContext; }
@Override public void open() throws Exception { AccessClient accessClient = new AccessClient( accessParameters.getDriver(), accessParameters.getUrl(), accessParameters.getUsername(), accessParameters.getPassword(), accessParameters.getQuery()); connection = accessClient.getAccessConnection( accessParameters.getUrl(), accessParameters.getUsername(), accessParameters.getPassword()); }
@Override public void close() throws IOException { if (connection != null) { try { connection.close(); } catch (SQLException e) { throw new RuntimeException(e); } } }
@Override public void pollNext(Collector<SeaTunnelRow> output) throws Exception { Statement statement = connection.createStatement(); ResultSet result = statement.executeQuery(accessParameters.getQuery()); ResultSetMetaData metaData = result.getMetaData();
while (result.next()) { Object[] datas = new Object[metaData.getColumnCount()]; for (int i = 1; i <= metaData.getColumnCount(); i++) { String columnName = metaData.getColumnName(i); String columnType = metaData.getColumnTypeName(i); datas[i - 1] = TypeConvertUtil.convertToObject(result, columnName, columnType); } output.collect(new SeaTunnelRow(datas)); } this.readerContext.signalNoMoreElement(); }}

AccessSourceFactory.java

package org.apache.seatunnel.connectors.seatunnel.access.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.source.SeaTunnelSource;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableSourceFactory;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)public class AccessSourceFactory implements TableSourceFactory {
@Override public String factoryIdentifier() { return "Access"; }
@Override public OptionRule optionRule() { return OptionRule.builder() .required(AccessConfig.DRIVER, AccessConfig.URL) .bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD) .optional(AccessConfig.QUERY) .build(); }
@Override public Class<? extends SeaTunnelSource> getSourceClass() { return AccessSource.class; }}

6. sink

sink模块用来从seatunnel中读取消息并发送给access数据库。

AccessSink.java

用来读取需要写出的字段及类型,并调用AccessSinkWriter类。

package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;import org.apache.seatunnel.api.sink.SeaTunnelSink;import org.apache.seatunnel.api.sink.SinkWriter;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.common.config.CheckConfigUtil;import org.apache.seatunnel.common.config.CheckResult;import org.apache.seatunnel.common.constants.PluginType;import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.google.auto.service.AutoService;
import java.io.IOException;
import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.DRIVER;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.URL;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.USERNAME;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.PASSWORD;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.TABLE;import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY;
@AutoService(SeaTunnelSink.class)public class AccessSink extends AbstractSimpleSink<SeaTunnelRow, Void> { private SeaTunnelRowType seaTunnelRowType; private final AccessParameters accessParameters = new AccessParameters();
private SeaTunnelDataType<?>[] seaTunnelDataTypes;
@Override public String getPluginName() { return "Access"; }
@Override public void prepare(Config pluginConfig) throws PrepareFailException { CheckResult checkResult = CheckConfigUtil.checkAllExists( pluginConfig, DRIVER.key(), URL.key(), USERNAME.key(), PASSWORD.key(), TABLE.key(), QUERY.key()); if (!checkResult.isSuccess()) { throw new AccessConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format( "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SINK, checkResult.getMsg())); } this.accessParameters.buildWithConfig(pluginConfig);
AccessClient accessClient = new AccessClient( pluginConfig.getString(DRIVER.key()), pluginConfig.getString(URL.key()), pluginConfig.getString(USERNAME.key()), pluginConfig.getString(PASSWORD.key()), pluginConfig.getString(QUERY.key()));
seaTunnelDataTypes = TypeConvertUtil.getSeaTunnelDataTypes( accessParameters, accessClient, pluginConfig, getPluginName()); }
@Override public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { this.seaTunnelRowType = seaTunnelRowType; }
@Override public SeaTunnelDataType<SeaTunnelRow> getConsumedType() { return this.seaTunnelRowType; }
@Override public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException { return new AccessSinkWriter(accessParameters, seaTunnelDataTypes); }}

AccessSinkWriter.java

用来将seatunnel中读取到的消息,发送到access数据库。

package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode;import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException;import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil;import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import java.util.Arrays;
@Slf4jpublic class AccessSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { private final AccessParameters accessParameters; private Connection connection; private final PreparedStatement statement; private final SeaTunnelDataType<?>[] seaTunnelDataTypes;
public AccessSinkWriter( AccessParameters accessParameters, SeaTunnelDataType<?>[] seaTunnelDataTypes) { this.accessParameters = accessParameters; this.seaTunnelDataTypes = seaTunnelDataTypes; AccessClient accessClient = new AccessClient( accessParameters.getDriver(), accessParameters.getUrl(), accessParameters.getUsername(), accessParameters.getPassword(), accessParameters.getQuery()); connection = accessClient.getAccessConnection( accessParameters.getUrl(), accessParameters.getUsername(), accessParameters.getPassword()); try { this.statement = connection.prepareStatement(initPrepareCQL()); } catch (SQLException e) { throw new RuntimeException(e); } }
@Override public void write(SeaTunnelRow element) throws IOException { try { for (int i = 0; i < accessParameters.getFields().size(); i++) { String type = this.seaTunnelDataTypes[i].toString(); TypeConvertUtil.reconvertAndInject(statement, i, type, element.getField(i)); } statement.executeUpdate(); } catch (SQLException e) { throw new RuntimeException(e); } }
@Override public void close() throws IOException { try { if (this.connection != null) { this.connection.close(); } } catch (Exception e) { throw new AccessConnectorException( AccessConnectorErrorCode.CLOSE_CQL_CONNECT_FAILED, e); } }
private String initPrepareCQL() { String[] placeholder = new String[accessParameters.getFields().size()]; Arrays.fill(placeholder, "?"); return String.format( "INSERT INTO %s (%s) VALUES (%s)", accessParameters.getTable(), String.join(",", accessParameters.getFields()), String.join(",", placeholder)); }}
AccessSinkFactory.java
package org.apache.seatunnel.connectors.seatunnel.access.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableSinkFactory;import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)public class AccessSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { return "Access"; }
@Override public OptionRule optionRule() { return OptionRule.builder() .required(AccessConfig.DRIVER, AccessConfig.URL, AccessConfig.TABLE) .bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD) .optional(AccessConfig.QUERY) .build(); }}

二、测试代码

在seatunnel examples模块下的seatunnel-flink-connector-v2-example下面的

resources/examples下面创建access_to_access.conf配置文件

env {  # You can set flink configuration here  execution.parallelism = 1  job.mode = "BATCH"  #execution.checkpoint.interval = 10000  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}
source { Access { driver = "net.ucanaccess.jdbc.UcanaccessDriver" url = "jdbc:ucanaccess://D:\\geely\\code\\access-test\\Database1.accdb" username = "" password = "" query = "select ID, name, address, age, birthday, a, b, c, d, e, f, g, h from student" }}
sink { Access { driver = "net.ucanaccess.jdbc.UcanaccessDriver" url = "jdbc:ucanaccess://D:\\geely\\code\\access-test\\Database1.accdb" username = "" password = "" table = "student" query = """insert into student(id, naMe, address, age, birthday, a, b, c, d, e, f, g, h) values(?,?,?,?,?,?,?,?,?,?,?,?,?);""" }}

修改SeaTunnelApiExample.java,将配置文件修改成access_to_access.conf文件

修改pom.xml

<dependency>            <groupId>org.apache.seatunnel</groupId>            <artifactId>connector-access</artifactId>            <version>${project.version}</version></dependency>

右键运行SeaTunnelApiExample.java

可以看到access数据库中的student表里面数据被读出来后有写了回去。

运行前:

运行后:

至此,我们就已经开发好了一个新的Seatunnel的connector;后面去提交PR就可以了。

更多大数据内容,请关注大数据技能圈公众号:

大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章