Flink基础简介和安装部署

发布于:2025-03-27 ⋅ 阅读:(30) ⋅ 点赞:(0)

一、Flink基础简介

1、什么是Flink

Flink是⼀个分布式,高性能,随时可⽤的以及准确的流处理计算框架,Flink可以对无界数据(流处理)和有界数据(批处理)进⾏有状态计算的分布式,⾼性能的计算框架。

⽆界数据流:数据流是有⼀个开始但是没有结束;
有界数据流:数据流是有⼀个明确的开始和结束,数据流是有边界的。

2、Flink流处理特性

1.支持高吞吐、低延迟、高性能的流处理
2.支持带有事件时间的窗口(Window)操作
3.支持有状态计算的 Exactly-once 语义
4.支持高度灵活的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作
5.支持具有 Backpressure(背压) 功能的持续流模型
6.支持基于轻量级分布式快照(Snapshot)实现的容错
7.一个运行时同时支持 Batch on Streaming 处理和 Streaming 处理
8.Flink 在 JVM 内部实现了自己的内存管理
9.支持迭代计算; 
10.程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果有必要进行缓存

3、Flink四大基石

checkpoint,state,time,window

checkpoint:
基于chandy-lamport算法实现分布式计算任务的⼀致性语义;

state:
flink中的状态机制,flink 天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;

time:
flink中⽀持基于事件时间和处理时间进⾏计算,spark streaming只能按照process time进⾏处理;基于事件时间的计算我们可以解决数据延迟和乱序等问题。

window:
flink提供了更多丰富的window,基于时间,基于数量,session window,同样⽀持滚动和滑动窗
⼝的计算。

4、Flink中的角色

JobManager: 负责资源申请,任务分发,任务调度执行,checkpoint的协调执行;
TaskManager: 负责任务的执行,基于Dataflow(Spark中DAG)划分出的Task;与JobManager保持⼼跳,汇报任务状态。

二、Flink集群搭建

1、Local模式

单机模式,适合自测学习使用,下面是简单部署步骤

①上传Flink安装包

准备服务器JDK1.8及以上版本,配置免密登录;详情可参考hadoop搭建模块
上传安装包 flink-1.7.2-bin-hadoop27-scala_2.11.tgz 然后解压到/opt目录(之前上传hadoop的目录),注意修改所属用户和用户组

注意: 服务器需要配置JDK8的环境

tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz
mv flink-1.7.2 flink
chown -R root:root flink

如下图所示:
在这里插入图片描述

②启动交互窗口

进入Flink的bin目录下启动shell交互式窗口

cd /opt/flink
bin/start-scala-shell.sh local

如下图所示
在这里插入图片描述

③提交任务测试

注意: 提前创建一个测试文件/root/words.txt,随便写入一些文本作为测试文件
在这里插入图片描述

benv.readTextFile("/root/words.txt").flatMap(_.split("")).map((_,1)).groupBy(0).sum(1).print()
④访问WebUI页面查看
http://192.168.88.101:8081

在这里插入图片描述

⑤退出停止集群
:quit

在这里插入图片描述

2、Standalone模式

Flink自带集群,资源管理由Flink集群管理,下面是原理图和部署步骤

提交作业
分发任务
分发任务
交换中间结果
发送状态,汇报心跳
交换中间结果
发送状态,汇报心跳
客户端
JobManager
负责申请和管理资源,并管理资源和任务
TaskManager
执行任务
TaskManager
执行任务

hadoop-1: 部署JobManager+TaskManager
hadoop-2: 部署TaskManager
hadoop-3: 部署TaskManager

①修改配置⽂件 conf/flink-conf.yaml
jobmanager.rpc.address: hadoop-1
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024
taskmanager.heap.size: 1024
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
taskmanager.tmp.dirs: /opt/flink/tmp
web.submit.enable: true
②修改conf/masters⽂件
hadoop-1:8081
③修改conf/slaves⽂件
hadoop-1
hadoop-2
hadoop-3
④分发flink⽬录到其它节点
scp -r /opt/flink hadoop-2:/opt/flink
scp -r /opt/flink hadoop-3:/opt/flink

#flink on yarn模式需要配置hadoop_conf_dir到/etc/profile中,并分发到其他节点
vim /etc/profile
#添加这一行
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
#执行 source /etc/profile
#分发 到其他节点
scp -r /etc/profile hadoop-2:/etc/profile
scp -r /etc/profile hadoop-3:/etc/profile到其他节点
⑤启动集群
启动集群
bin/start-cluster.sh

停止集群
bin/stop-cluster.sh

单独启停jobmanager
bin/jobmanager.sh start/stop

单独启停taskmanager
bin/taskmanager.sh start/stop

在这里插入图片描述
在这里插入图片描述
注意 : 如果6123端口被占用,启动服务后这里的三个数据都会显示为0

# 检查 6123 端口是否被占用
netstat -tuln | grep 6123
# 查询PID
lsof -i :6123
# 如果占用,杀死相关进程(谨慎操作!)
sudo kill -9 <PID>
⑥提交测试作业
/opt/flink/bin/flink run  /opt/flink/examples/batch/WordCount.jar --input hdfs://hadoop-1:8020/input/wordcount/hello.txt --output  hdfs://hadoop-1:8020/output/result.txt --parallelism 2

在这里插入图片描述

注意: 启动hadoop的是hadoop 用户, 需要授权root用户output目录的写权限才可以
授权命令(完全开放权限,仅供测试,生产环境不推荐): hdfs dfs -chmod 777 /output

3、Flink on Yarn模式

资源管理交给Yarn实现的模式