Flink
1️⃣ 一 、知识要点
📖 1. Flink简介
- Apache Flink® — Stateful Computations over Data Streams
- Apache Flink 是一个分布式大数据处理引擎,可对有界数据流和无界数据流进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算
- 官网地址:http://flink.apache.org
1.1 处理无界和有界数据
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
数据可以被作为无界或者有界流来处理。
(1) 无界流
有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
(2) 有界流
有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
- Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
1.2 部署应用到任意地方
Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。
Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。 部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成
1.3 运行任意规模应用
Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。
Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字: 每天处理数万亿的事件 可以维护几TB大小的状态 可以部署上千个节点的集群
1.4 利用内存性能
- 有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。
📖 2. Flink 的应用场景
在实际生产的过程中,大量数据在不断地产生,例如金融交易数据、互联网订单数据、 GPS 定位数据、传感器信号、移动终端产生的数据、通信信号数据等,以及我们熟悉的网络 流量监控、服务器产生的日志数据,这些数据最大的共同点就是实时从不同的数据源中产生, 然后再传输到下游的分析系统。针对这些数据类型主要包括实时智能推荐、复杂事件处理、 实时欺诈检测、实时数仓与 ETL 类型、流数据分析类型、实时报表类型等实时业务场景,而 Flink 对于这些类型的场景都有着非常好的支持。
1、实时智能推荐
智能推荐会根据用户历史的购买行为,通过推荐算法训练模型,预测用户未来可能会购 买的物品。对个人来说,推荐系统起着信息过滤的作用,对 Web/App 服务端来说,推荐系统 起着满足用户个性化需求,提升用户满意度的作用。推荐系统本身也在飞速发展,除了算法 越来越完善,对时延的要求也越来越苛刻和实时化。利用 Flink 流计算帮助用户构建更加实 时的智能推荐系统,对用户行为指标进行实时计算,对模型进行实时更新,对用户指标进行 实时预测,并将预测的信息推送给 Wep/App 端,帮助用户获取想要的商品信息,另一方面也 帮助企业提升销售额,创造更大的商业价值。
2、复杂事件处理
对于复杂事件处理,比较常见的案例主要集中于工业领域,例如对车载传感器、机械设 备等实时故障检测,这些业务类型通常数据量都非常大,且对数据处理的时效性要求非常高。 通过利用 Flink 提供的 CEP(复杂事件处理)进行事件模式的抽取,同时应用 Flink 的 Sql 进行事件数据的转换,在流式系统中构建实时规则引擎,一旦事件触发报警规则,便立即将 告警结果传输至下游通知系统,从而实现对设备故障快速预警监测,车辆状态监控等目的。
3、实时欺诈检测
在金融领域的业务中,常常出现各种类型的欺诈行为,例如信用卡欺诈、信贷申请欺诈等,而如何保证用户和公司的资金安全,是来近年来许多金融公司及银行共同面对的挑战。 随着不法分子欺诈手段的不断升级,传统的反欺诈手段已经不足以解决目前所面临的问题。 以往可能需要几个小时才能通过交易数据计算出用户的行为指标,然后通过规则判别出具有 欺诈行为嫌疑的用户,再进行案件调查处理,在这种情况下资金可能早已被不法分子转移, 从而给企业和用户造成大量的经济损失。而运用 Flink 流式计算技术能够在毫秒内就完成对 欺诈判断行为指标的计算,然后实时对交易流水进行规则判断或者模型预测,这样一旦检测 出交易中存在欺诈嫌疑,则直接对交易进行实时拦截,避免因为处理不及时而导致的经济损失。
4、实时数仓与 ETL
结合离线数仓,通过利用流计算诸多优势和 SQL 灵活的加工能力,对流式数据进行实时 清洗、归并、结构化处理,为离线数仓进行补充和优化。另一方面结合实时数据 ETL 处理能力,利用有状态流式计算技术,可以尽可能降低企业由于在离线数据计算过程中调度逻辑的 复杂度,高效快速地处理企业需要的统计结果,帮助企业更好地应用实时数据所分析出来的结果。
5、流数据分析
实时计算各类数据指标,并利用实时结果及时调整在线系统相关策略,在各类内容投放、 无线智能推送领域有大量的应用。流式计算技术将数据分析场景实时化,帮助企业做到实时化分析 Web 应用或者 App 应用的各项指标,包括 App 版本分布情况、Crash 检测和分布等, 同时提供多维度用户行为分析,支持日志自主分析,助力开发者实现基于大数据技术的精细 化运营、提升产品质量和体验、增强用户黏性。
6、实时报表分析
实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用便是实时 大屏展示。利用流式计算实时得出的结果直接被推送到前端应用,实时显示出重要指标的变 换情况。最典型的案例便是淘宝的双十一活动,每年双十一购物节,除疯狂购物外,最引人 注目的就是天猫双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购 买到数据采集、数据计算、数据校验,最终落到双十一大屏上展现的全链路时间压缩在 5 秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。 而在其他行业,企业也在构建自己的实时报表系统,让企业能够依托于自身的业务数据,快速提取出更多的数据价值,从而更好地服务于企业运行过程中。
📖 3. Flink基本技术栈
在flink整个软件架构体系中。同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建flink应用提供了丰富且友好的接口。
- API & libraries层
作为分布式数据处理框架,fink同时提供了支撑流计算和批计算的接口,同时在此基础之上抽象出不同的应用类型的组件库。
如:基于流处理的CEP(复杂事件处理库)、SQL&Table库、FlinkML(机器学习库)、Gelly(图处理库)
有流式处理API,批处理API。流式处理的支持事件处理,表操作。批处理的,支持机器学习,图计算,也支持表操作。
- Runtime核心层
该层主要负责对上层的接口提供基础服务,也就是flink分布式计算的核心实现。flink底层的执行引擎。
- 物理部署层
该层主要涉及到flink的部署模式,目前flink支持多种部署模式:
本地 local
集群 standalone/yarn
云 GCE/EC2 谷歌云、亚马逊云
kubenetes
📖 4. Flink基本架构
Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master-Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker(Slave)节点。所有组件之间的通信都是借助于 Akka Framework,包括任务的状态以及 Checkpoint 触发等信息
Client
客户端负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交JobManager, 通过和 JobManager 之间进行交互获取任务执行状态。 客户端提交任务可以采用CLI 方式或者通过使用 Flink WebUI 提交,也可以在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 提交 Flink 应用。
JobManager
JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。 JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。 JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。 当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。
TaskManager
TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。 TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。 可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。
📖 5. Flink的源码编译(了解)
我们可以对flink的源码进行编译,方便对我们各种hadoop的版本进行适配
参见:https://blog.csdn.net/h335146502/article/details/96483310
cd /kkb/soft
编译flink-shaded包
wget https://github.com/apache/flink-shaded/archive/release-7.0.tar.gz
tar -zxvf flink-shaded-release-7.0.tar.gz -C /kkb/install/
cd /kkb/install/flink-shaded-release-7.0/
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.14.2
编译flink源码
wget http://archive.apache.org/dist/flink/flink-1.9.2/flink-1.9.2-src.tgz
tar -zxf flink-1.9.2-src.tgz -C /kkb/install/
cd /kkb/install/flink-1.9.2/
mvn -T2C clean install -DskipTests -Dfast -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.2
📖 6. Local模式安装
1、安装jdk,配置JAVA_HOME,建议使用jdk1.8以上
2、安装包下载地址:
https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz
3、直接上传安装包到服务器
4、解压安装包并配置环境变量
tar -zxf flink-1.9.2-bin-scala_2.11.tgz -C /kkb/install/ 配置环境变量 sudo vim /etc/profile export FLINK_HOME=/kkb/install/flink-1.9.2 export PATH=:$FLINK_HOME/bin:$PATH
5、启动服务
local模式,什么配置项都不需要配,直接启动服务器即可
cd /kkb/install/flink-1.9.2 #启动flink bin/start-cluster.sh #停止flink bin/stop-cluster.sh
6、Web页面浏览
- http://node01:8081/#/overview
📖 7. Standalone模式安装
- (1)集群规划
主机名 | JobManager | TaskManager |
---|---|---|
node01 | 是 | 是 |
node02 | 是 | 是 |
node03 | 是 |
(2)依赖
- jdk1.8以上,配置JAVA_HOME
- 主机之间免密码
(3)安装步骤
node01修改以下配置文件
(a) 修改conf/flink-conf.yaml
#jobmanager地址
jobmanager.rpc.address: node01
#使用zookeeper搭建高可用
high-availability: zookeeper
##存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
(b) 修改conf/slaves
node01
node02
node03
(c) 修改conf/masters
node01:8081
node02:8081
(d)上传flink-shaded-hadoop-2-uber-2.7.5-10.0.jar到flink的lib目录下
将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 这个jar包上传到flink的安装目录的lib下
(e) 拷贝到其他节点
scp -r /kkb/install/flink-1.9.2 node02:/kkb/install
scp -r /kkb/install/flink-1.9.2 node03:/kkb/install
(f):node01(JobMananger)节点启动
注意:启动之前先启动hadoop和zookeeper集群
cd /kkb/install/flink-1.9.2
bin/start-cluster.sh
(g):访问
http://node01:8081
http://node02:8081
(h):关闭flink集群, 在主节点上执行
cd /kkb/install/flink-1.9.2
bin/stop-cluster.sh
- (4) StandAlone模式需要考虑的参数
jobmanager.heap.mb: jobmanager节点可用的内存大小
taskmanager.heap.mb: taskmanager节点可用的内存大小
taskmanager.numberOfTaskSlots: 每台taskmanager节点可用的cpu数量
parallelism.default: 默认情况下任务的并行度
taskmanager.tmp.dirs: taskmanager的临时数据存储目录
📖 8. Flink on Yarn模式安装
- 首先安装好Hadoop(yarn)
- 上传一个flink的包,配置好hadoop的环境变量就可以了
- flink on yarn有两种方式
8.1 第一种方式
内存集中管理模式(Yarn Session)
- 在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
8.2 第二种方式
- 内存Job管理模式==【yarn-cluster 推荐使用】==
- 在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
- 在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
8.3 不同模式的任务提交
第一种模式
【yarn-session.sh(开辟资源) + flink run(提交任务)】
1、在flink目录启动yarn-session
bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d # -n 表示申请2个容器, # -s 表示每个容器启动多少个slot # -tm 表示每个TaskManager申请1024M内存 # -d 表示以后台程序方式运行
2、使用 flink 脚本提交任务
bin/flink run examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output/result.txt ##说明:如果启动了很多的yarn-session, 在提交任务的时候可以通过参数 -yid 指定作业提交到哪一个yarn-session中运行 ##例如: bin/flink run \ -yid application_1597295374041_0008 \ examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output/result.txt
- 3、停止任务
yarn application -kill application_1587024622720_0001
第二种模式
【flink run -m yarn-cluster (开辟资源+提交任务)】
1、启动集群,执行任务
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 \ examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output1 注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。
help信息
yarn-session.sh 脚本参数
用法: 必选 -n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量) 可选 -D <arg> 动态属性 -d,--detached 独立运行 -jm,--jobManagerMemory <arg> JobManager的内存 [in MB] -nm,--name 在YARN上为一个自定义的应用设置一个名字 -q,--query 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> 指定YARN队列. -s,--slots <arg> 每个TaskManager使用的slots数量 -tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB] -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
flink run 脚本参数
run [OPTIONS] <jar-file> <arguments> "run" 操作参数: -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 -m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。 默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 连接指定host和port的jobmanager: ./bin/flink run -m node01:6123 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀 例如:flink run -m yarn-cluster -yn 2 examples/batch/WordCount.jar
8.4 Flink on YARN集群部署
- (1) flink on yarn运行原理
- 其实Flink on YARN部署很简单,就是只要部署好hadoop集群即可,我们只需要部署一个Flink客户端,然后从flink客户端提交Flink任务即可。类似于spark on yarn模式。
📖 9. 入门案例演示
9.1 实时需求分析
实时统计每隔1秒统计最近2秒单词出现的次数
- 创建maven工程,添加pom依赖
<properties>
<flink.version>1.9.2</flink.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
9.1.1 实时代码开发(scala版本,flink完全倒向了java,别用scala了)
代码开发
package com.kaikeba.demo1 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time /** * 使用滑动窗口 * 每隔1秒钟统计最近2秒钟的每个单词出现的次数 */ object FlinkStream { def main(args: Array[String]): Unit = { //构建流处理的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //从socket获取数据 val sourceStream: DataStream[String] = env.socketTextStream("node01",9999) //导入隐式转换的包 import org.apache.flink.api.scala._ //对数据进行处理 val result: DataStream[(String, Int)] = sourceStream .flatMap(x => x.split(" ")) //按照空格切分 .map(x => (x, 1)) //每个单词计为1 .keyBy(0) //按照下标为0的单词进行分组 .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s处理2s的数据 .sum(1) //按照下标为1累加相同单词出现的次数 //对数据进行打印 result.print() //开启任务 env.execute("FlinkStream") } }
发送 socket 数据
##在node01上安装nc服务 sudo yum -y install nc nc -lk 9999
打成jar包提交到yarn中运行
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.kaikeba.demo1.FlinkStream original-flink_study-1.0-SNAPSHOT.jar
9.1.2 实时代码开发(java版本)
代码开发
package com.kaikeba.demo1; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * java代码开发实时统计每隔1秒统计最近2秒单词出现的次数 */ public class WindowWordCountJava { public static void main(String[] args) throws Exception { //步骤一:获取流式处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //步骤二:获取socket数据 DataStreamSource<String> sourceDstream = env.socketTextStream("node01", 9999); //步骤三:对数据进行处理 DataStream<WordCount> wordAndOneStream = sourceDstream.flatMap(new FlatMapFunction<String, WordCount>() { public void flatMap(String line, Collector<WordCount> collector) throws Exception { String[] words = line.split(" "); for (String word : words) { collector.collect(new WordCount(word, 1L)); } } }); DataStream<WordCount> resultStream = wordAndOneStream .keyBy("word") //按照单词分组 .timeWindow(Time.seconds(2), Time.seconds(1)) //每隔1s统计2s的数据 .sum("count"); //按照count字段累加结果 //步骤四:结果打印 resultStream.print(); //步骤五:任务启动 env.execute("WindowWordCountJava"); } public static class WordCount{ public String word; public long count; //记得要有这个空构建 public WordCount(){ } public WordCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
发送socket数据
#在node01上执行命令,发送数据 nc -lk 9999
9.2 离线需求分析
对文件进行单词计数,统计文件当中每个单词出现的次数。
9.2.1 离线代码开发(scala)
package com.kaikeba.demo1
import org.apache.flink.api.scala.{ DataSet, ExecutionEnvironment}
/**
* scala开发flink的批处理程序
*/
object FlinkFileCount {
def main(args: Array[String]): Unit = {
//todo:1、构建Flink的批处理环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//todo:2、读取数据文件
val fileDataSet: DataSet[String] = env.readTextFile("d:\\words.txt")
import org.apache.flink.api.scala._
//todo: 3、对数据进行处理
val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet
.flatMap(x=> x.split(" "))
.map(x=>(x,1))
.groupBy(0)
.sum(1)
//todo: 4、打印结果
resultDataSet.print()
//todo: 5、保存结果到文件
resultDataSet.writeAsText("d:\\result")
env.execute("FlinkFileCount")
}
}
📖 10. Flink并行度&Slot&Task
Flink的每个TaskManager为集群提供solt。
每个task slot代表了TaskManager的一个固定大小的资源子集。
solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。
一般情况下你的slot数是你每个节点的cpu的核数。
10.1 并行度
一个Flink程序由多个任务组成(source、transformation和 sink)。 一个任务由多个并行的实例(线程)来执行, 一个任务的并行实例 (线程) 数目就被称为该任务的并行度。
10.2 并行度的设置
- 一个任务的并行度设置可以从多个级别指定
- Operator Level(算子级别)
- Execution Environment Level(执行环境级别)
- Client Level(客户端级别)
- System Level(系统级别)
- 这些并行度的优先级为
- Operator Level > Execution Environment Level > Client Level > System Level
10.2.1 算子级别
10.2.2 执行环境级别
10.2.3 客户端级别
并行度可以在客户端将job提交到Flink时设定,对于CLI客户端,可以通过-p参数指定并行度
bin/flink run -p 10 examples/batch/WordCount.jar
10.2.4 系统级别
在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度
parallelism.default: 1
10.3 并行度操作演示
为了方便在本地测试观察任务并行度信息,可以在本地工程添加以下依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency>
案例
- 注意获取程序的执行环境发生变化了
- val environment=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
package com.kaikeba.demo1 import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time /** * 本地调试并行度 */ object TestParallelism { def main(args: Array[String]): Unit = { //使用createLocalEnvironmentWithWebUI方法,构建本地流式处理环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() //执行环境级别 //environment.setParallelism(4) import org.apache.flink.api.scala._ //接受socket数据 val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999) val countStream: DataStream[(String, Int)] = sourceStream .flatMap(x => x.split(" ")).setParallelism(5) //算子级别 .map(x => (x, 1)) .keyBy(0) .timeWindow(Time.seconds(2), Time.seconds(1)) .sum(1) countStream.print() environment.execute() } }
- 设置并行度,观察localhost:8081界面
2️⃣ 总结
掌握Flink的编程规范
了解Flink的集群模式