我们将会讲述一个故事,来解释 python 中并发与并行的不同之处。
在这个故事中,我们将看到一个单人进行多任务处理的场景(类似并发 ,以及一个多人分别处理自己任务的场景(类似并行 ;我们会站在餐厅的角度观察这些场景的实际效果,并观察它们如何快速有效地为顾客服务;然后我们将在 Python 中实现这些“餐厅”;最后,我们会比较这两种不同的并发选项,并解释如何择机使用它们。
解释的内容包含
让我们从定义开始
如果一个系统可以同时支持两个 多个正在进行中的操作,则称该系统是并发的。
如果一个系统可以支持同时执行两个 多个动作,则称该系统是并行的。
这些定义之间的关键概念和区别在于“进行中”这一短语。— 并发的艺术[1]
如果你被绕晕了,现在让我们直接通过制作午餐的故事来讲解。
在午餐时间,你拐进了一条之前从未注意到的街道。这里有两种可供选择的食物来源 一个叫做 Concurrent Burgers 的市场摊位和一个叫做 Parallel Salads 的商店。
两者看起来都很美味,但都在排长队,所以你想知道哪一个会先为你服务。
Concurrent Burgers 由一位手臂上有蟒蛇纹身的中年女士经营,她在工作时开怀大笑。她正在执行以下任务
她在每个任务之间无缝切换 有一刻,她正在检查烤架上的肉饼并将煮熟的取出,下一刻她正在接受订单,再下一刻如果有任何肉饼已经准备好,她会制作一个汉堡并完成这笔订单。
Parallel Salads 配备了许多相同的人,他们在工作时面带微笑并礼貌地交谈。他们每个人都为一位顾客制作沙拉。他们接受订单,将所有原料 一个新鲜的碗中,浇上调味汁,尽情地混合,在一个容器中装满一份健康的沙拉,然后丢掉碗。与此同时,另一个克隆人拿起脏碗并清洗它们。
两家的主要区别在于员工数量和执行任务的方式
你注意到 两家餐厅都以相同的速度为顾客提供服务。Concurrent Burgers 中的女士同时制作多个汉堡,并且受到她的小烤架输出熟肉饼的速度的限制。
Parallel Salads 雇用多名男子一次制作一份沙拉,并且受到将制作单份沙拉的材料放在一起所需时间长度的限制。
你很快意识到 Concurrent Burgers 受 I/O 限制,而 Parallel Salads 受 CPU 限制
在一个固执己见的朋友打断你并邀请你 他们的队列之前,你无法做出决定,你可能会在相同的状态下保持五分钟的困惑。
需要注意的是,Parallel Salads 是并发,也是并行的,因为“两个 多个操作同时进行”。并行处理是并发处理的一个子集。
这两个商店为并发和并行任务之间的区别提供了一种直观的视角。下面我们将研究如何在 Python 中实现这两者。
Python 有两个可用于并发的选项
同时它内置了这个库以实现并行性
在云上运行 Python 程序时,还有另一种并行选项
让我们看一下使用 threading 和 asyncio 的 Concurrent Burgers 的两种可能实现。在这两种情况下,都有一个工人接单、做肉饼和做汉堡。
对于 threading 和 asyncio,都只有一个处理器在运行,但它在需要执行的不同任务之间跳转。threading 和 asyncio 之间的区别在于如何切换任务。
通过 threading,工作人员可以在执行期间随时切换任务。这名工人正在下订单时突然切换到检查馅饼 制作汉堡,然后又随时切换到其他任务之一。
让我们来看一下使用 threading 实现的 Concurrent Burgers
from concurrent.futures import ThreadPoolExecutor
import queues
# Note: Some methods and variables are skipped
# to focus only on the threading details
def run_concurrent_burgers():
# Create blocking queues
customers = queue.Queue()
orders = queue.Queue(maxsize=5) # Process up to 5 orders at once
cooked_patties = queue.Queue()
# The grill is entirely independent of the worker,
# and turns raw patties into cooked patties.
# This is like reading from disk or doing a network request
grill = Grill()
# Run the three tasks using a thread pool executor
with ThreadPoolExecutor() as executor:
executor.submit(take_orders, customers, orders)
executor.submit(cook_patties, grill, cooked_patties)
executor.submit(make_burgers, orders, cooked_patties)
def take_orders(customers, orders):
while True:
customer = customers.get()
order = take_order(customer)
orders.put(order)
def cook_patties(grill, cook_patties):
for position in range(len(grill)):
grill[position] = raw_patties.pop()
while True:
for position, patty in enumerate(grill):
if patty.cooked:
cooked_patties.put(patty)
grill[position] = raw_patties.pop()
# Don't check again for another minute
threading.sleep(60)
def make_burgers(orders, cooked_patties):
while True:
patty = cooked_patties.get()
order = orders.get()
burger = order.make_burger(patty)
customer = order.shout_for_customer()
customer.serve(burger)
接受订单、烹饪肉饼和制作汉堡的每一项任务都是一个无限循环,不断执行其动作。
在 run_concurrent_burgers
中,我们在单独的线程中启动每个任务。我们可以为每个任务手动创建一个线程,但是有一个更好的接口,称为 ThreadPoolExecutor
,它为我们提交给它的每个任务创建一个线程。
当使用多个线程时,我们必须确保一次只有一个线程在读取 写入任何状态。否则我们可能会遇到两个线程拿着同一个馅饼的情况,我们最终会遇到一个相当愤怒的顾客;这个问题被称为线程安全。
为了避免这个问题,我们使用 Queues
来传递状态。在单个任务中,调用 get 时 Queues
会阻塞,直到有客户、订单 小馅饼准备好。操作系统不会尝试切换到任何被阻塞的线程,这为我们提供了一种安全切换状态的简单方法。只要将状态放入 Queues
线程不再使用它,那么获取状态的线程就知道它在使用时不会改变。
threading 的优点
threading 的缺点
在 asyncio 中有一个事件循环来管理所有任务。任务可以处于多种不同的状态,但最重要的两个状态是就绪 等待。在每个循环中,事件循环都会检查 是否有任何处于等待状态的任务由于另一个任务完成而准备就绪。然后它选择一个就绪任务并运行它,直到任务完成 需要等待另一个任务,这通常是一个 I/O 操作,比如从磁盘读取 发出一个 http 请求。
有两个关键字涵盖了 asyncio 的大部分用途 async 和 await。
让我们来看一下使用 asyncio 实现的 Concurrent Burgers
import asyncio
# Note: Some methods and variables are skipped
# to focus only on the asyncio details
def run_concurrent_burgers():
# These queues give up control
customers = asyncio.Queue()
orders = asyncio.Queue(maxsize=5) # Only process up to five orders at once
cooked_patties = asyncio.Queue()
# The grill runs entirely independently to the worker,
# and turn raw patties into cooked patties
grill = Grill()
# Run all tasks using the default asyncio event loop
asyncio.gather(
take_orders(customers, orders),
cook_patties(grill, cooked_patties),
make_burgers(orders, cooked_patties),
)
# Declare asyncio tasks with async def
async def take_orders(customers, orders):
while True:
# Allow switching to another task here
# and at all other awaits
customer = await customers.get()
order = take_order(customer)
await orders.put(order)
async def cook_patties(grill, cooked_patties):
for position in range(len(grill)):
grill[position] = raw_patties.pop()
while True:
for position, patty in enumerate(grill):
if patty.cooked:
# put_noawait allows us to add to the queue without
# creating a new task and giving up control
cooked_patties.put_noawait(patty)
grill[position] = raw_patties.pop()
# Wait 30 seconds before checking again
await asyncio.sleep(30)
async def make_burgers(orders, cooked_patties):
while True:
patty = await cooked_patties.get()
order = await orders.get()
burger = order.make_burger(patty)
customer = await order.shout_for_customer()
customer.serve(burger)
接受订单、烹饪肉饼和制作汉堡的每一项任务都是用 async def
声明的。在这些任务中,每次调用 await
时,worker 都会切换到一个新任务。会出现以下场景
接单的时候
做馅饼的时候
做汉堡的时候
最后一个难题是在 run_concurrent_burger
中,它调用 asyncio.gather
来安排所有任务由事件循环运行,在这种情况下,事件循环就是我们的工作人员。
正如我们确切地知道,任务切换时我们实际上不需要担心共享状态。我们可以只使用队列列表来实现这一点,并且知道两个任务不会意外地持有同一个馅饼。然而,强烈推荐使用 asyncio
队列,因为它们允许我们通过提供暂停当前任务的合理点来非常轻松地在任务之间进行协作。
使用 asyncio
的一个有趣方面是 async
关键字改变了函数的接口,因为它不能直接从非异步函数调用。这可以被认为是一件好事 坏事。一方面,你可以说它损害了可组合性,因为你不能混合 asyncio
和普通函数。另一方面,如果 asyncio
只用于 I/O,这会迫使 I/O 和业务逻辑分离,将 asyncio
代码限制在应用程序的边缘,并使代码库更易于理解和测试。显式标记 I/O 是类型函数式语言中相当普遍的做法 在 Haskell
中是必需的。
Asyncio 的优点
asyncio
此处有一些benchmarks[2]Asyncio 的缺点
好了,这就是上篇的内容。如果大家觉得本文内容有帮助,请点赞转发支持一下。下篇将介绍并行的实践,并且总结该如何从 4 种并行和并发方案中做出选择。请持续关注哦~
并发的艺术: https://www.oreilly.com/library/view/the-art-of/9780596802424/
[2]benchmarks: https://www.techempower.com/benchmarks/#section=data-r19&hw=ph&test=fortune&l=zijzen-1r
[3]参考原文: https://sourcery.ai/blog/concurrency/
- EOF -
1、一个 print 函数,挺会玩啊?
2、For-else Python中一个奇怪但有用的特性
3、比默认的 Python shell 好太多,IPython 实用小技巧合集
觉得本文对你有帮助?请分享给更多人
推荐关注「」,提升Python技能
点赞和在看就是最大的支持??
Powered by 小羊羔外链网 8.3.11
©2015 - 2024 小羊羔外链网
您的IP:3.15.218.254,2024-04-20 17:54:23,Processed in 0.04862 second(s).