Scrapy进阶封装(第三阶段:多管道封装,多文件存储)

发布于:2025-06-27 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.yield返回数据的原理?

为什么要用yield返回数据给管道?

  • 遍历这个函数的返回值的时候,挨个把数据读到内存,不会造成内存的瞬间占用过高,Python3中的rangepython2中的xrange同理。
  • scrapy是异步爬取,所以通过yield能够将运行权限教给其他的协程任务去执行,这样整个程序运行效果会更高。

注意点:解析函数中的yield能够传递的对象只能是:BaseItemRequestdictNone

前置知识,关于数据的存储

可以参考我另一篇文章python爬虫之数据存储_爬虫代码w是什么意思-CSDN博客

2.Mysql存储管道封装

第一步创建一个管道文件夹

咱们要用多管道,所以咱们不用自带的pipelines.py文件,虽然可以写在一个文件,但是如果写多个导致代码冗余。不好查看。

类名修改如下:

 第二步在setting.py设置开启管道文件

按你的路径修改

 数据越小,优先级越大

第四步设置管道

先下载  pip install pymsql

以豆瓣为例,这三个字段

 初始数据

    def __init__(self):
        # 数据库连接信息
        self.host = '127.0.0.1'
        self.user = 'root'
        self.passwd = '12345'
        self.db = 'spider2'
        self.charset = 'utf8mb4'

链接数据库

 def open_spider(self, spider):
        # 连接数据库
        self.conn = pymysql.connect(
            host=self.host,
            user=self.user,
            passwd=self.passwd,
            db=self.db,
            charset=self.charset
        )
        self.cursor = self.conn.cursor()

注意open_spider是爬虫开启时自动运行,当前启动的爬虫对象,可以通过它获取爬虫的名称、设置等信息。

建表,以爬虫名称为表名

 self.cursor = self.conn.cursor()

        # 根据爬虫名字创建表
        table_name = spider.name
        create_table_sql = f"""
          CREATE TABLE IF NOT EXISTS {table_name} (
              id INT AUTO_INCREMENT PRIMARY KEY,
              title VARCHAR(255),
              rating VARCHAR(50),
              url VARCHAR(255)
          )
          """
        self.cursor.execute(create_table_sql)
        self.conn.commit()

 插入数据

    def process_item(self, item, spider):
        # 获取爬虫名字作为表名
        table_name = spider.name
        # 插入数据
        insert_sql = f"""
          INSERT INTO {table_name} (title, rating, url)
          VALUES (%s, %s, %s)
          """
        self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))
        self.conn.commit()
        return item

process_item 方法

作用:每当爬虫抓取到一个数据项(item)时,该方法会被调用来处理这个数据项。这是管道的核心方法,用于对数据项进行清洗、验证、存储等操作。

return item 作用:返回处理后的 item,以便后续的管道可以继续处理。如果抛出 DropItem 异常

 ,则丢弃该数据项,不再传递给后续的管道。

        def close_spider(self, spider):
            # 关闭数据库连接
            self.cursor.close()
            self.conn.close()

close_spider()在爬虫关闭时被调用,主要用于清理资源,如关闭数据库连接、文件等操作。

完整代码如下:

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter

import pymysql
class Mysql_Pipeline:
    def __init__(self):
        # 数据库连接信息
        self.host = '127.0.0.1'
        self.user = 'root'
        self.passwd = '12345'
        self.db = 'spider2'
        self.charset = 'utf8mb4'

    def open_spider(self, spider):
        # 连接数据库
        self.conn = pymysql.connect(
            host=self.host,
            user=self.user,
            passwd=self.passwd,
            db=self.db,
            charset=self.charset
        )
        self.cursor = self.conn.cursor()

        # 根据爬虫名字创建表
        table_name = spider.name
        create_table_sql = f"""
          CREATE TABLE IF NOT EXISTS {table_name} (
              id INT AUTO_INCREMENT PRIMARY KEY,
              title VARCHAR(255),
              rating VARCHAR(50),
              url VARCHAR(255)
          )
          """
        self.cursor.execute(create_table_sql)
        self.conn.commit()

    def process_item(self, item, spider):
        # 获取爬虫名字作为表名
        table_name = spider.name
        # 插入数据
        insert_sql = f"""
          INSERT INTO {table_name} (title, rating, url)
          VALUES (%s, %s, %s)
          """
        self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))
        self.conn.commit()
        print('mysql插入成功')
        return item

        def close_spider(self, spider):
            # 关闭数据库连接
            self.cursor.close()
            self.conn.close()

 结果如下:

 

3. MongoDB存储管道封装

直接创建管道

开启管道

ITEM_PIPELINES = {
   'myspider.pipelines.Mysql_pipelines.Mysql_Pipeline': 300,
   'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 200,
}

还是以豆瓣三个字段为例

pip insatll pymongo

import pymongo
from scrapy import signals

class MongoDBPipeline:
    def __init__(self):
        self.mongo_uri = 'mongodb://localhost:27017/'  # MongoDB 的 URI 地址
        self.mongo_db = 'spider'   # MongoDB 的数据库名称
    def open_spider(self, spider):
        # 在爬虫启动时执行,用于初始化操作,如建立 MongoDB 连接
        self.client = pymongo.MongoClient(self.mongo_uri)
        #自动创建数据库
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        # 在爬虫关闭时执行,用于清理操作,如关闭 MongoDB 连接
        self.client.close()

    def process_item(self, item, spider):
        # 处理每个数据项,将数据存储到 MongoDB 中
        collection_name = spider.name  # 使用爬虫名字作为集合名
        #自动创建表
        self.db[collection_name].insert_one(item)  # 将数据插入到集合中
        print('MongoDB插入成功!')
        #字典字典插入
        return item

 原理差不多一样,链接数据库,插入数据库,关闭数据库。

双数据库插入

结果如下:

mongodb 

 

mysql 

 

但如果只想插入其中一个数据库

要么注释return item,优先级设置高,这样会抛出错误

要么注释管道开启

 最好的还是在爬虫里重写配置

    custom_settings = {
        'ITEM_PIPELINES': {
            'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 300,
        }
    }

这样管道设置以爬虫类为主,只插入mongodb

 

 4.文件管道封装

图片存储

class FilePipeline:
    def process_item(self, item, spider):
        # getcwd(): 用于获取当前工作目录(Current Working Directory)的路径
        #图片下载
        if item.get("f_type"):
            if  item.get("f_type") == 'img':
                download_path = os.getcwd() + f'/img/'
                if not os.path.exists(download_path):
                    os.mkdir(download_path)
                # 图片保存
                image_name = item.get("title")
                image_content = item.get("content")
                if image_content:
                    with open(download_path+f'{image_name}.jpg', "wb") as f:
                        f.write(image_content)
                        print("图片保存成功: ", image_name)

一般用item.get("f_type")区分文件下载用os生成文件夹

图片存储一共就两个参数,响应体和名字,这样配置,

在爬虫里重写设置

  custom_settings = {
        'ITEM_PIPELINES': {
            'myspider.pipelines.File_piplines.FilePipeline': 300,
        }
    }

二次请求图片url

以上有个小错误,把 item_img['title'] =item['title']改为item_img['title'] =item

 结果如下:

利用item.get("f_type")区分文件下载,文件管道可以封装如下:

import os
import pickle

from itemadapter import ItemAdapter

import json
class FilePipeline:
    def process_item(self, item, spider):
        # getcwd(): 用于获取当前工作目录(Current Working Directory)的路径
        #图片下载
        if item.get("f_type"):
            if  item.get("f_type") == 'img':
                download_path = os.getcwd() + f'/img/'
                if not os.path.exists(download_path):
                    os.mkdir(download_path)
                # 图片保存
                image_name = item.get("title")
                image_content = item.get("content")
                if image_content:
                    with open(download_path+f'{image_name}.jpg', "wb") as f:
                        f.write(image_content)
                        print("图片保存成功: ", image_name)
            elif item.get("f_type") == 'txt':
                download_path = os.getcwd() + '/txt/'
                if not os.path.exists(download_path):
                    os.mkdir(download_path)
                # 文本保存
                txt_name = item.get("title")
                txt_content = item.get("content")
                with open(download_path+f'{txt_name}.txt', 'a', encoding='utf-8') as f:
                    f.write(txt_content + '\n')
                    print('文本存储成功')
            elif item.get("f_type") == 'json':
                download_path = os.getcwd() + '/json/'
                if not os.path.exists(download_path):
                    os.mkdir(download_path)
                # 文本保存
                json_name = item.get("title")
                json_obj = item
                with open(download_path+f'{json_name}.json', 'a', encoding='utf-8') as file:
                    file.write(json.dumps(json_obj, indent=2, ensure_ascii=False), )
                    print('json存储成功')
            elif item.get("f_type") == 'music':
                download_path = os.getcwd() + '/music/'
                if not os.path.exists(download_path):
                    os.mkdir(download_path)
                # 文本保存
                music_name = item.get("title")
                music_content = item.get("content")
                with open(download_path+f'{music_name}.mp3', 'a', encoding='utf-8') as f:
                    f.write(music_content + '\n')
                    print('MP3存储成功')
        else:
            print('无事发生')

包括mp3,文本,json,图片等文件下载。 

 但是事实上,一个文件可以进行多次存储,这也是用yield返回给管道的主要原因

    def parse(self, response, **kwargs):
        # scrapy的response对象可以直接进行xpath
        ol_list = response.xpath('//ol[@class="grid_view"]/li')
        for ol in ol_list:
            # 创建一个数据字典
            item = {}
            # 利用scrapy封装好的xpath选择器定位元素,并通过extract()或extract_first()来获取结果
            item['title'] = ol.xpath('.//div[@class="hd"]/a/span[1]/text()').extract_first()#标题
            item['content'] = ol.xpath('.//div[@class="bd"]/div/span[2]/text()').extract_first()#评分
            item['f_type'] = 'txt'
            yield item
            item['url'] = ol.xpath('.//a/@href').extract_first()#链接
            item['img_url'] = ol.xpath('.//img/@src').extract_first()
            item['f_type'] = 'json'
            yield item
            # yield item
            yield scrapy.Request(url= item['img_url'] , headers=self.headers, callback=self.img_parse,cb_kwargs={"item":item['title']},meta={"meta":item})#自定义一个回调方法
    def img_parse(self, response,item):
        item_img = {'f_type':"img"}
        item_img['content'] = response.body
        item_img['title'] =item
        yield item_img

if __name__ == '__main__':
    cmdline.execute('scrapy crawl douban'.split())

以上三个yield,返回三次 ,分别是文本保存,json保存,图片保存。

运行结果如下:

文本存储评分

管道封装到此一段落了。 


网站公告

今日签到

点亮在社区的每一天
去签到