看Flink JDBCSink 的源码
进行源码分析之前,首先看一下Flink DataStream JDBC-Sink的官方示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?, ?, ?, ?, ?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(1, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()
));
env.execute();
可以看到,addSink传入的是JdbcSink类通过sink方法构建的。
public static <T> SinkFunction<T> sink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcConnectionOptions connectionOptions) {
// 本质上调用的第2个sink方法
return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
}
第2个sink方法,可以传入具体的执行配置JdbcExecutionOptions:
public static <T> SinkFunction<T> sink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
// 创建GenericJdbcSinkFunction
return new GenericJdbcSinkFunction<>(
// 需要传入JdbcOutputFormat参数
new JdbcOutputFormat<>(
// 默认是simple连接器
new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
// 默认是simple批执行器
context ->
JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity()),
JdbcOutputFormat.RecordExtractor.identity()));
}
所以整个方法的传递路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。
我们先分析下JdbcOutputFormat类。
JdbcOutputFormat
首先看一下JdbcOutputFormat的构造函数:
public JdbcOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor) {
this.connectionProvider = checkNotNull(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
this.jdbcRecordExtractor = checkNotNull(recordExtractor);
}
JdbcConnectionProvider
第1个参数是JDBC连接的提供接口:
@Internal
public interface JdbcConnectionProvider {
// 返回存在的数据库连接
@Nullable
Connection getConnection();
// 判断连接是否合法
boolean isConnectionValid() throws SQLException;
// 返回已有的数据库连接,若没有,则新建
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
// 关闭数据库连接
void closeConnection();
// 关闭现有连接并创建1个新连接
Connection reestablishConnection() throws SQLException, ClassNotFoundException;
}
Flink提供了1个默认实现SimpleJdbcConnectionProvider。
@NotThreadSafe
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
private static final long serialVersionUID = 1L;
private final JdbcConnectionOptions jdbcOptions;
private transient Driver loadedDriver;
private transient Connection connection;
static {
DriverManager.getDrivers();
}
public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
this.jdbcOptions = jdbcOptions;
}
@Override
public Connection getConnection() {
return connection;
}
@Override
public boolean isConnectionValid() throws SQLException {
return connection != null
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}
private static Driver loadDriver(String driverName)
throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
// 若DriverManager里已加载该Driver,则直接返回
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver.getClass().getName().equals(driverName)) {
return driver;
}
}
// 否则基于反射创建1个新的Driver
Class<?> clazz =
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
return (Driver) clazz.newInstance();
} catch (Exception ex) {
throw new SQLException("Fail to create driver of class " + driverName, ex);
}
}
private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
if (loadedDriver == null) {
loadedDriver = loadDriver(jdbcOptions.getDriverName());
}
return loadedDriver;
}
@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection != null) {
return connection;
}
if (jdbcOptions.getDriverName() == null) {
// 根据jdbcOptions配置,基于DriverManager创建连接
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {
Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
// 基于Driver创建连接
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
throw new SQLException(
"No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
}
@Override
public void closeConnection() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.warn("JDBC connection close failed.", e);
} finally {
connection = null;
}
}
}
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
closeConnection();
return getOrEstablishConnection();
}
}
JdbcExecutionOptions
第2个主要定义执行的配置。
@PublicEvolving
public class JdbcExecutionOptions implements Serializable {
// 默认重试次数为3
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
// 默认提交时间间隔为0毫秒
private static final int DEFAULT_INTERVAL_MILLIS = 0;
// 默认批次大小为5000
public static final int DEFAULT_SIZE = 5000;
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchIntervalMs = batchIntervalMs;
this.batchSize = batchSize;
this.maxRetries = maxRetries;
}
public long getBatchIntervalMs() {
return batchIntervalMs;
}
public int getBatchSize() {
return batchSize;
}
public int getMaxRetries() {
return maxRetries;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JdbcExecutionOptions that = (JdbcExecutionOptions) o;
return batchIntervalMs == that.batchIntervalMs
&& batchSize == that.batchSize
&& maxRetries == that.maxRetries;
}
@Override
public int hashCode() {
return Objects.hash(batchIntervalMs, batchSize, maxRetries);
}
public static Builder builder() {
return new Builder();
}
public static JdbcExecutionOptions defaults() {
return builder().build();
}
/** Builder for {@link JdbcExecutionOptions}. */
public static final class Builder {
private long intervalMs = DEFAULT_INTERVAL_MILLIS;
private int size = DEFAULT_SIZE;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
public Builder withBatchSize(int size) {
this.size = size;
return this;
}
public Builder withBatchIntervalMs(long intervalMs) {
this.intervalMs = intervalMs;
return this;
}
public Builder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
public JdbcExecutionOptions build() {
return new JdbcExecutionOptions(intervalMs, size, maxRetries);
}
}
}
StatementExecutorFactory
第3个参数是预编译对象执行工厂类。
public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
extends SerializableFunction<RuntimeContext, T> {}
其是一个接口,其泛型为JdbcBatchStatementExecutor的子类,并继承了持久化函数接口SerializableFunction<RuntimeContext, T>。
函数接口实现会以RuntimeContext作为输入,然后输出T,即JdbcBatchStatementExecutor的实现类实例。
接着看一下JdbcBatchStatementExecutor是何方神圣?
@Internal
public interface JdbcBatchStatementExecutor<T> {
// 根据connection创建预编译对象
void prepareStatements(Connection connection) throws SQLException;
void addToBatch(T record) throws SQLException;
// 将批次数据提交
void executeBatch() throws SQLException;
// 关闭预编译对象
void closeStatements() throws SQLException;
// 创建KeyedBatchStatementExecutor
static <T, K> JdbcBatchStatementExecutor<T> keyed(
String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K> statementBuilder) {
return new KeyedBatchStatementExecutor<>(sql, keyExtractor, statementBuilder);
}
// 创建SimpleBatchStatementExecutor
static <T, V> JdbcBatchStatementExecutor<T> simple(
String sql, JdbcStatementBuilder<V> paramSetter, Function<T, V> valueTransformer) {
return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);
}
}
JdbcBatchStatementExecutor有如下实现类:
具体实现逻辑也都很简单,就是先将每条record插入到batch缓存中,当batch满了、snapshot时或者定时任务到期时(若有设置),执行executeBatch()方法将批次数据入库。
以SimpleBatchStatementExecutor为例:
class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class);
private final String sql;
// JdbcStatement的创建函数
private final JdbcStatementBuilder<V> parameterSetter;
private final Function<T, V> valueTransformer;
// batch缓存容器
private final List<V> batch;
private transient PreparedStatement st;
SimpleBatchStatementExecutor(
String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
this.sql = sql;
this.parameterSetter = statementBuilder;
this.valueTransformer = valueTransformer;
this.batch = new ArrayList<>();
}
@Override
public void prepareStatements(Connection connection) throws SQLException {
// 这里通过传入的sql构建出prepareStatement
this.st = connection.prepareStatement(sql);
}
@Override
public void addToBatch(T record) {
// 将记录添加到缓存中
batch.add(valueTransformer.apply(record));
}
@Override
public void executeBatch() throws SQLException {
// 将缓存中的数据一次性持久化到数据库中
if (!batch.isEmpty()) {
for (V r : batch) {
// 调用消费者函数的accept方法构建JdbcStatement
parameterSetter.accept(st, r);
st.addBatch();
}
st.executeBatch();
// 清空缓存,准备下一批次
batch.clear();
}
}
@Override
public void closeStatements() throws SQLException {
if (st != null) {
st.close();
st = null;
}
}
}
在构建SimpleBatchStatementExecutor时,sql和parameterSetter是最重要的2个输入。
sql是SQL语句,不解释。
parameterSetter是JdbcStatementBuilder函数接口。
@PublicEvolving
public interface JdbcStatementBuilder<T>
extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {}
其继承了Flink的自定义BiConsumerWithException函数式接口:
@FunctionalInterface
public interface BiConsumerWithException<T, U, E extends Throwable> {
// 消费第1个输入T和第2个输入U。若消费过程中发生异常,抛出异常E
void accept(T t, U u) throws E;
// 将BiConsumerWithException转化为JDK自带的BiConsumer
static <A, B> BiConsumer<A, B> unchecked(
BiConsumerWithException<A, B, ?> biConsumerWithException) {
return (A a, B b) -> {
try {
biConsumerWithException.accept(a, b);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
};
}
}
由于是函数式接口,所以外部传入的时候,一般为Lambda表达式或匿名类。
如章节1案例中就是典型的Lambda表达式:
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(1, t.qty);
}
RecordExtractor
第4个参数是记录提取函数接口。
public interface RecordExtractor<F, T> extends Function<F, T>, Serializable {
// 提供了静态方法,返回输入自身
static <T> RecordExtractor<T, T> identity() {
return x -> x;
}
}
看完了构造函数,接着看一下JdbcOutputFormat的其他方法实现:
@Internal
public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
extends RichOutputFormat<In> implements Flushable, InputTypeConfigurable {
protected final JdbcConnectionProvider connectionProvider;
// 输入泛型的序列化器
@Nullable private TypeSerializer<In> serializer;
...
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
// 如果执行配置里的批处理时间间隔不为0,且批次大小不为1,则创建定时任务线程池,以固定时间间隔强制触发批数据提交
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (JdbcOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
},
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}
...
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();
try {
// 如果提供了输入泛型的序列化器,会先执行拷贝,然后再添加到批缓存里面
In recordCopy = copyIfNecessary(record);
addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
batchCount++;
// 当批缓存中的数据到达设置的阈值时,触发 flush()将缓存数据落盘到数据库中
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
}
} catch (Exception e) {
throw new IOException("Writing records to JDBC failed.", e);
}
}
private In copyIfNecessary(In record) {
// 有持久化器,就执行拷贝
return serializer == null ? record : serializer.copy(record);
}
protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
jdbcStatementExecutor.addToBatch(extracted);
}
@Override
public synchronized void flush() throws IOException {
checkFlushException();
// 重试MaxRetries次
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
if (!connectionProvider.isConnectionValid()) {
updateExecutor(true);
}
} catch (Exception exception) {
LOG.error(
"JDBC connection is not valid, and reestablish connection failed.",
exception);
throw new IOException("Reestablish JDBC connection failed", exception);
}
try {
// 每次重试之间的间隔不断拉大
// 第1次和第2次间隔0秒,第2次和第3次间隔1秒,第3次和第四次间隔2秒,以此类推
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"unable to flush; interrupted while doing another attempt", e);
}
}
}
}
protected void attemptFlush() throws SQLException {
jdbcStatementExecutor.executeBatch();
}
@Override
public synchronized void close() {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
if (batchCount > 0) {
try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to JDBC failed.", e);
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
try {
if (jdbcStatementExecutor != null) {
jdbcStatementExecutor.closeStatements();
}
} catch (SQLException e) {
LOG.warn("Close JDBC writer failed.", e);
}
}
connectionProvider.closeConnection();
checkFlushException();
}
public static Builder builder() {
return new Builder();
}
...
几个重要点记一下:
- 若执行配置里的批处理时间间隔不为0,且批次大小不为1,则会创建定时任务线程池,以固定时间间隔强制触发批数据提交。之所以这样设计,主要是为了保证实时性,比如批缓存设置的为1000,此时缓存中的数据为990,之后上游长时间未有新的数据到达。这种情况下,数据库中的数据长时间比实际数据落后990条,实时性和一致性受到了很大影响。设置了定时间隔提交后,如果缓存数据长时间不再增加,就会强制触发批数据提交,保证了一致性,提高了实时性。
- Flink的flush()操作具有重试策略,且每次重试之间的间隔不断拉大。
- JdbcOutputFormat是线程安全的,由synchronized关键字来保证。
- 注意:jdbcsink写入数据库的时机有三个,1、writerrecord时batch满了触发flush;2、open方法的定时任务,每间隔interval时间触发flush; 3、在算子做checkpoint时,调用snapshotstate方法时触发flush; 这里要注意,如果设置batchsize为0,实际flush是靠checkpoint写入的,如果checkpoint没有开启,那么数据就写不进去了 ;建议设置batchsize为1;并不是官方推荐的0;
跟完了最内层的JdbcOutputFormat,接着看外层的GenericJdbcSinkFunction。
GenericJdbcSinkFunction
// 继承了RichSinkFunction接口
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction, InputTypeConfigurable {
private final JdbcOutputFormat<T, ?, ?> outputFormat;
// 构造类只有1个JdbcOutputFormat入参
public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?> outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {
// 底层通过outputFormat的方法执行操作
outputFormat.writeRecord(value);
}
@Override
public void initializeState(FunctionInitializationContext context) {}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 执行snapshotState时,会强制执行outputFormat的Flush方法,将批缓存中的数据提交到数据库中
outputFormat.flush();
}
@Override
public void close() {
outputFormat.close();
}
@Override
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
outputFormat.setInputType(type, executionConfig);
}
}
总结
-
DataStream的jdbc-sink的实现路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。
-
JdbcSink.sink方法的第2个参数本质上是1个Flink自定义的BiConsumer函数式接口,一般通过Lambda表达式构建。
-
拓展一下,通常我们在构建程序时,均是面向对象的,所以最后持久化的时候,流里面的泛型一般也为POJO。但按照官方示例的方法,针对每个POJO,我们均需要构建1个Insert SQL和1个JdbcStatementBuilder函数接口,但其实上述2个参数,可以通过POJO基于反射动态构建出来,所以我们可以在Flink官方实现的基础上做一步增强,使其支持POJO Sink。