1.1 概念
- 协程是一种用户态的轻量级线程
- 子程序
- 理解协程
也就是说同一个线程下的一段代码1 执行执行着就中断,然后去执行另一段代码2 ,当再次回来执行代码1 时,接着从之前的中断的位置继续向下执行
- 优点
- 缺点
b、进行阻塞操作( 操作IO) 会阻塞整个程序
- asyncio模块是python3. 4 版本引入的标准库,直接内置了对异步IO的操作
- 编程模式
- 说明
到目前为止实现协程的不仅仅只有asyncio, tornado和gevent都实现了类似功能
- 关键字的说明
| event_loop | 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 |
| coroutine | 协程对象,指一个使用async 关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
| task | 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 |
| async / await | python3. 5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口 |
import time
import asyncio
async def run ( url) :
print ( "开始向'%s'要数据……" % ( url) )
await asyncio. sleep( 5 )
data = "'%s'的数据" % ( url)
print ( "给你数据" )
return data
def call_back ( future) :
print ( "call_back:" , future. result( ) )
coroutine = run( "百度" )
task = asyncio. ensure_future( coroutine)
task. add_done_callback( call_back)
loop = asyncio. get_event_loop( )
loop. run_until_complete( task)
需求:同时请求"百度" , "阿里" , "腾讯" , "新浪" 四个网站,假设响应时长均为2 秒
import time
import asyncio
async def run ( url) :
print ( "开始向'%s'要数据……" % ( url) )
await asyncio. sleep( 2 )
data = "'%s'的数据" % ( url)
return data
def call_back ( future) :
print ( "call_back:" , future. result( ) )
async def main ( ) :
tasks = [ ]
t1 = time. time( )
for url in [ "百度" , "阿里" , "腾讯" , "新浪" ] :
coroutine = run( url)
task = asyncio. ensure_future( coroutine)
task. add_done_callback( call_back)
tasks. append( task)
await asyncio. wait( tasks)
t2 = time. time( )
print ( "总耗时:%.2f" % ( t2 - t1) )
if __name__ == "__main__" :
loop = asyncio. get_event_loop( )
loop. run_until_complete( main( ) )
1.2.4、Task 概念及用法 概念
+ Task,是 python 中与事件循环进行交互的一种主要方式。
创建 Task,意思就是把协程封装成 Task 实例,并追踪协程的 运行 / 完成状态,用于未来获取协程的结果。
+ Task 核心作用: 在事件循环中添加多个并发任务;
具体来说,是通过 asyncio. create_task( ) 创建 Task,让协程对象加入时事件循环中,等待被调度执行。
** 注意: **
Python 3.7 以后的版本支持 asyncio. create_task( ) ,在此之前的写法为 loop. create_task( )
开发过程中需要注意代码写 法对不同版本 python 的兼容性。
+ 需要指出的是,协程封装为 Task 后不会立马启动,当某个代码 await 这个 Task 的时候才会被执行。
当多个 Task 被加入一个 task_list 的时候,添加 Task 的过程中 Task 不会执行
必须要用 `await asyncio. wait( ) `或 `await asyncio. gather( ) ` 将 Task 对象加入事件循环中异步执行。
+ 一般在开发中,常用的写法是这样的:
- - 先创建 task_list 空列表;
- - 然后用 asyncio. create_task( ) 创建 Task;
- - 再把 Task 对象加入 task_list ;
- - 最后使用 await asyncio. wait 或 await asyncio. gather 将 Task 对象加入事件循环中异步执行。
** 注意: **
创建 Task 对象时,除了可以使用 asyncio. create_task( ) 之外,还可以用最低层级的 loop. create_task( ) 或 asyncio. ensure_future( )、Task 简单用法
import asyncio
import arrow
def current_time ( ) :
cur_time = arrow. now( ) . to( 'Asia/Shanghai' ) . format ( 'YYYY-MM-DD HH:mm:ss' )
return cur_time
async def func ( sleep_time) :
func_name_suffix = sleep_time
print ( f"[ { current_time( ) } ] 执行异步函数 { func. __name__} - { func_name_suffix} " )
await asyncio. sleep( sleep_time)
print ( f"[ { current_time( ) } ]函数 { func. __name__} - { func_name_suffix} 执行完毕" )
return f"【[ { current_time( ) } ] 得到函数 { func. __name__} - { func_name_suffix} 执行结果】"
async def run ( ) :
task_list = [ ]
for i in range ( 5 ) :
task = asyncio. create_task( func( i) )
task_list. append( task)
done, pending = await asyncio. wait( task_list)
for done_task in done:
print ( ( f"[ { current_time( ) } ]得到执行结果 { done_task. result( ) } " ) )
def main ( ) :
loop = asyncio. get_event_loop( )
loop. run_until_complete( run( ) )
if __name__ == '__main__' :
main( )
相同: 从功能上看, asyncio. wait 和 asyncio. gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
不同: asyncio. wait 会返回两个值: done 和 pending , done 为已完成的协程 Task , pending 为超时未完成的协程 Task ,需通过 future. result 调用 Task 的 result ; 而 asyncio. gather 返回的是所有已完成 Task 的 result ,不需要再进行调用或其他操作,就可以得到全部结果。
+ asyncio. wait 用法:
最常见的写法是: `await asyncio. wait( task_list) 。`
+ asyncio. gather 用法:
最常见的用法是: `await asyncio. gather( * task_list) ` ,注意这里 `task_list` 前面有一个 `* `。
pip install aiohttp
+ 首先是学习客户端,也就是用来发送http请求的用法。首先看一段代码,会在代码中讲述需要注意的地方:
import aiohttp
import asyncio
async def main ( ) :
async with aiohttp. ClientSession( ) as session:
async with session. get( 'http://httpbin.org/get' ) as resp:
print ( resp. status)
print ( await resp. text( ) )
asyncio. run( main( ) )
+ 代码解释:
在网络请求中,一个请求就是一个会话,然后** aiohttp** 使用的是** ClientSession** 来管理会话,所以第一个重点,看一下** ClientSession** :
然后上面的代码就是实例化一个* ClientSession* 类然后命名为session,然后用session去发送请求。
+ 有时候在发起网络请求的时候需要附加一些参数到url中,这一点也是支持的。
import aiohttp
import asyncio
async def main ( ) :
async with aiohttp. ClientSession( ) as session:
params = { 'key1' : 'value1' , 'key2' : 'value2' }
async with session. get( 'http://httpbin.org/get' ,
params= params) as resp:
print ( resp. url)
asyncio. run( main( ) )
+ 我们可以读取到服务器的响应状态和响应内容,这也是使用请求的一个很重要的部分。
通过`status`来获取响应状态码,`text( ) `来获取到响应内容
async def main ( ) :
async with aiohttp. ClientSession( ) as session:
async with session. get( 'http://httpbin.org/get' ) as resp:
print ( resp. status)
print ( await resp. text( encoding= utf- 8 ) )
await resp. read( )
将`text( ) `方法换成`read( ) `方法就好。
+ 1 、自定义Headers
headers = {
"User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/78.0.3904.108 Safari/537.36"
await session. post( url, headers= headers)
+ 2 、如果出现ssl验证失败的处理
import aiohttp
import asyncio
from aiohttp import TCPConnector
async def main ( ) :
async with aiohttp. ClientSession( connector= TCPConnector( ssl= False ) ) as session:
asyncio. run( main( ) )
+ 3 、自定义cookie
url = 'http://httpbin.org/cookies'
cookies = { 'cookies_are' : 'working' }
async with ClientSession( cookies= cookies) as session:
async with session. get( url) as resp:
assert await resp. json( ) == {
"cookies" : { "cookies_are" : "working" } }
+ 4 、使用代理
有时候在写爬虫的时候需要使用到代理,所以* aiohttp* 也是支持使用代理的
但是有一个很难受的地方就是它只支持`http`代理,不支持** HTTPS** 代理。使用起来大概是这样:
proxy = "http: // 127.0 .0 .1 : 10809
async with aiohttp. ClientSession( headers= headers) as session:
async with session. get( url= login_url, proxy= proxy) as response:
resu = await response. text( )
pip install aiofiles
1 、打开文件
import asyncio
import aiofiles
async def main ( ) :
async with aiofiles. open ( 'first.m3u8' , mode= 'r' ) as f:
contents = await f. read( )
print ( contents)
if __name__ == '__main__' :
asyncio. run( main( ) )
2 、迭代
import asyncio
import aiofiles
async def main ( ) :
async with aiofiles. open ( 'filename' ) as f:
async for line in f:
print ( line)
if __name__ == '__main__' :
asyncio. run( main( ) )
1.5 、并发控制
semaphore = asyncio. Semaphore( 10 )
import asyncio
import os
import aiofiles
import aiohttp
import requests
from bs4 import BeautifulSoup
def get_page_source ( web) :
headers = {
'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36'
response = requests. get( web, headers= headers)
response. encoding = 'utf-8'
return response. text
def parse_page_source ( html) :
book_list = [ ]
soup = BeautifulSoup( html, 'html.parser' )
a_list = soup. find_all( 'div' , attrs= { 'class' : 'mulu-list quanji' } )
for a in a_list:
a_list = a. find_all( 'a' )
for href in a_list:
chapter_url = href[ 'href' ]
book_list. append( chapter_url)
return book_list
def get_book_name ( book_page) :
book_number = book_page. split( '/' ) [ - 1 ] . split( '.' ) [ 0 ]
book_chapter_name = book_page. split( '/' ) [ - 2 ]
return book_number, book_chapter_name
async def aio_download_one ( chapter_url, signal) :
number, c_name = get_book_name( chapter_url)
for c in range ( 10 ) :
try :
async with signal:
async with aiohttp. ClientSession( ) as session:
async with session. get( chapter_url) as resp:
page_source = await resp. text( )
soup = BeautifulSoup( page_source, 'html.parser' )
chapter_name = soup. find( 'h1' ) . text
p_content = soup. find( 'div' , attrs= { 'class' : 'neirong' } ) . find_all( 'p' )
content = [ p. text + '\n' for p in p_content]
chapter_content = '\n' . join( content)
if not os. path. exists( f' { book_name} / { c_name} ' ) :
os. makedirs( f' { book_name} / { c_name} ' )
async with aiofiles. open ( f' { book_name} / { c_name} / { number} _ { chapter_name} .txt' , mode= "w" ,
encoding= 'utf-8' ) as f:
await f. write( chapter_content)
print ( chapter_url, "下载完毕!" )
return ""
except Exception as e:
print ( e)
print ( chapter_url, "下载失败!, 重新下载. " )
return chapter_url
async def aio_download ( url_list) :
tasks = [ ]
semaphore = asyncio. Semaphore( 10 )
for h in url_list:
tasks. append( asyncio. create_task( aio_download_one( h, semaphore) ) )
await asyncio. wait( tasks)
if __name__ == '__main__' :
url = 'https://www.51shucheng.net/daomu/guichuideng'
book_name = '鬼吹灯'
if not os. path. exists( book_name) :
os. makedirs( book_name)
source = get_page_source( url)
href_list = parse_page_source( source)
loop = asyncio. get_event_loop( )
loop. run_until_complete( aio_download( href_list) )
loop. close( )
2.1 概述及安装
+ selenium本身是一个自动化测试工具。它可以让python代码调用浏览器。并获取到浏览器中加载的各种资源。
我们可以利用selenium提供的各项功能。 帮助我们完成数据的抓取。
+ 安装:pip install selenium
+ 它与其他库不同的地方是他要启动你电脑上的浏览器, 这就需要一个驱动程序来辅助.
chrome驱动地址: http: // chromedriver. storage. googleapis. com/ index. html
然后关键的来了. 把你下载的浏览器驱动放在python解释器所在的文件夹
Windwos查看Python路径: py - 0p
+ 前期准备工作完毕,上代码看看 感受一下selenium:
from selenium. webdriver import Chrome
web = Chrome( )
web. get( "http://www.baidu.com" )
print ( web. title)
运行一下你会发现神奇的事情发生了. 浏览器自动打开了. 并且输入了网址. 也能拿到网页上的title标题
from selenium import webdriver
from selenium. webdriver. common. by import By
driver = webdriver. Chrome( )
driver. get( "http://www.baidu.com/" )
driver. save_screenshot( "baidu.png" )
driver. find_element( By. ID, "kw" ) . send_keys( "杜卡迪" )
driver. find_element( By. ID, "su" ) . click( )
driver. page_source
driver. get_cookies( )
driver. current_url
driver. close( )
driver. quit( )
2.3.1 元素定位的八种方法:
1 、By. ID 使用id 值定位
el = driver. find_element( By. ID, '' )
el = driver. find_element_by_id( )
2 、By. XPATH 使用xpath定位
el = driver. find_element( By. XPATH, '' )
el = driver. find_element_by_xpath( )
3 、By. TAG_NAME. 使用标签名定位
el = driver. find_element( By. TAG_NAME, '' )
el = driver. find_element_by_tag_name( )
4 、By. LINK_TEXT使用超链接文本定位
el = driver. find_element( By. LINK_TEXT, '' )
el = driver. find_element_by_link_text( )
5 、By. PARTIAL_LINK_TEXT 使用部分超链接文本定位
el = driver. find_element( By. PARTIAL_LINK_TEXT , '' )
el = driver. find_element_by_partial_link_text( )
6 、By. NAME 使用name属性值定位
el = driver. find_element( By. NAME, '' )
el = driver. find_element_by_name( )
7 、By. CLASS_NAME 使用class 属性值定位
el = driver. find_element( By. CLASS_NAME, '' )
el = driver. find_element_by_class_name( )
8 、By. CSS_SELECTOR 使用css选择器定位
el = driver. find_element( By. CSS_SELECTOR, '' )
el = driver. find_element_by_css_selector( )
** 注意:**
+ `by_link_text`和`by_partial_link_text`的区别:
> find_element_by_xxx方法仅仅能够获取元素对象,接下来就可以对元素执行以下操作 从定位到的元素中提取数据的方法
1. 从定位到的元素中获取数据
el. get_attribute( key)
el. text
2. 对定位到的元素的操作
el. click( )
el. submit( )
el. clear( )
el. send_keys( data)
from selenium import webdriver
from selenium. webdriver. common. by import By
driver = webdriver. Chrome( )
driver. get( "https://www.douban.com/" )
print ( driver. page_source)
ret4 = driver. find_elements( By. TAG_NAME, "h1" )
print ( ret4[ 0 ] . text)
ret5 = driver. find_elements( By. LINK_TEXT, "下载豆瓣 App" )
print ( ret5[ 0 ] . get_attribute( "href" ) )
driver. close( )
我们已经基本了解了selenium的基本使用了. 但是呢, 不知各位有没有发现, 每次打开浏览器的时间都比较长. 这就比较耗时了
我们写的是爬虫程序. 目的是数据. 并不是想看网页. 那能不能让浏览器在后台跑呢? 答案是可以的
from selenium. webdriver import Chrome
from selenium. webdriver. chrome. options import Options
opt = Options( )
opt. add_argument( "--headless" )
opt. add_argument( '--disable-gpu' )
opt. add_argument( "--window-size=4000,1600" )
web = Chrome( options= opt)
2.4.2、selenium 处理cookie
+ 获取cookie
dictCookies = driver. get_cookies( )
+ 设置cookie
driver. add_cookie( dictCookies)
+ 删除cookue
driver. delete_cookie( "CookieName" )
driver. delete_all_cookies( )
- 为什么需要等待
- 页面等待的三种方法
1 、强制等待
import time
time. sleep( n)
2 、显式等待( 自动化web测试使用,爬虫基本不用)
from selenium. webdriver. common. keys import Keys
from selenium. webdriver. common. by import By
from selenium. webdriver. support. ui import WebDriverWait
from selenium. webdriver. support import expected_conditions as EC
WebDriverWait( driver, 10 , 0.5 ) . until( EC. presence_of_element_located( ( By. ID, "myDynamicElement" ) )
3 、隐式等待
driver. implicitly_wait( 10 )
+ 一个浏览器肯定会有很多窗口,所以我们肯定要有方法来实现窗口的切换。切换窗口的方法如下:
也可以使用 window_handles 方法来获取每个窗口的操作对象。例如:
current_windows = driver. window_handles
driver. switch_to. window( current_windows[ 1 ] )
driver. switch_to. window( web. window_handles[ - 1 ] )
driver. switch_to. window( current_windows[ 0 ] )
+ 当你触发了某个事件之后,页面出现了弹窗提示,处理这个提示或者获取提示信息方法如下:
alert = driver. switch_to_alert( )
2.4.5. 页面前进和后退
driver. forward( )
driver. back( )
driver. refresh( )
driver. close( )
driver. maximize_window( )
- 优点
- selenium能够执行页面上的js,对于js渲染的数据和模拟登陆处理起来非常容易
- 使用难度简单
- 爬取速度慢,爬取频率更像人的行为,天生能够应对一些反爬措施
- 缺点
- 由于selenium操作浏览器,因此会将发送所有的请求,因此占用网络带宽
- 由于操作浏览器,因此占用的内存非常大( 相比较之前的爬虫)
- 速度慢,对于效率要求高的话不建议使用