上次我们讨论使用进程和线程的异步编程时,这里有一个链接。 强烈建议在阅读本文之前先阅读这篇文章,因为我们将使用那里提到的概念。 今天我们将简要讨论何时使用什么以及生产者-消费者范式的实现。
流程 | 线程 |
进程适用于 CPU 利用率较高的应用程序。 考虑处理大量数字或处理大量数据。 这些任务最好跨进程或 CPU 执行。 | 线程适用于我们有耗时的 I/O 操作的场景,例如:文件 I/O、http 调用或套接字连接。 进行此类 I/O 调用的线程等待响应,而其他线程可能会继续运行。 |
生产者-消费者模式
生产者-消费者设计模式今天广泛用于一系列应用程序中。 大多数集成接口都使用这种方法。 消息队列 (MQ) 广泛使用此范例并将其扩展到发布者/订阅者模式、P2P 模式或推/拉模式。
使用这种模式的基本思想是将一组任务分配给异步运行的多个线程。 我们还可以实现生产者和消费者的链接,并使用既充当生产者又充当消费者的中间节点。
以 android 通知为例,服务器将通知推送(生成)到 FCM,FCM 使用它们并进一步推送到各个设备。
另一个现实生活中的例子可能是邮政服务,人们将邮件发送给邮政服务,后者消费它们,然后将它们生产给充当消费者的收件人。
队列
在处理生产者和消费者时,我们需要一个队列。 生产者需要一个地方来推送消息,消费者需要一个地方来读取它们。 考虑到队列是这里广泛使用的数据结构,我们通常希望以 FIFO 方式处理消息。 尽管 List 和 Arrays 等其他数据结构应该可以正常工作。 对生产者和消费者来说非常重要 分享 队列的同一个对象.
生产者
生产者是异步运行并在队列中添加消息的对象。 然后推送到队列中的消息可供消费者使用。 如果消息被消费,生产者不会打扰,他们的工作是有限的,它是推送消息。 可能有多个生产者向队列生成消息。
消费者
消费者不断轮询队列,等待消息到达队列。 一旦消息可用,消费者就开始处理消息。 可能有多个消费者通过队列轮询。
毒药(可选)
帮助确定何时返回正在运行的线程。 当生产者/消费者收到毒药时,这是他们停止的信号。 这确保在处理结束时没有挂起的线程。
自己做:
有多个单一生产者和消费者的例子。 让我们提高一个档次。
假设:
有一些任务(即。 初始任务, final_task)可以比其他人相对更快地完成并且有任务(即。 中介任务) 这需要大量时间来处理。
设计:
将只有一个线程用于运行 初始任务 另一个为 final_task 但是,将有三个线程专用于 中介任务. 只要 初始任务 完成后,它需要被可用的线程拾取,然后执行 中介任务. 一旦完成,结果就会被生成到最终队列中,然后被消耗以执行 final_task.
import time from queue import Queue from threading import Thread Poison = -1 class TimeConsumingNode(Thread): def __init__(self, consumer_queue: Queue = None, producer_queue: Queue = None, task_time: int = 0): super(TimeConsumingNode, self).__init__() self.consumer_queue = consumer_queue self.producer_queue = producer_queue self.task_time = task_time def run(self) -> None: global Poison while True: consumed_message = self.consumer_queue.get() print(self, f"consumed {consumed_message}") if consumed_message == Poison: print(self, "got poison") # Pass the poison to intermediary producers. self.consumer_queue.put(consumed_message) # Pass the poison to the final consumer. self.producer_queue.put(consumed_message) break # Mock processing the consumed message. time.sleep(self.task_time) # Mock some heavy tasks. time.sleep(self.task_time) print(self, f"took at least {self.task_time * 2} for consuming " f"{consumed_message} and for producing {consumed_message}") self.producer_queue.put(consumed_message) # Comparatively faster task node. class ProducerNode(Thread): def __init__(self, consumed_queue: Queue = None, produced_queue: Queue = None, task_time: int = 0): super(ProducerNode, self).__init__() self.consumed_queue = consumed_queue self.produced_queue = produced_queue self.task_time = task_time def run(self) -> None: global Poison while not self.consumed_queue.empty(): some_number = self.consumed_queue.get() time.sleep(self.task_time) print(self, f"Producing {some_number}") self.produced_queue.put(some_number) # Add poison. print(self, f"Adding poison") self.produced_queue.put(Poison) # Last task again comparatively faster one. class ConsumerNode(Thread): def __init__(self, consumed_queue: Queue = None, task_time: int = 0): super(ConsumerNode, self).__init__() self.consumed_queue = consumed_queue self.task_time = task_time self.poisons = 0 def run(self) -> None: global Poison while True: consumed_message = self.consumed_queue.get() if consumed_message == Poison: self.poisons = self.poisons + 1 print(self, f"got {self.poisons} poisons") if self.poisons == 3: break # Mock some time taking task time.sleep(self.task_time / 2) print(self, f"took {self.task_time / 2} to consume {consumed_message}") if __name__ == "__main__": my_queue = Queue() my_queue.put(2) my_queue.put(3) my_queue.put(4) my_queue.put(5) my_queue.put(6) intermediary_queue = Queue() final_queue = Queue() first_prod_node = ProducerNode(consumed_queue=my_queue, produced_queue=intermediary_queue, task_time=3) tc_node_1 = TimeConsumingNode(consumer_queue=intermediary_queue, producer_queue=final_queue, task_time=4) tc_node_2 = TimeConsumingNode(consumer_queue=intermediary_queue, producer_queue=final_queue, task_time=5) tc_node_3 = TimeConsumingNode(consumer_queue=intermediary_queue, producer_queue=final_queue, task_time=2) final_node = ConsumerNode(consumed_queue=final_queue, task_time=1) first_prod_node.start() tc_node_1.start() tc_node_2.start() tc_node_3.start() final_node.start() first_prod_node.join() tc_node_1.join() tc_node_2.join() tc_node_3.join() final_node.join() |
输出:
风险:
如果实施不当,可能会导致
- 挂线
- 死锁
- 读取队列时出现 IndexOutOfBound 异常。
结论
生产者-消费者模式是一种非常有用的设计,可以在不同程度上加以利用,以实现多个耗时任务的异步处理。 该概念已被广泛纳入现代消息队列,即。 卡夫卡,RabbitMQ, Cloud AWS、GCP等提供的MQ。 他们强大而危险! 明智地使用它们!