Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

python学习笔记,多线程与多进程部分

多进程

python中有很多多线程库,其中有一些是只能在Unix系操作系统上使用,原因是这些库是调用Unix系统中的一些特殊的系统调用实现的。

比如python的os模块封装了常见的系统调用,其中就包括fork,可以在python程序中轻松创建子进程。

Unix操作系统提供了一个fork()系统调用,普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(父进程)复制了一份(子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

在Unix系统下,我们可以这样创建子进程:

1
2
3
4
5
6
7
8
9
10
11
12
import os


# 获取当前进程的ID
print('Process (%s) start...' % os.getpid())

# Only works on Linux/Unix/Mac:
pif = os.fork()
if pid == 0:
print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid()))
else:
print("I (%s) just created a child process (%s)." % (os.getpid(), pid)

运行结果如下:

1
2
3
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

python是跨平台的,因此,windows下也应该有一个能进行多进程的模块,下面介绍multiprocessing模块

multiprocessing

multiprocessing模块是一款跨平台版本的多进程模块,该模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import os


# 子进程要执行的代码
def run_proc(name):
print('Run Child process %s (%s)...' % (name, os.getpid()))

if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target = run_proc, args = ('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

测试结果如下:

1
2
3
4
Parent process 9788.
Child process will start.
Run Child process test (14640)...
Child process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 14096.
Waiting for all subprocesses done...
Run task 0 (6504)...
Run task 1 (6176)...
Run task 2 (16396)...
Run task 3 (9388)...
Task 3 runs 0.94 seconds.
Run task 4 (9388)...
Task 4 runs 0.14 seconds.
Task 2 runs 1.89 seconds.
Task 0 runs 2.51 seconds.
Task 1 runs 2.68 seconds.
All subprocesses done.

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

主义此处4号进程没有马上执行,而是等到3号进程结束之后才进行,这是因为我们设置Pool(4),即大小为4,最多同时执行4个进程,如果将其设置为Pool(5),5个进程就能同时开始执行。

由于Pool的默认大小是CPU的核数。

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面引用一个例子:

1
2
3
4
5
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

该例可以在python代码中运行命令nslookup www.python.org

运行结果:

1
2
3
4
5
6
7
8
9
10
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53

Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

1
2
3
4
5
6
7
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

1
2
3
set q=mx
python.org
exit

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53

Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6


Exit code: 0

进程间通讯

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
# Queue.get(block=True, timeout=None)
# 从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。
# 如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。
# 反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)。
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

运行结果如下:

1
2
3
4
5
6
7
8
Process to write: 7060
Put A to queue...
Process to read: 20224
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

多线程

Threading

大部分高级语言通常都内置多线程支持,python也不例外,并且,python的线程是真正的Posix Thread,而不是模拟出来的线程。

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

下面举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time, threading

# 新线程执行的代码
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running' % threading.current_thread().name)
t = threading.Thread(target = loop, name = 'LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

执行结果如下:

1
2
3
4
5
6
7
8
9
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

Lock

进程操作中,对于同一个变量,每个进程会各自拷贝一份,相互并不影响,但在线程操作中,变量与所有线程共性,任何线程对某一变量造成影响后,该影响会被传递到所有其他线程中。因此,线程之间如果需要访问一个共享的数据,需要处理同步互斥

下面介绍一个错误操作的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(2000000):
change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

得到的结果如下:

1
-3

然而这个结果显然没有意义,因为每次运行都会得到不同的结果。

但理论上来说,结果应该是0。

原因是高级语言在实际运行时,需要经过层层翻译,而balance = balance + n这行代码实际翻译为汇编语言时,需要经过至少三个步骤:

  1. 取数
  2. 运算
  3. 存数

当两个线程交替运行时,如果T1进行刀运算时,还没进行存数,但T2进入了存数阶段,村数结束后,T1也进入村数阶段,则T1村入的数将覆盖T2的结果。

经过上面的分析,我们发现两个线程对balance的访问是互斥的。

python中的threading.lock()为我们提供了实现互斥的功能,于是我们使用如下方式修改run_thread()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

但是使用临界区的方式实现进程同步与互斥时,需要注意仔细分析避免死锁的发生。

多CPU

我们试试使用多个死循环线程:

1
2
3
4
5
6
7
8
9
10
import threading, multiprocessing

def loop():
x = 0
while True:
x = x ^ 1

for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()

我们使用TaskManager查看CPU的占用情况,但我们发现CPU占用率大约是25%左右,这说明死循环大约只占用了一个CPU核心,并没有跑满我的全部4核CPU。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满。

这是因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

ThreadLocal

由于线程存在的同步与互斥问题。我们希望线程能够尽可能的去操作自己的局部变量,但局部变量在传递的过程中比较麻烦,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)

def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)

def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)

在该线程中使用到的所有函数中,我们都需要接受这一参数。而每个线程操作的对象又不相同,所以也不能使用全局变量。

为此,我们可以想到用一个全局的dict来存所有Student对象,并将其与线程绑定。函数中使用当前线程进行访问:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
global_dict = {}

def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()

def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]

def do_task_2():
# 任何函数都可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]

于是这样就消除了需要反复传递某一参数的问题。

但是注意到std = global_dict[threading.current_thread()]这样一条长长的代码重复了多次,显然不够优美。

ThreadLocal对象为我们简化了这一操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()

t1 = threading.Thread(target = process_thread, args =('Alice',), name='Thread-A')
t2 = threading.Thread(target = process_thread, args =('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

运行结果:

1
2
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

多线程vs多进程

对于多线程与多进程,以及批处理任务模式的好坏,如下博客进行了清晰的讨论:

分布式进程

多进程章节中,我们在同一台计算机上运行了多个进程,而进程还有个又是,就是支持分布式。

python的multiprocessing模块不但支持多进程,还支持将多进程分不到多台计算机上运行。

下面我们使用Master-Worker设计模式对其进行改写,将原有的Queue通过网络暴露出去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# task_master.py
#!/usr/bin/env python3

# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

if __name__ == '__main__'

此时对task_queueQueue对象进行操作时,需要对通过manager.get_task_queue()获取的对象进行操作。

可见,我们在分配任务时,不需要了解网络通信的细节,因为managers模块封装得很好

接着我们再编写Worker端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# task_worker.py
#!/usr/bin/env python3

# -*- coding: utf-8 -*-

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')

if __name__ == '__main__':

部署完成后,启动tsak_master.py

1
2
3
4
5
6
7
8
9
10
11
12
$ python3 task_master.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

task_master.py进程发送完任务后,开始等待result队列的结果。接着启动test_worker.py

1
2
3
4
5
6
7
8
9
10
11
12
13
$ python3 task_worker.py
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

task_worker.py进程结束,在task_master.py进程中会继续打印出结果:

1
2
3
4
5
6
7
8
9
10
Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

实验结果

再windows下运行需要注意以下几个报错:

运行tesk_master.py

  1. 报错:OSError: [WinError 87] 参数错误

原因如下:

在win10环境下,pickle模块不能序列化lambda函数,所以需要自定义要使用的函数,而不用lambda函数

将原程序中的lambda修改为函数。再运行:python task_master.py

  1. 报错:
1
2
3
4
5
6
RuntimeError:

An attempt has been made to start a new process before the

current process has finished its bootstrapping phase.

根据提示在程序中添加main函数

if name == ‘main‘:

再运行:python task_master.py

  1. 报错:OSError: [WinError 10049]在其上下文中,该请求的地址无效。

原因:OSError: [WinError 10049]是由于 ip地址为空造成的

设置端口验证的一行 manager = QueueManager(address=('',5000), authkey=b'abc') 中的地址添加127.0.0.1

在另一个命令行运行环境下运行python task_worker.py

(同样修改lambda函数,并添加main函数)

  1. 报错:ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。

经比对教程中的示例程序和教程后附加的例程,发现只有程序开头两行区别。

#!/usr/bin/env python3

# -- coding: utf-8 --

将这两行添加到程序中。

如果时两台机器,需要再系统防火墙中把5000端口打开。

评论