1. 基础环境:
1.1 安装JDK
本次使用 jdk-11.0.26_linux-x64_bin.tar.gz
解压缩
tar -zxvf jdk-11.0.26_linux-x64_bin.tar.gz -C /usr/local/java/
配置环境变量:
vi /etc/profile
JAVA_HOME=/usr/local/java/jdk-11.0.26
CLASSPATH=.:${JAVA_HOME}/lib:$CLASSPATH
PATH=$PATH:${JAVA_HOME}/bin
export JAVA_HOME CLASS_PATH PATH
让环境变量生效:
source /etc/profile
如果没生效就重启服务器
1.2 ssh免密码登录
集群内节点之间免密登录
2. 搭建Flink分布式集群
1. 下载
版本:flink-2.0.0-bin-scala_2.12.tgz
地址: https://www.apache.org/dyn/closer.lua/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz
2. 安装
通过虚拟机设置共享文件夹将需要的安装包复制到linux虚拟机中 localhost1。虚拟机的共享盘在 /mnt/hgfs/。 将共享盘安装包复制到 存在目标路径/opt/software/
解压缩
cd /opt/software/
tar -zxvf flink-2.0.0-bin-scala_2.12.tgz -C /usr/local/applications/
3,修改FLINK配置
修改 /conf/config.yaml 文件
at localhost1
jobmanager:
bind-host: 0.0.0.0
rpc:
address: localhost1
port: 6123
taskmanager:
bind-host: 0.0.0.0
host: localhost1
at localhost2
jobmanager:
bind-host: 0.0.0.0
rpc:
address: localhost1
port: 6123
taskmanager:
bind-host: 0.0.0.0
host: localhost2
at localhost3
jobmanager:
bind-host: 0.0.0.0
rpc:
address: localhost1
port: 6123
taskmanager:
bind-host: 0.0.0.0
host: localhost3
修改 /conf/masters文件
localhost1:8081
修改 /conf/workers文件
localhost1
localhost2
localhost3
修改 /conf/zoo.cfg 文件 (可以不改)
server.1=localhost1:2888:3888
server.2=localhost2:2888:3888
server.3=localhost3:2888:3888
4. 将Spark软件分发到集群
先关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
将Flink分发到localhost2 和 localhost3
scp -r flink-2.0.0 root@localhost2:/usr/local/applications/flink-2.0.0
scp -r flink-2.0.0 root@localhost3:/usr/local/applications/flink-2.0.0
5, 启动集群
[root@localhost1 flink-2.0.0]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost1.
Starting taskexecutor daemon on host localhost1.
Starting taskexecutor daemon on host localhost2.
Starting taskexecutor daemon on host localhost3.
6, 查看WEB页面
http://localhost1:8081/#/overview
3, Flink 开发
3.1 单词统计案例
Socket 模拟实时发送单词,使用 Flink 实时接收数据,并且对 数据进行聚合统计,并且把计算结果打印出来。
创建一个Java项目 导入Flink依赖
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>2.0.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
创建WordCount 类
package com.neilparker;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = executionEnvironment.socketTextStream("localhost1",7777,"\n");
SingleOutputStreamOperator<Tuple2<String, Long>> dataStream = source.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>()
{
@Override
public void flatMap (String string, Collector<Tuple2<String, Long>> collector) {
String[] splits = string.split("\\s");
for (String word : splits) {
collector.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(value -> value.f0).sum(1);
dataStream.print();
executionEnvironment.execute("wordcount batch process");
}
}
启动nc 命令 模拟一个 Socket Server ,
然后运行java 代码,
然后再nc 命令行发送数据
然后就可以看到nc 命令行如下:
[root@localhost1 ~]# nc -lp 7777
hello neil hello jack
hello mike hello walker
hello sun
Java代码控制台看到单词统计结果:
5> (hello,1)
15> (neil,1)
14> (jack,1)
5> (hello,2)
4> (mike,1)
9> (walker,1)
5> (hello,3)
5> (hello,4)
15> (sun,1)
5> (hello,5)
3.2 提交代码到Flink集群中
先在pom文件中添加 打包插件
<build>
<plugins>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
然后maven package

先启动nc命令
[root@localhost1 flink-2.0.0]# nc -lp 7777
然后到Flink UI 页面提交jar包
然后就看到job正常运行起来了
然后区nc 命令行 输入一些单词
到task manager 页面就能看到统计结果