增加flink-jdbc-connector的Connector Options配置

midoll 365 2023-02-24

背景

上篇讲了修改JdbcOutputFormat实现在多并行度情况下解决jdbcsink死锁的问题,但是好像代码写死了,有的地方不会发生死锁也做了加分布式锁的逻辑;所以这段逻辑的加上条件执行,什么条件,就是Connector Options,我们增加一个配置,sink.buffer-flush.locks-enable ,加锁逻辑里先判断一下是否有这个配置

步骤

  1. JdbcExecutionOptions 增加一个field:locksEnable 修改构造方法、增加设置和获取该属性的方法
    image-1677228832764
  2. JdbcConnectorOptions 这个是tableApi配置类,增加一个SINK_BUFFER_FLUSH_LOCKS_ENABLE的configoption
    image-1677228852861
  3. JdbcDynamicTableFactory 这个是tableApi DynamicTable配置类,创建动态表时,将ddl的with参数赋给JdbcExecutionOptions
    image-1677228906833
  4. JdbcOutputFormat 在加分布式锁的地方,提前给出判断
    image-1677228931348
    image-1677228948617
    image-1677228987738