由于和线程相似,这里的前几个示例都是从线程示例中修改的。
内容目录
- multiprocessing基础
- 可导入的目标函数
- 确定当前进程
- 守护进程 Daemon
- 等待进程 join()
- 终止进程 terminate()
- 进程退出状态
- 调试 log_to_stderr
- 子类化进程
1.multiprocessing基础
生成进程的最简单的方法是用目标函数实例化一个进程对象,并调用start()来让它开始工作。
import multiprocessingdef worker(): """worker function""" print('Worker')if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker) jobs.append(p) p.start()
运行结果:
WorkerWorkerWorkerWorkerWorker
传递参数:
import multiprocessingdef worker(num): """thread worker function""" print('Worker:', num)if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
运行结果:
Worker: 1Worker: 0Worker: 3Worker: 2Worker: 4
2.可导入的目标函数
# main.pyimport multiprocessingimport multiprocessing_import_workerif __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process( target=multiprocessing_import_worker.worker, ) jobs.append(p) p.start()
其中worker()函数在multiprocessing_import_worker.py中定义:
# multiprocessing_import_worker.pydef worker(): """worker function""" print('Worker') return
运行结果:
WorkerWorkerWorkerWorkerWorker
3.确定当前进程
import multiprocessingimport timedef worker(): name = multiprocessing.current_process().name print(name, 'Starting') time.sleep(2) print(name, 'Exiting')def my_service(): name = multiprocessing.current_process().name print(name, 'Starting') time.sleep(3) print(name, 'Exiting')if __name__ == '__main__': service = multiprocessing.Process( name='my_service', target=my_service, ) worker_1 = multiprocessing.Process( name='worker 1', target=worker, ) worker_2 = multiprocessing.Process( # default name target=worker, ) worker_1.start() worker_2.start() service.start()
运行结果:可以看到默认的进程名字对应于Process-3, 和线程很类似。
worker 1 StartingProcess-3 Startingmy_service Startingworker 1 ExitingProcess-3 Exitingmy_service Exiting
4.守护进程 Daemon
默认情况下主进程会在子进程全部执行完毕后才退出,如果子进程设置为守护进程,便不再阻塞主进程退出。
为了将进程标记为守护进程只需将daemon属性设置为True。默认情况下,进程不是守护进程。import multiprocessingimport timeimport sysdef daemon(): p = multiprocessing.current_process() print('Starting:', p.name, p.pid) sys.stdout.flush() time.sleep(2) print('Exiting :', p.name, p.pid) sys.stdout.flush()def non_daemon(): p = multiprocessing.current_process() print('Starting:', p.name, p.pid) sys.stdout.flush() print('Exiting :', p.name, p.pid) sys.stdout.flush()if __name__ == '__main__': d = multiprocessing.Process( name='daemon', target=daemon, ) d.daemon = True n = multiprocessing.Process( name='non-daemon', target=non_daemon, ) n.daemon = False d.start() time.sleep(1) n.start()
运行结果:可以看到d进程还没执行完主进程就退出了。
Starting: daemon 24852Starting: non-daemon 23248Exiting : non-daemon 23248
在主程序退出之前,守护进程会自动终止,这将避免留下孤儿进程。
5.等待进程 join()
要等到进程完成工作再退出,请使用join()方法。
import multiprocessingimport timeimport sysdef daemon(): name = multiprocessing.current_process().name print('Starting:', name) time.sleep(2) print('Exiting :', name)def non_daemon(): name = multiprocessing.current_process().name print('Starting:', name) print('Exiting :', name)if __name__ == '__main__': d = multiprocessing.Process( name='daemon', target=daemon, ) d.daemon = True n = multiprocessing.Process( name='non-daemon', target=non_daemon, ) n.daemon = False d.start() time.sleep(1) n.start() d.join() n.join()
运行结果:
Starting: non-daemonExiting : non-daemonStarting: daemonExiting : daemon
默认情况下,join()无限期阻塞。也可以使用超时参数。
6.终止进程 terminate()
在一个进程调用terminate()会杀死子进程
import multiprocessingimport timedef slow_worker(): print('Starting worker') time.sleep(0.1) print('Finished worker')if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print('BEFORE:', p, p.is_alive()) p.start() print('DURING:', p, p.is_alive()) p.terminate() print('TERMINATED:', p, p.is_alive()) p.join() print('JOINED:', p, p.is_alive())
运行结果:在终止它之后,使用join()是很重要的,以便让进程管理代码有时间来更新对象的状态以反映终止。
BEFORE:FalseDURING: TrueTERMINATED: TrueJOINED: False
7.进程退出状态
当进程退出时产生的状态码可以通过exitcode属性访问。允许的范围列在下面的表格中。
Exit Code | Meaning |
---|---|
== 0 | row 1 col 2 |
> 0 | the process had an error, and exited with that code |
< 0 | the process had an error, and exited with that code |
import multiprocessingimport sysimport timedef exit_error(): sys.exit(1)def exit_ok(): returndef return_value(): return 1def raises(): raise RuntimeError('There was an error!')def terminated(): time.sleep(3)if __name__ == '__main__': jobs = [] funcs = [ exit_error, exit_ok, return_value, raises, terminated, ] for f in funcs: print('Starting process for', f.__name__) j = multiprocessing.Process(target=f, name=f.__name__) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
运行结果:
Starting process for exit_errorStarting process for exit_okStarting process for return_valueStarting process for raisesStarting process for terminatedProcess raises: exit_error.exitcode = 1 exit_ok.exitcode = 0Traceback (most recent call last): File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "E:\MyPython\image.py", line 19, in raises raise RuntimeError('There was an error!')RuntimeError: There was an error! return_value.exitcode = 0 raises.exitcode = 1 terminated.exitcode = -15
exit(0):无错误退出
exit(1):有错误退出 退出代码是告诉解释器的(或操作系统)8.调试 log_to_stderr
在调试并发性问题时,可以使用multiprocessing所提供的对象的内部构件。有一个方便的模块级函数为logtostderr()。它使用logging设置一个记录器对象,并添加一个处理程序,以便将日志消息发送到标准错误通道。
import multiprocessingimport loggingimport sysdef worker(): print('Doing some work') sys.stdout.flush()if __name__ == '__main__': multiprocessing.log_to_stderr(logging.DEBUG) p = multiprocessing.Process(target=worker) p.start() p.join()
运行结果:认情况下,logging级别被设置为NOTSET不会产生任何消息。
Doing some work[INFO/Process-1] child process calling self.run()[INFO/Process-1] process shutting down[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0[DEBUG/Process-1] running the remaining "atexit" finalizers[INFO/Process-1] process exiting with exitcode 0[INFO/MainProcess] process shutting down[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0[DEBUG/MainProcess] running the remaining "atexit" finalizers
若想直接操作日志请使用get_logger()
获取
import multiprocessingimport loggingimport sysdef worker(): print('Doing some work') sys.stdout.flush()if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
运行结果:
[INFO/Process-1] child process calling self.run()Doing some work[INFO/Process-1] process shutting down[INFO/Process-1] process exiting with exitcode 0[INFO/MainProcess] process shutting down
9.子类化进程
通过multiprocessing.Process
可以创建进程,也可以通过自定义子类创建进程。
import multiprocessingclass Worker(multiprocessing.Process): def run(self): print('In {}'.format(self.name)) returnif __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
运行结果:
In Worker-2In Worker-4In Worker-3In Worker-1In Worker-5