基于pyspark的北京历史天气数据分析及可视化_实时

发布于:2025-06-24 ⋅ 阅读:(17) ⋅ 点赞:(0)

基于pyspark的北京历史天气数据分析及可视化

项目概况

[👇👇👇👇👇👇👇👇]
点这里,查看所有项目
[👆👆👆👆👆👆👆👆]

数据类型

北京历史天气数据

开发环境

centos7

软件版本

python3.8.18、hadoop3.2.0、spark3.1.2、mysql5.7.38、scala2.12.18、jdk8、kafka2.8.2

开发语言

python

开发流程

数据上传(hdfs)->数据分析(spark)->数据写kafka(python)->实时分析(spark)->数据存储(mysql)->后端(flask)->前端(html+js+css)

可视化图表

在这里插入图片描述
在这里插入图片描述

操作步骤

python安装包


pip3 install kafka-python==2.0.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pandas==2.0.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask==3.0.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask-cors==4.0.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pyecharts==2.0.4 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install pymysql==1.1.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

启动MySQL


# 查看mysql是否启动 启动命令: systemctl start mysqld.service
systemctl status mysqld.service
# 进入mysql终端
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
# MySQL的用户名:root 密码:123456
mysql -uroot -p123456

启动Hadoop


# 离开安全模式: hdfs dfsadmin -safemode leave
# 启动hadoop
bash /export/software/hadoop-3.2.0/sbin/start-hadoop.sh

启动kafka


# 启动zookeeper
sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/zookeeper.properties
# 启动kafka
sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-start.sh -daemon /export/software/kafka_2.12-2.8.2/config/server.properties
# 创建topic
/export/software/kafka_2.12-2.8.2/bin/kafka-topics.sh --create --topic weather --replication-factor 1 --partitions 1 --zookeeper master:2181
# 启动消费者
/export/software/kafka_2.12-2.8.2/bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic weather
# 关闭kafka
# sh /export/software/kafka_2.12-2.8.2/bin/kafka-server-stop.sh
# 关闭zookeeper
# sh /export/software/kafka_2.12-2.8.2/bin/zookeeper-server-stop.sh

准备目录


mkdir -p /data/jobs/project/
cd /data/jobs/project/

# 上传 "data" 目录下的 "beijing_weather_data.csv" 文件

head -5 beijing_weather_data.csv

上传文件到hdfs


cd /data/jobs/project/

hdfs dfs -mkdir -p /data/source/
hdfs dfs -rm -r /data/source/*
hdfs dfs -put -f beijing_weather_data.csv /data/source/
hdfs dfs -ls /data/source/

创建MySQL库


CREATE DATABASE IF NOT EXISTS echarts CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

spark数据分析


cd /data/jobs/project/

# 上传 "pyspark" 目录下的 "data_process.py" 文件

spark-submit \
--master local[*] \
--jars /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process.py /data/source/

# 可以进入MySQL进行校验
# select * from weather_info limit 10;
# select * from weather_year_h_temp limit 10;
# select * from weather_min_max_temp limit 10;

spark实时计算


cd /data/jobs/project/

# 上传 "pyspark" 目录下的 "data_process_realtime.py" 文件
# 上传 "pyspark" 目录下的 "spark-sql-kafka-0-10_2.12-3.1.2.jar" 文件

spark-submit \
--master local[*] \
--jars /data/jobs/project/spark-sql-kafka-0-10_2.12-3.1.2.jar,/export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,com.mysql:mysql-connector-j:8.0.33 \
--driver-class-path /export/software/spark-3.1.2-bin-hadoop3.2/jars/mysql-connector-j-8.0.33.jar \
/data/jobs/project/data_process_realtime.py

kafka生产者_读取csv写kafka


cd /data/jobs/project/

# 上传 "pyspark" 目录下的 "csv_to_kafka.py" 文件
# 向kafka中发送数据
python3 csv_to_kafka.py

启动可视化


mkdir -p /data/jobs/project/myapp/
cd /data/jobs/project/myapp/

# 上传 "可视化" 目录下的 "所有" 文件/文件夹

# windows本地运行: python app.py
python3 app.py pro


网站公告

今日签到

点亮在社区的每一天
去签到