集群规划
节点角色 | 主机名 | IP 地址 | 运行服务 |
---|---|---|---|
JobManager | hadoop1 | 192.168.16.219 | JobManager, Web UI |
TaskManager | hadoop2 | 192.168.16.67 | TaskManager |
TaskManager | hadoop3 | 192.168.16.249 | TaskManager |
环境准备(所有节点)
1. 安装 JDK 11(如未安装)
sudo yum install -y java-11-openjdk-devel
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk' | sudo tee /etc/profile.d/java.sh
source /etc/profile.d/java.sh
2. 配置主机解析(所有节点)
sudo tee -a /etc/hosts << EOF
192.168.16.219 hadoop1
192.168.16.67 hadoop2
192.168.16.249 hadoop3
EOF
Flink 安装与配置(在 JobManager 节点 hadoop1 操作)
1. 下载并解压 Flink
wget https://archive.apache.org/dist/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz -C /opt/
sudo ln -s /opt/flink-1.14.6 /opt/flink
2. 配置环境变量
sudo tee /etc/profile.d/flink.sh << 'EOF'
export FLINK_HOME=/opt/flink
export PATH=$PATH:$FLINK_HOME/bin
EOF
source /etc/profile
3. 配置 Flink 核心文件
flink-conf.yaml
sudo tee $FLINK_HOME/conf/flink-conf.yaml << 'EOF'
# 基础配置
jobmanager.rpc.address: hadoop1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
# 网络配置
taskmanager.host: 0.0.0.0
rest.address: hadoop1
rest.port: 8081
# 检查点配置
execution.checkpointing.interval: 10000
state.backend: filesystem
state.checkpoints.dir: hdfs://hadoop1:9000/flink/checkpoints
# 与Hadoop集成
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop
classloader.resolve-order: parent-first
EOF
masters
sudo tee $FLINK_HOME/conf/masters << 'EOF'
hadoop1:8081
EOF
workers
sudo tee $FLINK_HOME/conf/workers << 'EOF'
hadoop2
hadoop3
EOF
4. 配置 Hadoop 集成
# 复制 Hadoop 配置文件到 Flink
sudo cp /usr/local/hadoop/etc/hadoop/core-site.xml $FLINK_HOME/conf/
sudo cp /usr/local/hadoop/etc/hadoop/hdfs-site.xml $FLINK_HOME/conf/
分发 Flink 到所有节点
# 分发到 TaskManager 节点
scp -r /opt/flink-1.14.6 hadoop2:/opt/
scp -r /opt/flink-1.14.6 hadoop3:/opt/
# 创建符号链接
ssh hadoop2 "ln -s /opt/flink-1.14.6 /opt/flink"
ssh hadoop3 "ln -s /opt/flink-1.14.6 /opt/flink"
# 分发环境变量
scp /etc/profile.d/flink.sh hadoop2:/etc/profile.d/
scp /etc/profile.d/flink.sh hadoop3:/etc/profile.d/
# 应用环境变量
ssh hadoop2 "source /etc/profile"
ssh hadoop3 "source /etc/profile"
创建 HDFS 目录(在 hadoop1 操作)
hdfs dfs -mkdir -p /flink/checkpoints
hdfs dfs -chmod 777 /flink/checkpoints
启动 Flink 集群(在 hadoop1 操作)
# 启动集群
$FLINK_HOME/bin/start-cluster.sh
# 启动历史服务器(可选)
$FLINK_HOME/bin/historyserver.sh start
验证集群状态
1. 检查进程
# hadoop1 (JobManager)
jps | grep -E 'StandaloneSessionClusterEntrypoint|HistoryServer'
# hadoop2 和 hadoop3 (TaskManager)
jps | grep TaskManager
2. Web UI 访问
http://192.168.16.219:8081
3. 命令行验证
# 查看集群状态
$FLINK_HOME/bin/flink list
# 查看 TaskManager
$FLINK_HOME/bin/flink run -m hadoop1:8081 $FLINK_HOME/examples/streaming/WordCount.jar
运行测试作业
1. 提交 WordCount 示例
$FLINK_HOME/bin/flink run \
-m hadoop1:8081 \
$FLINK_HOME/examples/streaming/WordCount.jar
2. 提交 Socket 流处理示例
# 第一个终端:启动数据源
nc -lk 9999
# 第二个终端:提交作业
$FLINK_HOME/bin/flink run \
-m hadoop1:8081 \
$FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \
--port 9999
集群管理命令
常用命令
# 启动集群
$FLINK_HOME/bin/start-cluster.sh
# 停止集群
$FLINK_HOME/bin/stop-cluster.sh
# 启动历史服务器
$FLINK_HOME/bin/historyserver.sh start
# 停止历史服务器
$FLINK_HOME/bin/historyserver.sh stop
作业管理
# 提交作业
$FLINK_HOME/bin/flink run -m hadoop1:8081 /path/to/job.jar
# 取消作业
$FLINK_HOME/bin/flink cancel <jobid>
# 保存点操作
$FLINK_HOME/bin/flink savepoint <jobid> hdfs://hadoop1:9000/flink/savepoints
集成 Hadoop YARN(可选)
1. 配置 YARN 模式
# 设置环境变量
echo 'export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop' | sudo tee -a /etc/profile.d/flink.sh
source /etc/profile
# 测试 YARN 集成
$FLINK_HOME/bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
$FLINK_HOME/examples/streaming/WordCount.jar
2. YARN 常用命令
# 查看 YARN 应用
yarn application -list
# 停止 Flink YARN 应用
yarn application -kill <applicationid>
故障排查
1. 查看日志
# JobManager 日志
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# TaskManager 日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
2. 常见问题解决
端口冲突
# 查看端口占用
netstat -tunlp | grep <端口号>
HDFS 权限问题
# 修复 HDFS 权限
hdfs dfs -chmod -R 777 /flink
内存配置错误
# 调整 flink-conf.yaml
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m
生产环境建议
1. 安全配置
# flink-conf.yaml
security.ssl.enabled: true
security.ssl.keystore: /path/to/keystore.jks
security.ssl.truststore: /path/to/truststore.jks
2. 高可用配置
# flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop1:9000/flink/ha/
high-availability.zookeeper.quorum: hadoop1:2181,hadoop2:2181,hadoop3:2181
3. 监控配置
# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260
4. 资源隔离
# 为 Flink 创建专用用户
sudo useradd -m -s /bin/bash flink
sudo chown -R flink:flink /opt/flink-1.14.6
卸载 Flink
# 停止服务
$FLINK_HOME/bin/stop-cluster.sh
$FLINK_HOME/bin/historyserver.sh stop
# 删除文件
sudo rm -rf /opt/flink-1.14.6
sudo rm /opt/flink
sudo rm /etc/profile.d/flink.sh
# 清理 HDFS
hdfs dfs -rm -r /flink
版本升级
# 1. 停止当前集群
$FLINK_HOME/bin/stop-cluster.sh
# 2. 备份配置
cp -r $FLINK_HOME/conf /tmp/flink-conf-backup
# 3. 下载新版本
wget https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
# 4. 解压并迁移配置
tar -zxvf flink-1.15.0-bin-scala_2.12.tgz -C /opt/
cp /tmp/flink-conf-backup/* /opt/flink-1.15.0/conf/
# 5. 更新符号链接
ln -sfn /opt/flink-1.15.0 /opt/flink
# 6. 启动新集群
$FLINK_HOME/bin/start-cluster.sh
通过以上步骤,您可以在已有的 Hadoop 集群上成功部署 Apache Flink 1.14.6 Standalone 集群,并实现与 HDFS 的集成。</端口号></applicationid></jobid></jobid>