Python学习—18 进程和线程

线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。

进程

fork调用

通过fork()系统调用,就可以生成一个子进程。

下面先了解下关于fork()的相关知识:

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python里的os模块封装了常见的系统调用,包括fork()

  1. # coding: utf-8
  2. import os
  3. print('Process (%s) start...' % os.getpid())
  4. pid = os.fork()
  5. if pid == 0:
  6. print('I am child Process %s , my parent is %s ' % (os.getpid(), os.getppid()))
  7. else:
  8. print('I am parent Process %s , my child is %s ' % (os.getpid(), pid))

上面代码无法运行在Windows系统上(没有fork调用),需要运行在Unix/Linux操作系统。输出:

  1. # ./user_process.py
  2. Process (25464) start...
  3. I am parent Process 25464 , my child is 25465
  4. I am child Process 25465 , my parent is 25464

multiprocessing

虽然fork()调用无法在Windows调用,但Python也提供了跨平台的多进程支持。使用multiprocessing即可创建跨平台多进程:

  1. # coding: utf-8
  2. from multiprocessing import Process
  3. import os
  4. def run_proc(name):
  5. print('Child Process %s %s is running...' % (name, os.getpid()))
  6. if __name__ == '__main__':
  7. print('Parent Process %s is running...' % os.getpid() )
  8. p = Process(target=run_proc, args=('testProcess', ))
  9. p.start()
  10. p.join()

输出:

  1. Parent Process 10488 is running...
  2. Child Process testProcess 3356 is running...

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。jonin()方法用于等待子进程结束后再继续往下运行,相当于阻塞了进程的异步执行。

这里的if __name__ == '__main__'用于仅允许在命令行下直接运行。如果是另一个模块引入该文件,里面的代码不会执行。

在python中,当一个module作为整体被执行时,moduel.__name__的值将是__main__;而当一个 module被其它module引用时,module.__name__将是module自己的名字,当然一个module被其它module引用时,其本身并不需要一个可执行的入口main了。

上面的多进程创建代码风格与后文讲的创建多线程很相似。

进程池

如果要启动大量的子进程,可以用进程池(Pool)的方式批量创建子进程:

user_process_pool.py

  1. # coding: utf-8
  2. from multiprocessing import Pool
  3. import os,time,random
  4. def run_task(name):
  5. print('Run task %s' % name)
  6. s_start = time.time()
  7. time.sleep(random.random())
  8. s_end = time.time()
  9. print('Task %s run %.2f sec' % (name, s_end - s_start))
  10. if __name__ == '__main__':
  11. print('Parent Process %s is running...' % os.getpid())
  12. p = Pool(5)
  13. for i in range(5):
  14. p.apply_async(run_task, args=(i,))
  15. print('all subProcess will running...')
  16. p.close()
  17. p.join()
  18. print('all subProcess running ok')

输出:

  1. Parent Process 10576 is running...
  2. all subProcess will running...
  3. Run task 0
  4. Run task 1
  5. Run task 2
  6. Run task 3
  7. Run task 4
  8. Task 3 run 0.16 sec
  9. Task 1 run 0.31 sec
  10. Task 4 run 0.38 sec
  11. Task 2 run 0.44 sec
  12. Task 0 run 0.89 sec
  13. all subProcess running ok

如果修改Pool()的参数为1,看下输出:

  1. Parent Process 6252 is running...
  2. all subProcess will running...
  3. Run task 0
  4. Task 0 run 0.77 sec
  5. Run task 1
  6. Task 1 run 0.22 sec
  7. Run task 2
  8. Task 2 run 0.73 sec
  9. Run task 3
  10. Task 3 run 0.54 sec
  11. Run task 4
  12. Task 4 run 0.02 sec
  13. all subProcess running ok

说明有Pool进程池的数量有多少,就可以最多同时运行多少个进程。如果不设置参数,Pool的默认大小是CPU的核数。

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

外部子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出:

  1. # coding:utf-8
  2. import subprocess
  3. r = subprocess.call(['ping', 'www.python.org'])
  4. print(r)

然后运行:

  1. $ python user_subprocess.py
  2. 正在 Ping www.python.org [151.101.72.223] 具有 32 字节的数据:
  3. 来自 151.101.72.223 的回复: 字节=32 时间=189ms TTL=53
  4. 来自 151.101.72.223 的回复: 字节=32 时间=183ms TTL=53
  5. 来自 151.101.72.223 的回复: 字节=32 时间=192ms TTL=53
  6. 来自 151.101.72.223 的回复: 字节=32 时间=192ms TTL=53
  7. 151.101.72.223 Ping 统计信息:
  8. 数据包: 已发送 = 4,已接收 = 4,丢失 = 0 (0% 丢失),
  9. 往返行程的估计时间(以毫秒为单位):
  10. 最短 = 183ms,最长 = 192ms,平均 = 189ms
  11. 0

相当于命令行运行:

  1. ping www.python.org

进程间通信

进程间肯定需要互相通信的。这里我们使用队列来实现一个简单的例子:

  1. # coding: utf-8
  2. from multiprocessing import Process,Queue
  3. import os,time
  4. def write(q):
  5. print('write Process %s is running... ' % os.getpid())
  6. for x in ['python', 'c', 'java']:
  7. q.put(x)
  8. print('write to Queue : %s' % x)
  9. def read(q):
  10. print('read Process %s is running... ' % os.getpid())
  11. while True:
  12. r = q.get(True)
  13. print('read from Queue : %s' % r)
  14. pass
  15. if __name__ == '__main__':
  16. print('MainProcess %s is running...' % os.getpid())
  17. q = Queue()
  18. p1 = Process(target=write, args=(q,))
  19. p2 = Process(target=read, args=(q,))
  20. p1.start()
  21. p2.start()
  22. p1.join()
  23. p2.join()
  24. # p2.terminate()

输出:

  1. MainProcess 9680 is running...
  2. write Process 10232 is running...
  3. write to Queue : python
  4. write to Queue : c
  5. write to Queue : java
  6. read Process 8024 is running...
  7. read from Queue : python
  8. read from Queue : c
  9. read from Queue : java

由于p2进程里是死循环,默认执行完毕后程序不会退出,可以使用p2.terminate()进程强行退出。

线程

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

创建线程

启动一个线程就是把一个函数传入并创建threading.Thread()实例,然后调用start()开始执行:

  1. # coding: utf-8
  2. import threading,time
  3. def test():
  4. print('Thread %s is running... ' % threading.current_thread().name)
  5. print('waiting 3 seconds... ')
  6. time.sleep(3)
  7. print('Hello Thread!')
  8. print('Thread %s is end. ' % threading.current_thread().name)
  9. print('Thread %s is running... ' % threading.current_thread().name)
  10. t = threading.Thread(target = test, name = 'TestThread')
  11. t.start()
  12. t.join()
  13. print('Thread %s is end. ' % threading.current_thread().name)

输出:

  1. Thread MainThread is running...
  2. Thread TestThread is running...
  3. waiting 3 seconds...
  4. Hello Thread!
  5. Thread TestThread is end.
  6. Thread MainThread is end.

t.start()用于启动线程。t.join()的作用是等待线程执行完毕,否则会不等待线程执行完毕就执行下面的代码了,因为线程执行是异步的。

主线程实例的名字叫MainThread,子线程的名字在创建时指定,这里我们用TestThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义。

线程锁

多线程与多进程最大的不同就是:对于同一个变量,对于多个线程是共享的,但多进程里每个进程各自会复制一份。

所以,在多线程里,最大的一个隐患就是多个线程同时改一个变量,可能就把变量内容改乱了。看下面例子如何改乱一个变量:

  1. # coding:utf-8
  2. import threading
  3. amount = 0
  4. def changeValue(x):
  5. global amount
  6. amount = amount + x
  7. amount = amount - x
  8. # 批量运行改值
  9. def batchRunThread(x):
  10. for i in range(100000):
  11. changeValue(x)
  12. # 创建2个线程
  13. t1 = threading.Thread(target=batchRunThread, args=(5,), name = 'Thread1')
  14. t2 = threading.Thread(target=batchRunThread, args=(15,), name = 'Thread2')
  15. t1.start()
  16. t2.start()
  17. t1.join()
  18. t2.join()
  19. print(amount)

正常情况下会输出0。但是运行多次,发现结果不一定是0。由于线程的调度是由操作系统决定的,当t1、t2线程交替执行时,只要循环次数足够多,结果就可能被改乱了。

原因是高级语言的一条语句执行在CPU执行是多个语句,即使是一个简单的计算:

  1. amount = amount + x

CPU会执行下列运算:
1、计算amount + x,存入临时变量中;
2、将临时变量的值赋给amount。

想要解决这个问题,就要使用线程锁:线程执行changeValue()时先获取锁,这时候其它线程想要执行changeValue(),想要等待之前那个线程释放锁。Python里通过threading.Lock()来实现。

  1. # coding:utf-8
  2. import threading
  3. lock = threading.Lock()
  4. amount = 0
  5. def changeValue(x):
  6. global amount
  7. amount = amount + x
  8. amount = amount - x
  9. # 批量运行改值
  10. def batchRunThread(x):
  11. for i in range(100000):
  12. lock.acquire() # 获得锁
  13. try:
  14. changeValue(x)
  15. finally:
  16. lock.release() # 释放锁
  17. # 创建2个线程
  18. t1 = threading.Thread(target=batchRunThread, args=(5,), name = 'Thread1')
  19. t2 = threading.Thread(target=batchRunThread, args=(15,), name = 'Thread2')
  20. t1.start()
  21. t2.start()
  22. t1.join()
  23. t2.join()
  24. print(amount)

这时候不管循环多少次,输出的结果永远是0。

想要注意的是,获得锁后一定要记得释放,否则其它线程一直在等待,就成了死锁。这里使用try...finally...保证最后一定会释放锁。

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。

如果锁使用不当,也可能造成死锁,程序无法正常运行。

1、说说进程与线程的区别与联系 - oyzway - 博客园
http://www.cnblogs.com/way_testlife/archive/2011/04/16/2018312.html
2、进程与线程的一个简单解释 - 阮一峰的网络日志
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

作者: 飞鸿影
版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」许可协议进行许可。
出处:https://www.cnblogs.com/52fhy/p/6389150.html