大家好,我是python222_小锋老师,看到一个不错的基于Python的网易云音乐热门歌单可视化大屏项目(flask+pandas+echarts+request库),分享下哈。
项目视频演示
【免费】基于Python的网易云音乐热门歌单可视化大屏项目(flask+pandas+echarts+爬虫) Python毕业设计_哔哩哔哩_bilibili
项目介绍
数字化时代带动着整个社会的信息化发展,随着数字媒体的不断发展,现在通多媒体数字产品的内容越来越丰富,传播影响力越来越强,以音乐为例,现在的音乐文化多样、音乐资源也异常的丰富,在这种大数据的环境下,基于网易云音乐平台的歌单数据,设计并实现了一个高度互动与视觉吸引力的数据可视化大屏项目。该项目通过挖掘网易云歌单的海量数据,包括但不限于歌单类型分布、热门歌曲排行、用户偏好分析、地域音乐风格差异等关键指标,为用户、音乐创作者及平台管理者提供直观、全面的音乐生态洞察。采用可视化技术,如动态图表、热力图、词云等,将复杂的数据转化为生动形象的视觉故事,不仅展现了音乐流行的趋势与变迁,还揭示了用户行为的深层规律。该数据可视化大屏不仅提升了音乐数据的价值密度,也为音乐行业的精准营销、内容创作及用户服务提供了有力的数据支持。
系统展示
部分代码
# -*- coding: utf-8 -*-
import os
import re
import csv
import json
import time
import pymysql
import requests
from bs4 import BeautifulSoup
from multiprocessing import Pool
# 老版本爬虫响应速度慢,因为api接口(https://api.imjad.cn/)将在未来失效
# 开源方案 https://github.com/mixmoe/HibiAPI 的安装文档不详细,但是提供了公开接口见'公开搭建实例'
# 于是找到 https://api.obfs.dev/docs#operation/playlist_api_netease_playlist_get
# 将现有接口替换下就实现了
# coder: SuccessKey
# 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 歌单类型链接
type_url = "https://music.163.com/discover/playlist"
# 连接数据库
db = pymysql.connect(
host="localhost",
user="root",
password="123456",
port=3306,
db="cloudmusic"
)
cursor = db.cursor()
"""获取歌单类型"""
def get_playlist_type(url):
response = requests.get(url=url, headers=headers)
html = response.text
soup = BeautifulSoup(html, 'lxml')
types = [t.text for t in soup.select("a.s-fc1")][1:]
# for t in types:
# print(t)
return types
"""获取歌单id"""
# CSV文件名,用于保存已爬取的歌单ID和类型
csv_filename = "playlist_ids.csv"
# 获取歌单ID并保存
def get_playlist_id(url):
response = requests.get(url=url, headers=headers)
html = response.text
soup = BeautifulSoup(html, 'lxml')
# 获取所有的歌单 ID
ids = [re.sub(r"\D+", "", i['href']) for i in soup.select("a.msk")]
t = re.search('https.*cat=(.*)&limit', url).group(1)
# 打印出所有的歌单ID和类型(调试用)
print(f"获取到的歌单ID: {ids}, 歌单类型: {t}")
# 保存歌单ID和类型到CSV文件
save_ids_to_csv(ids, t)
# 保存ID和类型到CSV
def save_ids_to_csv(ids, t):
# 打开文件并保存ID和类型
with open(csv_filename, mode='a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
for id in ids:
writer.writerow([id, t]) # 保存ID和类型到文件中
print(f"已保存 {len(ids)} 个歌单ID 和类型到 {csv_filename}")
# 读取CSV文件中的已存在ID和类型
def load_ids_from_csv():
existing_ids = {} # 使用字典来存储 ID 和类型的配对
if os.path.exists(csv_filename):
with open(csv_filename, mode='r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
for row in reader:
existing_ids[row[0]] = row[1] # ID作为key,类型作为value
return existing_ids
"""获取歌单信息"""
def get_playlist_info(ids, t):
# playlist_url = "https://api.imjad.cn/cloudmusic/?type=playlist&id={}"
# playlist_url = "https://api.obfs.dev/api/netease/playlist?id={}"
playlist_url = "https://hibi.moecube.com/api/netease/playlist?id={}"
url = playlist_url.format(ids)
print(url)
response = requests.get(url=url, headers=headers)
json_text = response.text
# print(url)
# print(response.text)
json_playlist = json.loads(json_text)["playlist"]
# 歌单ID、歌单名、歌单类型、标签、创建时间、最后更新时间、播放量、收藏量、转发量、评论数
# 用户名、性别、用户类型、VIP类型、省份、城市
playlistID = str(json_playlist["id"])
name = json_playlist["name"]
playlistType = t
tags = "、".join(json_playlist["tags"])
createTime = time.strftime("%Y-%m-%d", time.localtime(int(str(json_playlist["createTime"])[:-3])))
updateTime = time.strftime("%Y-%m-%d", time.localtime(int(str(json_playlist["updateTime"])[:-3])))
tracks_num = len(json_playlist["trackIds"])
playCount = json_playlist["playCount"]
subscribedCount = json_playlist["subscribedCount"]
shareCount = json_playlist["shareCount"]
commentCount = json_playlist["commentCount"]
nickname = json_playlist['creator']['nickname']
gender = str(json_playlist['creator']['gender'])
userType = str(json_playlist['creator']['userType'])
vipType = str(json_playlist['creator']['vipType'])
province = str(json_playlist['creator']['province'])
city = str(json_playlist['creator']['city'])
# 匹配性别、省份、城市代码
if gender == '1':
gender = '男'
else:
gender = '女'
# 打开行政区代码文件
with open("country.csv", encoding="utf-8") as f:
rows = csv.reader(f)
for row in rows:
if row[0] == province:
province = row[1]
if row[0] == city:
city = row[1]
if province == '香港特别行政区':
city = '香港特别行政区'
if province == '澳门特别行政区':
city = '澳门特别行政区'
if province == '台湾省':
city = '台湾省'
if province == str(json_playlist['creator']['province']):
province = '海外'
city = '海外'
if city == str(json_playlist['creator']['city']):
city = province
playlist = [playlistID, name, playlistType, tags, createTime, updateTime,
tracks_num, playCount, subscribedCount, shareCount, commentCount,
nickname, gender, userType, vipType, province, city]
print(playlist)
save_to_playlists(playlist)
# def get_playlist_info(ids, t):
# # playlist_url = "https://api.imjad.cn/cloudmusic/?type=playlist&id={}"
# # playlist_url = "https://api.obfs.dev/api/netease/playlist?id={}"
# playlist_url = "https://hibi.moecube.com/api/netease/playlist?id={}"
# urls = [playlist_url.format(i) for i in ids]
#
# for url in urls:
# print(url)
# try:
# response = requests.get(url=url, headers=headers)
# json_text = response.text
# # print(url)
# # print(response.text)
# json_playlist = json.loads(json_text)["playlist"]
# except:
# continue
#
# # 歌单ID、歌单名、歌单类型、标签、创建时间、最后更新时间、播放量、收藏量、转发量、评论数
# # 用户名、性别、用户类型、VIP类型、省份、城市
# playlistID = str(json_playlist["id"])
# name = json_playlist["name"]
# playlistType = t
# tags = "、".join(json_playlist["tags"])
# createTime = time.strftime("%Y-%m-%d", time.localtime(int(str(json_playlist["createTime"])[:-3])))
# updateTime = time.strftime("%Y-%m-%d", time.localtime(int(str(json_playlist["updateTime"])[:-3])))
# tracks_num = len(json_playlist["trackIds"])
# playCount = json_playlist["playCount"]
# subscribedCount = json_playlist["subscribedCount"]
# shareCount = json_playlist["shareCount"]
# commentCount = json_playlist["commentCount"]
# nickname = json_playlist['creator']['nickname']
# gender = str(json_playlist['creator']['gender'])
# userType = str(json_playlist['creator']['userType'])
# vipType = str(json_playlist['creator']['vipType'])
# province = str(json_playlist['creator']['province'])
# city = str(json_playlist['creator']['city'])
#
# # 匹配性别、省份、城市代码
# if gender == '1':
# gender = '男'
# else:
# gender = '女'
#
# # 打开行政区代码文件
# with open("country.csv", encoding="utf-8") as f:
# rows = csv.reader(f)
#
# for row in rows:
# if row[0] == province:
# province = row[1]
# if row[0] == city:
# city = row[1]
#
# if province == '香港特别行政区':
# city = '香港特别行政区'
# if province == '澳门特别行政区':
# city = '澳门特别行政区'
# if province == '台湾省':
# city = '台湾省'
# if province == str(json_playlist['creator']['province']):
# province = '海外'
# city = '海外'
# if city == str(json_playlist['creator']['city']):
# city = province
#
# playlist = [playlistID, name, playlistType, tags, createTime, updateTime,
# tracks_num, playCount, subscribedCount, shareCount, commentCount,
# nickname, gender, userType, vipType, province, city]
# print(playlist)
# save_to_playlists(playlist)
"""保存到数据库"""
# 保存歌单信息到数据库,如果已存在就更新
def save_to_playlists(l):
sql = """
INSERT INTO playlists(id, name, type, tags, create_time, update_time,
tracks_num, play_count, subscribed_count, share_count, comment_count,
nickname, gender, user_type, vip_type, province, city)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
type = VALUES(type),
tags = VALUES(tags),
create_time = VALUES(create_time),
update_time = VALUES(update_time),
tracks_num = VALUES(tracks_num),
play_count = VALUES(play_count),
subscribed_count = VALUES(subscribed_count),
share_count = VALUES(share_count),
comment_count = VALUES(comment_count),
nickname = VALUES(nickname),
gender = VALUES(gender),
user_type = VALUES(user_type),
vip_type = VALUES(vip_type),
province = VALUES(province),
city = VALUES(city)
"""
try:
cursor.execute(sql, (
l[0], l[1], l[2], l[3], l[4], l[5], l[6], l[7], l[8], l[9], l[10],
l[11], l[12], l[13], l[14], l[15], l[16]
))
db.commit()
except Exception as e:
db.rollback()
print(f"Error: {e}")
def main():
types = get_playlist_type(type_url)
urls = []
for t in types:
for i in range(37):
url = "https://music.163.com/discover/playlist/?order=hot&cat={0}&limit=35&offset={1}".format(t, i * 35)
print(url)
urls.append(url)
pool = Pool(10)
for url in urls:
pool.apply_async(get_playlist_id, args=(url,))
pool.close()
pool.join()
def main2():
# print(load_ids_from_csv())
data = load_ids_from_csv()
for k, v in data.items():
# print(k, v)
try:
get_playlist_info(k, v)
except:
pass
if __name__ == "__main__":
# main()
main2()
源码下载
链接:https://pan.baidu.com/s/1JcGAqWi2yBa12SlUHXQgeQ
提取码:1234