0%

二探并发(Actor)

上回谈到利用消息队列在线程(进程)间传递消息以此来实现线程间的通信。

共享的消息队列

这是建立在并发只是提高系统吞吐率的基础上,在这种假设下每一个worker所做的工作都一样,因此可以共享一个消息队列,谁拿到下一条指令都无所谓。

如果我们不是同胞呢

假如我们需要实现一个各个worker各司其职异步工作的系统(例如我们的IP-phone)该怎么办呢?假设我们还使用上面的方法,所有的worker共享一个消息队列,这时候就会产生问题:消息没办法发送给特定的worker。

怎么解决这个问题呢?一个简单粗暴的解决方案是给每个消息加上标识,以标识出这条消息所要发送的对象:

1
2
3
4
5
queue = Queue([('Send_to_worker_A', 'do_something1'),
('Send_to_worker_B', 'do_something2'),
('Send_to_worker_B', 'do_something3'),
('Send_to_worker_C', 'do_something4'),
('Send_to_worker_A', 'do_something5')])

看起来问题迎刃而解了对吧?但我们引入了一个新的问题。

如果我不小心拆开了送给别人的信

如果送给worker A的消息被worker B接收到了,他得到条消息其实并没有什么用,而本该得到这条消息的worker A却没得到它,从而这条信息就丢失了。既浪费了worker B的时间,由消耗了work A的青春,甚至有可能因为跳步而使得整个系统陷入到什么诡异的Bug中。

这个问题又怎么解决呢?从直觉上看,给每一个worker添加如下策略就能解决问题:得到别人的消息就把这条消息塞回消息队列中。

1
2
3
4
5
6
7
8
9
10
class Worker_B:

def run(self):
while True:
if not self.queue.empty():
data = self.queue.get()
if data['to_whom'] == 'worker_B':
do_something_with_data
else:
self.queue.put(data)

看似修修补补解决了问题…实际上又引入了一个新的问题。

这看起来是一条死路

第一,消息队列是一个队列,这意味着FIFO,就算引入了具有优先级的消息队列,本质上这一点也并没有改变。这样一来,本该第一个交给worker A的消息经worker B一倒腾,成为了当前消息队列中最晚递交给worker A的了,可能使得系统陷入诡异的Bug中。

第二,即使我们设法让分发错了的消息重新插回消息队列的队首,如果不引入锁机制,使得错收了消息worker B的收信-检查-放回操作成为一个原子操作的话,那么在work B执行放回操作的时候,可能其他worker已经在继续进行接收消息操作了,仍然有可能造成时序上的大问题,而引入锁机制索然能解决逻辑上的错误问题,但是这样会使得每次轮询中只有一个worker真正能够工作,而其他得到错误消息执行收信-检查-放回操作的worker白白浪费自己的时间片,效率低下。

所以,看起来针对为了异步工作而引入并发的系统来说,每个worker共享同一个消息队列并不是一个好办法。

根治这个问题的办法就是针对每一组执行同一任务的worker都建立一个专门的消息队列(邮箱),甚至每个worker都设置自己的消息队列,完全放弃了线程间共享内存的能力。后一种方法就是我今天要讲的主角:Actor模型(参与者模式)

Actor模型

首先我们来看看维基百科对Actor模型的定义与概念:

概念

在计算机科学中,参与者模式(英语:Actor model)是一种并发运算上的模型。“参与者”是一种程序上的抽象概念,被视为并发运算的基本单元:当一个参与者接收到一则消息,它可以做出一些决策、创建更多的参与者、发送更多的消息、决定要如何回答接下来的消息。参与者模式在1973年于Carl Hewitt、Peter Bishop及Richard Steiger的论文中提出。

参与者模型推崇的哲学是“一切皆是参与者”,这与面向对象编程的“一切皆是对象”类似,但是面向对象编程通常是顺序执行的,而参与者模型是并行执行的。参与者是一个运算实体,回应接受到的消息,同时并行的:

  • 发送有限数量的消息给其他参与者;
  • 创建有限数量的新参与者;
  • 指定接受到下一个消息时的行为。

以上操作不含有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,是参与者模型的根本优势。这允许进行异步通信,同时满足消息传递的控制结构。消息接收者是通过地址区分的,有时也被称作“邮件地址”。因此参与者只能和它拥有地址的参与者通信。它可以通过接受到的信息获取地址,或者获取它创建的参与者的地址。参与者模型的特征是,参与者内部或之间进行并行计算,参与者可以动态创建,参与者地址包含在消息中,交互只有通过直接的异步消息通信,不限制消息到达的顺序。

实现

Actor足够简单,为并发而生,并且具有足够好的封装性可以隔离变化(例如:不关心是多线程还是多进程)。

让我们来看看一个典型的Actor的python线程实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

from queue import Queue
from threading import Thread, Event


class WorkerExit(Exception):
# 用来结束任务的异常
pass


class BaseWorker(object):
# Actor
def __init__(self):
self._mailbox = Queue()
super(BaseWorker, self).__init__()

def send(self, msg):
self._mailbox.put(msg)

def recv(self):
msg = self._mailbox.get()
if msg is WorkerExit:
raise WorkerExit()
return msg

def close(self):
self.send(WorkerExit)

def start(self):
self._terminated = Event()
t = Thread(target=self._bootstrap)

t.daemon = True
t.start()

def _bootstrap(self):
try:
self.run()
except WorkerExit:
pass
finally:
self._terminated.set()

def join(self):
self._terminated.wait()

def run(self):
raise NotImplementedError

简单地解释一下,这个Actor(我起名为BaseWorker,在真实的项目中作为父类使用),维持两个数据结构:Queue作为邮箱以及Event作为阻塞主线程的杀器。

这里作为外部接口的核心操作只有一个send()方法。请注意,我们不限制能够传递的消息类型,这意味着无比的灵活性。

在Actor内部我们使用一个线程来运行run()方法结合recv()方法执行所指定的具体工作。值得一提的是,我们设置了一个哨兵信号WorkerExit用来停止任务,请注意WorkerExit的运行原理是当他被识别后作为一个异常被抛出,而在异常处理这里我们甚至做更多事情,当然这里我们只是在捕获到这个异常时停之线程运行。而这一异常处理的实现有赖于用来包装run()方法的_bootstrap()

在Actor模型的哲学下我们将这个简单的例子拓展从而走得更远。

总结

并发编程需要一种较之平常更为抽象的思维方式,也可以作为“高内聚,低耦合”这一思想的良好实践。

而我在这片对我而言的全新领域的探索中Actor的确是一盏指路明灯,算是领着我真正入了门,因此在这里分享。 :)

Welcome to my other publishing channels