flink 集成 springboot 的血泪史

midoll 346 2022-09-19

作为一名老Java开发,偶然机会转到大数据开发组, 于是开始卷大数据了。。。
第一次学习flink,看到都是原生Java写任务, 卷了多年Java,那必须把springboot安排上。中间的血和泪就不叙述了,直接就是干结论。

先来一个pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.11</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.hrfax.flink-jobs</groupId>
    <artifactId>8102-analytics-jobs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>8102-analytics-jobs</name>
    <packaging>pom</packaging>
    <description>hrfax flink jobs</description>

    <modules>
        <module>commons</module>
        <module>dispatcher-kafka-topic</module>
        <module>eshare-oracle-cdc</module>
        <module>datasource</module>
    </modules>

    <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <mysql.version>5.1.47</mysql.version>
        <turing.version>0.0.10-SNAPSHOT</turing.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!--            <version>2.3.7.RELEASE</version>-->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-connector-oracle-cdc</artifactId>
                <version>2.2.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>

            <!--   其他         -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.5</version>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>5.8.5</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba.fastjson2</groupId>
                <artifactId>fastjson2</artifactId>
                <version>2.0.12</version>
            </dependency>

            <dependency>
                <groupId>cn.hrfax</groupId>
                <artifactId>hrfax-redis-spring-boot-starter</artifactId>
                <version>${turing.version}</version>
            </dependency>

            <dependency>
                <groupId>cn.hrfax.flink-jobs</groupId>
                <artifactId>commons</artifactId>
                <version>${project.version}</version>
            </dependency>
            <dependency>
                <groupId>cn.hrfax.flink-jobs</groupId>
                <artifactId>datasource</artifactId>
                <version>${project.version}</version>
            </dependency>

        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.4</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <createDependencyReducedPom>false</createDependencyReducedPom>
                                <artifactSet>
                                    <excludes>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.handlers</resource>
                                        <resource>reference.conf</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                        <resource>META-INF/spring.factories</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.schemas</resource>
                                    </transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>main.clazz
                                        </mainClass>
                                    </transformer>
                                </transformers>

                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

日志依赖问题 引入log4j 排除logback

image-1663582937085

打包问题,用shade打fat jar ,具体请查看开头的pom,还能指定main方法

有了springboot那必须集成各种中间件

1、集成redis

直接使用了公司的starter,辛运没有遇到问题,当然用没有封装的redis客户端也是一样的
image-1663583499292

2、连接oracle数据库(公司用Oracle,难受)

那必须加上jdbctemplate, 因为业务不多,jdbc只是建表用,mabatis这些就不上了, 当然想上也可以上
image-1663584164389
image-1663584172803

这里出大问题了

  • oraclecdc问题解决
    image-1663584510965
  • 跟之前的oraclejdbc 冲突了,选了ojdbc8,依赖也都上传了,看下flink/lib下的依赖
    image-1663584441435

image-1663585828562


算子里的序列化和空指针问题

spring容器

image-1663850835027
这样定义的初始化的容器,在flink流中拿不到,会报空指针,
只能在function的open方法里去初始化容器,显然不是我想要的;
直接放弃显示调用容器

序列化的问题

1、function中的实例变量必须要能够序列化,spring中很多对象不能序列化,那只能在里面new了,好在我们用的不是很多
2、function中不能用name和type注入bean(@Autowire),只能构造器注入,不然会报序列化错误,还是1点说的原因
3、function中的注入的bean不能是@Configration修饰的,不然会报序列化错误,还是1点说的原因
解决:像我这样new就好了
image-1663851892429
注意
手动关闭datasource

    @Override
    public void close() throws Exception {
        if(dataSource!=null && dataSource instanceof DruidDataSource){
            log.info("-----------关闭 DruidDataSource------------");
            ((DruidDataSource)dataSource).close();
        }
        super.close();
    }

依赖的问题真的很难弄,这边建议能不要的依赖还是不要放上去,尽量减少依赖项

没有打包的依赖,flink集群的所有节点都必须上传一份。

我司测试环境session模式集群启动配置
image-1664282592852

检查点和状态后端配置


//1.1 开启CheckPoint
env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE); // 每分钟保存一次
env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));//重启策略
//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//1.2 设置状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
String os = System.getProperty("os.name");
String path;
if(os.toLowerCase().startsWith("win")){
    path = dwdSinkOracleProperties.getWindowsDir();
} else if (os.startsWith("Mac OS X")) {
    path = dwdSinkOracleProperties.getMacOSDir();
} else{
    path = dwdSinkOracleProperties.getLinuxDir();  //file:///data/projects/tmp/
}
env.getCheckpointConfig().setCheckpointStorage(path);

2022/10/09更新

依赖的抉择

springboot-parent依赖自成一套体系。flink的一些连接器的依赖会和springboot定义的版本冲突。我这里遇到的是kafka依赖冲突,springboot定义的kafkaClient的依赖是3.0.1,而kafkaConnetor的是2.4.1。传递依赖会被springboot定义的覆盖。
552f6d25372a8966dd1e6d09f4c4774
两种解决方案,

  • 一个是在properties声明
    image-1665293922332
    这样版本就是properties指定的
  • 不继承springbootParent,全部依赖手动指定版本号
    image-1665294012506

采用第二种方案,因为后面可能会遇到其它依赖冲突问题,这样就可以避免flink相关的依赖被覆盖。

遇到问题

1、 使用maven-shade-plugin的PropertiesMergingResourceTransformer时,class not found
解决:手动添加mavenplugin依赖
image-1665294279144

2、NoClassDefFoundError: org/springframework/boot/bind/RelaxedPropertyResolver
这个是druid和springboot的spring-boot-autoconfigure.jar的版本冲突导致的。
解决:排除druid的传递依赖
image-1665294485903

3、排除数据源的自动配置
image-1665294544021

菜鸡了,打肿脸充胖子
结束了,我又推荐团队直接用Java写了,毕竟flink推出的connector越来越丰富,不太需要自己去封装一些东西了。
直接来了一波重构a8e1485a03c53e95


# springboot # flink