1.yield返回数据的原理?
为什么要用yield返回数据给管道?
- 遍历这个函数的返回值的时候,挨个把数据读到内存,不会造成内存的瞬间占用过高,
Python3
中的range
和python2
中的xrange
同理。 scrapy
是异步爬取,所以通过yield
能够将运行权限教给其他的协程任务去执行,这样整个程序运行效果会更高。
注意点:解析函数中的yield
能够传递的对象只能是:BaseItem
、Request
、dict
、None
前置知识,关于数据的存储
可以参考我另一篇文章python爬虫之数据存储_爬虫代码w是什么意思-CSDN博客
2.Mysql存储管道封装
第一步创建一个管道文件夹
咱们要用多管道,所以咱们不用自带的pipelines.py文件,虽然可以写在一个文件,但是如果写多个导致代码冗余。不好查看。
类名修改如下:
第二步在setting.py设置开启管道文件
按你的路径修改
数据越小,优先级越大
第四步设置管道
先下载 pip install pymsql
以豆瓣为例,这三个字段
初始数据
def __init__(self):
# 数据库连接信息
self.host = '127.0.0.1'
self.user = 'root'
self.passwd = '12345'
self.db = 'spider2'
self.charset = 'utf8mb4'
链接数据库
def open_spider(self, spider):
# 连接数据库
self.conn = pymysql.connect(
host=self.host,
user=self.user,
passwd=self.passwd,
db=self.db,
charset=self.charset
)
self.cursor = self.conn.cursor()
注意open_spider是爬虫开启时自动运行,当前启动的爬虫对象,可以通过它获取爬虫的名称、设置等信息。
建表,以爬虫名称为表名
self.cursor = self.conn.cursor()
# 根据爬虫名字创建表
table_name = spider.name
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
rating VARCHAR(50),
url VARCHAR(255)
)
"""
self.cursor.execute(create_table_sql)
self.conn.commit()
插入数据
def process_item(self, item, spider):
# 获取爬虫名字作为表名
table_name = spider.name
# 插入数据
insert_sql = f"""
INSERT INTO {table_name} (title, rating, url)
VALUES (%s, %s, %s)
"""
self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))
self.conn.commit()
return item
process_item
方法
作用:每当爬虫抓取到一个数据项(item
)时,该方法会被调用来处理这个数据项。这是管道的核心方法,用于对数据项进行清洗、验证、存储等操作。
return item 作用:返回处理后的 item
,以便后续的管道可以继续处理。如果抛出 DropItem
异常
,则丢弃该数据项,不再传递给后续的管道。
def close_spider(self, spider):
# 关闭数据库连接
self.cursor.close()
self.conn.close()
close_spider()在爬虫关闭时被调用,主要用于清理资源,如关闭数据库连接、文件等操作。
完整代码如下:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import pymysql
class Mysql_Pipeline:
def __init__(self):
# 数据库连接信息
self.host = '127.0.0.1'
self.user = 'root'
self.passwd = '12345'
self.db = 'spider2'
self.charset = 'utf8mb4'
def open_spider(self, spider):
# 连接数据库
self.conn = pymysql.connect(
host=self.host,
user=self.user,
passwd=self.passwd,
db=self.db,
charset=self.charset
)
self.cursor = self.conn.cursor()
# 根据爬虫名字创建表
table_name = spider.name
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
rating VARCHAR(50),
url VARCHAR(255)
)
"""
self.cursor.execute(create_table_sql)
self.conn.commit()
def process_item(self, item, spider):
# 获取爬虫名字作为表名
table_name = spider.name
# 插入数据
insert_sql = f"""
INSERT INTO {table_name} (title, rating, url)
VALUES (%s, %s, %s)
"""
self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))
self.conn.commit()
print('mysql插入成功')
return item
def close_spider(self, spider):
# 关闭数据库连接
self.cursor.close()
self.conn.close()
结果如下:
3.
MongoDB存储管道封装
直接创建管道
开启管道
ITEM_PIPELINES = {
'myspider.pipelines.Mysql_pipelines.Mysql_Pipeline': 300,
'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 200,
}
还是以豆瓣三个字段为例
pip insatll pymongo
import pymongo
from scrapy import signals
class MongoDBPipeline:
def __init__(self):
self.mongo_uri = 'mongodb://localhost:27017/' # MongoDB 的 URI 地址
self.mongo_db = 'spider' # MongoDB 的数据库名称
def open_spider(self, spider):
# 在爬虫启动时执行,用于初始化操作,如建立 MongoDB 连接
self.client = pymongo.MongoClient(self.mongo_uri)
#自动创建数据库
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
# 在爬虫关闭时执行,用于清理操作,如关闭 MongoDB 连接
self.client.close()
def process_item(self, item, spider):
# 处理每个数据项,将数据存储到 MongoDB 中
collection_name = spider.name # 使用爬虫名字作为集合名
#自动创建表
self.db[collection_name].insert_one(item) # 将数据插入到集合中
print('MongoDB插入成功!')
#字典字典插入
return item
原理差不多一样,链接数据库,插入数据库,关闭数据库。
双数据库插入
结果如下:
mongodb
mysql
但如果只想插入其中一个数据库
要么注释return item,优先级设置高,这样会抛出错误
要么注释管道开启
最好的还是在爬虫里重写配置
custom_settings = {
'ITEM_PIPELINES': {
'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 300,
}
}
这样管道设置以爬虫类为主,只插入mongodb
4.文件管道封装
图片存储
class FilePipeline:
def process_item(self, item, spider):
# getcwd(): 用于获取当前工作目录(Current Working Directory)的路径
#图片下载
if item.get("f_type"):
if item.get("f_type") == 'img':
download_path = os.getcwd() + f'/img/'
if not os.path.exists(download_path):
os.mkdir(download_path)
# 图片保存
image_name = item.get("title")
image_content = item.get("content")
if image_content:
with open(download_path+f'{image_name}.jpg', "wb") as f:
f.write(image_content)
print("图片保存成功: ", image_name)
一般用item.get("f_type")区分文件下载用os生成文件夹
图片存储一共就两个参数,响应体和名字,这样配置,
在爬虫里重写设置
custom_settings = {
'ITEM_PIPELINES': {
'myspider.pipelines.File_piplines.FilePipeline': 300,
}
}
二次请求图片url
以上有个小错误,把 item_img['title'] =item['title']改为item_img['title'] =item
结果如下:
利用item.get("f_type")区分文件下载,文件管道可以封装如下:
import os
import pickle
from itemadapter import ItemAdapter
import json
class FilePipeline:
def process_item(self, item, spider):
# getcwd(): 用于获取当前工作目录(Current Working Directory)的路径
#图片下载
if item.get("f_type"):
if item.get("f_type") == 'img':
download_path = os.getcwd() + f'/img/'
if not os.path.exists(download_path):
os.mkdir(download_path)
# 图片保存
image_name = item.get("title")
image_content = item.get("content")
if image_content:
with open(download_path+f'{image_name}.jpg', "wb") as f:
f.write(image_content)
print("图片保存成功: ", image_name)
elif item.get("f_type") == 'txt':
download_path = os.getcwd() + '/txt/'
if not os.path.exists(download_path):
os.mkdir(download_path)
# 文本保存
txt_name = item.get("title")
txt_content = item.get("content")
with open(download_path+f'{txt_name}.txt', 'a', encoding='utf-8') as f:
f.write(txt_content + '\n')
print('文本存储成功')
elif item.get("f_type") == 'json':
download_path = os.getcwd() + '/json/'
if not os.path.exists(download_path):
os.mkdir(download_path)
# 文本保存
json_name = item.get("title")
json_obj = item
with open(download_path+f'{json_name}.json', 'a', encoding='utf-8') as file:
file.write(json.dumps(json_obj, indent=2, ensure_ascii=False), )
print('json存储成功')
elif item.get("f_type") == 'music':
download_path = os.getcwd() + '/music/'
if not os.path.exists(download_path):
os.mkdir(download_path)
# 文本保存
music_name = item.get("title")
music_content = item.get("content")
with open(download_path+f'{music_name}.mp3', 'a', encoding='utf-8') as f:
f.write(music_content + '\n')
print('MP3存储成功')
else:
print('无事发生')
包括mp3,文本,json,图片等文件下载。
但是事实上,一个文件可以进行多次存储,这也是用yield返回给管道的主要原因
def parse(self, response, **kwargs):
# scrapy的response对象可以直接进行xpath
ol_list = response.xpath('//ol[@class="grid_view"]/li')
for ol in ol_list:
# 创建一个数据字典
item = {}
# 利用scrapy封装好的xpath选择器定位元素,并通过extract()或extract_first()来获取结果
item['title'] = ol.xpath('.//div[@class="hd"]/a/span[1]/text()').extract_first()#标题
item['content'] = ol.xpath('.//div[@class="bd"]/div/span[2]/text()').extract_first()#评分
item['f_type'] = 'txt'
yield item
item['url'] = ol.xpath('.//a/@href').extract_first()#链接
item['img_url'] = ol.xpath('.//img/@src').extract_first()
item['f_type'] = 'json'
yield item
# yield item
yield scrapy.Request(url= item['img_url'] , headers=self.headers, callback=self.img_parse,cb_kwargs={"item":item['title']},meta={"meta":item})#自定义一个回调方法
def img_parse(self, response,item):
item_img = {'f_type':"img"}
item_img['content'] = response.body
item_img['title'] =item
yield item_img
if __name__ == '__main__':
cmdline.execute('scrapy crawl douban'.split())
以上三个yield,返回三次 ,分别是文本保存,json保存,图片保存。
运行结果如下:
文本存储评分
管道封装到此一段落了。