Spark分布式环境搭建_Linux版7.0
具体思路:先进行单机配置,然后复制Linux虚拟机、分发配置好的框架。
一、准备软件包、框架包、和系统包
二、安装VMware workstation player(免费)
三、Centos安装,参见centos安装文档
四、 配置centos7(node1节点)
登录,用户名root,密码: 123456
查看网络IP地址,记录IP地址(当前node1节点IP:192.168.172.131)
ifconfig (获得IP地址) # ssh登录,Windows终端 ssh root@192.168.172.131 # Windows10 配置hosts # 地址:C:\Windows\System32\drivers\etc\hosts # 最后一行追加 192.168.172.131 node1 192.168.172.132 node2 192.168.172.133 node3 # windows端的host配置后,ssh终端可按照如下方式登录 ssh root@node1 ssh root@node2 ssh root@node3
配置静态IP地址
vim /etc/sysconfig/network-scripts/ifcfg-ens33
BOOTPROTO="static" IPADDR="192.168.172.131" (添加,node1可与ifconfig中IP保持一致,其余node需要修改) NETMASK="255.255.255.0" GATEWAY="192.168.172.2" (最后一位必须为2,前三位一致) DNS1="192.168.10.20" (windows终端输入ipconfig /all) DNS2="192.168.10.21" MACADDR="00:0c:29:c6:fb:05" (node1节点可不用修改,其余node节点必须修改)
service network restart (重启网卡服务)
ping baidu.com (测试外网连通性)
修改hostname
vim /etc/hostname (node1不用修改、node2、node3需要修改)
修改hosts映射
vim /etc/hosts (添加三个节点的IP映射,同时修改Windows的hosts文件)
192.168.172.131 node1 192.168.172.132 node2 192.168.172.133 node3
关闭防火墙
# 查看防火墙状态 systemctl status firewalld # 关闭防火墙 systemctl stop firewalld # 禁止开机启动 systemctl disable firewalld
关闭SELinux
vim /etc/selinux/config SELINUX=disabled
创建software和server文件夹
mkdir -p /export/software mkdir -p /export/server
配置环境变量(预先配置)
# 用户环境变量,配置两份!!! ## profile为全局环境变量,ssh终端登录使用 vim /etc/profile ## .bashrc为远程解释器环境变量,为pycharm的ssh远程解释器登录使用 vim ~/.bashrc
# 环境变量地址使用软链接地址 #(需在具体路径设置软链接:ln -s /export/server/hadoop3.x.x /export/server/hadoop) # 设置系统环境变量 PYARROW_IGNORE_TIMEZONE=1 # Java export JAVA_HOME=/export/server/jdk export PATH=$JAVA_HOME/bin:$PATH #Anaconda export ANACONDA_HOME=/export/server/anaconda3 export PATH=$PATH:$ANACONDA_HOME/bin #Spark export SPARK_HOME=/export/server/spark export PYSPARK_PYTHON=/export/server/anaconda3/envs/pyspark/bin/python3.9 #Hadoop export HADOOP_HOME=/export/server/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
# 载入环境变量 source /etc/profile source ~/.bashrc
上传安装包
# scp 上传文件到服务器/export/software scp C:\Users\1945\Desktop\Spark部署资源\Anaconda3-2022.05-Linux-x86_64.sh root@node1:/export/software scp C:\Users\1945\Desktop\Spark部署资源\hadoop-3.3.3.tar.gz root@node1:/export/software scp C:\Users\1945\Desktop\Spark部署资源\spark-3.3.0-bin-hadoop3.tgz root@node1:/export/software scp C:\Users\1945\Desktop\Spark部署资源\jdk-8u333-linux-x64.tar.gz root@node1:/export/software scp C:\Users\1945\Desktop\Spark部署资源\apache-zookeeper-3.8.0-bin.tar.gz root@node1:/export/software
五、安装Jdk、Anaconda3
安装jdk
# 打开软件存放目录 cd /export/software # 解压到/export/server tar -zxvf jdk-8u333-linux-x64.tar.gz -C /export/server/ # 设置软链接 cd /export/server/ ln -s jdk1.8.0_333 jdk # 测试java java -version
安装anaconda3
# 1. 打开软件存放目录 cd /export/software # 2. 解压到/export/server sh Anaconda3-2022.05-Linux-x86_64.sh # 3. 自定义目录 (yes init) /export/server/anaconda3 # 4. 重新进入ssh exit ssh root@node1 # 5. (base)进入、配置国内镜像源 vim ~/.condarc # 6. 粘贴,并保存 channels: - defaults show_channel_urls: true default_channels: - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2 custom_channels: conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud # 6. 创建pyspark虚拟环境 conda create -n pyspark python=3.9 (anconda3中的是3.9.12)
六、复制虚拟机
复制虚拟机(手动)
改ip,mac地址,hostname(node2,node3)
# 同第四章节 vim /etc/sysconfig/network-scripts/ifcfg-ens33 ... # node2 IPADDR="192.168.172.132" MACADDR="00:11:22:33:44:55" vim /etc/hostname (node2) sudo reboot # node3 IPADDR="192.168.172.133" MACADDR="00:01:02:03:04:05" vim /etc/hostname (node3)
ssh免密码登录
# ssh免密登录(根据需求二选一)(推荐2) # node1生成公钥私钥 (一路回车)(仅在node1执行) ssh-keygen # node1配置免密登录到node1 node2 node3 (仅在node1执行) ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node3 # 三台虚拟机相互登录 (每台都执行) ssh-keygen -t rsa ssh-copy-id node1 scp /root/.ssh/authorized_keys node2:/root/.ssh (分发:仅在node1执行) scp /root/.ssh/authorized_keys node3:/root/.ssh (分发:仅在node1执行)
时间同步
ntpdate ntp5.aliyun.com (每台都执行)
七、安装Hadoop
解压hadoop包,顺便解压spark包 (都在node1执行,然后直接把配置好的包发给node2、node3)
# 进入软件包目录 cd /export/software # 解压 hadoop-3.3.3.tar.gz tar -zxvf hadoop-3.3.3.tar.gz -C /export/server/ # 解压 spark-3.3.0-bin-hadoop3.tgz tar -zxvf spark-3.3.0-bin-hadoop3.tgz -C /export/server/ # 设置软链接 cd /export/server/ ln -s hadoop-3.3.3 hadoop ln -s spark-3.3.0-bin-hadoop3 spark
修改配置文件
cd /export/server/hadoop/etc/hadoop
hadoop-env.sh
vim hadoop-env.sh
# 导入Java地址 export JAVA_HOME=/export/server/jdk # 文件最后添加 export HDFS_NAMENODE_USER=root export HDFS_DATANODE_USER=root export HDFS_SECONDARYNAMENODE_USER=root export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root
core-site.xml (xml文件都是)
vim core-site.xml <configuration> # xml文件都是插入在这里 </configuration>
<!-- 设置默认使用的文件系统 Hadoop支持file、HDFS、GFS、ali|Amazon云等文件系统 --> <property> <name>fs.defaultFS</name> <value>hdfs://node1:8020</value> </property> <!-- 设置Hadoop本地保存数据路径 --> <property> <name>hadoop.tmp.dir</name> <value>/export/data/hadoop-3.3.3</value> </property> <!-- 设置HDFS web UI用户身份 --> <property> <name>hadoop.http.staticuser.user</name> <value>root</value> </property> <!-- 整合hive 用户代理设置 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property>
hdfs-site.xml
vim hdfs-site.xml
<!-- 设置SNN进程运行机器位置信息 --> <property> <name>dfs.namenode.secondary.http-address</name> <value>node2:9868</value> </property>
mapred-site.xml
vim mapred-site.xml
<!-- 设置MR程序默认运行模式: yarn集群模式 local本地模式 --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <!-- MR程序历史服务器端地址 --> <property> <name>mapreduce.jobhistory.address</name> <value>node1:10020</value> </property> <!-- 历史服务器web端地址 --> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>node1:19888</value> </property> <property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property>
yarn-site.xml
vim yarn-site.xml
<!-- 设置YARN集群主角色运行机器位置 --> <property> <name>yarn.resourcemanager.hostname</name> <value>node1</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 是否将对容器实施物理内存限制 --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!-- 是否将对容器实施虚拟内存限制。 --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <!-- 开启日志聚集 --> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 设置yarn历史服务器地址 --> <property> <name>yarn.log.server.url</name> <value>http://node1:19888/jobhistory/logs</value> </property> <!-- 保存的时间7天 --> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>604800</value> </property>
workers
vim workers
node1 node2 node3
分发hadoop包
cd /export/server scp -r hadoop-3.3.3 root@node2:$PWD scp -r hadoop-3.3.3 root@node3:$PWD #分别给node2、3做软链接 cd /export/server/ ln -s hadoop-3.3.3 hadoop ln -s hadoop-3.3.3 hadoop
hadoop首次启动格式化
# node1执行hadoop格式化
hdfs namenode -format
INFO common.Storage: Storage directory /export/data/hadoop-3.3.3/dfs/name has been successfully formatted.
八、安装spark
配置workers
# 进入spark配置目录 cd /export/server/spark/conf
# 改名, 去掉后面的.template后缀 mv workers.template workers # 编辑worker文件 vim workers # 将里面的localhost删除, 追加 node1 node2 node3 # 功能: 这个文件就是指示了 当前SparkStandAlone环境下, 有哪些worker
配置spark-env.sh (!!!区分StandAlone 和 StandAlone HA模式!!!)
# 1. 改名 mv spark-env.sh.template spark-env.sh vim spark-env.sh
# 1. !!!以StandAlone模式运行,请在底部追加如下内容!!! # 设置JAVA安装目录 JAVA_HOME=/export/server/jdk export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native # HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群 HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop # 指定spark老大Master的IP和提交任务的通信端口 # 告知Spark的master运行在哪个机器上 export SPARK_MASTER_HOST=node1 # 告知sparkmaster的通讯端口 export SPARK_MASTER_PORT=7077 # 告知spark master的 webui端口 SPARK_MASTER_WEBUI_PORT=8080 # worker cpu可用核数 export SPARK_WORKER_CORES=2 # worker可用内存 export SPARK_WORKER_MEMORY=2g # worker的工作通讯地址 SPARK_WORKER_PORT=7078 # worker的 webui地址 SPARK_WORKER_WEBUI_PORT=8081 # 设置历史服务器 # 配置的意思是 将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中 SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
# 2. !!!以StandAlone HA模式运行,请在底部追加如下内容!!! # 设置JAVA安装目录 JAVA_HOME=/export/server/jdk export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native # HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群 HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop # 指定spark老大Master的IP和提交任务的通信端口 # 告知Spark的master运行在哪个机器上 # export SPARK_MASTER_HOST=node1 # 告知sparkmaster的通讯端口 export SPARK_MASTER_PORT=7077 # 告知spark master的 webui端口 SPARK_MASTER_WEBUI_PORT=8080 # worker cpu可用核数 export SPARK_WORKER_CORES=2 # worker可用内存 export SPARK_WORKER_MEMORY=2g # worker的工作通讯地址 SPARK_WORKER_PORT=7078 # worker的 webui地址 SPARK_WORKER_WEBUI_PORT=8081 # 设置历史服务器 # 配置的意思是 将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中 SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true" SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha" # spark.deploy.recoveryMode 指定HA模式 基于Zookeeper实现 # 指定Zookeeper的连接地址 # 指定在Zookeeper中注册临时节点的路径
# 先启动hadoop集群才行用hdfs文件系统 start-all.sh # 在HDFS上创建程序运行历史记录存放的文件夹: hadoop fs -mkdir /sparklog hadoop fs -chmod 777 /sparklog
配置spark-defaults.conf文件
# 1. 改名 mv spark-defaults.conf.template spark-defaults.conf vim spark-defaults.conf # 先输入:set paste,回车,进入粘贴模式,再按i插入 :set paste
# 1. 修改内容, 追加如下内容 # 开启spark的日期记录功能 spark.eventLog.enabled true # 设置spark日志记录的路径 spark.eventLog.dir hdfs://node1:8020/sparklog/ # 设置spark日志是否启动压缩 spark.eventLog.compress true # 2. 提前解决yarn的spark的jar包依赖 spark.yarn.jars hdfs://node1:8020/spark/jars/*
配置log4j2.properties 文件 (去除日志info打印)
# 1. 改名 mv log4j2.properties.template log4j2.properties vim log4j2.properties # 2. 修改rootLogger rootLogger.level = warn
将Spark安装文件夹 分发到其它的服务器上
# 1. 分发spark安装文件目录 cd /export/server/ scp -r spark-3.3.0-bin-hadoop3 node2:/export/server/ scp -r spark-3.3.0-bin-hadoop3 node3:/export/server/ # 2. 软链接 cd /export/server/ ln -s spark-3.3.0-bin-hadoop3 spark cd /export/server/ ln -s spark-3.3.0-bin-hadoop3 spark
九、安装zookeeper
解压zookeeper安装包
# 进入软件包目录 (node1) cd /export/software # 解压安装包到server目录 tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /export/server/ # 进入server目录 cd /export/server/ # 软链接 ln -s apache-zookeeper-3.8.0-bin zookeeper
修改zookeeper配置文件
# 创建zookeeper的data文件夹 mkdir -p /export/server/zookeeper/zkdatas/ # 在node1节点上修改zookeeper 配置文件 cd /export/server/zookeeper/conf/ mv zoo_sample.cfg zoo.cfg vim zoo.cfg
# 修改以下内容 #Zookeeper的数据存放目录 dataDir=/export/server/zookeeper/zkdatas # 保留多少个快照 autopurge.snapRetainCount=3 # 日志多少小时清理一次 autopurge.purgeInterval=1 # 集群中服务器地址 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888
添加myid配置
echo 1 > /export/server/zookeeper/zkdatas/myid
安装包分发,并修改myid的值
cd /export/server/ scp -r /export/server/apache-zookeeper-3.8.0-bin/ node2:$PWD scp -r /export/server/apache-zookeeper-3.8.0-bin/ node3:$PWD # node2 修改信息 cd /export/server/ ln -s apache-zookeeper-3.8.0-bin zookeeper echo 2 > /export/server/zookeeper/zkdatas/myid # node3修改信息 cd /export/server/ ln -s apache-zookeeper-3.8.0-bin zookeeper echo 3 > /export/server/zookeeper/zkdatas/myid
三台机器启动zookeeper服务 (不依赖hadoop 可单独启动)
# 三台机器都要执行 /export/server/zookeeper/bin/zkServer.sh start # 三台主机分别查看启动状态 /export/server/zookeeper/bin/zkServer.sh status
十、StandAlone模式启动
启动hadoop集群
# 刚刚写入hdfs已经启动过了,首次安装时,不用启动 start-all.sh
启动spark历史服务器
cd /export/server/spark sbin/start-history-server.sh
启动master和worker
sbin/start-all.sh
提交pi.py
# 集群运行模式:--master spark://node1:7077 (集群地址在node1:8080可查) bin/spark-submit --master spark://node1:7077 /export/server/spark/examples/src/main/python/pi.py 100 # 本地运行模式:--master local[*] bin/spark-submit --master local[*] /export/server/spark/examples/src/main/python/pi.py 100
十一、StandAlone HA模式启动
启动集群
# 启动hadoop,安装部署时,此时不用启动 start-all.sh # 启动zookeeper,安装zookeeper时,已经启动过,现在不用启动 /export/server/zookeeper/bin/zkServer.sh start # 启动spark历史服务器 /export/server/spark/sbin/start-history-server.sh # 在node1上 启动一个master 和全部worker /export/server/spark/sbin/start-all.sh # 在node2上启动一个备用的master进程 /export/server/spark/sbin/start-master.sh
master主备切换
# 提交一个spark任务到当前alive master上: cd /export/server/spark bin/spark-submit --master spark://node1:7077 /export/server/spark/examples/src/main/python/pi.py 1000 # 在提交成功后, 将alive 的master直接kill掉 kill -9 3163(后面这个数字代码根据jps自己查master) # 主节点故障后,被kill了,现在要重新启动 # 切换到node1节点 sbin/start-master.sh
十二、Spark on Yarn模式启动
配置依赖Spark的jar包(首次安装时执行)
# 1. 依赖环境及环境变量前面已写入 # 2. hdfs上创建存储spark相关jar包目录 hadoop fs -mkdir -p /spark/jars/ # 3. 上传$SPARK_HOME/jars所有jar包 hadoop fs -put /export/server/spark/jars/* /spark/jars/
正常启动
# 启动hadoop,首次安装时,已经启动过 start-all.sh # 启动zookeeper(三台机器都要启动)首次安装时,已经启动过 /export/server/zookeeper/bin/zkServer.sh start # 启动spark历史服务器 /export/server/spark/sbin/start-history-server.sh # 在node1上 启动一个master 和全部worker /export/server/spark/sbin/start-all.sh # 在node2上启动一个备用的master进程 /export/server/spark/sbin/start-master.sh
连接到yarn中
# 进入spark目录 cd /export/server/spark # pyspark的yarn运行模式,yarn服务器地址:8088 bin/pyspark --master yarn --deploy-mode client|cluster # --deploy-mode 选项是指定部署模式, 默认是 客户端模式 # client就是客户端模式 # cluster就是集群模式 # --deploy-mode 仅可以用在YARN模式下
# Scala的yarn运行模式 bin/spark-shell --master yarn --deploy-mode client|cluster # 注意: 交互式环境 pyspark 和 spark-shell 无法运行 cluster模式
# spark-submit # bin/spark-submit (PI) bin/spark-submit --master yarn --deploy-mode client|cluster /xxx/xxx/xxx.py 参数 bin/spark-submit --master yarn /export/server/spark/examples/src/main/python/pi.py 100
十三、Linux集群安装pyspark类库
切换到pyspark虚拟环境(三台虚拟机都要安装)
# 切换到pyspark虚拟环境 conda activate pyspark
pip 安装pyspark(三台虚拟机都要安装)
# pip安装pyspark pip install pyspark koalas
十四、配置PyCharm本地开发环境
配置本地PySpark环境变量(供本地Local模式开发测试用)
# Windows上可直接pip安装pyspark,无需额外的spark框架包 # 只需设置JAVA_HOME、HADOOP_HOME环境变量 # 配置Path环境变量 path=D:\ProgramData\Anaconda3 path=D:\ProgramData\Anaconda3\Scripts path=D:\ProgramData\Anaconda3\Library\bin path=D:\ProgramData\Anaconda3\Library\mingw-w64\bin # 配置HOME环境变量 JAVA_HOME=D:\Program Files\Java\jdk1.8.0_333 HADOOP_HOME=D:\Program Files\hadoop-3.3.3
创建本地Anaconda3虚拟环境
# 1. 安装本地Anaconda3 # 2. 在C:\Users\1945下新建一个.condarc文件,并粘贴国内源地址 # 3. 创建虚拟环境(管理员权限) ## 管理员权限运行:Anaconda Prompt (Anaconda3) conda create -n pyspark python=3.9 # 4. 切换到pyspark虚拟环境 conda activate pyspark # 5. 在当前的虚拟环境安装pyspark包 pip install pyspark findspark -i https://pypi.tuna.tsinghua.edu.cn/simple
配置pycharm本地python解释器
# 1. 下载pycharm专业版到Windows主机 # 2. 新建项目pyspark,解释器选择已存在的解释器,即pyspark所在的虚拟环境 D:\ProgramData\Anaconda3\envs\pyspark\python.exe # 3. 进入项目主界面,运行main.py文件测试此解释器是否可以运行 # 4. 打开设置-解释器设置-添加远程解释器-ssh解释器 # 5. 输入ssh解释器参数 主机名:node1 端口:22 用户名:root next:输入密码123456 # 6. 填写node1上的解释器位置 /export/server/anaconda3/envs/pyspark/bin/python3.9
十五、正式开发
pyspark分布式任务的相关设置
# 1. 并行度设置 (全局并行度)(设置为CPU总核心的2-10倍) ## 配置文件中:conf/spark-defaults.conf spark.default.parallelism 100 ## 客户端提交参数 bin/spark-submit --conf "spark.default.parallelism=100" ## 代码中设置 conf=SparkConf() conf.set("spark.default.parallelism","100") # 2. executor的数量设置 一个集群多少台机器 就多少个executor
SparkSQL的Shuffle 分区数目
# 在SparkSQL中当job产生shuffle时,默认的分区数是200(yarn模式较合适、local模式2、4、10) # 1. 配置文件conf/spark-defaults.conf spark.sql.shuflle.partions=100 # 2. 在客户端提交参数中 bin/spark-submit --conf "spark.sql.shuflle.partions=100" # 3. 在代码中可以设置 spark = SparkSession.builder. \ appName("test"). \ master("local[*]"). \ config("spark.sql.shuflle.partions","2") getOrCreate()
将案例提交到Yarn中运行
# 1. PyCharm直接执行 ## 环境变量,让pycharm直接提交yarn的时候,知道hadoop的配置在哪,可以去读取yarn的信息 ## ~/.basnrc设置or代码导入 import os os.environ["JAVA_HOME"]="/export/server/jdk" os.environ["LD_LIBRARY_PATH"]="/export/server/hadoop/lib/native" os.environ["YARN_CONF_DIR"] = "/export/server/hadoop/etc/hadoop" os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/envs/pyspark/bin/python3.9" ## 在集群运行,本地文件就不可用,需要用HDFS文件 rdd=sc.textFile("hdfs://node1:8020/input/order.txt") # 如果在pycharm中直接提交到yarn,依赖了其他的python文件,可以通过设置属性来指定依赖代码 ## 如果在代码中运行,那么依赖的其他文件,可以通过spark.submit.pyFiles属性来设置 ## conf对象,可以通过setAPI设置数据,参数1是key,参数2是value,支持.zip(一堆),也支持单个.py文件 conf.set("spark.submit.pyFiles","defs.py") # 2. 在服务器上通过spark-submit提交到集群运行 ## --py-files 可以帮你指定 你依赖的其他python代码,支持.zip(一堆),也支持单个.py文件 /export/server/spark/binspark-submit --master yarn --py-files ./defs.zip ./main.py
将文件上传到hdfs
# 将data上传到Linux集群 scp... # 将集群data上传到hdfs hadoop fs -put ./data.txt /input/ # 如果存在远程user用户无法读写hdfs文件,需要hadoop用户远程授权 hadoop fs -chmod 777 /user
Spark 3.0 新特性
# 1. SparkSql的性能优化 # 2. 自适应查询(AQE)提高SparkSql性能 (3.0及以上版本请自觉打开这个) set spark.sql.adaptiva.enabled=True; · 动态合并较小分区 · 动态调整join策略 · 动态优化倾斜join # 3. 动态分区裁剪 # 4. 增强的Python API: PySpark和Koalas(现在已被pyspark.pandas代替) # (@@@@@@@@@@@@@@@@@@@ pyspark.pandas @@@@@@@@@@@@@@@@@@@@@) · pyspark.pandas:实现对pandas api的支持,并将底层运行转为分布式的pandas · 可以用原生PySpark提供的DataFrame API 开发 · 也能使用pyspark.pandas提供的pandas的API开发
Koalas 类库实例代码
""" # (@@@@@@@@@@@@@@@@@@@ pyspark.pandas @@@@@@@@@@@@@@@@@@@@@) # (@@@@@@@@@@@@@@@@@@@ pyspark.pandas ====> pandas用户神器 @@@@@@@@@@@@@@@@@@@@@) """ import pandas as pd import numpy as np import pyspark.pandas as ks # 将pandas的dataframe对象转换为koalas对象 pdf=pd.read_csv("path") # pandas 的对象 kdf=ks.from_pandas(pdf) # 将pandas对象转换为koalas对象 # 将spark的DataFrame对象转换为koalas对象 sdf=spark.createDataFrame(pdf) kdf=sdf.to_koalas() # koalas自己生成dataframe对象 kdf=ks.DataFrame({'A':['foo','bar'],'B':[1,2]}) # koalas与pyspark (合众国而攻一强) kdf.to_spark_io('zoo.orc',format='orc') kdf.read_spark_io('zoo.orc',format='orc').head(10)
Example:pyspark实现随机森林
""" Random Forest Classifier Example. """ import os import findspark import pandas as pd import pyspark.pandas as pyd from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession from sklearn.datasets import load_breast_cancer if __name__ == "__main__": findspark.init() os.environ["PYARROW_IGNORE_TIMEZONE"] = '1' spark = SparkSession \ .builder \ .appName("随机森林针对乳腺癌数据分析") \ .master("yarn") \ .getOrCreate() # 加载数据 cancer = load_breast_cancer() data = pd.DataFrame(data=cancer.data, columns=cancer.feature_names) data["label"] = cancer.target data = pyd.from_pandas(data) data = data.to_spark(index_col='index') # 合并特征 data = data.rdd.map(lambda x: (Vectors.dense(x[1:-1]), x[-1])).toDF(["features", "label"]) data.show() # 索引标签,将元数据添加到标签列。 # 适合整个数据集以包含索引中的所有标签。 labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) # 自动识别分类特征,并对其进行索引。 # 设置 maxCategories,使具有 > 4 个不同值的特征被视为连续的。 featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data) # 将数据分成训练集和测试集(30% 用于测试) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # 训练一个随机森林模型。 rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", seed=60, maxDepth=16,numTrees=60) # 将索引标签转换回原始标签。 labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels) # 在管道中链接索引和随机森林 pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter]) # 训练模型。 运行索引器。 model = pipeline.fit(trainingData) # 做预测 predictions = model.transform(testData) # 选择要显示的示例行 predictions.select("predictedLabel", "label", "features").show(5) # 选择(预测,真实标签)并计算测试误差 evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction",metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Right = {}".format(accuracy * 100)) rfModel = model.stages[2] print(rfModel) # summary only spark.stop()
附加:Dask分布式机器学习框架
dask部署及运行
# 1. 多台Linux虚拟机 # 2. 配置主机和Linux虚拟机环境:Anaconda3 # 3. 创建conda虚拟环境(全部机) conda create -n dask python=3.9 # 4. 进入虚拟环境 conda activate dask pip install dask[complete] # 5. 启动 主机,dask环境cmd:dask-scheduler tcp://192.168.31.79:8786 Linux集群,dask环境cmd:dask-worker 192.168.31.79:8786 # 6. Pycharm开发
针对不同情景的dask分布式
# 1. 块式集成方法 dask 提供针对具有均匀分布的块式数据集的集成方法,如dask的dataframe.. 也就是说,支持整个过程的假设是,数据在分区之间的分布相对均匀。融合的每个成员学习的参数应该相对相似,因此在应用于相同数据时会给出相对相似的预测。 # 基本思想是将某个子估计器的副本拟合到 dask 数组或 DataFrame 的每个块(或分区)中 # 结果类似于投票平均式,所以要求数据在每个块中必须均匀分布 import sklearn.linear_model subestimator = sklearn.linear_model.RidgeClassifier(random_state=0) clf = dask_ml.ensemble.BlockwiseVotingClassifier( subestimator, classes=[0, 1] ) clf.fit(X, y) preds = clf.predict(X)
# 2. 针对小数据问题的扩展Scikit-Learn # 网格搜索 dask并行搜索 import joblib with joblib.parallel_backend('dask'): grid_search.fit(data.data, data.target)
# 3. 针对大数据集进行评分预测 # 使用ParallelPostFit进行包装 from sklearn.linear_model import LogisticRegressionCV from dask_ml.wrappers import ParallelPostFit clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2") clf.fit(X_train, y_train) y_pred = clf.predict(X_large) y_pred
# 4. 在大数据集上训练模型 使用dask自带的少量机器学习算法api import dask_ml.datasets import dask_ml.cluster import matplotlib.pyplot as plt from dask_ml. import ..