scrapy 使用Selenium与Scrapy处理动态加载网页内容的解决方法

发布于:2024-05-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

引言
在爬虫技术领域,处理动态加载的网页内容常常是一项挑战,尤其是对于那些通过用户滚动或其他交互动态加载更多内容的网站。本文将介绍如何结合使用Selenium和Scrapy来有效处理这类网页。

初探Selenium与Scrapy的结合
首先,我们探索如何使用Selenium在Scrapy中间件中处理动态加载内容的网页。关键在于模拟用户滚动行为,以加载并捕获所有内容。

# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html

from scrapy import signals
from scrapy.http import HtmlResponse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import random,time
from fake_useragent import UserAgent
# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter
from scrapy.utils.project import get_project_settings


class SeleniumMiddleware:
    @classmethod
    def from_crawler(cls, crawler):
        middleware = cls()
        crawler.signals.connect(middleware.spider_closed, signal=signals.spider_closed)
        return middleware

    def __init__(self):
        options = Options()
        # options.add_argument('--headless')  # 启用无头模式
        # options.add_argument('user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36')
        # 创建UserAgent对象
        # ua = UserAgent()
        # settings=get_project_settings() #获取settings配置,设置需要的信息
        # 生成随机User-Agent 没有用这里是因为这里有可能会产生手机端的访问方式
        # user_agent = ua.random
        # user_agent = random.choice(settings["USER_AGENTS"])
        user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        # print("user_agent:",user_agent)
        options.add_argument('--disable-blink-features=AutomationControlled')#关闭自动控制blink特征
        options.add_argument(f"user-agent={user_agent}")
        options.add_experimental_option('excludeSwitches', ['enable-automation'])
        self.driver = webdriver.Chrome(options=options)

    def spider_closed(self, spider):
        self.driver.quit()

    def process_request(self, request, spider):
        self.driver.get(request.url)
        # 等待页面初步加载完成
        time.sleep(3)  # 示例等待时间,可能需要根据实际页面调整
        # 找到以游客模式进入的按钮
        try:
            element = self.driver.find_element(By.ID,"loginContainer")
            child_element = self.driver.find_elements(By.CLASS_NAME,'css-txolmk-DivGuestModeContainer')
            child_element[0].click()
        except Exception as e:
            print("以游客模式进入的按钮没有找到")

        time.sleep(3)
        try:
            flush_element = self.driver.find_elements(By.CLASS_NAME,'css-z9i4la-Button-StyledButton')
            flush_element[0].click()
        except Exception as e:
            print("刷新按钮没有找到")
        time.sleep(6)
        
        xunhuan = True
        temp_height = 0
        while xunhuan:
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)
            # 获取当前滚动条距离顶部的距离
            check_height = self.driver.execute_script("return document.documentElement.scrollTop || window.pageYOffset || document.body.scrollTop;")
            if check_height == temp_height:
                print("到底了")
                xunhuan = False
            else:
                temp_height = check_height
      

        body = self.driver.page_source.encode('utf-8')
        
        return HtmlResponse(self.driver.current_url, body=body, encoding='utf-8', request=request)
    
 

完整的Scrapy爬虫实例
下面是一个使用Selenium和Scrapy爬取网易新闻的示例。

import scrapy,json,html,base64
import datetime,hashlib
from tiktokSelenium.items import TiktokseleniumItem

class TiktokSpider(scrapy.Spider):
    name = "tiktok"
    # allowed_domains = ["httpbin.org"]
    # start_urls = ["https://httpbin.org/user-agent"]
    allowed_domains = ["tiktok.com"]
    start_urls = [
        "https://www.tiktok.com/@espn"
    ]

    def __init__(self):
        # settings=get_project_settings() #获取settings配置,设置需要的信息
        # self.tik_accounts = settings['TIK_ACCOUNTS']
        # 获取今天的爬取
        # self.fenrundate = time.strftime("%Y%m%d")
        # 获取今天的爬取
        self.fenrundate = self.get_yesterday_day()

    def parse(self, response):
        print("==================response.text=======================")
        # print(response.text)
        print(len(response.xpath('//div[@class="css-x6y88p-DivItemContainerV2 e19c29qe8"]')))
        author_url = response.url
        author_name = author_url.split("@")[-1]
        
        for sel in response.xpath('//div[@class="css-x6y88p-DivItemContainerV2 e19c29qe8"]'):
            link = sel.xpath('div/div/div/a')[0]
            # 获取视频的链接地址
            href = sel.xpath('div/div/div/a/@href').extract_first()
            # 视频id
            vid = href.split("/")[-1]
            vclick = link.xpath('div/div/strong[@class="video-count css-dirst9-StrongVideoCount e148ts222"]/text()').extract_first()
            # vclick = videoCount[0].text
            title = html.escape(sel.xpath('div[2]/div/@aria-label').extract_first())
            # continue
            uqc_arr = [title,vclick]
            cvideo_item = TiktokseleniumItem()
            # cvideo_item = {}
            cvideo_item['author_url'] = author_url
            cvideo_item['author_name'] = author_name
            cvideo_item['video_id'] = vid
            cvideo_item['video_url'] = href
            cvideo_item['video_title'] = title
            cvideo_item['video_hits'] = vclick
            
            cvideo_item['date'] = self.fenrundate
            cvideo_item['video_real_hits'] = self.convert_to_real_hits(vclick)
            # print(cvideo_item)
            cvideo_item['unique_key'] = self.str_md5("_".join(uqc_arr))
            yield cvideo_item     
            # print(cvideo_item)

    # 获取昨天的日期 
    def get_yesterday_day(self):
        today = datetime.date.today()
        yesterday = today - datetime.timedelta(days=1)
        yesterday2 = str(yesterday)
        return yesterday2.replace("-","")

    # 点击量转化为数字
    def convert_to_real_hits(self,strs):
        lastItem = strs[-1]
        
        if lastItem in ['K','M','B','k','m','b']:
            strs = eval(strs[0:-1])
            
            if lastItem == 'K' or lastItem == 'k':
                strs = strs * 1000
            elif lastItem == 'M' or lastItem == 'm':
                strs = strs * 1000 * 1000
            else:
                strs = strs * 1000 * 1000 * 1000
        return int(strs)
    
    def str_md5(self,strs):
        m = hashlib.md5()
        bs = base64.b64encode(strs.encode("utf-8"))
        m.update(bs)
        str_md5 = m.hexdigest()
        return str_md5