Flink On Yarn实操演示

发布于:2024-06-21 ⋅ 阅读:(70) ⋅ 点赞:(0)

这课的学习我们基本上了解了flink on line的一些基本原理,以及相应的一些集群的一些参数配置。通过本节课的学习,我们将去通过一些实际的一些操作,然后来去看flink on your的一个集群的具体的一个部署的一个流程。我们接下来看一下这个flink on娅的一个集群的一个部署。这个的话其实,我们首先检查一下它的一个环境的一个要求。这个环境的一个要求的话是要求他要有hadoop版本的这样的一个2.4.1的以上。它要有HDFS的一个环境,以及hadoop相关的一些依赖包。这几个的话是我们需要在当前的这样的一个客户端的一个机器里面的话,是需要有这样的一个提供。同样的我们这样的一个SDFS的一个环境的话,是在这样的一个机器里面,大家其实可以输入这样的一个命令的话,可以有相应的这样的一个环境的一个HDFS的一个环境OK。

我们来看一下环境的一些配置。首先下载和解压相应的一些安装包。我们把所有的这样的一个榜样的一个环境的一个flink的一个安装包已经提前下载下来。我们把它解压到flink on your的这样的一个路径里面来。然后下一步的话,我们在这样的一个环境的一个路径里面进行相应的这样一个操作。在下面的话,我们需要配置hadoop config的一些一些环境变量。在这个地方的话,其实我们需要去看一下我们自己系统内部的ETC profile的这样的一个环境的一个文件。在这里的话,其实我们看到我们的hadoop的这样的一个环境变量也已经。

配置。在这个路径里面的话,其实是存放了我们所有好多个客户端相关的一些配置。客户端相关的配置的话是可以到对应的这样的一个好多个集群上面进行一个获取。

另外一个的话是我们的一个hadoop的一个class pass一个配置。Class pass配置的目的是想通过class pass这种方式直接获取到我们hadoop相关的一些安装包。包括像HDFS1样,以及像一些hadoop common的一些安装包的话,都是通过这样的一个class pass的一个路径去提供。对于我们当前的这样的一个环境的话,是使用的是cloud的CDH。它的一个安装包的话是通过在hato部的一个环境里面进行相应的这样的一个获取,去指定它一个对应的一个路径。最后的话是我们把配置完成了之后。

我们如果发现我们hadoop class pass配置完成了之后。作业执行的过程中,如果还是报好都不依赖包找不到的这种情况下,我们可以到我们的一个官方提供的这样的一个仓库地址。去下载我们之前已经打好的这样的一个shit进来的一个hadoop相关的一些安装包。这个的话就是flink sheet普优博的这样的一个包。这个包的话可以通过我们这样的一个官方的一个路径去把它下载下来。对于我们这样的一个环境里面的话,我们已经把这样的一个hadoop的一个依赖包已经下载到对应的这样的一个内部里面的话。基本上就能保证我在运行on娅的这样的一个环境里面,所有的一些基础的一些依赖,包括一些环境的一些参数,都基本上就配置完毕。

我们所有的一些基础环境全部都ready了之后,下面的话其实我们就要看如何去启动我对应的这样的一个hadoop的一个,就是启动我们的一个flink的一个集群OK首先我们来看一下基于session这种模式的一个启动和一个部署。首先我们其实看到对于三胜里面的话,我们如果是使用到的是session的集群模式的话,我们在这个地方的话,其实可以通过运行三胜的一个命令直接可以去启动。一些默认的一些参数,包括task manager以及slot的一些数量的话,都是可以通过系统默认的方式进行一个提供。

我们启动完毕了之后,系统会返回给对应的这样的一个jump manager的一个web的一个地址。通过copy这样的一个地址,我把它copy到浏览器里面,就可以登录到我们用yarn session的这种模式启动的这样的一个flink的一个集群。这里的话,我们把对应刚才生成的这样的一个地址copy到我们的浏览器的页面之后。那我们可以看得到对于我们刚才已经启动的这样的一个药的一个这样的一个session一个集群。我们就可以在我们的一个浏览器上面进行相应的这样的一个操作。所有的一些参数的一些配置什么的,都可以通过这样的一个页面的方式进行相应的这样的一个获取。它所提供的这样的一些所有的一些操作,基本上跟我们在stand alone里面去操作是完全一致的。这个的话其实对于12 session的这种集群的一个部署基本上就完成了。

接下来的话,其实我们就需要去讲解一下,我们如何去把我们对应的这样的一个作业去提交到我们的一个亚session的一个集群的时候。这个地方的话,我们事先可以通过HDFS杠pod去将我们本地的一个文本文件去传到我们HDFS的一个路径。HDFS的一个路径的话,这个地方的话其实我们可以通过HDFS的一些这样的一些参数,我们可以看到我们其实已经事先已经把对应的这样的一个文本文件已经传到我们的一个HDFS的一个环境OK这个地方其实可以看得到,我们已经通过这样的一种命令方式传到我对应的HDFS的一个存储的一个路径。下一步的话,我们就可以通过flink的提交的这样的一个命令,去把我们的这样的一个指定的,就把我们的一个作业去提交到我们的对应的这样的一个启动的一个123上的一个集群里面来。

我们进行一个回撤OK这个地方进行。我们启动一下我们的一个命令提交上去。对这个地方我们需要去到我们的执行环境里面去启动。OK这个时候的话,我们其实已经得到了这样的一个反馈结果。我们的一个drop已经提交到我们对应的集群上面,并且返回了对应的照牌ID,这个时候其实我们因为本身是一个批量的一个任务的,他的一个作业基本上就已经执行完毕,且返回了这样的一个结果。

那我们下面通过我们的页面看一下,页面里面的话,其实我们的world count的一个程序就已经提交到我们的一个机型上面,并且进行了一个执行。它这个执行的一个过程相对来说比较快。执行完毕了之后,这个任务就finish就完成了。整个的话其实我们对于123上的这样的一个集群的话,就完成了一个集群的一个启动,同时加上一些作业的一些提交。

这里面另外一点我们需要去注意一点,就是当我们用要塞上的这种集群提交的时候。我们可以去把我对应的这样的一个要session的这样的一个客户端的一个进程去attach到一个existing的一个session上面去。前提是你需要去获取我提交上去的这样的一个application ID。这个application ID的话是可以通过在日志里面去获取的到。这个application ID的话,比如说我们再去启动一个ER的一个session的一个时候,那么这个要塞上我不想再去单独启动一个集群。那我们可以通过杠ID的这种方式去指定我去attach到我对应的这样的一个application,就是我们的一个flink session的一个集群上面去,这个的话其实也可以去直接去attach到上面去。它其实跟我们前面,在这个过程中提交的这样的一个启动的这样的一个集群其实是一样的。这个过程也就是说它可以把我们刚才提交的这样的一个session的一个集群的一个客户端的一个进程attach上去。

这个时候的话,当我们去取消的时候,这个的整个的这个集群也会进行一个一个delete,或者说进行一个停止。这个是一个attach的一个操作。当我们要去停止我的一个集群的一个服务的时候,这个停止的一个集群的服务的方式主要是有两种两种。第一种的话是我们echo一个stop的一命令到我们的这个yet section里面,同时去指定它的一个application ID,我们看一下这样的一种模式去进行一个停止的一个操作。我们前面的话就把这样的一个OK。

我们现在aco stop去停止我们的对应的一个yard的一个session的一个集群。那我们通过这样的一种方式,其实我们前面那个在我们停止的时候,就是前面在我们那个attach的时候,其实已经把把我们之前的那个启动的一个集群已经把它Q掉了。所以说这个地方的话,我们已经找不到我们刚才启动的这样的一个集群。当然这个可以再重新去启动一个这样的一个123上的一个集群之后,我们再来去进行一个停止的一个操作。比如说我们在进入到我们的一个环境里面来。OK, 我们再通过其他的这种方式,我们把co stop这个里面的话指定它的一个ID,我们再到刚才OK,这个时候你看到已经返回了一个011的这样的一个地址,那我们去指定一下它的一个ID的一个信息。那么我们去aco stop一下,那么这个那个stop我们可以直接copy一下。OK, 我们看一下,这个时候的话,其实看得到我对应的这样的一个集群就已经正常的停止了,我们这是一种停止集群的一种方式。

另外一种的话,其实我们可以通过压的这种方式去进行一个任务的一个Q首先我们可以通过yr application杠list,它可以去获取到我对应的这样的一个集群的一个application一个列表。接下来的话,我们通过压application q的这种方式去指定我对应的这样的一个application的一个RD。然后他就可以通过亚的一个命令去care掉我整个的一个session的一个集群,这个的话其实就是停止我们的一个集群服务的两种方式OK。

我们接下来我们看一下基于我们的一个per job的一个模式。就是说我们基于我们或drop的模式的一个提交。这种的话就是直接通过命令的方式就可以提交我们的一个作业。同样这样的,我们比如说通过这样的一种方式,我们去提交我们的一个per job的一个作业,它这个地方的话是需要去指定一个杠M的一个我们的一个模式的话,是需要指定要cluster的这样的一种模式OK那我们把它copy下来了之后进行一个运行。这个时候其实也看到也单独分配了一个这样的一个样的一个application。这个的话其实我们任务执行完成了之后,它的一个执行的一个结果的话,就已经输出到我们的一个终端。它跟我们的session model,30这个application一旦启动或一旦数据处理完成了之后,整个的这样的一个环境,或者这样整个的一个flink的一个作业,就完全全部的包括它的一个runtime全部都已经停止,然后包括一些其他的一些。这些结果的话返回回来了之后,整个机群就终止,这个其实就是我们的一个per job的一个模式。

当然你也可以通过杠杆的这种方式,就是把这样的一个管理线程委托给我们的一个ER的一个进行管理。客户端这边的话,其实就可以直接提交完成了,进行一个退出的一个操作。这种就是一种detach的一种操作。对于我们for job的这种模式的去运行作业的话,其实可以看得到。它其实就有这么多,就是相对来说比较简单一点,没有太多的复杂的一些操作。

那我们来看一下基于application model的这种模式。F是model的话,前面我们已经提到了它的一个作业的一个没方法,都是在我们的一个集群上面进行操作。在集群上面进行操作的话,以及我们的一些用户的一些dependency的一些价包。这些价包和一些我们flink集群的一些dependence的一些价包的话,都是通过一些可以通过一些分布式的一些存储的方式,直接通过做一些从HDFS或一些其他的一些介质上面去获取。

比如说我们在这样的一个例子里面的话可以看得到。第一个我们去启动我对应的这样的一个作业的时候,如果不去指定我的一个下面看到的这样的一个yp provide这样的一个lib的一个路径的话。它的所有的flink相关的一些安装的一些包,都是通过本地的客户端去上传到我们的一个hadd的一个要的一个环境上面去。然后再去启动我们的一个drop manager等一些咨询一些服务。同时你去指定自己的一个application的一个价包,也就我们写的这样的一个可执行的一个价包。整个的话其实都是通过从本地去上传到我的一个集群。它其实跟我们的一个job的这种模式其实没有太大的一个区别。

最主要的话,其实我们在这个地方提供了一个这样的一些,可以去指定我的一个flink集群运行的一些环境的一些路径的一些参数。比如说我们可以在在这样的一个的一个这样的一个路径里面,这样配置的一个参数里面,去指定我的一个flink的一个运行的一些环境,包括我们一刚解压出来的flink 1.1的这样的一个可执行的一些环境。那些依赖的一些包包括它的一些病路径,以及康复的一些路径,以及它的一些leap的一些依赖,通过这种方式的话,去指定我的一个环境上面的一个依赖包。另外一个的话,通过本地去指定我的一个本地的一个应用的一个安装包,可执行包。

整个下来的话,其实就是我们整个的一个,就是从HDFS上面去获取我的集群相关的依赖,然后从本地上传我的用户,我的一个包。可以看得到从HDMS上面去拉取flink的一些环境依赖包。从本地上传我的用户自己编译的这样的一个应用的依赖包,这种也是一种模式,我们可以通过这种方式去提交一下我们的一个集群的一个作业。这个地方因为我们application的这个架包是没有的那我们可以换一个路径,待会试一下。比如说在这样的一个事例里面的话,我们可以看得到我们这个地方其实我们是提供了一个top speed window的这样的一个架构。它其实跟我们这样的一个路径里面是一样的。

我们其实也刚才可以再去指定一个路径出来试一下。比如说我们随便找一个flink提供的一些官方的一些实例。我们把对应的这样的一个路径告诉我我们的一个flink的提交的这个过程,我们把它配置进去OK。然后比如说我们同样去启动的是这个top speed这样的一个window的一个可应用的一个夹包OK。那我们把这样的一个路径去进行一个提交的一个操作,那我们需要回到我们的一个并录进的主目录里面去操作OK。这个时候其实我们的一个application model就正式的运行起来了。

OK我们下面也是通过提供的这样的一个端口和它的一个地址信息,可以去找到我的一个map的一个interface。同样的,我们把刚才已经返回过来的这样的一个地址信息的话,拷贝到浏览器里面了之后。那我们可以看得到我刚才提交的这样的一个card的一个top speed window的一个这样的一个官方的一个示例。它就已经正常的running起来了。这个running起来了之后,我们可以进去之后看到它的一个数据的一些流转的一些情况,包括它的一些算子的一些情况。这种模式的话,其实它每次都会返回给你一个i exception。当然这个可能是一些问题,就是说因为我们这种模式的话,它不再支持你通过session那种window再去提交一些用户的一些可执行的一些加班或可执行的一些作业。所以说这个其实是一种application model的这样的一种模式。

它最大的特点就是说把我们一些job graph的对象的一些生成,然后这些架的一些dependency的一些依赖包,都是通过直接从分布式存储上面HDMS上面直接拉取到我的hato BR的环境里面。然后这样的话提交和运行相应的这样的一个作业,减少了我们的一个客户端的一个压力。同样的,对于我们在这样的一个集群内部的话,我们也可以直接把我们的一个相关的一些包。比如说我们传到我们的一个HDFS的路径。比如说我们还是使用官方给的这样的一个这样实例,我们也是通过从HDMI上面去获取我的一个应用的一个架构。同时从HDMI上面去获取我flink相关的一些依赖的一些包。同样的我们也是可以把这样的一个实例进行一个启动,这种可能是因为集群没有足够的一个计算的一个资源了。这种情况的话,我们可以去把刚才提交的一些作业,把它进行一个就是先进行一个stop,然后再看一下集群能够释放出来更多我的一个资源。

啥玩意?Gun visit, 我们看一下我们刚才提交的一些集群的一些资源,相对来说已经提了很多了。所以说没有过多的一些运行的资源。我们通过application的这种方式去把它进行一个票的一个操作。OK, 然后还有一些其他的不用的一些都可以进行一个停止。还有05。OK, 这个基本上能够释放出来一些计算的一些资源,看到这个计算资源一旦释放完毕了之后,这个作业就正常的去提交到我的一个集群上面了。同样的,我们也是把返回的这样的一个地址去拷贝到浏览器里面。

刚才提交的这样的一个作业的情况,它其实正常的就运行起来了。通常情况下在一个application里面,它可以去同时去提交多个job。这个job的话是完全取决于我们在这个application架里面的话,可以去提交相应的这样的一个执行他的一个SSQ的的一个方法。这个SQ的的一个方法的话是可以去进行多次的一个调用,所以他就可以提交多个这样的一个job上来。

这里的话其实我们就讲完了这个per job的一个部署。这个per job的一个模式的一个部署,以及还有它的一个application model的一些部署,还有我们session的一个集群的一些部署。整个下来的话,其实这样三种模式的一个部署的话,基本上通过具体的一些操作的话,能够有一个非常清晰的一个了解和一个认识。通过本节课实际的操作的话,我们了解了flink on yard的一些基本的一些部署的一些步骤。下节课的话我们重点去了解一下flink on KY8S的一些部署的一些原理,以及相应的一些实践的一些操作。