2024 高级爬虫笔记(四)协程、selenium

发布于:2024-12-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

一、协程

1.1 概念

- 协程是一种用户态的轻量级线程

- 子程序
  在所有的语言中都是层级调用的,比如A中调用B,B在执行过程中调用C,C执行完返回,B执行完返回,最后是A执行完毕。
  这是通过栈实现的,一个函数就是一个执行的子程序,子程序的调用总是有一个入口、一次返回,调用的顺序是明确的

- 理解协程
  线程是系统级别的,它们是由操作系统调度。协程是程序级别,由程序员根据需求自己调度。
  我们把一个线程中的一个个函数称为子程序,那么一个子程序在执行的过程中可以中断去执行别的子程序,这就是协程。
  也就是说同一个线程下的一段代码1执行执行着就中断,然后去执行另一段代码2,当再次回来执行代码1时,接着从之前的中断的位置继续向下执行

- 优点
  a、最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  b、不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

- 缺点
  a、无法利用多核CPU,协程的本质是单个线程,它不能同时将多个CPU的多个核心使用上,失去了标准线程使用多CPU的能力。
  b、进行阻塞操作(操作IO)会阻塞整个程序

1.2、asyncio模块

1.2.1、概述

- asyncio模块是python3.4版本引入的标准库,直接内置了对异步IO的操作

- 编程模式
  是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用
  然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO

- 说明
  到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能

- 关键字的说明
  | event_loop  | 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 |
  | coroutine   | 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
  | task        | 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 |
  | async/await | python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口 |

1.2.2、asyncio基本使用

  import time
  import asyncio

  async def run(url):
      print("开始向'%s'要数据……"%(url))
      # 向百度要数据,网络IO
      await asyncio.sleep(5)
      data = "'%s'的数据"%(url)
      print("给你数据")
      return data

  # 定义一个回调函数
  def call_back(future):
      print("call_back:", future.result())

  coroutine = run("百度")
  # 创建一个任务对象
  task = asyncio.ensure_future(coroutine)

  # 给任务添加回调,在任务结束后调用回调函数
  task.add_done_callback(call_back)

  loop = asyncio.get_event_loop()
  loop.run_until_complete(task)

1.2.3、使用协程实现多任务异步执行

需求:同时请求"百度", "阿里", "腾讯", "新浪"四个网站,假设响应时长均为2import time
    import asyncio

    async def run(url):
        print("开始向'%s'要数据……" % (url))
        await asyncio.sleep(2)
        data = "'%s'的数据" % (url)
        return data

    def call_back(future):
        print("call_back:", future.result())

    async def main():
        tasks = []
        t1 = time.time()

        for url in ["百度", "阿里", "腾讯", "新浪"]:
            coroutine = run(url)
            task = asyncio.ensure_future(coroutine)
            task.add_done_callback(call_back)
            tasks.append(task)

        # 同时添加4个异步任务
        await asyncio.wait(tasks)
        t2 = time.time()
        print("总耗时:%.2f" % (t2 - t1))

    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())

1.2.4、Task 概念及用法

1.2.4.1 概念
+ Task,是 python 中与事件循环进行交互的一种主要方式。
  创建 Task,意思就是把协程封装成 Task 实例,并追踪协程的 运行 / 完成状态,用于未来获取协程的结果。

+ Task 核心作用: 在事件循环中添加多个并发任务;
  具体来说,是通过 asyncio.create_task() 创建 Task,让协程对象加入时事件循环中,等待被调度执行。

**注意:**
  Python 3.7 以后的版本支持 asyncio.create_task() ,在此之前的写法为 loop.create_task() 
  开发过程中需要注意代码写 法对不同版本 python 的兼容性。

+ 需要指出的是,协程封装为 Task 后不会立马启动,当某个代码 await 这个 Task 的时候才会被执行。

  当多个 Task 被加入一个 task_list 的时候,添加 Task 的过程中 Task 不会执行
  必须要用 `await asyncio.wait() `或 `await asyncio.gather()` 将 Task 对象加入事件循环中异步执行。

+ 一般在开发中,常用的写法是这样的:
  -- 先创建 task_list 空列表;
  -- 然后用 asyncio.create_task() 创建 Task;
  -- 再把 Task 对象加入 task_list ;
  -- 最后使用 await asyncio.wait 或 await asyncio.gather 将 Task 对象加入事件循环中异步执行。

**注意:** 
  创建 Task 对象时,除了可以使用 asyncio.create_task() 之外,还可以用最低层级的 loop.create_task() 或 asyncio.ensure_future() 
1.2.4.2、Task 简单用法
  import asyncio
  import arrow

  def current_time():
      '''
      获取当前时间
      :return:
      '''
      cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
      return cur_time

  async def func(sleep_time):
      func_name_suffix = sleep_time # 使用 sleep_time (函数 I/O 等待时长)作为函数名后缀,以区分任务对象
      print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
      await asyncio.sleep(sleep_time)
      print(f"[{current_time()}]函数{func.__name__}-{func_name_suffix} 执行完毕")
      return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"

  async def run():
      task_list = []
      for i in range(5):
          task = asyncio.create_task(func(i))
          task_list.append(task)
      done, pending = await asyncio.wait(task_list)
      for done_task in done:
          print((f"[{current_time()}]得到执行结果 {done_task.result()}"))
  
  def main():
      loop = asyncio.get_event_loop()
      loop.run_until_complete(run())

  if __name__ == '__main__':
      main()

1.2.5、asyncio.wait和asyncio.gather的异同

  相同:从功能上看, asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。

  不同: asyncio.wait 会返回两个值: done 和 pending , done 为已完成的协程 Task , pending 为超时未完成的协程 Task ,需通过 future.result 调用 Task 的 result ;而 asyncio.gather 返回的是所有已完成 Task 的 result ,不需要再进行调用或其他操作,就可以得到全部结果。

+ asyncio.wait 用法:
  最常见的写法是: `await asyncio.wait(task_list) 。`

+ asyncio.gather 用法:
  最常见的用法是: `await asyncio.gather(*task_list)` ,注意这里 `task_list` 前面有一个 `* `。

1.3、aiohttp

1.3.1、安装与使用

pip install aiohttp

1.3.2、简单使用

+ 首先是学习客户端,也就是用来发送http请求的用法。首先看一段代码,会在代码中讲述需要注意的地方:
import aiohttp
import asyncio


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text())

asyncio.run(main())

+ 代码解释:
  在网络请求中,一个请求就是一个会话,然后**aiohttp**使用的是**ClientSession**来管理会话,所以第一个重点,看一下**ClientSession**:
  在源码中,这个类的注释是使用HTTP请求接口的第一个类。
  然后上面的代码就是实例化一个*ClientSession*类然后命名为session,然后用session去发送请求。

1.3.3、在URL中传递参数

+ 有时候在发起网络请求的时候需要附加一些参数到url中,这一点也是支持的。

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        params = {'key1': 'value1', 'key2': 'value2'}
        async with session.get('http://httpbin.org/get',
                               params=params) as resp:
            print(resp.url)

asyncio.run(main())

1.3.4、读取响应内容

+ 我们可以读取到服务器的响应状态和响应内容,这也是使用请求的一个很重要的部分。
  通过`status`来获取响应状态码,`text()`来获取到响应内容

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text(encoding=utf-8))

1.3.5、非文本内容格式

对于网络请求,有时候是去访问一张图片,这种返回值是二进制的也是可以读取到的:

await resp.read()

将`text()`方法换成`read()`方法就好。

1.3.6、请求的自定义

+ 1、自定义Headers
  有时候做请求的时候需要自定义headers
  主要是为了让服务器认为我们是一个浏览器。然后就需要我们自己来定义一个headers:

headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                      "AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/78.0.3904.108 Safari/537.36"
    }
await session.post(url, headers=headers)

+ 2、如果出现ssl验证失败的处理
import aiohttp
import asyncio
from aiohttp import TCPConnector


async def main():
    async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session:
        pass
asyncio.run(main())


+ 3、自定义cookie
  发送你自己的cookies给服务器,你可以为ClientSession对象指定cookies参数:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:
    async with session.get(url) as resp:
        assert await resp.json() == {
           "cookies": {"cookies_are": "working"}}

+ 4、使用代理
  有时候在写爬虫的时候需要使用到代理,所以*aiohttp*也是支持使用代理的
  我们可以在发起请求的时候使用代理,只需要使用关键字`proxy`来指明就好
  但是有一个很难受的地方就是它只支持`http`代理,不支持**HTTPS**代理。使用起来大概是这样:

proxy = "http://127.0.0.1:10809
async with aiohttp.ClientSession(headers=headers) as session:
  async with session.get(url=login_url, proxy=proxy) as response:
    resu = await response.text()

1.4、aiofiles文件读写

1.4.1、安装

pip install aiofiles

1.4.2、使用实例

1、打开文件
import asyncio
import aiofiles

async def main():
    async with aiofiles.open('first.m3u8', mode='r') as f:
        contents = await f.read()
        print(contents)

if __name__ == '__main__':
    asyncio.run(main())

2、迭代
import asyncio
import aiofiles

async def main():
    async with aiofiles.open('filename') as f:
        async for line in f:
            print(line)

if __name__ == '__main__':
    asyncio.run(main())

1.5、并发控制
semaphore,信号量控制并发

semaphore = asyncio.Semaphore(10)

实例
import asyncio
import os
import aiofiles
import aiohttp
import requests
from bs4 import BeautifulSoup


def get_page_source(web):
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36'
    }
    response = requests.get(web, headers=headers)
    response.encoding = 'utf-8'
    return response.text

def parse_page_source(html):
    book_list = []
    soup = BeautifulSoup(html, 'html.parser')
    a_list = soup.find_all('div', attrs={'class': 'mulu-list quanji'})
    for a in a_list:
        a_list = a.find_all('a')
        for href in a_list:
            chapter_url = href['href']
            book_list.append(chapter_url)
    return book_list

def get_book_name(book_page):
    book_number = book_page.split('/')[-1].split('.')[0]
    book_chapter_name = book_page.split('/')[-2]
    return book_number, book_chapter_name

async def aio_download_one(chapter_url, signal):
    number, c_name = get_book_name(chapter_url)
    for c in range(10):
        try:
            async with signal:
                async with aiohttp.ClientSession() as session:
                    async with session.get(chapter_url) as resp:
                        page_source = await resp.text()
                        soup = BeautifulSoup(page_source, 'html.parser')
                        chapter_name = soup.find('h1').text
                        p_content = soup.find('div', attrs={'class': 'neirong'}).find_all('p')
                        content = [p.text + '\n' for p in p_content]
                        chapter_content = '\n'.join(content)
                        if not os.path.exists(f'{book_name}/{c_name}'):
                            os.makedirs(f'{book_name}/{c_name}')
                        async with aiofiles.open(f'{book_name}/{c_name}/{number}_{chapter_name}.txt', mode="w",
                                                 encoding='utf-8') as f:
                            await f.write(chapter_content)
                        print(chapter_url, "下载完毕!")
                        return ""
        except Exception as e:
            print(e)
            print(chapter_url, "下载失败!, 重新下载. ")
    return chapter_url

async def aio_download(url_list):
    tasks = []
    semaphore = asyncio.Semaphore(10)
    for h in url_list:
        tasks.append(asyncio.create_task(aio_download_one(h, semaphore)))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    url = 'https://www.51shucheng.net/daomu/guichuideng'
    book_name = '鬼吹灯'
    if not os.path.exists(book_name):
        os.makedirs(book_name)
    source = get_page_source(url)
    href_list = parse_page_source(source)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(aio_download(href_list))
    loop.close()

二、selenium

2.1 概述及安装

+ selenium本身是一个自动化测试工具。它可以让python代码调用浏览器。并获取到浏览器中加载的各种资源。 
  我们可以利用selenium提供的各项功能。 帮助我们完成数据的抓取。

+ 安装:pip install selenium

+ 它与其他库不同的地方是他要启动你电脑上的浏览器, 这就需要一个驱动程序来辅助.
  这里推荐用chrome浏览器
  chrome驱动地址:http://chromedriver.storage.googleapis.com/index.html
  根据你电脑的不同自行选择
  然后关键的来了. 把你下载的浏览器驱动放在python解释器所在的文件夹
  Windwos查看Python路径:  py -0p     

+ 前期准备工作完毕,上代码看看 感受一下selenium:

    from selenium.webdriver import Chrome  # 导入谷歌浏览器的类

    # 创建浏览器对象
    web = Chrome()  # 如果你的浏览器驱动放在了解释器文件夹

    web.get("http://www.baidu.com")  # 输入网址
    print(web.title)  # 打印title

    运行一下你会发现神奇的事情发生了. 浏览器自动打开了. 并且输入了网址. 也能拿到网页上的title标题

2.2、selenium的基本使用

selenium通过控制浏览器,所以对应的获取的数据都是elements中的内容

from selenium import webdriver
from selenium.webdriver.common.by import By

driver = webdriver.Chrome()

# 访问百度
driver.get("http://www.baidu.com/")

# 截图
driver.save_screenshot("baidu.png")

# 搜索关键字 杜卡迪
driver.find_element(By.ID, "kw").send_keys("杜卡迪")

# 点击id为su的搜索按钮
driver.find_element(By.ID, "su").click()

# 获取页面内容,cookie, url
driver.page_source   
driver.get_cookies()
driver.current_url

# 退出当前页面
driver.close()  

# 退出浏览器
driver.quit()   

2.3、selenium元素定位的方法

2.3.1 元素定位的八种方法:

   1、By.ID  使用id值定位
     el = driver.find_element(By.ID, '')
     el = driver.find_element_by_id()            

   2、By.XPATH 使用xpath定位
     el = driver.find_element(By.XPATH, '')
     el = driver.find_element_by_xpath()         

   3、By.TAG_NAME. 使用标签名定位
     el = driver.find_element(By.TAG_NAME, '')
     el = driver.find_element_by_tag_name()     

   4、By.LINK_TEXT使用超链接文本定位
     el = driver.find_element(By.LINK_TEXT, '')
     el = driver.find_element_by_link_text()

   5、By.PARTIAL_LINK_TEXT  使用部分超链接文本定位
     el = driver.find_element(By.PARTIAL_LINK_TEXT  , '')
     el = driver.find_element_by_partial_link_text()

   6、By.NAME   使用name属性值定位
     el = driver.find_element(By.NAME, '')
     el = driver.find_element_by_name()

   7、By.CLASS_NAME     使用class属性值定位
     el = driver.find_element(By.CLASS_NAME, '')   
     el = driver.find_element_by_class_name()

   8、By.CSS_SELECTOR   使用css选择器定位
     el = driver.find_element(By.CSS_SELECTOR, '')  
     el = driver.find_element_by_css_selector()

**注意:**
+ `by_link_text`和`by_partial_link_text`的区别:
  全部文本和包含某个文本

2.3.2、元素的操作:

> find_element_by_xxx方法仅仅能够获取元素对象,接下来就可以对元素执行以下操作 从定位到的元素中提取数据的方法

1. 从定位到的元素中获取数据
el.get_attribute(key)           # 获取key属性名对应的属性值
el.text                         # 获取开闭标签之间的文本内容

2. 对定位到的元素的操作
el.click()                      # 对元素执行点击操作
el.submit()                     # 对元素执行提交操作
el.clear()                      # 清空可输入元素中的数据
el.send_keys(data)              # 向可输入元素输入数据

使用示例:
from selenium import webdriver
from selenium.webdriver.common.by import By

driver =webdriver.Chrome()

driver.get("https://www.douban.com/")

# 打印页面内容 (获取到以后可以进行后续的xpath,bs4 或者存储等)
print(driver.page_source)

ret4 = driver.find_elements(By.TAG_NAME, "h1")
print(ret4[0].text)

ret5 = driver.find_elements(By.LINK_TEXT, "下载豆瓣 App")
print(ret5[0].get_attribute("href"))

driver.close()

2.4、selenium的其他操作

2.4.1、后台浏览器

我们已经基本了解了selenium的基本使用了. 但是呢, 不知各位有没有发现, 每次打开浏览器的时间都比较长. 这就比较耗时了
我们写的是爬虫程序. 目的是数据. 并不是想看网页. 那能不能让浏览器在后台跑呢? 答案是可以的

from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options

opt = Options()
opt.add_argument("--headless")
opt.add_argument('--disable-gpu')
opt.add_argument("--window-size=4000,1600")  # 设置窗口大小

web = Chrome(options=opt)

2.4.2、selenium 处理cookie

+ 获取cookie
  dictCookies = driver.get_cookies()

+ 设置cookie
  driver.add_cookie(dictCookies)

+ 删除cookue
  #删除一条cookie
  driver.delete_cookie("CookieName")
  # 删除所有的cookie
  driver.delete_all_cookies()

2.4.3、页面等待

- 为什么需要等待
  如果网站采用了动态html技术,那么页面上的部分元素出现时间便不能确定
  这个时候就可以设置一个等待时间,强制等待指定时间
  等待结束之后进行元素定位,如果还是无法定位到则报错

- 页面等待的三种方法
  1、强制等待
    import time
    time.sleep(n)      # 阻塞等待设定的秒数之后再继续往下执行

  2、显式等待(自动化web测试使用,爬虫基本不用)
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    
      # 显式等待指定某个条件,然后设置最长等待时间10,在10秒内每隔0.5秒使用指定条件去定位元素,如果定位到元素则直接结束等待,如果在10秒结束之后仍未定位到元素则报错
    WebDriverWait(driver, 10,0.5).until( EC.presence_of_element_located((By.ID, "myDynamicElement"))

  3、隐式等待
    # 在指定的n秒内每隔一段时间尝试定位元素,如果n秒结束还未被定位出来则报错
    driver.implicitly_wait(10)    

2.4.4、switch方法切换的操作

+ 一个浏览器肯定会有很多窗口,所以我们肯定要有方法来实现窗口的切换。切换窗口的方法如下:

也可以使用 window_handles 方法来获取每个窗口的操作对象。例如:

# 获取当前所有的窗口
current_windows = driver.window_handles

# 根据窗口索引进行切换
driver.switch_to.window(current_windows[1])

driver.switch_to.window(web.window_handles[-1])  # 跳转到最后一个窗口
driver.switch_to.window(current_windows[0])  # 回到第一个窗口

+ 当你触发了某个事件之后,页面出现了弹窗提示,处理这个提示或者获取提示信息方法如下:
alert = driver.switch_to_alert()

2.4.5. 页面前进和后退

driver.forward()     # 前进
driver.back()        # 后退
driver.refresh()          # 刷新
driver.close()       # 关闭当前窗口

2.4.6、设置浏览器最大窗口

driver.maximize_window()  #最大化浏览器窗口

2.5、selenium的优缺点

- 优点
  - selenium能够执行页面上的js,对于js渲染的数据和模拟登陆处理起来非常容易
  - 使用难度简单
  - 爬取速度慢,爬取频率更像人的行为,天生能够应对一些反爬措施
- 缺点
  - 由于selenium操作浏览器,因此会将发送所有的请求,因此占用网络带宽
  - 由于操作浏览器,因此占用的内存非常大(相比较之前的爬虫)
  - 速度慢,对于效率要求高的话不建议使用

网站公告

今日签到

点亮在社区的每一天
去签到