背景
上篇讲了修改JdbcOutputFormat实现在多并行度情况下解决jdbcsink死锁的问题,但是好像代码写死了,有的地方不会发生死锁也做了加分布式锁的逻辑;所以这段逻辑的加上条件执行,什么条件,就是Connector Options,我们增加一个配置,sink.buffer-flush.locks-enable ,加锁逻辑里先判断一下是否有这个配置
步骤
- JdbcExecutionOptions 增加一个field:locksEnable 修改构造方法、增加设置和获取该属性的方法
- JdbcConnectorOptions 这个是tableApi配置类,增加一个SINK_BUFFER_FLUSH_LOCKS_ENABLE的configoption
- JdbcDynamicTableFactory 这个是tableApi DynamicTable配置类,创建动态表时,将ddl的with参数赋给JdbcExecutionOptions
- JdbcOutputFormat 在加分布式锁的地方,提前给出判断