Python 多进程卡住

Posted by Sz Zheng on 2019-06-26

Python多进程编程遇到的一个问题

1. Queue的通信出现死锁问题:

1.1 知识:

多进程通信使用Pipe和Queue,区别在于Pipe只是两个进程间通信,而Queue则是允许多个进程。
种类有Queue, multiprocessing.queues.SimpleQueue, JoinableQueue
它们的模型都是基于Queue.Queue的,这是Python自带的Queue,但是这个Queue没有task_done, join方法。
如果用JoinableQueue,必须在每个task移除时调用JoinableQueue.task_done方法。
用Managers也可以创建共享Queue。

  • Queue.Empty
  • Queue.Full

它们都是定义在Python自带的Queue下的,不在multiprocessing中

Queue.join_thread()会保证buffer中的data被送到pipe中
Queue.cancel_join_thread()则不会保证data不丢失

1.2 问题:

queue总是hang,代码是按照autotvm的写法写的,这段是核心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def kill_child_processes(parent_pid, sig=signal.SIGTERM):
"""kill all child processes recursively"""
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for process in children:
try:
process.send_signal(sig)
except psutil.NoSuchProcess:
return

def _execute_func(func, queue, args, kwargs):
"""execute function and return the result or exception to a queue"""
try:
res = func(*args, **kwargs)
except Exception as exc: # pylint: disable=broad-except
res = exc
queue.put(res)


def call_with_timeout(queue, timeout, func, args, kwargs):
"""A wrapper to support timeout of a function call"""

# start a new process for timeout (cannot use thread because we have c function)
p = Process(target=_execute_func, args=(func, queue, args, kwargs))
p.start()
p.join(timeout=timeout)

queue.put(executor.TimeoutError())

kill_child_processes(p.pid)
p.terminate()
p.join()

对于需要跑的任务func,这样使用:

1
2
3
4
queue = Queue(2)
process = Process(target=call_with_timeout,
args=(queue, self.timeout, func, args, kwargs))
process.start()

得到结果的方法是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def get(self, timeout=None):
try:
res = self._queue.get(block=True, timeout=timeout)
except Empty:
raise executor.TimeoutError()
if self._process.is_alive():
kill_child_processes(self._process.pid)
self._process.terminate()
self._process.join()
self._queue.close()
self._queue.join_thread()
self._done = True
del self._queue
del self._process
return res

这些都是autotvm中的代码,其思路是用嵌套的进程launch任务,然后用join的timeout控制时间。
实际用autotvm也没有问题,但是当我照抄这段代码自己用的时候经常会遇到hanging问题,代码卡住不动。

1.3 问题解决

因为autotvm用起来从来没有卡住的问题,一直不理解为什么会卡住。通过查multiprocessing的文档发现,
queue的使用最好不要被多个进程共享,而且要等到queue中的数据取出来后再kill子进程,否则容易卡住。上面的代码里call_with_timeout里就出现了同一个queue被父子两个进程使用的问题,把这段代码改掉后就不再出现卡住问题了。不过令人疑惑的是autotvm为什么这么用就没问题,而且跑起来非常快…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def call_with_timeout(func, queue, timeout, args, kwargs):
q = multi.Queue() # 这个queue不和父进程共享
p = multi.Process(target=exec_func, args=(func, q, args, kwargs))
p.start()
# 先取完数据再kill子进程
try:
res = q.get(block=True, timeout=timeout)
except Empty:
res = multi.TimeoutError()
except Exception as e:
print("Exception in process {}: {}".format(os.getpid(), str(e)))
res = e
kill_child_processes(p.pid)
p.terminate()
p.join()
queue.put(res) # 注意这个queue不和子进程共享