python提供了一个跨平台的多进程支持——multiprocessing模块,其包含Process类来代表一个进程对象
1、Process语法结构:( 注: 传参的时候一定使用关键字传参 )
2、自定义进程类:需要继承Process类
自定义类的时候必须注意的事项:
第一,必须继承Process类的构造方法
第二,必须重写Process类的run()方法
第三,不能用实例化对象直接调用run()方法,而是调用start()方法
第四,在进程改变实例化对象的数据时,这个数据是被隔离的,即改变数据不成功
# 自定义进程类 class ProcessClass(Process): g_num = 100 def __init__ (self, interval): # 这行代码必须添加上 super(). __init__ () self.interval = interval self.result = " 初始化 " def run(self): global g_num g_num = 120 print ( " 子进程{},开始执行,父进程为{} " .format(os.getpid(), os.getppid())) start = time.time() time.sleep( 2 ) stop = time.time() print ( " {}执行结束,耗时{:0.2f}秒 " .format(os.getpid(), stop- start)) self.result = " 运行之后的结果 " if __name__ == " __main__ " : t_start = time.time() print ( " 当前进程{} " .format(os.getpid())) p = ProcessClass(2 ) p.start() p.join() t_stop = time.time() # 数据隔离 print ( " 子进程 任务 运行结果: " , p.result) # -----> 初始化 数据未改变 print (ProcessClass.g_num) # ------>100 数据未改变 print ( " (%s)执行结束,耗时%0.2f " % (os.getpid(), t_stop - t_start))
3、进程间的通信
像之前改变数据,就要使用到进程间的通信,可以使用multiprocessing模块的Queue类来实现进程间的通信
Queue的常用方法:
qsize(): 返回当前队列包含的消息(数据)数量
empty():如果队列为空,返回True,否则False
full():如果队列满了,返回True,否则False
get() 或者 put()分别时阻塞式获取 或者 阻塞式存储队列的一条消息(数据),然后获取 或者 添加这条消息,如果队列为空 或者 队列满了,在运行的过程阻塞
get_nowait() 或者 put_nowait()分别时非阻塞式获取 或者 非阻塞式存储队列的一条消息(数据),然后移除 和 添加这条消息,如果队列为空 或者 队列满了,会抛出相应的异常
实例如下:
# 自定义进程类 class ProcessClass(Process): g_num = 100 def __init__ (self, interval, q): # 这行代码必须添加上 super(). __init__ () self.interval = interval self.result = " 初始化 " # 初始化一个队列实例化对象 self.q = q def run(self): global g_num g_num = 120 print ( " 子进程{},开始执行,父进程为{} " .format(os.getpid(), os.getppid())) start = time.time() time.sleep(self.interval) stop = time.time() print ( " {}执行结束,耗时{:0.2f}秒 " .format(os.getpid(), stop- start)) self.result = " 运行之后的结果 " # 将消息(数据)添加到队列中 self.q.put(g_num) self.q.put(self.result) def get_data(self): return self.result if __name__ == " __main__ " : # 初始化一个队列实例化对象,参数为队列的长度 queues = Queue(5 ) print ( " 当前进程{} " .format(os.getpid())) p = ProcessClass(2 , queues) t_start = time.time() p.start() p.join() t_stop = time.time() # 数据隔离 print ( " 子进程 任务 运行结果: " , p.get_data()) # -----> 初始化 数据未改变 print (ProcessClass.g_num) # ------>100 数据未改变 print ( " 子进程 任务 运行结果: " , queues.get()) # -----> 120 数据未改变 print ( " 子进程 任务 运行结果: " , queues.get()) # -----> 运行之后的结果 数据未改变 print ( " (%s)执行结束,耗时%0.2f " % (os.getpid(), t_stop - t_start))
4、进程池:
如果进程有成千上万个,手动创建进程的工作量巨大,这个时候应该用到multiprocessing模块中的Pool类
这个类下有几个方法比较常用:
apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行 ,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给 func的参数列表,kwds为传递给func的关键字参数列表;
apply(func[, args[, kwds]]):使用阻塞方式调用func close():关闭Pool,使其不再接受新的任务;
terminate():不管任务是否完成,立即终止;
join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用
主要实例:
# 进程池 def worker(msg): start = time.time() print ( " {}开始执行,进程号为{} " .format(msg, os.getpid())) # random.random()生成0~1之间的随机数 time.sleep(random.random()*3 ) # time.sleep(3) stop = time.time() print (msg, " 执行完毕,耗时{:.2f} " .format(stop- start)) if __name__ == " __main__ " : # 定义一个进程池,最大数为3 po = Pool(3 ) for i in range(10 ): # 非阻塞式操作Pool.apply_async(要调用的目标,(传递给目标的参数元组,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker, (i, )) # 阻塞式操作 # po.apply(worker, (i, )) print ( " start " .center(24, " - " )) po.close() po.join() print ( " end " .center(24, " - " ))