作为一名老Java开发,偶然机会转到大数据开发组, 于是开始卷大数据了。。。
第一次学习flink,看到都是原生Java写任务, 卷了多年Java,那必须把springboot安排上。中间的血和泪就不叙述了,直接就是干结论。
先来一个pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.hrfax.flink-jobs</groupId>
<artifactId>8102-analytics-jobs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>8102-analytics-jobs</name>
<packaging>pom</packaging>
<description>hrfax flink jobs</description>
<modules>
<module>commons</module>
<module>dispatcher-kafka-topic</module>
<module>eshare-oracle-cdc</module>
<module>datasource</module>
</modules>
<properties>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<mysql.version>5.1.47</mysql.version>
<turing.version>0.0.10-SNAPSHOT</turing.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!-- <version>2.3.7.RELEASE</version>-->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 其他 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>cn.hrfax</groupId>
<artifactId>hrfax-redis-spring-boot-starter</artifactId>
<version>${turing.version}</version>
</dependency>
<dependency>
<groupId>cn.hrfax.flink-jobs</groupId>
<artifactId>commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hrfax.flink-jobs</groupId>
<artifactId>datasource</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>main.clazz
</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
日志依赖问题 引入log4j 排除logback
打包问题,用shade打fat jar ,具体请查看开头的pom,还能指定main方法
有了springboot那必须集成各种中间件
1、集成redis
直接使用了公司的starter,辛运没有遇到问题,当然用没有封装的redis客户端也是一样的
2、连接oracle数据库(公司用Oracle,难受)
那必须加上jdbctemplate, 因为业务不多,jdbc只是建表用,mabatis这些就不上了, 当然想上也可以上
3、集成flink-oracle-cdc
这里出大问题了
- oraclecdc问题解决
- 跟之前的oraclejdbc 冲突了,选了ojdbc8,依赖也都上传了,看下flink/lib下的依赖
flink-table-blink 依赖provided,然后上传
算子里的序列化和空指针问题
spring容器
这样定义的初始化的容器,在flink流中拿不到,会报空指针,
只能在function的open方法里去初始化容器,显然不是我想要的;
直接放弃显示调用容器
序列化的问题
1、function中的实例变量必须要能够序列化,spring中很多对象不能序列化,那只能在里面new了,好在我们用的不是很多
2、function中不能用name和type注入bean(@Autowire),只能构造器注入,不然会报序列化错误,还是1点说的原因
3、function中的注入的bean不能是@Configration修饰的,不然会报序列化错误,还是1点说的原因
解决:像我这样new就好了
注意
手动关闭datasource
@Override
public void close() throws Exception {
if(dataSource!=null && dataSource instanceof DruidDataSource){
log.info("-----------关闭 DruidDataSource------------");
((DruidDataSource)dataSource).close();
}
super.close();
}
依赖的问题真的很难弄,这边建议能不要的依赖还是不要放上去,尽量减少依赖项
没有打包的依赖,flink集群的所有节点都必须上传一份。
我司测试环境session模式集群启动配置
检查点和状态后端配置
//1.1 开启CheckPoint
env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE); // 每分钟保存一次
env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));//重启策略
//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//1.2 设置状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
String os = System.getProperty("os.name");
String path;
if(os.toLowerCase().startsWith("win")){
path = dwdSinkOracleProperties.getWindowsDir();
} else if (os.startsWith("Mac OS X")) {
path = dwdSinkOracleProperties.getMacOSDir();
} else{
path = dwdSinkOracleProperties.getLinuxDir(); //file:///data/projects/tmp/
}
env.getCheckpointConfig().setCheckpointStorage(path);
2022/10/09更新
依赖的抉择
springboot-parent依赖自成一套体系。flink的一些连接器的依赖会和springboot定义的版本冲突。我这里遇到的是kafka依赖冲突,springboot定义的kafkaClient的依赖是3.0.1,而kafkaConnetor的是2.4.1。传递依赖会被springboot定义的覆盖。
两种解决方案,
- 一个是在properties声明
这样版本就是properties指定的 - 不继承springbootParent,全部依赖手动指定版本号
采用第二种方案,因为后面可能会遇到其它依赖冲突问题,这样就可以避免flink相关的依赖被覆盖。
遇到问题
1、 使用maven-shade-plugin的PropertiesMergingResourceTransformer时,class not found
解决:手动添加mavenplugin依赖
2、NoClassDefFoundError: org/springframework/boot/bind/RelaxedPropertyResolver
这个是druid和springboot的spring-boot-autoconfigure.jar的版本冲突导致的。
解决:排除druid的传递依赖
3、排除数据源的自动配置
菜鸡了,打肿脸充胖子
结束了,我又推荐团队直接用Java写了,毕竟flink推出的connector越来越丰富,不太需要自己去封装一些东西了。
直接来了一波重构