看Flink JDBCSink 的源码

midoll 828 2023-02-16

看Flink JDBCSink 的源码

进行源码分析之前,首先看一下Flink DataStream JDBC-Sink的官方示例:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
	.fromElements(...)
	.addSink(JdbcSink.sink(
		"insert into books (id, title, author, price, qty) values (?, ?, ?, ?, ?)",
		(ps, t) -> {
			ps.setInt(1, t.id);
			ps.setString(2, t.title);
			ps.setString(3, t.author);
			ps.setDouble(4, t.price);
			ps.setInt(1, t.qty);
		},
		new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
			.withUrl(getDbMetadata().getUrl())
			.withDriverName(getDbMetadata().getDriverClass())
			.build()
	));	
env.execute();

可以看到,addSink传入的是JdbcSink类通过sink方法构建的。


public static <T> SinkFunction<T> sink(
		String sql,
		JdbcStatementBuilder<T> statementBuilder,
		JdbcConnectionOptions connectionOptions) {
	// 本质上调用的第2个sink方法
	return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
}

第2个sink方法,可以传入具体的执行配置JdbcExecutionOptions:


public static <T> SinkFunction<T> sink(
		String sql,
		JdbcStatementBuilder<T> statementBuilder,
		JdbcExecutionOptions executionOptions,
		JdbcConnectionOptions connectionOptions) {
	// 创建GenericJdbcSinkFunction
	return new GenericJdbcSinkFunction<>(
			// 需要传入JdbcOutputFormat参数
			new JdbcOutputFormat<>(
					// 默认是simple连接器
					new SimpleJdbcConnectionProvider(connectionOptions),
					executionOptions,
					// 默认是simple批执行器
					context ->
							JdbcBatchStatementExecutor.simple(
									sql, statementBuilder, Function.identity()),
					JdbcOutputFormat.RecordExtractor.identity()));
}

所以整个方法的传递路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。

我们先分析下JdbcOutputFormat类。

JdbcOutputFormat

首先看一下JdbcOutputFormat的构造函数:


public JdbcOutputFormat(
		@Nonnull JdbcConnectionProvider connectionProvider,
		@Nonnull JdbcExecutionOptions executionOptions,
		@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
		@Nonnull RecordExtractor<In, JdbcIn> recordExtractor) {
	this.connectionProvider = checkNotNull(connectionProvider);
	this.executionOptions = checkNotNull(executionOptions);
	this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
	this.jdbcRecordExtractor = checkNotNull(recordExtractor);
}

JdbcConnectionProvider

第1个参数是JDBC连接的提供接口:


@Internal
public interface JdbcConnectionProvider {
    // 返回存在的数据库连接
    @Nullable
    Connection getConnection();

    // 判断连接是否合法
    boolean isConnectionValid() throws SQLException;

    // 返回已有的数据库连接,若没有,则新建
    Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;

    // 关闭数据库连接
    void closeConnection();

    // 关闭现有连接并创建1个新连接
    Connection reestablishConnection() throws SQLException, ClassNotFoundException;
}

Flink提供了1个默认实现SimpleJdbcConnectionProvider。


@NotThreadSafe
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);

    private static final long serialVersionUID = 1L;

    private final JdbcConnectionOptions jdbcOptions;

    private transient Driver loadedDriver;
    private transient Connection connection;

    static {
        DriverManager.getDrivers();
    }

    public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
        this.jdbcOptions = jdbcOptions;
    }

    @Override
    public Connection getConnection() {
        return connection;
    }

    @Override
    public boolean isConnectionValid() throws SQLException {
        return connection != null
                && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
    }

    private static Driver loadDriver(String driverName)
            throws SQLException, ClassNotFoundException {
        Preconditions.checkNotNull(driverName);
		// 若DriverManager里已加载该Driver,则直接返回
        Enumeration<Driver> drivers = DriverManager.getDrivers();
        while (drivers.hasMoreElements()) {
            Driver driver = drivers.nextElement();
            if (driver.getClass().getName().equals(driverName)) {
                return driver;
            }
        }
        // 否则基于反射创建1个新的Driver
        Class<?> clazz =
                Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
        try {
            return (Driver) clazz.newInstance();
        } catch (Exception ex) {
            throw new SQLException("Fail to create driver of class " + driverName, ex);
        }
    }

    private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
        if (loadedDriver == null) {
            loadedDriver = loadDriver(jdbcOptions.getDriverName());
        }
        return loadedDriver;
    }

    @Override
    public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
        if (connection != null) {
            return connection;
        }
        if (jdbcOptions.getDriverName() == null) {
			// 根据jdbcOptions配置,基于DriverManager创建连接
            connection =
                    DriverManager.getConnection(
                            jdbcOptions.getDbURL(),
                            jdbcOptions.getUsername().orElse(null),
                            jdbcOptions.getPassword().orElse(null));
        } else {
            Driver driver = getLoadedDriver();
            Properties info = new Properties();
            jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
            jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
			// 基于Driver创建连接
            connection = driver.connect(jdbcOptions.getDbURL(), info);
            if (connection == null) {
                throw new SQLException(
                        "No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
            }
        }
        return connection;
    }

    @Override
    public void closeConnection() {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                LOG.warn("JDBC connection close failed.", e);
            } finally {
                connection = null;
            }
        }
    }

    @Override
    public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
        closeConnection();
        return getOrEstablishConnection();
    }
}

JdbcExecutionOptions

第2个主要定义执行的配置。


@PublicEvolving
public class JdbcExecutionOptions implements Serializable {
    // 默认重试次数为3
    public static final int DEFAULT_MAX_RETRY_TIMES = 3;
    // 默认提交时间间隔为0毫秒
    private static final int DEFAULT_INTERVAL_MILLIS = 0;
    // 默认批次大小为5000
    public static final int DEFAULT_SIZE = 5000;

    private final long batchIntervalMs;
    private final int batchSize;
    private final int maxRetries;

    private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
        Preconditions.checkArgument(maxRetries >= 0);
        this.batchIntervalMs = batchIntervalMs;
        this.batchSize = batchSize;
        this.maxRetries = maxRetries;
    }

    public long getBatchIntervalMs() {
        return batchIntervalMs;
    }

    public int getBatchSize() {
        return batchSize;
    }

    public int getMaxRetries() {
        return maxRetries;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        JdbcExecutionOptions that = (JdbcExecutionOptions) o;
        return batchIntervalMs == that.batchIntervalMs
                && batchSize == that.batchSize
                && maxRetries == that.maxRetries;
    }

    @Override
    public int hashCode() {
        return Objects.hash(batchIntervalMs, batchSize, maxRetries);
    }

    public static Builder builder() {
        return new Builder();
    }

    public static JdbcExecutionOptions defaults() {
        return builder().build();
    }

    /** Builder for {@link JdbcExecutionOptions}. */
    public static final class Builder {
        private long intervalMs = DEFAULT_INTERVAL_MILLIS;
        private int size = DEFAULT_SIZE;
        private int maxRetries = DEFAULT_MAX_RETRY_TIMES;

        public Builder withBatchSize(int size) {
            this.size = size;
            return this;
        }

        public Builder withBatchIntervalMs(long intervalMs) {
            this.intervalMs = intervalMs;
            return this;
        }

        public Builder withMaxRetries(int maxRetries) {
            this.maxRetries = maxRetries;
            return this;
        }

        public JdbcExecutionOptions build() {
            return new JdbcExecutionOptions(intervalMs, size, maxRetries);
        }
    }
}

StatementExecutorFactory

第3个参数是预编译对象执行工厂类。


public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
		extends SerializableFunction<RuntimeContext, T> {}

其是一个接口,其泛型为JdbcBatchStatementExecutor的子类,并继承了持久化函数接口SerializableFunction<RuntimeContext, T>。

函数接口实现会以RuntimeContext作为输入,然后输出T,即JdbcBatchStatementExecutor的实现类实例。

接着看一下JdbcBatchStatementExecutor是何方神圣?


@Internal
public interface JdbcBatchStatementExecutor<T> {

	// 根据connection创建预编译对象
    void prepareStatements(Connection connection) throws SQLException;

    void addToBatch(T record) throws SQLException;

    // 将批次数据提交
    void executeBatch() throws SQLException;

    // 关闭预编译对象
    void closeStatements() throws SQLException;
	
	// 创建KeyedBatchStatementExecutor
    static <T, K> JdbcBatchStatementExecutor<T> keyed(
            String sql, Function<T, K> keyExtractor, JdbcStatementBuilder<K> statementBuilder) {
        return new KeyedBatchStatementExecutor<>(sql, keyExtractor, statementBuilder);
    }
	
	// 创建SimpleBatchStatementExecutor
    static <T, V> JdbcBatchStatementExecutor<T> simple(
            String sql, JdbcStatementBuilder<V> paramSetter, Function<T, V> valueTransformer) {
        return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);
    }
}

JdbcBatchStatementExecutor有如下实现类:
image-1677224598323
具体实现逻辑也都很简单,就是先将每条record插入到batch缓存中,当batch满了、snapshot时或者定时任务到期时(若有设置),执行executeBatch()方法将批次数据入库。

以SimpleBatchStatementExecutor为例:


class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class);

    private final String sql;
	// JdbcStatement的创建函数
    private final JdbcStatementBuilder<V> parameterSetter;
    private final Function<T, V> valueTransformer;
	// batch缓存容器
    private final List<V> batch;

    private transient PreparedStatement st;

    SimpleBatchStatementExecutor(
            String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
        this.sql = sql;
        this.parameterSetter = statementBuilder;
        this.valueTransformer = valueTransformer;
        this.batch = new ArrayList<>();
    }

    @Override
    public void prepareStatements(Connection connection) throws SQLException {
		// 这里通过传入的sql构建出prepareStatement
        this.st = connection.prepareStatement(sql);
    }

    @Override
    public void addToBatch(T record) {
		// 将记录添加到缓存中
        batch.add(valueTransformer.apply(record));
    }

    @Override
    public void executeBatch() throws SQLException {
		// 将缓存中的数据一次性持久化到数据库中
        if (!batch.isEmpty()) {
            for (V r : batch) {
				// 调用消费者函数的accept方法构建JdbcStatement
                parameterSetter.accept(st, r);
                st.addBatch();
            }
            st.executeBatch();
			// 清空缓存,准备下一批次
            batch.clear();
        }
    }

    @Override
    public void closeStatements() throws SQLException {
        if (st != null) {
            st.close();
            st = null;
        }
    }
}

在构建SimpleBatchStatementExecutor时,sql和parameterSetter是最重要的2个输入。

sql是SQL语句,不解释。

parameterSetter是JdbcStatementBuilder函数接口。


@PublicEvolving
public interface JdbcStatementBuilder<T>
        extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {}

其继承了Flink的自定义BiConsumerWithException函数式接口:


@FunctionalInterface
public interface BiConsumerWithException<T, U, E extends Throwable> {

    // 消费第1个输入T和第2个输入U。若消费过程中发生异常,抛出异常E
    void accept(T t, U u) throws E;

    // 将BiConsumerWithException转化为JDK自带的BiConsumer
    static <A, B> BiConsumer<A, B> unchecked(
            BiConsumerWithException<A, B, ?> biConsumerWithException) {
        return (A a, B b) -> {
            try {
                biConsumerWithException.accept(a, b);
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        };
    }
}

由于是函数式接口,所以外部传入的时候,一般为Lambda表达式或匿名类。

如章节1案例中就是典型的Lambda表达式:


(ps, t) -> {
	ps.setInt(1, t.id);
	ps.setString(2, t.title);
	ps.setString(3, t.author);
	ps.setDouble(4, t.price);
	ps.setInt(1, t.qty);
}

RecordExtractor
第4个参数是记录提取函数接口。


public interface RecordExtractor<F, T> extends Function<F, T>, Serializable {
	// 提供了静态方法,返回输入自身
	static <T> RecordExtractor<T, T> identity() {
		return x -> x;
	}
}

看完了构造函数,接着看一下JdbcOutputFormat的其他方法实现:


@Internal
public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>>
        extends RichOutputFormat<In> implements Flushable, InputTypeConfigurable {

    protected final JdbcConnectionProvider connectionProvider;
	// 输入泛型的序列化器
    @Nullable private TypeSerializer<In> serializer;

    ...
	
    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try {
            connectionProvider.getOrEstablishConnection();
        } catch (Exception e) {
            throw new IOException("unable to open JDBC writer", e);
        }
        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
		// 如果执行配置里的批处理时间间隔不为0,且批次大小不为1,则创建定时任务线程池,以固定时间间隔强制触发批数据提交
        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
            this.scheduler =
                    Executors.newScheduledThreadPool(
                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            this.scheduledFuture =
                    this.scheduler.scheduleWithFixedDelay(
                            () -> {
                                synchronized (JdbcOutputFormat.this) {
                                    if (!closed) {
                                        try {
                                            flush();
                                        } catch (Exception e) {
                                            flushException = e;
                                        }
                                    }
                                }
                            },
                            executionOptions.getBatchIntervalMs(),
                            executionOptions.getBatchIntervalMs(),
                            TimeUnit.MILLISECONDS);
        }
    }

    ...

    @Override
    public final synchronized void writeRecord(In record) throws IOException {
        checkFlushException();

        try {
			// 如果提供了输入泛型的序列化器,会先执行拷贝,然后再添加到批缓存里面
            In recordCopy = copyIfNecessary(record);
            addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
            batchCount++;
			// 当批缓存中的数据到达设置的阈值时,触发 flush()将缓存数据落盘到数据库中
            if (executionOptions.getBatchSize() > 0
                    && batchCount >= executionOptions.getBatchSize()) {
                flush();
            }
        } catch (Exception e) {
            throw new IOException("Writing records to JDBC failed.", e);
        }
    }

    private In copyIfNecessary(In record) {
		// 有持久化器,就执行拷贝
        return serializer == null ? record : serializer.copy(record);
    }

    protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
        jdbcStatementExecutor.addToBatch(extracted);
    }

    @Override
    public synchronized void flush() throws IOException {
        checkFlushException();
		// 重试MaxRetries次
        for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
            try {
                attemptFlush();
                batchCount = 0;
                break;
            } catch (SQLException e) {
                LOG.error("JDBC executeBatch error, retry times = {}", i, e);
                if (i >= executionOptions.getMaxRetries()) {
                    throw new IOException(e);
                }
                try {
                    if (!connectionProvider.isConnectionValid()) {
                        updateExecutor(true);
                    }
                } catch (Exception exception) {
                    LOG.error(
                            "JDBC connection is not valid, and reestablish connection failed.",
                            exception);
                    throw new IOException("Reestablish JDBC connection failed", exception);
                }
                try {
					// 每次重试之间的间隔不断拉大
					// 第1次和第2次间隔0秒,第2次和第3次间隔1秒,第3次和第四次间隔2秒,以此类推
                    Thread.sleep(1000 * i);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException(
                            "unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    protected void attemptFlush() throws SQLException {
        jdbcStatementExecutor.executeBatch();
    }

    @Override
    public synchronized void close() {
        if (!closed) {
            closed = true;

            if (this.scheduledFuture != null) {
                scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }

            if (batchCount > 0) {
                try {
                    flush();
                } catch (Exception e) {
                    LOG.warn("Writing records to JDBC failed.", e);
                    throw new RuntimeException("Writing records to JDBC failed.", e);
                }
            }

            try {
                if (jdbcStatementExecutor != null) {
                    jdbcStatementExecutor.closeStatements();
                }
            } catch (SQLException e) {
                LOG.warn("Close JDBC writer failed.", e);
            }
        }
        connectionProvider.closeConnection();
        checkFlushException();
    }

    public static Builder builder() {
        return new Builder();
    }

    ...

几个重要点记一下:

  • 若执行配置里的批处理时间间隔不为0,且批次大小不为1,则会创建定时任务线程池,以固定时间间隔强制触发批数据提交。之所以这样设计,主要是为了保证实时性,比如批缓存设置的为1000,此时缓存中的数据为990,之后上游长时间未有新的数据到达。这种情况下,数据库中的数据长时间比实际数据落后990条,实时性和一致性受到了很大影响。设置了定时间隔提交后,如果缓存数据长时间不再增加,就会强制触发批数据提交,保证了一致性,提高了实时性。
  • Flink的flush()操作具有重试策略,且每次重试之间的间隔不断拉大。
  • JdbcOutputFormat是线程安全的,由synchronized关键字来保证。
  • 注意:jdbcsink写入数据库的时机有三个,1、writerrecord时batch满了触发flush;2、open方法的定时任务,每间隔interval时间触发flush; 3、在算子做checkpoint时,调用snapshotstate方法时触发flush; 这里要注意,如果设置batchsize为0,实际flush是靠checkpoint写入的,如果checkpoint没有开启,那么数据就写不进去了 ;建议设置batchsize为1;并不是官方推荐的0;

跟完了最内层的JdbcOutputFormat,接着看外层的GenericJdbcSinkFunction。

GenericJdbcSinkFunction

// 继承了RichSinkFunction接口
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
        implements CheckpointedFunction, InputTypeConfigurable {
    private final JdbcOutputFormat<T, ?, ?> outputFormat;
	// 构造类只有1个JdbcOutputFormat入参
    public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?> outputFormat) {
        this.outputFormat = Preconditions.checkNotNull(outputFormat);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }

    @Override
    public void invoke(T value, Context context) throws IOException {
		// 底层通过outputFormat的方法执行操作
        outputFormat.writeRecord(value);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) {}

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
		// 执行snapshotState时,会强制执行outputFormat的Flush方法,将批缓存中的数据提交到数据库中
        outputFormat.flush();
    }

    @Override
    public void close() {
        outputFormat.close();
    }

    @Override
    public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        outputFormat.setInputType(type, executionConfig);
    }
}

总结

  • DataStream的jdbc-sink的实现路径为JdbcSink–>GenericJdbcSinkFunction–>JdbcOutputFormat。

  • JdbcSink.sink方法的第2个参数本质上是1个Flink自定义的BiConsumer函数式接口,一般通过Lambda表达式构建。

  • 拓展一下,通常我们在构建程序时,均是面向对象的,所以最后持久化的时候,流里面的泛型一般也为POJO。但按照官方示例的方法,针对每个POJO,我们均需要构建1个Insert SQL和1个JdbcStatementBuilder函数接口,但其实上述2个参数,可以通过POJO基于反射动态构建出来,所以我们可以在Flink官方实现的基础上做一步增强,使其支持POJO Sink。


# flink # jdbcsink