使用 Maven 构建 Flink 项目的正确姿势

midoll 316 2022-03-29

使用 Maven 构建 Flink 项目的正确姿势

通过本文你能 get 到以下点:

  • 使用 Maven 构建的 Flink 项目 module 如何规划
  • Shade 插件解决 Jar 包依赖冲突(不限于 Flink,Spark 同样适用)

痛点

先从痛点开始讲起,通常由 Maven 来构建 Flink 项目,如下图所示,一般会按照业务来划分 module。
image-1669012275555

本项目是 zhisheng 老师关于 flink 学习的项目(https://github.com/zhisheng17/flink-learning 欢迎大家关注),由于是学习项目所以按照 Flink 的知识点来划分 module,例如 sql、state、window 都是单独的 module。我们日常工作中一般会按照业务来划分 module,例如一般大数据组都会做一个数据管理平台也就是业界所说的 dmp 平台,提供各种维度 pv、uv 及其他指标的查询,互联网公司做 App的也都会有推送平台,市场推广平台,数据服务化平台,推荐平台,数据质量监控平台,做互联网广告的话还会有 dsp adx ssp 等平台,一些用户量比较大的 App 可能单独还会有一个业务数据统计的平台。以上这些平台都属于不同的业务线,且都需要用到 Flink 提供 ETL 或者实时统计的需求,如果上述所有业务统计都由一个团队来承担的话,一般会把上述所有的业务都放到一个 Flink 项目来管理,一般都会按照业务来划分 module,一个业务对应 Maven 项目的一个 module。

代码开发完发布时,也会按照 module 来打包发布。由于 Flink 安装包只提供了 flink core 相关的 jar 包,所以代码打包时,需要把其他第三方依赖全都打到 jar 包里,否则,Flink 任务发布时,就会出现 ClassNotFound。痛点来了,代码中引入的第三方依赖太多了,所以每个业务打 jar 包时,都会出现 jar 包很大的现象,准确的数字是我们六只以上业务打出来的 jar 包都在 200M 以上。Jar 包大,带来了两个问题:

  • 打包太慢了,测试阶段每次打包电脑风扇飞速运转,肉疼
  • 开发调试阶段,如果想在集群环境运行,需要将 Jar 包上传到集群,Jar 包大,上传慢

基于上述两个问题,每解决一个 BUG,都需要好几分钟用来打包上传,效率太低了,这就是痛点。那怎么解决呢?

解决思路

Flink 的安装目录下有个 lib 目录,当每次启动一个 Flink session 或 flink cluster 时,都会加载 lib 目录下的 jar 包,依赖这一点可以将所有业务的一些第三方公共依赖提前打包好,提前放到 lib 目录下,这样开发的业务代码打包时就可以不打这些公共依赖了,只把业务代码和相应业务代码单独的依赖打进去。这种思路就需要在 Flink 项目里单独提取出一个 common 模块,也就是上图中红色箭头标注的 module,把所有业务线公共的依赖都放到 common module 里,common 单独打包一份放置到集群中 Flink 安装目录的 lib 目录下。之后所有的业务代码打包时,就不需要再打 common 包含的内容了。

笔者按照上述思路,将公共依赖提取到 common 后,对 common 打包发现 Jar 包 200M 左右,一下子笔者就放心了,意味着以后每次改完业务代码打包时,我的 Jar 体积会比之前小 200M。举个例子吧,有一个 dmp 业务的 module 之前每次打包 201M,现在已经把其中的 200M 提前传到了集群中,所以每次打包只需要打我们剩余的 1M 就可以了。为什么所有业务的公共依赖会占这么大呢,因为 Flink 安装包默认只包含了Flink core,像一些很常用的而且是 Flink 源码包含的代码比如 Kafka 连接器默认都不包含,都需要我们单独打包。其余还有很多 Flink 源码里的代码都没被包含在内,所以需要我们打包的内容挺多。再加上我们项目里有一个比较大的依赖,关于 ip 解析城市的依赖由于映射关系比较多占了将近 100 M,而且很多业务都需要用到这个功能,所以我们之前每个业务打包时,都需要把这个 ip 解析依赖打一遍,现在我们将其提取到 common 后,终身只需要打包一次即可,如果后续 common 里增加了其他依赖,可能会再打包一次。

原理理解了,具体怎么实操呢?大家先了解一个 maven 插件:shade。maven-shade-plugin 提供了两大基本功能:

  1. 将依赖的 jar 包打包到当前 jar 包(常规打包是不会将所依赖jar包打进来的)
  2. 对依赖的 jar 包进行重命名(用于类的隔离)

我们需要在 common 中配置 shade 插件,使 common 中不仅要打包 common 中自己开发的代码,还要打包 common 依赖的第三方 jar 包。具体如何配置呢?

common module 配置

Flink 官网走一波:

https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html#appendix-template-for-building-a-jar-with-dependencies


<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>my.programs.main.clazz</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

以上配置从上面 Flink 官方文档里复制出来的,大家注意改其中一行 my.programs.main.clazz ,这个 common Jar 包并不用来发布任务,可以将该行改为 即可,其他位置不用改动将其复制到 common 的 pom 文件即可。配置好之后,我们把公共依赖的代码都可以放在 common 这个 module 中,然后就可以打包后将其上传到 Flink 安装目录的 lib 目录下即可。打包命令:mvn -pl module-common -am clean install -D maven.test.skip=true

,注意这里必须是 mvn clean install 而不是 mvn clean package,后面会介绍为什么。

业务 module 配置

业务 module 中,主要开发具体的业务代码,每个业务 module 肯定会依赖上面所讲的 common module,可能还会有第三方依赖但是 common 中没有,为什么呢?假如 dmp module 有一个功能,其他 module 中没有该功能,那么我们可能不会把该依赖放到 common 中,所以业务代码打包时,分了两种情况:

  • 仅依赖 common
  • 除了依赖 common,还依赖其他第三方 jar

业务代码仅依赖 common 的情况

这种情况比较简单,简单的 maven 默认打包方式就好了,maven 默认只会把我们开发的代码打成 jar 包。我们依赖的第三方 jar 包已经被 common 放到了 lib 目录下,所以这种情况打包时不需要考虑第三方依赖。打包命令:mvn -pl module-dmp clean package -D maven.test.skip=true,注意,这里不能有 -am 参数,-am 参数表示构建指定模块,同时构建指定模块依赖的其他模块,此时,我们不想重复构建 common,所以不能加。在执行 dmp 的打包命令之前,必须先执行 common 的打包命令,且 common 打包必须使用 install 命令,而不使用 package 命令,为什么呢?install 命令比 package 多了一步:把打好的可执行jar包(war包或其它形式的包)布署到本地 maven 仓库。这样才能保障 module-dmp 打包时,能在本地 maven 仓库找到依赖的 jar 包。package 命令只是打包好,将jar 包放到 target 目录,module-dmp 打包时在 maven 仓库并找不到依赖的 common jar 包,所以打包会报错。

业务代码除了依赖 common,还依赖其他第三方 jar

假如 dmp module 除了依赖 common,还依赖其他第三方 jar,为了将单独依赖 class 打包到当前 jar 包,我们还是要像 common 的 pom 配置一样,复制上面一大段 shade 插件配置到 dmp 的 pom 中。但是此时,同学们可能会想,此时 dmp module 也依赖了 common,会不会也把 common 再继续打进去呢?会的,会打进去。如果打进去,我们的痛点没有解决,每次 jar 包还是很大,那怎么解决呢?如下如所示,加一个provided 配置即可。


<dependency>
    <groupId>suishen.bigdata</groupId>
    <artifactId>module-common</artifactId>
    <version>1.0-SNAPSHOT</version>
    <scope>provided</scope>
</dependency>

scope 配置为 provided 表示被依赖项目可以参与编译、测试、运行等阶段,但是在打包阶段做了 exclude 的动作,也就是不会被打包到 jar 中。

通过以上方案对项目配置,终于可以做到每次打的 jar 包都很小了,打包快,上传 jar 包也快。

Shade 插件解决 Jar 包依赖冲突(Spark 同样适用)

上面介绍 Shade 功能时,还有第二个功能点:对依赖的 jar 包进行重命名。

痛点

当我们开发的 Flink 或 Spark 代码在本地运行没毛病,放到集群就报错,而且报 NoSuchMethodError 或者ClassNotFoundException 的异常或者其他与类加载相关的JVM异常。这种场景一般是同一个类在集群中有两份,且这两个版本不同所导致。假如你的代码中依赖了 guava 的 a版本,但是 Flink 的 lib 目录下的某个 jar 包里放着 guava 的 b 版本,此时就会导致依赖冲突。解决方案

  • 修改你的代码,使其使用的依赖库版本与 Flink lib 目录下所使用的版本相同

  • 使用 maven-shade-plugin 插件对依赖 jar 包重命名的方式来打包。具体的原理就是:shade 插件可以将 guava 重命名为 myguava,那么 shade 会将你代码中使用 guava 的地方全替换成 myguava,当你的代码在集群运行时,你的代码根本不用去理会集群上的 guava,因为你依赖的是 myguava,所以再也不用担心集群上的 jar 包会影响到你了。

具体如何配置呢?在 configuration 标签内加 relocations 标签,relocations 内可以放多个 relocation,每个 relocation 内设置我们要替换的包。替换后的包名可以随意命名,一般小改动即可,如下所示,将 org.apache.kafka 改为 org.myshade.kafka。


<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <relocations>
                            <relocation>
                                <pattern>org.apache.kafka</pattern>
                                <shadedPattern>org.myshade.kafka</shadedPattern>
                            </relocation>
                        </relocations>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

# flink