基于pyspark的北京历史天气数据分析及可视化
项目概况
[👇👇👇👇👇👇👇👇]
点这里,查看所有项目
[👆👆👆👆👆👆👆👆]
数据类型
北京历史天气数据
开发环境
centos7
软件版本
python3.8.18、hadoop3.2.0、spark3.1.2、mysql5.7.38、scala2.12.18、jdk8、kafka2.8.2
开发语言
python
开发流程
数据上传(hdfs)->数据分析(spark)->数据写kafka(python)->实时分析(spark)->数据存储(mysql)->后端(flask)->前端(html+js+css)
可视化图表
操作步骤
python安装包
pip3 install kafka-python==2.0.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pandas==2.0.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask==3.0.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask-cors==4.0.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pyecharts==2.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pymysql==1.1.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
启动MySQL
# 查看mysql是否启动 启动命令: systemctl start mysqld.service
systemctl status mysqld.service
# 进入mysql终端
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
mysql -uroot -p123456
启动Hadoop
# 离开安全模式: hdfs dfsadmin -safemode leave
# 启动hadoop
bash /export/software/hadoop-3.2.0/sbin/start-hadoop.sh
启动kafka
# 启动zookeeper
sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/zookeeper.properties
# 启动kafka
sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/server.properties
# 创建topic
/export/software/kafka_2.12-2.8.2/bin/kafka-topics.sh --create --topic weather --replication-factor 1 --partitions 1 --zookeeper master:2181
# 启动消费者
/export/software/kafka_2.12-2.8.2/bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic weather
# 关闭kafka
# sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-stop.sh
# 关闭zookeeper
# sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-stop.sh
准备目录
mkdir -p /data/jobs/project/
cd /data/jobs/project/
# 上传 "data" 目录下的 "beijing_weather_data.csv" 文件
head -5 beijing_weather_data.csv
上传文件到hdfs
cd /data/jobs/project/
hdfs dfs -mkdir -p /data/source/
hdfs dfs -rm -r /data/source/*
hdfs dfs -put -f beijing_weather_data.csv /data/source/
hdfs dfs -ls /data/source/
创建MySQL库
CREATE DATABASE IF NOT EXISTS echarts CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
spark数据分析
cd /data/jobs/project/
# 上传 "pyspark" 目录下的 "data_process.py" 文件
spark-submit \
--master local[*] \
--jars /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process.py /data/source/
# 可以进入MySQL进行校验
# select * from weather_info limit 10;
# select * from weather_year_h_temp limit 10;
# select * from weather_min_max_temp limit 10;
spark实时计算
cd /data/jobs/project/
# 上传 "pyspark" 目录下的 "data_process_realtime.py" 文件
# 上传 "pyspark" 目录下的 "spark-sql-kafka-0-10_2.12-3.1.2.jar" 文件
spark-submit \
--master local[*] \
--jars /data/jobs/project/spark-sql-kafka-0-10_2.12-3.1.2.jar,/export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,com.mysql:mysql-connector-j:8.0.33 \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process_realtime.py
kafka生产者_读取csv写kafka
cd /data/jobs/project/
# 上传 "pyspark" 目录下的 "csv_to_kafka.py" 文件
# 向kafka中发送数据
python3 csv_to_kafka.py
启动可视化
mkdir -p /data/jobs/project/myapp/
cd /data/jobs/project/myapp/
# 上传 "可视化" 目录下的 "所有" 文件/文件夹
# windows本地运行: python app.py
python3 app.py pro