爬虫——playwright获取亚马逊数据

发布于:2025-03-21 ⋅ 阅读:(17) ⋅ 点赞:(0)

playwright简介

playwright是微软新出的一个测试工具,与selenium类似,不过与selenium比起来还是有其自身的优势的(除了教程少是弱项)。

  • 任何浏览器 • 任何平台 • 一个 API

    • 跨浏览器。 Playwright 支持所有现代渲染引擎,包括 Chromium、WebKit 和 Firefox。
    • 跨平台。 在 Windows、Linux 和 macOS 上进行本地或 CI 测试,无头或有头。
    • 跨语言。 在 TypeScript、JavaScript、Python、.NET、Java 中使用 Playwright API。
    • 测试移动网络。 适用于 Android 的 Google Chrome 和 Mobile Safari 的原生移动模拟。 相同的渲染引擎可以在桌面和云端运行。
      (上面是抄官网的, 下面才是个人的使用体验)
  • 方便

    • 配置方便。与selenium的配置相比较的话,不需要下载额外的驱动,也不需要下载对应版本的浏览器。(就是配置地址不能修改)。
    • 可以异步selenium是只能阻塞式的执行,而playwright可以执行异步。
    • 内存占用少。好像确实是比较少一点。

使用playwright初窥亚马逊

通常情况,每家电商平台都有自己的反爬措施,亚马逊也不例外,所以要尝试一下,当然如果是scrapy这个爬虫框架的话,似乎可以越过反爬措施,内部的逻辑代码是什么没有深究,现在先拿过来用用。

安装playwright

同样的,这个第三方库也要安装,直接使用pip安装就行

pip install playwright

安装完毕之后,还需要安装一些依赖,只要执行一句话即可。这样就能安装了。

playwright install

如果不能安装,或者安装卡进度条了,可以查看这个教程

打开亚马逊页面

小试牛刀一把,看看这个库怎么样

import asyncio
import time
import traceback
from playwright.async_api import async_playwright

class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None

    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.goto("https://www.amazon.com/")
        time.sleep(60)
        await self.browser.close()

if __name__ == "__main__":
    asyncio.run(AmazonBrowser().start())

在这里插入图片描述
what!!! 不愧是美国注明的电商平台,上来就是验证码=。=还有点不好搞定。

那试试添加headers,应该可以

class AmazonBrowser:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.page = None
        self.playwright = None
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Referer": "https://www.google.com/", # 伪造,告诉亚马逊网站,说我是从Google搜索页面点击进来的。
        }

在这里插入图片描述
诶嘿! 来了!

搞数据

在第一个页面里面,没有我们想要的数据,所以我们需要输入内容,然后从新的页面中搞到要的数据。
获取元素 → 搜索 → 搞数据 应该是这个思路。

搜索

我们需要获取2个元素,一个元素是搜索框,另一个元素是搜索按钮(当然搞回车按键也行),从F12里面可以得知,搜索框是下面的内容:
在这里插入图片描述
搜索按钮是下面的:
在这里插入图片描述
先搞定输入框:

import random # 引入random库
class AmazonBrowser:
    async def start(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        self.page = await self.browser.new_page()
        await self.page.set_extra_http_headers(self.headers)
        await self.page.goto("https://www.amazon.com/")
		search_box = self.page.locator("input#twotabsearchtextbox")# 这里的input有个id,就是twotabsearchtextbox,这个id应该是独立的,所以直接用它。
        await search_box.wait_for(state="visible")
        await search_box.type('键盘', delay=random.uniform(50, 200)) 
        time.sleep(60)

【细节讲解】

  • locator:在Playwright中,locator的作用是用于定位页面上的元素,语法有很多:
    • "text=登录"是通过文本匹配,
    • "#username"是通过ID匹配,
    • "//button[@type='submit']"是通过xpath匹配,
    • "a.s-pagination-next.s-pagination-separator"通过css匹配,
    • "a.s-pagination-next.s-pagination-separator:not([aria-disabled="true"])",这个则是排除具有aria-disabled="true"属性的元素。
  • type:在playwright中,type的作用时将文字打入输入框中,与之类似的方法是fill,只不过前者是将文字挨个键入,而后者则是将文字一次性输入,在严格监听页面的网页中,type的的功能用的会比较多,因为可以模拟人的操作,而fill则不行。

再执行看看~
在这里插入图片描述
芜湖~
在这里插入图片描述
同样的,找到搜索按钮并点击就能跳转过去了。

class AmazonBrowser:
    async def start(self):
    # ...
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await self.page.locator("input#nav-search-submit-button").click()

在这里插入图片描述

修改bug

有的人可能会遇到另外一种情况:
在这里插入图片描述
这个时候搜索输入框和搜索按钮的id就已经不一样了,这种情况的搜索输入框是id="nav-bb-search",搜索按钮是没有id的只有class="nav-bb-button",所以要修改上面的代码

class AmazonBrowser:
    async def start(self):
		#...
		# 同时检测两种搜索框的存在
        twotab_exists = await self.page.locator("input#twotabsearchtextbox").count()
        navbb_exists = await self.page.locator("input#nav-bb-search").count()
        if twotab_exists:
            search_box = self.page.locator("input#twotabsearchtextbox")
            search_btn = self.page.locator("input#nav-search-submit-button")
        elif navbb_exists:
            search_box = self.page.locator("input#nav-bb-search")
            search_btn = self.page.locator("input.nav-bb-button")
        else:
            raise Exception("No search box found")
        await search_box.wait_for(state="visible")
        await search_box.type("键盘", delay=random.uniform(50, 200))
        await search_btn.click()

数据获取

我们先来看看要搞哪些数据:
在这里插入图片描述
从这个截图上看,有图片,标题,评分,价格。不过有一点是:第二个图片里面有2个价格,到底那个是真实的价格呢?点进去发现,第一个价格是原价,第二个价格好像是二手价格。
在这里插入图片描述
那就弄原价吧,不然太麻烦了。可是他的标题有了,要是标题重复了怎么办?从F12里面可以发现data-asin应该是唯一的,那我们就把这个ASIN弄到手,当然后面也有一个uuid,弄那个也行。
在这里插入图片描述

# 新增依赖库
from urllib.parse import urljoin

class AmazonBrowser:
    async def start(self):
    	#...
    	await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        batch_data = []
        for index, item in enumerate(items):
        	asin = await item.get_attribute("data-asin")
        	uuid = await item.get_attribute("data-uuid")
        	img = item.locator('img.s-image').first
        	await img.scroll_into_view_if_needed()
        	await self.page.wait_for_timeout(500)
        	src = await img.get_attribute('src')
			href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
			product = await item.locator('h2.a-size-medium > span').first.inner_text()
			price_section = item.locator('span.a-price[data-a-color="base"]')
			if await price_section.count() > 0:
				price = await price_section.locator('span.a-offscreen').first.inner_text()
			else:
				price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
            		price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
            	else:
            		price = '暂无价格'
            batch_data.append({
                        'Product': product,
                        'UUID': uuid,
                        'ASIN': asin,
                        'ImageURL': src,
                        'URL': urljoin(self.target_url, href) if href else None,
                        'Price': price,
                    })

【细节讲解】

  • wait_for_selector:是等待页面上符合选择器的元素出现,最长等待15秒。
    • div[role="listitem"][data-asin]:是选择<div>元素,要求这个元素里面有[role="listitem"]以及[data-asin]这两个东西。
  • await,异步标志,有些情况下需要使用,有些情况下则不需要使用:
    • 需要使用的情况:
      • 涉及网络请求:如 goto()fetch()
      • 获取异步返回的内容:如 text_content()inner_text()
      • 涉及页面交互的操作:如点击、输入、等待
      • 需要时间完成的操作:如 wait_for_selector()
    • 不需要使用的情况:
      • locator()对象
      • 只定义选择器,不交互
      • 直接获取属性,如browser.name

翻页

一页当然是不行的,一页才16条数据,所以肯定要翻页的。同理从F12里面找到翻页标签,那就是
在这里插入图片描述
然后仅有这个也不够,我们还要知道终止,也就是不能点击的情况下,是什么情况的,所以翻到20页得到
在这里插入图片描述
所以需要设计选择器,来判断下一页按钮能不能按下去,或者是能不能检测到终止的<span>

class AmazonBrowser:
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待

在这里插入图片描述

优化结构

但是仅有循环,没有获取数据,这也不是我们想要的,所以接下来是优化结构。将数据获取也翻页分开。

class AmazonBrowser:
	# ...
	async def data_collection(self):
        await self.page.wait_for_selector('div[role="listitem"][data-asin]', timeout=15000)
        items = await self.page.locator('div[role="listitem"][data-asin]').all()
        for index, item in enumerate(items):
            asin = await item.get_attribute("data-asin")
            uuid = await item.get_attribute("data-uuid")
            img = item.locator('img.s-image').first
            await img.scroll_into_view_if_needed()
            await self.page.wait_for_timeout(500)
            src = await img.get_attribute('src')
            href = await item.locator('a.a-link-normal.s-line-clamp-2').get_attribute('href')
            product = await item.locator('h2.a-size-medium > span').first.inner_text()
            price_section = item.locator('span.a-price[data-a-color="base"]')
            if await price_section.count() > 0:
                price = await price_section.locator('span.a-offscreen').first.inner_text()
            else:
                price_section_secondary = item.locator('div.a-row.a-size-base.a-color-secondary')
                if await price_section_secondary.count() > 0:
                    price = await price_section_secondary.locator('span.a-color-base').first.inner_text()
                else:
                    price = '暂无价格'
			# self.batch_data要提前声明哦
            self.batch_data.append({
                'Product': product,
                'UUID': uuid,
                'ASIN': asin,
                'ImageURL': src,
                'URL': urljoin(self.target_url, href) if href else None,
                'Price': price,
            })
	async def start(self):
		#...
		disabled_selector = 'span.s-pagination-next.s-pagination-disabled'
		enabled_selector = 'a.s-pagination-next:not([aria-disabled="true"])'
		# 当检测到disable_selector时就继续
		while True:
			if await self.page.locator(disabled_selector).count() > 0:
				break
			# 在这里添加
			await self.data_collection()
			next_btn = self.page.locator(enabled_selector)
			await next_btn.scroll_into_view_if_needed()
			        # 点击前确保元素可交互
        	await next_btn.wait_for(state="visible")
        	await next_btn.click()
        	await self.page.wait_for_timeout(2000)  # 基础等待

这样就搞定啦,但是数据要怎么保存呢?可以有两种方式,一种方式是保存在数据库里面,比如mongodb,mysql等等,还可以保存在csv文件里面。

简单保存

在这里就简单的保存成csv文件的样子吧,也就是直接使用pandas保存就行了,下一篇章就搞一下mongodb

# 导入依赖库
import pandas as pd
class AmazonBrowser:
	def __init__(self):
		self.df = pd.DataFrame()
	#... 
	async def start(self):
		while True:
			#...while循环内容
		new_df = pd.DataFrame(self.batch_data)
        self.df = pd.concat([self.df, new_df], ignore_index=True)
        self.df.to_csv(path)

就酱~
在这里插入图片描述