在大数据处理领域,Apache Flink凭借强大的实时流处理和批处理能力,成为众多开发者的首选工具。在日常工作中,开发Flink Jar任务是常见需求,但每次都需重复配置日志、梳理pom依赖、设置打包插件等,流程繁琐且易出错。为提升开发效率,减少重复劳动,将这些基础配置进行整理归纳十分必要。本文将围绕Flink项目的本地日志配置、pom依赖及插件配置展开详细介绍,为开发者提供一套可直接复用的基础配置方案,助力Flink项目高效开发。
一、本地日志配置:精准掌控运行信息
在Flink项目中,日志是了解任务运行状态、排查问题的重要依据。本地日志配置主要借助slf4j(Simple Logging Facade for Java)实现,通过在项目的src/main/resources
目录下创建log4j.properties
文件,即可完成相关配置。以下是详细的配置内容及其解析:
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
- 全局日志级别与根日志配置:
rootLogger.level = INFO
设定了根日志级别为INFO,这意味着所有未单独配置日志级别的类和包,其日志输出都将遵循INFO级别,即只记录INFO及以上级别的日志信息(如WARN、ERROR)。rootLogger.appenderRef.file.ref = MainAppender
指定了根日志将使用名为MainAppender
的日志输出器进行输出。 - 第三方库日志级别配置:针对Akka、Kafka、Hadoop、Zookeeper等Flink常用的第三方库,通过单独配置
logger
,将它们的日志级别设定为INFO。这样既能获取这些库运行时的关键信息,又避免了过多低级别日志信息干扰视线。例如,logger.akka.name = akka
和logger.akka.level = INFO
表示将Akka相关的日志级别设置为INFO,开发者可根据实际需求调整这些库的日志级别。 - 日志输出器配置:
MainAppender
是一个基于文件滚动的日志输出器(appender.main.type = RollingFile
)。appender.main.append = true
表示日志将以追加的方式写入文件,避免覆盖原有日志。appender.main.fileName = ${sys:log.file}
指定了日志文件的名称,可通过系统属性log.file
动态设置;appender.main.filePattern = ${sys:log.file}.%i
定义了日志文件滚动后的命名规则,其中%i
会根据滚动次数自动递增。appender.main.layout.type = PatternLayout
设置了日志的输出格式,%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
具体规定了输出内容包含时间、日志级别、类名、线程信息和日志消息等。 - 日志滚动策略配置:
appender.main.policies.type = Policies
定义了日志滚动的策略集合。appender.main.policies.size.type = SizeBasedTriggeringPolicy
表示基于文件大小进行滚动,当单个日志文件大小达到100MB
(appender.main.policies.size.size = 100MB
)时,将触发滚动创建新的日志文件。appender.main.policies.startup.type = OnStartupTriggeringPolicy
确保在应用启动时也会进行一次日志文件滚动。appender.main.strategy.type = DefaultRolloverStrategy
指定了默认的滚动策略,appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
设置了最多保留10个历史日志文件,可通过环境变量MAX_LOG_FILE_NUMBER
进行自定义。 - 特定日志抑制配置:
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
和logger.netty.level = OFF
用于关闭Netty相关的一些无关警告日志,避免这些日志信息干扰开发者对关键日志的查看。
二、pom配置:构建项目的基石
Maven的pom文件是项目的核心配置文件,它管理着项目的依赖、构建等关键信息。以下是一个典型的Flink项目pom配置文件及其详细解析:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.nankong.flink</groupId>
<artifactId>flink-example</artifactId>
<version>1.0-SNAPSHOT</version>
<name>flink-example:</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.version>2.11</scala.version>
<log4j.version>2.0.9</log4j.version>
<slf4j.version>1.6.1</slf4j.version>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Streaming -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Gelly (如果需要图处理功能) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Connector for Kafka (如果需要与 Kafka 集成) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<finalName>Flink-Job</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 项目基本信息配置:
groupId
、artifactId
和version
用于唯一标识项目,name
为项目名称。在团队协作和项目管理中,这些信息有助于准确区分和管理不同的项目。 - 属性配置:通过
<properties>
标签定义了一系列属性,如maven.compiler.source
和maven.compiler.target
指定了Java编译版本为8;flink.version
、scala.version
、log4j.version
和slf4j.version
分别定义了Flink、Scala、Log4j和slf4j的版本号。使用属性定义版本号的好处是便于统一管理和修改,当需要升级某个依赖版本时,只需在一处修改属性值即可。 - 依赖配置:
- Flink核心依赖:
flink-core
是Flink的核心库,包含了Flink运行的基础功能和核心API,是所有Flink项目的必备依赖。 - Flink流处理依赖:
flink-streaming-java_${scala.version}
提供了Flink流处理的核心功能,用于开发实时流处理任务。 - Flink图处理依赖:
flink-gelly_${scala.version}
用于支持Flink的图处理功能,如果项目中涉及图计算相关业务,需添加此依赖。 - Kafka连接器依赖:
flink-connector-kafka_${scala.version}
允许Flink与Kafka进行集成,实现从Kafka读取数据或向Kafka写入数据,是处理消息队列场景的常用依赖。 - Flink Table API & SQL依赖:
flink-table-api-java-bridge_${scala.version}
和flink-table-planner_${scala.version}
用于支持Flink的Table API和SQL功能,方便开发者使用SQL语句进行数据处理。 - JSON处理依赖:
flink-json
提供了对JSON数据格式的处理支持,在处理JSON数据时非常实用。 - Flink客户端依赖:
flink-clients_${scala.version}
用于与Flink集群进行交互,如提交作业、管理任务等。 - 日志相关依赖:
slf4j-api
是slf4j的核心接口库,slf4j-simple
是slf4j的一个简单实现,用于在项目中输出日志。
- Flink核心依赖:
- 构建配置:
- 最终产物名称:
<finalName>Flink-Job</finalName>
指定了打包后Jar文件的名称。 - 打包插件配置:
maven-shade-plugin
插件用于将项目及其依赖打包成一个可执行的Jar文件。<excludes>
标签中排除了一些与日志相关的依赖重复打包,避免冲突;在<execution>
中,通过<filters>
进一步过滤掉一些不必要的文件(如签名文件),确保打包后的Jar文件更加精简、可靠。
- 最终产物名称:
通过以上对Flink项目本地日志配置和pom配置的详细介绍,开发者在后续开发Flink Jar任务时,可直接复用这些基础配置,减少重复工作,将更多精力投入到业务逻辑开发中。当然,实际项目中可根据具体需求对配置进行灵活调整和扩展,以满足多样化的开发场景。