在很多情况下,程序会进入“阻塞”状态,例如输入输出,访问某个网站,打开某个文件等等。当程序处于阻塞状态的时候,CPU 是不在工作的,造成了时间的浪费。

所以协程是解决这样一个问题,当程序某个时刻处于阻塞状态时,它可以暂停执行,去执行其它的任务,这样就节省了时间成本。

这似乎和线程池类似,但是从本质上来说两者不同。协程(在爬虫领域表现为异步爬虫)是单线程的。

提示:在本文中,暂时对 python 协程的实现不作解释,只提供理解。因为笔者还没看懂

asyncio

asyncio 是 python 中自带的、专门用处理 IO 异步的一个模块。

假设我们有两个任务要完成,分别放在函数 func1func2 中:

1
2
3
4
5
6
7
8
9
10
11
import time

def func1():
print("1")
time.sleep(3)
print("1.1")

def func2():
print("2")
time.sleep(2)
print("2.1")

其中,time.sleep() 方法模拟了每个任务所需要的时间。

下面只改变 func1func2 的改变完全类似。

首先,我们利用 async 关键字,将以上两个函数转化为协程对象(coroutine),代码如下:

1
2
3
4
async def func1():
print("1")
time.sleep(3)
print("1.1")

但是这样还不够,func1 中的 time.sleep() 方法是“同步”而非“异步”的。暂时可以这样理解:同步和异步程序在底层的实现不同,所以对于特定的需要异步来处理的程序,即导致 CPU 不再工作的“阻塞”程序,需要特定的,与同步程序不同的模块或 API。在这里,需要用 asyncio.sleep() 方法来替换。两者的效果,抛开同步还是异步不谈,完全相同。代码如下:

1
2
3
4
async def func1():
print("1")
asyncio.sleep(3) # 使用非阻塞的异步睡眠
print("1.1")

但是这样还不够,语法上会报错,我们还需要用 python 自带的 await 关键字显性的标注那些异步操作:

1
2
3
4
async def func1():
print("1")
await asyncio.sleep(3) # 使用非阻塞的异步睡眠
print("1.1")

这样,一个单独的任务已经被我们实现了。然后我们需要的是一个类似于线程池的东西,即如果一个任务阻塞了,可以去执行其它的任务。我们这样来实现:编写一个 main 函数,并在其中用 async.create_task() 方法来将协程对象(在本例中是func1,func2)转化成任务(Task)对象,并用 asyncio.gather() 方法进行执行。不要忘记 await凡是那些与异步有关的地方,都需要加上 await 关键字。这里写的或许过于笼统与不准确,但是对于异步爬虫目前是够用了。

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func1():
print("1")
await asyncio.sleep(3) # 使用非阻塞的异步睡眠
print("1.1")

async def func2():
print("2")
await asyncio.sleep(2) # 使用非阻塞的异步睡眠
print("2.1")

async def main():
f1 = asyncio.create_task(func1())
f2 = asyncio.create_task(func2())
await asyncio.gather(f1, f2);

if __name__ == '__main__':
asyncio.run(main());

aiohttp

上文已经说过,在异步爬虫中,以前用同步方法的地方要用异步方法进行替换。而 requests 模块的替代品就是 aiohttp 模块。

而在进行异步操作时,我们利用 async with 关键字,可以把这两个单词看成一个关键字,与 with 上下文管理器的底层实现略有不同。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import aiohttp
import asyncio

urls = [
"https://i1.huishahe.com/uploads/allimg/202206/9999/61c976a82d.jpg",
"https://i1.huishahe.com/uploads/tu/201911/9999/7b9852f16a.jpg",
"https://i1.huishahe.com/uploads/tu/201911/9999/766d82b8a0.jpg"
]

async def aiodownload(url: str):
name = url.split("/")[-1];
# aiohttp.ClientSession() # 等价于 requests 模块
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
with open(name, mode="wb") as f:
f.write(await resp.content.read());
# resp.text()
print(name, "搞定");

async def main():
tasks = [];
for url in urls:
tasks.append(asyncio.create_task(aiodownload(url)));

await asyncio.gather(*tasks);

if __name__ == '__main__':
asyncio.run(main());

可以看到还是略有不同,比如返回 byte 下的文件要用 read() 方法。

最后是利用协程爬取百度书库里面的西游记所有章节的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""
https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"}
包括 title, price_status, cid


章节内部的内容。
https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|1569782244","need_bookinfo":1}

是否用异步?请求第一个页面只需要一次,而爬取每一个页面的信息是需要异步的。

"""

import requests
import asyncio
import aiohttp
from lxml import etree
import json

def GetUrl(book_id: int, cid: int) ->str:
return f'https://dushu.baidu.com/api/pc/getChapterContent?data={'{"book_id":"'}{book_id}","cid":"{book_id}|{cid}","need_bookinfo":1{"}"}'

async def Download(url: str, id: int):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
with open(f"./西游记/第{id}话.txt", "w", newline="", encoding="utf-8") as f:
# f.write(resp.json()["data"]["noval"]["content"]);
# dic = await resp.json();
f.write((await resp.json())["data"]["novel"]["content"]);


async def main():
mainurl = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"}';
mainresp = requests.get(mainurl);
maindic = mainresp.json();
book_id = json.loads(mainurl.split("=")[-1])["book_id"];
print(book_id);
# print(maindic);
tasks = [];
i = 0;
for dic in maindic["data"]["novel"]["items"]:
i = i + 1;
nowurl = GetUrl(book_id, dic["cid"]);
# nowresp = requests.get(nowurl)
tasks.append(Download(nowurl, i));
# break;
await asyncio.gather(*tasks);

if __name__ == '__main__':
asyncio.run(main());

PS:笔者对于协程或者是 aiohttp 的理解并不深刻,所以这里挖一个坑,看看能否彻底搞懂协程的底层实现。