搭建Flink分布式集群

发布于:2025-06-30 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. 基础环境:

1.1 安装JDK

本次使用 jdk-11.0.26_linux-x64_bin.tar.gz

解压缩

tar -zxvf jdk-11.0.26_linux-x64_bin.tar.gz -C /usr/local/java/

 配置环境变量:

vi /etc/profile

JAVA_HOME=/usr/local/java/jdk-11.0.26
CLASSPATH=.:${JAVA_HOME}/lib:$CLASSPATH
PATH=$PATH:${JAVA_HOME}/bin
export JAVA_HOME CLASS_PATH PATH

让环境变量生效:

source /etc/profile 

如果没生效就重启服务器

1.2 ssh免密码登录

集群内节点之间免密登录

2. 搭建Flink分布式集群

1. 下载

版本:flink-2.0.0-bin-scala_2.12.tgz
地址: https://www.apache.org/dyn/closer.lua/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz

2. 安装

通过虚拟机设置共享文件夹将需要的安装包复制到linux虚拟机中 localhost1。虚拟机的共享盘在 /mnt/hgfs/。 将共享盘安装包复制到 存在目标路径/opt/software/

解压缩

cd /opt/software/
tar -zxvf flink-2.0.0-bin-scala_2.12.tgz -C /usr/local/applications/

3,修改FLINK配置

修改 /conf/config.yaml 文件

at localhost1

jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: localhost1
    port: 6123

taskmanager:
  bind-host: 0.0.0.0
  host: localhost1

at localhost2

jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: localhost1
    port: 6123

taskmanager:
  bind-host: 0.0.0.0
  host: localhost2

at localhost3

jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: localhost1
    port: 6123

taskmanager:
  bind-host: 0.0.0.0
  host: localhost3

修改 /conf/masters文件

localhost1:8081

修改 /conf/workers文件

localhost1
localhost2
localhost3

修改 /conf/zoo.cfg 文件 (可以不改)

server.1=localhost1:2888:3888
server.2=localhost2:2888:3888
server.3=localhost3:2888:3888

4. 将Spark软件分发到集群

先关闭防火墙

systemctl stop firewalld
 
systemctl disable firewalld

将Flink分发到localhost2 和 localhost3

scp -r flink-2.0.0 root@localhost2:/usr/local/applications/flink-2.0.0
scp -r flink-2.0.0 root@localhost3:/usr/local/applications/flink-2.0.0

5, 启动集群

[root@localhost1 flink-2.0.0]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost1.
Starting taskexecutor daemon on host localhost1.
Starting taskexecutor daemon on host localhost2.
Starting taskexecutor daemon on host localhost3.

6, 查看WEB页面

http://localhost1:8081/#/overview

3, Flink 开发

3.1 单词统计案例

Socket 模拟实时发送单词,使用 Flink 实时接收数据,并且对 数据进行聚合统计,并且把计算结果打印出来。

 创建一个Java项目 导入Flink依赖

     <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>2.0.0</flink.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

创建WordCount 类

package com.neilparker;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment executionEnvironment =  StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = executionEnvironment.socketTextStream("localhost1",7777,"\n");

        SingleOutputStreamOperator<Tuple2<String, Long>> dataStream = source.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>()
        {
            @Override
            public void flatMap (String string, Collector<Tuple2<String, Long>> collector) {
                String[] splits = string.split("\\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(value -> value.f0).sum(1);
        dataStream.print();

        executionEnvironment.execute("wordcount batch process");
    }

}

启动nc 命令 模拟一个 Socket Server ,

然后运行java 代码,

然后再nc 命令行发送数据

然后就可以看到nc 命令行如下:

[root@localhost1 ~]# nc -lp 7777
hello neil hello jack
hello mike hello walker
hello sun

Java代码控制台看到单词统计结果:

5> (hello,1)
15> (neil,1)
14> (jack,1)
5> (hello,2)
4> (mike,1)
9> (walker,1)
5> (hello,3)
5> (hello,4)
15> (sun,1)
5> (hello,5)

3.2 提交代码到Flink集群中

先在pom文件中添加 打包插件
 <build>
        <plugins>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

然后maven package

先启动nc命令

[root@localhost1 flink-2.0.0]# nc -lp 7777

然后到Flink UI 页面提交jar包

然后就看到job正常运行起来了

然后区nc 命令行 输入一些单词

到task manager 页面就能看到统计结果


网站公告

今日签到

点亮在社区的每一天
去签到