异步爬虫-协程的使用

协程的基本原理

  要想实现异步机制的爬虫,自然和协程逃脱不了关系。

1.案例引入

  在介绍协程之前,先引入https://www.httpbin.org/delay/5这个网站,访问这个网站需要服务器强制等待5秒才会返回响应。
  我们使用requests写一个遍历程序,直接遍历100次该网站,看下需要多久时间。

import requests
import logging
import time

logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')

TOTAL_NUMBER = 10
url = 'https://www.httpbin.org/delay/5'

start_time = time.time()

for _ in range(1,TOTAL_NUMBER+1):
    logging.info('scraping %s',url)
    response = requests.get(url)

end_time = time.time()
logging.info('total time %s seconds',end_time-start_time)

  这里使用的requests是单线程,由于每个页面都要等待至少5秒,请求100次至少花费500秒加上网站本身负载问题,总时间大约会在10分钟以上,耗时比较久,所以开启多线程爬取非常有必要。

2.基础概念知识

  • 阻塞
      程序在等待某个操作完成期间,自身无法干别的事情,则该程序在操作上是阻塞的。
  • 非阻塞
      程序在等待某个操作期间,自身不被阻塞可以继续干别的事情,则该程序在操作上是非阻塞的。
  • 同步
      不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通讯方式保持一致,此时这些程序单元是同步执行的。
      同步意味着有序。
  • 异步
      异步意味着无序。
  • 多进程
      就是利用cpu的多核优势,在同一时间内并行执行多个任务。
  • 协程
      又称微线程,是一种运行在用户态的一种轻量级线程。协程本质上是一个单进程。
      进程是线程的集合,一个任务对应一个线程。

3.协程的用法

  python中使用协程最常用的库莫过于asyncio。

  • event_loop:相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就调用对应的处理办法。
  • coroutine:协程。我们可以使用async关键字来定义一个方法,这个方法在调用时不会立即执行,而是会返回一个协程对象。
  • task:任务,这是对协程对象的进一步封装,包括协程对象的各个状态。
  • future:代表将来执行或者没有执行的结果,和task没有本质区别。
      async和await。前者来定义一个协程,后者用来挂起阻塞方法的执行。

4.准备工作

  Python3.5及以上。

5.定义协程

  先来定义一个协程,体验下它和普通进程实现上的不同之处。

import asyncio

async def execute(x):
    print('number:',x)

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling exxecute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

  首先,引入了asyncio包,这样才可以使用async和await关键字。然后使用了async定义了一个execute方法,该方法接受一个数字参数x,执行之后打印该数字。
  随后我们直接调用了execute方法,然而这个方法并没有执行,而是返回了一个coroutine对象。之后使用get_event_loop方法创建了一个时间循环loop,并调用loop对象的run_until_complete方法将协程对象注册到了事件循环中,接着启动,最后才能看到execute打印出了收的数字。
  由此可见,async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
  上面这个例子我们把协程对象coroutine传递给run_until_complete方法的时候,实际上就是将coroutine封装成task对象。task它是对协程对象的进一步封装,比协程对象多了运行状态,例如running,finished等,我们可以利用这些状态获得协程对象的执行情况。相比于上个例子如果想要显示的进行声明,代码如下:

import asyncio 

async def execute(x):
    print('Number',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:',task)
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

  这里定义了loop对象后,紧接着调用了他的create_task方法,将协程对象转化为task对象,随后打印输出发现处于pending状态。然后将task对象添加到事件循环中执行,并再次打印出task对象,发现其状态变成了finished,同时还可以看到其result变成了1,也就是我们定义的execute方法返回的结果。
  定义task对象还有另外一种方式,就是直接调用asyncio包的ensure_future方法,返回结果也是task对象,这样的话可以不借助loop对象。即使还没有声明loop,也可以提前定义好task对象。

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')

task = asyncio.ensure_future(coroutine)
print('Task:',task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

  可以看到,运行效果是一致的。

6.绑定回调

  我们也可以为某个task对象绑定一个回调方法。示例如下:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(task):
    print('Status:',task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:',task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)

  这里定义了request方法,用这个方法请求百度,并获取其状态码,但是没有编写任何print语句。随后定义了callback方法,这个方法接收一个参数,参数是task对象,在这个方法中调用print方法打印出了task对象的结果。这样就定义好了一个协程对象和一个回调方法。希望达到的小伙就是:当协程对象执行完毕后,就去执行声明的cellback方法。
  两者关联起来,只需调用add_done_callback方法。我们将callback方法传递给封装好的task对象,这样当task执行完毕后,就可以调用callback方法了。同时task对象还会作为参数传递给callback方法,调用task对象的result方法就可以获取任何返回结果了。
  实际上,即使不使用回调方法,在task执行完毕后,也可以直接调用result方法获取结果。

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:',task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('Task Result:',task.result())

7.多任务协程

  在上面的例子中,我们都只执行了一次请求,如果想执行多次请求,可以定义一个task列表,然后用asyncio中wait方法执行。

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    Status = requests.get(url)
    return Status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:',tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:',task.result())

  这里我们使用for循环创建了5个task,它们组成一个列表,然后把这个列表优先传递给asyncio包的wait方法,再将其注册到事件循环中,就可以发起5个任务了。最后,输出任务的执行结果。

8.协程实现

  接下来,我们正式看看协程在解决IO密集型任务方面有什么优势?
  在第一个代码中,我们用一个网络请求作为例子,这本身就是一个耗时等待的操作,因为在请求网页之后需要等待页面响应并返回结果。耗时等待操作一般都是IO操作,例如文件读取、网络请求等。协程在处理这种操作时是有很大优势的,但遇到需要等待的情况时,程序暂时挂起,转而执行其他操作,从而避免一直等待一个程序而耗费更多的时间,能够充分利用资源。
  首先,我们利用开头的访问的网站示例,演示下常犯的错误,后面再给出正确的例子做对比。这里还是拿之前的requests库进行请求。

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = requests.get(url)
    print('Get response from',url,'response',response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time',end-start)

  这里我们创建了10个task,然后将task列表传给wait方法并注册到事件循环中执行。但是运行后发现,这和正常请求没区别,依然是顺次执行的,耗费时长依然很久。为什么呢?
  其实,要进行异步处理,先得有挂起操作。上面方法都是一连串的执行下来,连挂起都没有,肯定不可能实现异步。
  awit关键字它可以将等待的操作进行挂起,转而执行别的协程,直到其他协程挂起或关闭。
  所以,代码的请求我们可以更改下。

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = await requests.get(url)
    print('Get response from',url,'response',response)

  更改后执行之前的代码,发现会报如下错误。
在这里插入图片描述
  这次确实挂起了也等待了,但是却报错。这个错误意思是requests返回的Response对象不能和await一起使用,根据官方文档说明,await后面的对象必须是如下格式之一。

  • 一个原生协程对象;
  • 一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象;
  • 由一个包含_await_方法的对象返回的一个迭代器。
      这里requests返回的Response都不符合,所以报错。
      既然await后面可以跟随一个协程对象,那么async把请求的方法改成协程对象可以吗?
import asyncio
import requests
import time

start = time.time()

async def get(url):
    return requests.get(url)

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = await get(url)
    print('Get response from',url,'response',response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time',end-start)

  这里将请求页面的方法独立出来,并用async修饰,就得到了一个协程对象。
  但是运行后发现,依然不是异步执行的,也就是说仅仅将涉及IO操作的代码封装到async修饰的方法里是不可行的。只有使用异步操作的请求方式才可以实现真正的异步,这里aiohttp就派上用场了。

9.使用aiohttp

  aiohttp是一个支持异步请求的库,它和asyncio配合使用,可以非常方便的进行异步请求操作。
  下面使用aiohttp将代码改写下:

import asyncio 
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response

async def resquest():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = await get(url)
    print('Get response from',url,'response',response)

tasks = [asyncio.ensure_future(resquest()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time',end-start)

  运行后我们将发现,10次请求的耗时由之前的60秒已经缩短到6秒多。
  这里我们使用了await,其后面跟着get方法。在执行10个协程的时候,如果遇到await,就会将当前协程挂起,转而执行其他协程,直到其他协程也挂起或执行完毕,再执行下一个协程。
  开始运行时,事件循环会执行第一个task,对于第一个task来说,当执行到第一个await跟着的get方法时,它会被挂起,但是第一个get方法是非阻塞的,挂起后会被立马唤醒,立即又进入执行,并创建了ClientSession对象。接着又遇到第二个await,session.get,然后就被挂起,这个请求的时间比较长,所以一直没有被唤醒,好在这个task已经被挂起了,接下来事件循环会找到当前未被挂起的协程继续执行,就去执行第二个task,以此类推,直到第十个循环task的第二个session.get也被挂起,这时全部task都被挂起了,只能等待,5秒之后几个请求几乎都会有响应,然后几个task也被唤醒接着执行,并输出结果。
  从理论上来说,只要服务器无限抗压,而且可以忽略IO传输时延,可以做到无限个task一起执行,并且在理想时间内得到结果。但是由于不同服务器处理task的实现机制不同,可能某些服务器不能承受那么么高的并发量,因此响应速度也会减慢。

import asyncio 
import aiohttp
import time

def test(number):
    start = time.time()

    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        await response.text()
        await session.close()
        return response
    
    async def request():
        url = 'https://www.baidu.com/'
        await get(url)

    tasks = [asyncio.ensure_future(request()) for _ in range(number)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

    end = time.time()
    print('Number:',number,'Cost time:',end - start)

# for number in [1,3,5,10,15,30,75,100,200,500]:
#    test(number)
 for number in [500]:
     test(number)

  可以看到在服务器能够承受的情况下,即使增加了并发量,爬取速度也不会受太大影响。
  综上所述,能够将异步请求应用到爬虫中,速度提升将会相当可观。