Spark编程基础Python
前言:大数据技术
技术层面 | 功能 |
---|---|
数据采集 | 利用ETL工具将分布的、异构数据源中的数据如关系数据、平面数据文件等,抽取到临时中间层后进行清洗、转换、集成,最后加载到数据仓库或数据集市中,成为联机分析处理、数据挖掘的基础;或者也可以把实时采集的数据作为流计算系统的输入,进行实时处理分析 |
数据存储和管理 | 利用分布式文件系统、数据仓库、关系数据库、NoSQL数据库、云数据库等,实现对结构化、半结构化和非结构化海量数据的存储和管理 |
数据处理与分析 | 利用分布式并行编程模型和计算框架,结合机器学习和数据挖掘算法,实现对海量数据的处理和分析;对分析结果进行可视化呈现,帮助人们更好地理解数据、分析数据 |
数据隐私和安全 | 在从大数据中挖掘潜在的巨大商业价值和学术价值的同时,构建隐私数据保护体系和数据安全体系,有效保护个人隐私和数据安全 |
大数据计算模式 | 解决问题 | 代表产品 |
---|---|---|
批处理计算 | 针对大规模数据的批量处理 | MapReduce、Spark等 |
流计算 | 针对流数据的实时计算 | Flink、Strom、S4、Streams等 |
图计算 | 针对大规模图结构数据的处理 | Pregel、Graphx、PowerGraph、Hama等 |
查询分析计算 | 大规模数据的存储管理和查询分析 | Dremel、Hive等 |
1. Spark简介
spark架构图:信息查询,流式计算,机器学习,图计算
Spark与Hadoop的对比
Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题
相比于Hadoop MapReduce,Spark主要具有如下优点:
Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比Hadoop MapReduce更灵活
Spark提供了内存计算,可将中间结果放到内存中,对于迭代运算效率更高
Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制
结论:Spark更适合与迭代次数较多的,比如机器学习,数据挖掘
注意:Spark只是一个计算框架与MapReduce对等,可能会取代Map Reduce,但是不会取代Hadoop整体
Flink和Spark还存在一些明显的区别,具体如下:
(1)Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。从技术发展方向看,用批处理来模拟流计算有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流计算来模拟批处理,在技术上有更好的扩展性。
(2)Flink和Spark都支持流计算,二者的区别在于,Flink是一条一条地处理数据,而Spark是基于RDD的小批量处理,所以,Spark在流式处理方面,不可避免地会增加一些延时,实时性没有Flink好。Flink的流计算性能和Storm差不多,可以支持毫秒级的响应,而Spark则只能支持秒级响应。
(3)当全部运行在Hadoop YARN之上时,Flink的性能要略好于Spark,因为,Flink支持增量迭代,具有对迭代进行自动优化的功能。总结:Spark在生态上更加完善,然后Flink在流计算有绝对的优势
Spark生态系统
应用场景 | 时间跨度 | 其他框架 | Spark生态系统中的组件 |
---|---|---|---|
复杂的批量数据处理 | 小时级 | MapReduce, Hive | Spark |
基于历史数据的交互式查询 | 分钟级、秒 级 | Impala, Dremel. Drill | Spark SQL |
基于实时数据流的数据处理 | 毫秒、秒级 | Storm, S4 | Spark Streaming 、Structured Streaming |
基于历史数据的数据挖掘 | - | Mahout | MLlib |
图结构数据的处理 | - | Pregel, Hama | GraphX |
- RDD:是Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
- DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
- Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task
- 应用(Application):用户编写的Spark应用程序
- 任务(Task):运行在Executor上的工作单元
- 作业(Job):一个作业包含多个RDD及作用于相应RDD上的各种操作
- 阶段(Stage):是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合,代表了一组关联的、相互之间,没有Shuffle依赖关系的任务组成的任务集
Spark运行基本流程
spark流水线优化:窄依赖和宽依赖
窄依赖可以实现“流水线”优化
宽依赖无法实现“流水线”优化
如上图所示,平行线都是可以优化的在一个Stage中,因为他们不用shuffle,可以在一个Stage中表达,如果需要shuffle就需要划分
RDD在Spark架构中的运行过程
Spark的运行模式
./bin/pyspark --master 会进入spark交互界面
Spark on Yarn:在这种情况下,一般是hadoop集群了,所以无需部署Spark集群,只需要找一台服务器,充当Spark客户端,即可提交任务到Yarn集群中运行
Client模式一般使用于学习和测试的场景,Cluster模式适用于生产环境
Cluster模式 Client模式(默认) Driver运行位置 YARN容器内部 客户端进程内 通讯效率 高 低于Cluster模式 日志查看 日志输出在容器内,不方便查看 日志输出在客户端的标准输出流中,方便查看 生产环境是否使用 推荐使用 不推荐使用 稳定性 稳定 受到客户端进程影响
2. spark配置安装
spark安装 local单节点
分布式环境下spark配置
在所有节点上完成包括安装Anaconda3、设置国内源、创建pyspark环境等。验证pyspark虚拟环境中python的版本。
Hadoop02,Hadoop03中修改配置 ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_371
export JRE_HOME= J A V A H O M E / j r e e x p o r t C L A S S P A T H = . : {JAVA_HOME}/jre export CLASSPATH=.: JAVAHOME/jreexportCLASSPATH=.:{JAVA_HOME}/lib: J R E H O M E / l i b e x p o r t H A D O O P H O M E = / u s r / l o c a l / h a d o o p e x p o r t S P A R K H O M E = / u s r / l o c a l / s p a r k e x p o r t H A D O O P C O N F D I R = {JRE_HOME}/lib export HADOOP_HOME=/usr/local/hadoop export SPARK_HOME=/usr/local/spark export HADOOP_CONF_DIR= JREHOME/libexportHADOOPHOME=/usr/local/hadoopexportSPARKHOME=/usr/local/sparkexportHADOOPCONFDIR=HADOOP_HOME/etc/hadoop
export PYSPARK_PYTHON=/home/hadoop/miniconda3/env/pysaprk/bin/python3.8
export PATH= P A T H : PATH: PATH:{JAVA_HOME}/bin: H A D O O P H O M E / b i n : HADOOP_HOME/bin: HADOOPHOME/bin:SPARK_HOME/binSpark配置
配置works文件
在hadoop01节点上执行如下命令将workers.template改名为workers:
$ cd /usr/local/spark/ $ sudo mv ./conf/workers.template ./conf/workers
在workers文件中设置Spark集群的Worker节点。编辑workers文件的内容,把默认内容localhost替换成如下内容:
hadoop01
hadoop02
hadoop03注意不要有多余的空格之类的
配置spark-env.sh文件
#PART1
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_371
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export YARN_CONF_DIR==/usr/local/hadoop/etc/hadoop#PART2
export SPARK_MASTER_HOST=hadoop01
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8081
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export SPARK_HISTORY_OPTS=“-Dspark.history.fs.logDirectory=hdfs://hadoop01:9000/sparklog -Dspark.history.fs.cleaner.enabled=true”#PART3
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=1G
export SPARK_DRIVER_MEMORY=1G
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8082创建历史服务器日志目录
# 在Hadoop01上启动hdfs cd /usr/local/hadoop ./sbin/start-dfs.sh cd /usr/local/hadoop ./bin/hdfs dfs -mkdir /sparklog ./bin/hdfs dfs -chmod 777 /sparklog #将这个hdfs文件的权限赋予hadoop01用户
配置spark-defaults.conf文件
# 在 haoop01上执行如下命令 cd /usr/local/hadoop sudo mv spark-defaults.conf.template spark-defaults.conf vim spark-defaults.conf #修改内容如下 spark.eventLog.enabled true # 开启Spark的日志记录功能 spark.eventLog.dir hdfs://hadoop01:9000/sparklog #日志记录的保存路径 spark.eventLog.compress true # 是否启动压缩
配置Worker节点
在hadoop01节点上执行如下命令,将Master节点(hadoop01节点)上的/usr/local/spark文件夹复制到各个Worker节点上(即hadoop02和hadoop03节点):
$ cd /usr/local/ $ tar -zcf ~/spark.master.tar.gz ./spark #将./spark打包为tar.gz $ cd ~ $ scp ./spark.master.tar.gz hadoop02:/home/hadoop $ scp ./spark.master.tar.gz hadoop03:/home/hadoop
在hadoop02和hadoop03节点上分别执行如下操作:
$ cd ~ $ sudo rm -rf /usr/local/spark/ $ sudo tar -zxf ~/spark.master.tar.gz -C /usr/local $ sudo chown -R hadoop /usr/local/spark
启动集群服务
启动Hadoop集群
./sbin/start-all.sh
- 启动历史服务器 maser节点上
spark/sbin/start-history-server.sh
- 启动历史服务器 maser节点上
启动maser节点,在Hadoop01节点上
spark/sbin/start-master.sh
启动所有Worker节点Hadoop01节点上
spark/sbin/start-workers.sh
查看进程情况
5409 DataNode
6434 ResourceManager
7156 Master
7380 Worker
7418 Jps
5228 NameNode
7022 HistoryServer
6607 NodeManager
查看spark集群信息
http://hadoop01:8081
关闭集群/usr/local/spark
- 先关闭master
./sbin/stop-master.sh
- 关闭workers
./sbin/stop-workers.sh
- 在/usr/local/hadoop关闭Hadoop集群
./sbin/stop-all.sh
- 先关闭master
spark代码常见的运行方式
- 启动python版本的Spark交互式执行环境
启动pyspark ./bin/pyspark即可进入spark交互页面
使用spark-submit命令提交应用程序
注意:在集群模式使用standalone模式需要先启动 sbin/start-master.sh sbin/start-workers.sh
spark-submit命令提交应用程序的,命令格式如下
./bin/spark-submit
--master #上图中可以加的参数
--deploy-mode
… #其他参数
--jar xxx #添加其他的依赖包
#python代码文件
[application-arguments] #传递给主类的主方法参数
开发Spark独立应用程序
编写程序
在命令行直接运行.py文件
- 进入虚拟环境
- 运行命令 Python wordCount.py
或者是通过spark-submit运行应用程序
进入虚拟环境
示例: ./bin/spark-submit \ --master local[*] \ /usr/local/spark/examples/src/main/python/pi.py | grep "Pi is" ./bin/spark-submit \ --master spark://hadoop01:7077 \ /usr/local/spark/mycode/python/wordCount.py 注意:这里使用standalone模式需要先启动master和workers,如果不是分布式环境直接local[*]就可以了
总结
所有操作都在pyspark虚拟环境下操作,
使用 spark/bin/pyspark交互式进行编写
或者是使用 submit 提交
3. RDD创建
- 本地数据加载,在usr/local/spark文件夹下:./bin/pyspark 这个类库中执行
>>> lines = sc.textFile("file:///usr/local/spark/mycode/word.text")
>>> lines.foreach(print)
注意这里的sc 是直接可用的,由pyspark创建的
- 在pycharm中创建程序
#coding:utf-8
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("My app")
sc = SparkContext(conf)