python学习笔记,多线程与多进程部分
多进程
python中有很多多线程库,其中有一些是只能在Unix系操作系统上使用,原因是这些库是调用Unix系统中的一些特殊的系统调用实现的。
比如python的os
模块封装了常见的系统调用,其中就包括fork
,可以在python程序中轻松创建子进程。
Unix操作系统提供了一个fork()
系统调用,普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(父进程)复制了一份(子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0
,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
在Unix系统下,我们可以这样创建子进程:
1 2 3 4 5 6 7 8 9 10 11 12 import osprint ('Process (%s) start...' % os.getpid())pif = os.fork() if pid == 0 : print ("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid())) else : print ("I (%s) just created a child process (%s)." % (os.getpid(), pid)
运行结果如下:
1 2 3 Process (876 ) start... I (876 ) just created a child process (877 ). I am child process (877 ) and my parent is 876.
python是跨平台的,因此,windows下也应该有一个能进行多进程的模块,下面介绍multiprocessing
模块
multiprocessing
multiprocessing
模块是一款跨平台版本的多进程模块,该模块提供了一个Process
类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processimport osdef run_proc (name ): print ('Run Child process %s (%s)...' % (name, os.getpid())) if __name__ == '__main__' : print ('Parent process %s.' % os.getpid()) p = Process(target = run_proc, args = ('test' ,)) print ('Child process will start.' ) p.start() p.join() print ('Child process end.' )
测试结果如下:
1 2 3 4 Parent process 9788. Child process will start. Run Child process test (14640 )... Child process end.
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process
实例,用start()
方法启动,这样创建进程比fork()
还要简单。
join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
Pool
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Poolimport os, time, randomdef long_time_task (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__' : print ('Parent process %s.' % os.getpid()) p = Pool(4 ) for i in range (5 ): p.apply_async(long_time_task, args=(i,)) print ('Waiting for all subprocesses done...' ) p.close() p.join() print ('All subprocesses done.' )
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 Parent process 14096. Waiting for all subprocesses done... Run task 0 (6504 )... Run task 1 (6176 )... Run task 2 (16396 )... Run task 3 (9388 )... Task 3 runs 0.94 seconds. Run task 4 (9388 )... Task 4 runs 0.14 seconds. Task 2 runs 1.89 seconds. Task 0 runs 2.51 seconds. Task 1 runs 2.68 seconds. All subprocesses done.
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
主义此处4
号进程没有马上执行,而是等到3
号进程结束之后才进行,这是因为我们设置Pool(4)
,即大小为4,最多同时执行4个进程,如果将其设置为Pool(5)
,5个进程就能同时开始执行。
由于Pool
的默认大小是CPU的核数。
子进程
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
下面引用一个例子:
1 2 3 4 5 import subprocessprint ('$ nslookup www.python.org' )r = subprocess.call(['nslookup' , 'www.python.org' ]) print ('Exit code:' , r)
该例可以在python代码中运行命令nslookup www.python.org
运行结果:
1 2 3 4 5 6 7 8 9 10 $ nslookup www.python.org Server: 192.168 .19 .4 Address: 192.168 .19 .4 Non-authoritative answer: www.python.org canonical name = python.map .fastly.net. Name: python.map .fastly.net Address: 199.27 .79 .223 Exit code: 0
如果子进程还需要输入,则可以通过communicate()
方法输入:
1 2 3 4 5 6 7 import subprocessprint ('$ nslookup' )p = subprocess.Popen(['nslookup' ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n' ) print (output.decode('utf-8' ))print ('Exit code:' , p.returncode)
上面的代码相当于在命令行执行命令nslookup
,然后手动输入:
1 2 3 set q=mxpython.org exit
运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 $ nslookup Server: 192.168 .19 .4 Address: 192.168 .19 .4 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from : mail.python.org internet address = 82.94 .164 .166 mail.python.org has AAAA address 2001 :888 :2000 :d::a6 Exit code: 0
进程间通讯
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
我们以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from multiprocessing import Process, Queueimport os, time, randomdef write (q ): print ('Process to write: %s' % os.getpid()) for value in ['A' , 'B' , 'C' ]: print ('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) def read (q ): print ('Process to read: %s' % os.getpid()) while True : value = q.get(True ) print ('Get %s from queue.' % value) if __name__=='__main__' : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()
运行结果如下:
1 2 3 4 5 6 7 8 Process to write: 7060 Put A to queue... Process to read: 20224 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
在Unix/Linux下,multiprocessing
模块封装了fork()
调用,使我们不需要关注fork()
的细节。由于Windows没有fork
调用,因此,multiprocessing
需要“模拟”出fork
的效果,父进程所有python对象都必须通过pickle
序列化再传到子进程去,所以,如果multiprocessing
在Windows下调用失败了,要先考虑是不是pickle
失败了。
多线程
Threading
大部分高级语言通常都内置多线程支持,python也不例外,并且,python的线程是真正的Posix Thread
,而不是模拟出来的线程。
Python的标准库提供了两个模块:_thread
和threading
,_thread
是低级模块,threading
是高级模块,对_thread
进行了封装。绝大多数情况下,我们只需要使用threading
这个高级模块。
下面举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import time, threadingdef loop (): print ('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5 : n = n + 1 print ('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1 ) print ('thread %s ended.' % threading.current_thread().name) print ('thread %s is running' % threading.current_thread().name)t = threading.Thread(target = loop, name = 'LoopThread' ) t.start() t.join() print ('thread %s ended.' % threading.current_thread().name)
执行结果如下:
1 2 3 4 5 6 7 8 9 thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended.
由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading
模块有个current_thread()
函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread
,子线程的名字在创建时指定,我们用LoopThread
命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1
,Thread-2
……
Lock
进程操作中,对于同一个变量,每个进程会各自拷贝一份,相互并不影响,但在线程操作中,变量与所有线程共性,任何线程对某一变量造成影响后,该影响会被传递到所有其他线程中。因此,线程之间如果需要访问一个共享的数据,需要处理同步
与互斥
下面介绍一个错误操作的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import time, threadingbalance = 0 def change_it (n ): global balance balance = balance + n balance = balance - n def run_thread (n ): for i in range (2000000 ): change_it(n) t1 = threading.Thread(target=run_thread, args=(5 ,)) t2 = threading.Thread(target=run_thread, args=(8 ,)) t1.start() t2.start() t1.join() t2.join() print (balance)
得到的结果如下:
然而这个结果显然没有意义,因为每次运行都会得到不同的结果。
但理论上来说,结果应该是0。
原因是高级语言在实际运行时,需要经过层层翻译,而balance = balance + n
这行代码实际翻译为汇编语言时,需要经过至少三个步骤:
取数
运算
存数
当两个线程交替运行时,如果T1
进行刀运算时,还没进行存数,但T2
进入了存数阶段,村数结束后,T1
也进入村数阶段,则T1
村入的数将覆盖T2
的结果。
经过上面的分析,我们发现两个线程对balance
的访问是互斥的。
python中的threading.lock()
为我们提供了实现互斥的功能,于是我们使用如下方式修改run_thread()
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 balance = 0 lock = threading.Lock() def run_thread (n ): for i in range (100000 ): lock.acquire() try : change_it(n) finally : lock.release()
当多个线程同时执行lock.acquire()
时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
但是使用临界区的方式实现进程同步与互斥时,需要注意仔细分析避免死锁
的发生。
多CPU
我们试试使用多个死循环线程:
1 2 3 4 5 6 7 8 9 10 import threading, multiprocessingdef loop (): x = 0 while True : x = x ^ 1 for i in range (multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start()
我们使用TaskManager
查看CPU的占用情况,但我们发现CPU占用率大约是25%左右,这说明死循环大约只占用了一个CPU核心,并没有跑满我的全部4核CPU。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满。
这是因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁
:Global Interpreter Lock,任何python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython
,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
不过,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
ThreadLocal
由于线程存在的同步与互斥问题。我们希望线程能够尽可能的去操作自己的局部变量,但局部变量在传递的过程中比较麻烦,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 def process_student (name ): std = Student(name) do_task_1(std) do_task_2(std) def do_task_1 (std ): do_subtask_1(std) do_subtask_2(std) def do_task_2 (std ): do_subtask_2(std) do_subtask_2(std)
在该线程中使用到的所有函数中,我们都需要接受这一参数。而每个线程操作的对象又不相同,所以也不能使用全局变量。
为此,我们可以想到用一个全局的dict
来存所有Student
对象,并将其与线程绑定。函数中使用当前线程进行访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 global_dict = {} def std_thread (name ): std = Student(name) global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1 (): std = global_dict[threading.current_thread()] def do_task_2 (): std = global_dict[threading.current_thread()]
于是这样就消除了需要反复传递某一参数的问题。
但是注意到std = global_dict[threading.current_thread()]
这样一条长长的代码重复了多次,显然不够优美。
ThreadLocal
对象为我们简化了这一操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threading local_school = threading.local() def process_student (): std = local_school.student print ('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread (name ): local_school.student = name process_student() t1 = threading.Thread(target = process_thread, args =('Alice' ,), name='Thread-A' ) t2 = threading.Thread(target = process_thread, args =('Bob' ,), name='Thread-B' ) t1.start() t2.start() t1.join() t2.join()
运行结果:
1 2 Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
ThreadLocal
最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
多线程vs多进程
对于多线程与多进程,以及批处理任务模式的好坏,如下博客进行了清晰的讨论:
分布式进程
多进程章节中,我们在同一台计算机上运行了多个进程,而进程还有个又是,就是支持分布式。
python的multiprocessing
模块不但支持多进程,还支持将多进程分不到多台计算机上运行。
下面我们使用Master-Worker
设计模式对其进行改写,将原有的Queue
通过网络暴露出去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import random, time, queuefrom multiprocessing.managers import BaseManagertask_queue = queue.Queue() result_queue = queue.Queue() class QueueManager (BaseManager ): pass QueueManager.register('get_task_queue' , callable =lambda : task_queue) QueueManager.register('get_result_queue' , callable =lambda : result_queue) manager = QueueManager(address=('' , 5000 ), authkey=b'abc' ) manager.start() task = manager.get_task_queue() result = manager.get_result_queue() for i in range (10 ): n = random.randint(0 , 10000 ) print ('Put task %d...' % n) task.put(n) print ('Try get results...' )for i in range (10 ): r = result.get(timeout=10 ) print ('Result: %s' % r) manager.shutdown() print ('master exit.' )if __name__ == '__main__'
此时对task_queue
等Queue
对象进行操作时,需要对通过manager.get_task_queue()
获取的对象进行操作。
可见,我们在分配任务时,不需要了解网络通信的细节,因为managers
模块封装得很好
接着我们再编写Worker
端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import time, sys, queuefrom multiprocessing.managers import BaseManagerclass QueueManager (BaseManager ): pass QueueManager.register('get_task_queue' ) QueueManager.register('get_result_queue' ) server_addr = '127.0.0.1' print ('Connect to server %s...' % server_addr)m = QueueManager(address=(server_addr, 5000 ), authkey=b'abc' ) m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range (10 ): try : n = task.get(timeout=1 ) print ('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1 ) result.put(r) except Queue.Empty: print ('task queue is empty.' ) print ('worker exit.' )if __name__ == '__main__' :
部署完成后,启动tsak_master.py
:
1 2 3 4 5 6 7 8 9 10 11 12 $ python3 task_master.py Put task 3411. .. Put task 1605. .. Put task 1398. .. Put task 4729. .. Put task 5300. .. Put task 7471. .. Put task 68. .. Put task 4219. .. Put task 339. .. Put task 7866. .. Try get results...
task_master.py
进程发送完任务后,开始等待result
队列的结果。接着启动test_worker.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 $ python3 task_worker.py Connect to server 127.0 .0 .1 ... run task 3411 * 3411. .. run task 1605 * 1605. .. run task 1398 * 1398. .. run task 4729 * 4729. .. run task 5300 * 5300. .. run task 7471 * 7471. .. run task 68 * 68. .. run task 4219 * 4219. .. run task 339 * 339. .. run task 7866 * 7866. .. worker exit.
task_worker.py
进程结束,在task_master.py
进程中会继续打印出结果:
1 2 3 4 5 6 7 8 9 10 Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956
实验结果
再windows下运行需要注意以下几个报错:
运行tesk_master.py
时
报错:OSError: [WinError 87] 参数错误
。
原因如下:
在win10环境下,pickle模块不能序列化lambda函数,所以需要自定义要使用的函数,而不用lambda函数
将原程序中的lambda修改为函数。再运行:python task_master.py
报错:
1 2 3 4 5 6 RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase.
根据提示在程序中添加main函数
if name == ‘main ’:
再运行:python task_master.py
报错:OSError: [WinError 10049]在其上下文中,该请求的地址无效。
原因:OSError: [WinError 10049]
是由于 ip
地址为空造成的
设置端口验证的一行 manager = QueueManager(address=('',5000), authkey=b'abc')
中的地址添加127.0.0.1
在另一个命令行运行环境下运行python task_worker.py
(同样修改lambda函数,并添加main函数)
报错:ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。
经比对教程中的示例程序和教程后附加的例程,发现只有程序开头两行区别。
#!/usr/bin/env python3
# -- coding: utf-8 - -
将这两行添加到程序中。
如果时两台机器,需要再系统防火墙中把5000端口打开。