高性能Python:Asyncio

并发编程是一种处理多个任务同时执行的编程方法。在Python中,asyncio是实现异步编程的强大工具。基于协程的概念,asyncio能够高效地处理I/O密集型任务。本文将介绍asyncio的基本原理和使用方法。

图片描述

我们为什么需要 asyncio

我们知道,在处理 I/O 操作时,使用多线程相比于普通的单线程可以大大提高效率。那么,为什么我们仍然需要 asyncio 呢?

多线程有许多优点,并且被广泛使用,但它也存在某些限制:

  • 例如,多线程的运行过程容易被中断,因此可能会出现竞争条件的情况。
  • 此外,线程切换本身也有一定的成本,并且线程的数量不能无限增加。因此,如果你的 I/O 操作非常繁重,多线程可能无法满足高效率和高质量的要求。

正是为了解决这些问题,asyncio 应运而生。

同步与异步

首先,我们来区分同步(Sync)和异步(Async)的概念。

  • 同步(Sync)意味着操作是一个接一个地执行。下一个操作只能在前一个操作完成后才能执行。
  • 异步(Async)意味着不同的操作可以交替执行。如果其中一个操作被阻塞,程序不会等待,而是会寻找可执行的操作继续进行。

asyncio 的工作原理

  1. 协程asyncio 使用协程来实现异步操作。协程是一种使用 async 关键字定义的特殊函数。在协程中,可以使用 await 关键字来暂停当前协程的执行,并等待异步操作的完成。
  2. 事件循环:事件循环是 asyncio 的核心机制之一。它负责调度和执行协程,并处理协程之间的切换。事件循环会不断轮询可执行的任务。一旦任务准备就绪(例如,当 I/O 操作完成或定时器到期时),事件循环会将其放入执行队列,并继续处理下一个任务。
  3. 异步任务: 在 asyncio 中,我们通过创建异步任务来执行协程。异步任务是通过 asyncio.create_task() 函数创建的,该函数将协程封装成一个可等待对象,并将其提交给事件循环进行处理。
  4. 异步 I/O 操作: asyncio 提供了一组异步 I/O 操作(例如网络请求、文件读写等),可以通过 await 关键字与协程和事件循环无缝集成。通过使用异步 I/O 操作,可以避免在等待 I/O 完成时的阻塞,从而提高程序性能和并发性。
  5. 回调函数: asyncio 还支持使用回调函数来处理异步操作的结果。可以使用 asyncio.ensure_future() 函数将回调函数封装成一个可等待对象,并将其提交给事件循环进行处理。
  6. 并发执行: asyncio 可以并发执行多个协程任务。事件循环将根据任务的就绪情况自动调度协程的执行,从而实现高效的并发编程。

总之,asyncio 的工作原理基于协程和事件循环的机制。通过使用协程进行异步操作,并由事件循环负责协程的调度和执行,asyncio 实现了高效的异步编程模型。

协程与异步编程

协程是 asyncio 中一个重要的概念。它们是轻量级的执行单元,可以在任务之间快速切换,而无需线程切换的开销。协程可以使用 async 关键字定义,await 关键字用于暂停协程的执行,并在某个操作完成后恢复执行。

以下是一个简单的示例代码,演示如何使用协程进行异步编程:

import asyncio

async def hello():
    print("Hello")

await asyncio.sleep(1)  # 模拟一个耗时操作
    print("World")

# 创建事件循环
loop = asyncio.get_event_loop()

# 将协程添加到事件循环并执行
loop.run_until_complete(hello())

在这个例子中,函数 hello() 是一个使用 async 关键字定义的协程。在协程内部,我们可以使用 await 来暂停其执行。这里,asyncio.sleep(1) 被用来模拟一个耗时的操作。run_until_complete() 方法将协程添加到事件循环中并运行它。

异步 I/O 操作

asyncio 主要用于处理 I/O 密集型任务,例如网络请求、文件读写。它提供了一系列用于异步 I/O 操作的 API,可以与 await 关键字结合使用,以轻松实现异步编程。

以下是一个简单的示例代码,展示如何使用 asyncio 进行异步网络请求:

import asyncio
import aiohttp

async def fetch(session, url):

async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'https://www.example.com')
        print(html)

# 创建事件循环
loop = asyncio.get_event_loop()

# 将协程添加到事件循环并执行
loop.run_until_complete(main())
在这个例子中,我们使用 aiohttp 库进行网络请求。函数 fetch() 是一个协程。它通过 session.get() 方法发起异步 GET 请求,并使用 await 关键字等待响应返回。函数 main() 是另一个协程。它在内部创建一个 ClientSession 对象以供重用,然后调用 fetch() 方法获取网页内容并打印出来。

注意:这里我们使用 aiohttp 而不是 requests 库,因为 requests 库与 asyncio 不兼容,而 aiohttp 库则兼容。为了充分利用 asyncio,特别是发挥其强大的功能,在许多情况下,需要相应的 Python 库。

多任务的并发执行

asyncio 还提供了一些机制来并发执行多个任务,例如 asyncio.gather()asyncio.wait()。以下是一个示例代码,展示了如何使用这些机制并发执行多个协程任务:

import asyncio

async def task1():
    print("任务 1 开始")
    await asyncio.sleep(1)


print("任务 1 完成")

async def task2():
    print("任务 2 开始")
    await asyncio.sleep(2)
    print("任务 2 完成")

async def main():
    await asyncio.gather(task1(), task2())

# 创建事件循环
loop = asyncio.get_event_loop()

# 将协程添加到事件循环并执行

loop.run_until_complete(main())

在这个例子中,我们定义了两个协程任务 task1()task2(),这两个任务都执行一些耗时的操作。协程 main() 通过 asyncio.gather() 同时启动这两个任务,并等待它们完成。并发执行可以提高程序的执行效率。

如何选择?

在实际项目中,我们应该选择多线程还是 asyncio?一位专家生动地总结道:

if io_bound:
    if io_slow:
        print('使用 Asyncio')
    else:
        print('使用多线程')
elif cpu_bound:
    print('使用多进程')
    • 如果它是I/O密集型并且I/O操作较慢,需要多个任务/线程的协作,那么使用asyncio更为合适。

 

    • 如果它是I/O密集型但I/O操作较快,仅需要有限数量的任务/线程,那么多线程就足够了。

 

    • 如果它是CPU密集型,那么需要多进程来提高程序运行效率。

实践

 

输入一个列表。对于列表中的每个元素,我们想要计算从0到该元素的所有整数的平方和。

 

同步实现

import time

def cpu_bound(number):

return sum(i * i for i in range(number))

def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)

end_time = time.perf_counter()
print('计算耗时 {} 秒'.format(end_time - start_time))

if __name__ == '__main__':
    main()

执行时间为 计算耗时 16.00943413000002 秒

使用 concurrent.futures 的异步实现

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

def cpu_bound(number):
    return sum(i * i for i in range(number))

def calculate_sums(numbers):
    with ProcessPoolExecutor() as executor:


results = executor.map(cpu_bound, numbers)
results = [result for result in results]
print(results)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)

在这段代码中,`results` 变量通过 `executor.map` 方法调用 `cpu_bound` 函数并传入 `numbers` 列表进行计算。接着,`results` 变量被重新赋值为一个列表,该列表包含了从 `results` 中提取的每个 `result`。最后,使用 `print` 函数输出 `results` 的内容。

`main` 函数的定义中,首先记录了开始时间 `start_time`,然后生成了一个包含从 10000000 开始的 20 个数字的列表 `numbers`,最后调用 `calculate_sums` 函数并传入 `numbers` 列表进行计算。


end_time = time.perf_counter()
    print('计算耗时 {} 秒'.format(end_time - start_time))

if __name__ == '__main__':
    main()

执行时间为 计算耗时 7.314132894999999 秒

在这段改进的代码中,我们使用 concurrent.futures.ProcessPoolExecutor 来创建一个进程池,然后使用 executor.map() 方法提交任务并获取结果。请注意,在使用 executor.map() 之后,如果需要获取结果,可以将结果迭代为列表或使用其他方法处理结果。

多进程实现

import time
import multiprocessing

def cpu_bound(number):
    return sum(i * i for i in range(number))

def calculate_sums(numbers):


with multiprocessing.Pool() as pool:
    pool.map(cpu_bound, numbers)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()

print('计算耗时 {} 秒'.format(end_time - start_time))

if __name__ == '__main__':
    main()

执行时间为 计算耗时 5.024221667 秒

concurrent.futures.ProcessPoolExecutormultiprocessing 都是用于在 Python 中实现多进程并发的库。它们之间有一些区别:

  1. 基于接口的封装concurrent.futures.ProcessPoolExecutorconcurrent.futures 模块提供的高层接口。它封装了底层的多进程功能,使得编写多进程代码变得更加简单。而 multiprocessing 是 Python 的标准库之一,提供完整的多进程支持,并允许对进程进行直接操作。
  2. API 使用concurrent.futures.ProcessPoolExecutor 的使用方式类似于线程池。它将可调用对象(如函数)提交到进程池中执行,并返回一个 Future 对象,该对象可用于获取执行结果。multiprocessing 提供了更低级的进程管理和通信接口。可以显式创建、启动和控制进程,并且可以使用队列或管道在多个进程之间进行通信。

可扩展性和灵活性:由于 multiprocessing 提供了更低级的接口,因此相比于 concurrent.futures.ProcessPoolExecutor 更加灵活。通过直接操作进程,可以对每个进程实现更细粒度的控制,例如设置进程优先级和在进程之间共享数据。concurrent.futures.ProcessPoolExecutor 更适合于简单的任务并行化,隐藏了许多底层细节,使得编写多进程代码变得更加容易。

  • 跨平台支持concurrent.futures.ProcessPoolExecutormultiprocessing 都提供跨平台的多进程支持,可以在各种操作系统上使用。

 

总之,concurrent.futures.ProcessPoolExecutor 是一个高层接口,封装了底层的多进程功能,适合于简单的多进程任务并行化。multiprocessing 是一个更低级的库,提供了更多的控制和灵活性,适合于需要对进程进行细粒度控制的场景。您需要根据具体需求选择合适的库。如果只是简单的任务并行化,可以使用 concurrent.futures.ProcessPoolExecutor 来简化代码;如果需要更多的低级控制和通信,则可以使用 multiprocessing 库。


摘要

 

与多线程不同,asyncio 是单线程的,但其内部事件循环的机制允许它同时运行多个不同的任务,并且比多线程具有更大的自主控制能力。

 

asyncio 中的任务在执行过程中不会被中断,因此不会出现竞争条件的情况。

 

特别是在重 I/O 操作的场景中,asyncio 的运行效率高于多线程。因为在 asyncio 中,任务切换的成本远小于线程切换的成本,并且 asyncio 可以启动的任务数量远大于多线程中的线程数量。

 

然而,需要注意的是,在许多情况下,使用 asyncio 需要特定第三方库的支持,例如前面示例中的 aiohttp。如果 I/O 操作快速且不繁重,使用多线程也可以有效解决问题。

  • asyncio 是一个用于实现异步编程的 Python 库。
  • 协程是 asyncio 的核心概念,通过 asyncawait 关键字实现异步操作。
  • asyncio 提供强大的 API 用于异步 I/O 操作,并且可以轻松处理 I/O 密集型任务。
  • 通过诸如 asyncio.gather() 的机制,可以并发执行多个协程任务。

更多