大数据Hadoop之——Flink1.17.0安装与使用(非常详细)

发布于:2025-07-07 ⋅ 阅读:(39) ⋅ 点赞:(0)

一、前期准备

1、查看网卡

2、配置静态IP 

vi /etc/sysconfig/network-scripts/ifcfg-ens32  ----  根据自己网卡设置。 

3、设置主机名 

hostnamectl --static set-hostname  主机名

例如:

hostnamectl --static set-hostname  hadoop001

4、配置IP与主机名映射

vi /etc/hosts

5、关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

6、配置免密登录

传送门 

二、JDK的安装

传送门

注意:

Flink1.16.0版本也支持使用JDK8,后续版本对JDK8的支持将会移除。

从Flink 1.17.0版本开始,必须使用Java 11或更高版本来运行Flink。这是因为Flink为了支持最新的Java API和语言特性,需要Java 11中引入的一些新功能。

虽然使用JDK8 也可以,但从 Flink 1.17 开始,部分依赖于 Flink 的第三方库已经弃用了对 JDK 8 的支持,并要求使用 JDK 11 或更高版本。

考虑到Flink后期与一些大数据框架进行整合,这些大数据框架对JDK11的支持并不完善,例如:Hive3.1.3版本还不支持JDK11,所以采用JDK8来开发Flink。

三、Flink的本地安装

1、Flink的下载安装  

​1.1. 下载

Index of /dist/flink/flink-1.17.0

https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

​下载 flink-1.17.0-bin-scala_2.12.tgz 安装包

1.2 上传
使用xshell上传到指定安装路径

此处是安装路径是 /opt/module

1.3 解压重命名

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz

mv flink-1.17.0 flink

1.4 配置环境变量

vi  /etc/profile

export JAVA_HOME=/opt/module/java

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export FLINK_HOME=/opt/module/flink

export PATH=$PATH:$JAVA_HOME/bin:$FLINK_HOME/bin

1.5 加载环境变量

source  /etc/profile

验证环境变量是否生效:

env | grep HOME

env | grep PATH

2、修改flink-conf.yaml配置文件

cd /opt/module/flink
vi conf/flink-conf.yaml

# JobManager节点地址
jobmanager.rpc.address: hadoop001
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop001
rest.bind-address: 0.0.0.0


# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop001

3、启动集群

cd /opt/module/flink

./bin/start-cluster.sh

4、查看进程

5、查看WebUI

默认端口 :8081

http://192.168.200.151:8081/

6、停止集群 

cd /opt/module/flink

./bin/stop-cluster.sh

三、Flink Standalone搭建

  准备三台节点:hadoop001、hadoop002、hadoop003,每个节点部署不同角色。

进程 节点

Master(JobManager)

Slave(TaskManager)

hadoop001
Slave(TaskManager) hadoop002
Slave(TaskManager) hadoop003

以下内容是在Flink本地安装的基础上进行的。

1、修改workers文件

cd /opt/module/flink
vi conf/workers

hadoop001

hadoop002

hadoop003

2、分发文件

scp -r  /etc/profile  root@hadoop002:/etc/profile

scp -r  /etc/profile  root@hadoop003:/etc/profile

scp -r  /opt/module/java root@hadoop002:/opt/module/java 

scp -r  /opt/module/java root@hadoop003:/opt/module/java 

scp -r  /opt/module/flink root@hadoop002:/opt/module/flink

scp -r  /opt/module/flink root@hadoop003:/opt/module/flink

让三台机器文件生效

ssh hadoop001 "source /etc/profile"
ssh hadoop002 "source /etc/profile"
ssh hadoop003 "source /etc/profile"

3、启动集群

cd /opt/module/flink

./bin/start-cluster.sh

4、查看WebUI

默认端口 :8081

http://192.168.200.151:8081/

5、测试

     尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:

cd /opt/module/flink

./bin/flink run examples/streaming/WordCount.jar

 

6、停止集群 

cd /opt/module/flink

./bin/stop-cluster.sh

四、Flink Standalone HA搭建(不推荐后面由Yarn管理)

        HA的主要作用:可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager

进程 节点

Master(JobManager)

Slave(TaskManager)

FlinkZooKeeperQuorumPeer

hadoop001

Master(JobManager)

Slave(TaskManager)

FlinkZooKeeperQuorumPeer

hadoop002

Slave(TaskManager)

FlinkZooKeeperQuorumPeer

hadoop003

下面内容在 Flink Standalone 搭建前提下修改。

1、修改flink-conf.yaml配置文件

cd /opt/module/flink
vi conf/flink-conf.yaml

# 将高可用模式设置为ZooKeeper,默认集群不会开启高可用状态
high-availability: zookeeper
 
# ZooKeeper集群主机名(或IP)与端口列表,多个以逗号分隔
high-availability.zookeeper.quorum: hadoop001:2181,hadoop002:2181,hadoop003:2181
 
# 用于持久化JobManager元数据(JobGraph、应用程序JAR文件等)的HDFS地址,以便进行故障恢复,ZooKeeper上存储的只是元数据所在的位置路径信息
high-availability.storageDir: /opt/module/flink/ha

2、修改master文件

cd /opt/module/flink
vi conf/masters

hadoop001:8081

hadoop002:8082

3、分发文件

scp -r $FLINK_HOME/conf/flink-conf.yaml root@hadoop002:$FLINK_HOME/conf/flink-conf.yaml

scp -r $FLINK_HOME/conf/flink-conf.yaml root@hadoop003:$FLINK_HOME/conf/flink-conf.yaml

scp -r $FLINK_HOME/conf/masters root@hadoop002:$FLINK_HOME/conf/masters

scp -r $FLINK_HOME/conf/masters root@hadoop003:$FLINK_HOME/conf/masters

4、安装与启动Zookeeper

传送门

5、启动集群

cd /opt/module/flink

./bin/start-cluster.sh

6、查看WebUI

默认端口 :8081

http://192.168.200.151:8081/

7、测试

     尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:

cd /opt/module/flink

./bin/flink run examples/streaming/WordCount.jar

8、停止集群 

cd /opt/module/flink

./bin/stop-cluster.sh

五、YARN运行模式(重点) 

       YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

        本节是在 三、Flink Standalone搭建 的基础上进行修改。

1、安装启动Hadoop集群

      在将Flink任务部署YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。

传送门

2、配置环境变量

vi /etc/profile

export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

export HADOOP_CLASSPATH=`hadoop classpath`

不要丢掉 一漂符号`

注意:重点是 HADOOP_CLASSPATH=`hadoop classpath` 是执行命令 hadoop classpath,使环境变量能够加载到hadoop的类路径和包路径。此时可以将hadoopFlink进行解耦,不用纠结使用hadoop的哪个版本。 

3、修改配置文件flink-conf.yaml

1.此文件中的 hadoop001 无需修改,启动时候yarn自动分配代理的主机名和端口

# JobManager节点地址.
jobmanager.rpc.address: hadoop001
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop001
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop001

2、设置加载检查

vi /opt/module/flink/conf/flink-conf.yaml

classloader.check-leaked-classloader: false

注意:如果上面配置中是避免启动过程中报如下异常。

Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.

at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders

4、分发文件

scp -r  /etc/profile  root@hadoop002:/etc/profile

scp -r  /etc/profile  root@hadoop003:/etc/profile

scp -r  /opt/module/flink/conf/flink-conf.yaml  root@hadoop002:/opt/module/flink/conf/flink-conf.yaml

scp -r  /opt/module/flink/conf/flink-conf.yaml  root@hadoop003:/opt/module/flink/conf/flink-conf.yaml

三台机器分别执行 source /etc/profile

5、Session会话模式

       YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。

5.1.启动Hadoop集群

cd /opt/module/hadoop

sbin/start-all.sh

5.2.启动Flink的会话模式

cd /opt/module/flink

bin/yarn-session.sh -d -nm flinkTest

参数说明:

-d:分离模式,执行命令后不会占用窗口,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。

 注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的

       YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,

       此时Yarn为Flink动态分配资源并启动JobManager。用户可以通过Web UI或者命令行两种方式提交作业。 

5.3.命令行提交作业

新打开一个窗口,使用命令行方式提交作业,提交后Yarn会自动的为Flink分配资源启动对应的TaskManager。运行自己到 webui上进行查看。

cd /opt/module/flink

./bin/flink run examples/streaming/WordCount.jar

 

5.4.小结

      通过前面来看,在yarn的每个应用其实就是对应 flink 的 一个集群

6、单作业模式

        前面不变,只是提交方式改变。

cd /opt/module/flink

bin/flink run -d -t yarn-per-job examples/streaming/WordCount.jar

        如果报错如图所示,则配置flink-conf.yaml

vi /opt/module/flink/conf/flink-conf.yaml

env.java.home: /opt/module/java
env.hadoop.conf.dir: /opt/module/hadoop/etc/hadoop

      修改后分发文件

scp -r  /opt/module/flink/conf/flink-conf.yaml  root@hadoop002:/opt/module/flink/conf/flink-conf.yaml

scp -r  /opt/module/flink/conf/flink-conf.yaml  root@hadoop003:/opt/module/flink/conf/flink-conf.yaml

7、应用模式

        前面不变,只是提交方式改变。

cd /opt/module/flink

bin/flink run-application -t yarn-application examples/streaming/WordCount.jar

       如果 Flink 本身的依赖和j插件的jar,用户可以预先上传到HDFS,而不需要每次单独发送到集群,这就使得作业提交更加轻量了。

1、创建HDFS,目录

hdfs dfs -mkdir /flink-dist
hdfs dfs -mkdir /flink-jars

2、上传 Flink 本身的依赖和用户jar
hdfs dfs -put /opt/module/flink/lib/ /flink-dist
hdfs dfs -put /opt/module/flink/plugins/ /flink-dist
hdfs dfs -put /opt/module/flink/examples/streaming/WordCount.jar /flink-dist

3、提交作业

cd /opt/module/flink

bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop001:9000/flink-dist" hdfs://hadoop001:9000/flink-jars/WordCount.jar