进程间的同步
一、进程同步_信号传递(Event)
#encoding=utf-8import multiprocessingimport timedef wait_for_event(e):#无超时等待 """Wait for the event to be set before doing anything""" print('wait_for_event: starting') e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞 print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):#有超时等待 """Wait t seconds and then timeout""" print('wait_for_event_timeout: starting') e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行 print('wait_for_event_timeout: e.is_set()->', e.is_set()) e.set()# 初始内部标志为真 if __name__ == '__main__': e = multiprocessing.Event() print("begin,e.is_set()", e.is_set())#默认值是false w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() #可将2改为5,看看执行结果 w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w2.start() print('main: waiting before calling Event.set()') time.sleep(3) # e.set() #可注释此句话看效果,要有set才能在wait的时候继续执行 print('main: event is set')
逻辑:函数a-wait_for_event:e.wait():死等,等别的进程调用e.set(),才会继续执行函数b-wait_for_event_timeoute.wait(t):死等t秒,等别的进程调用e.set()或经过t秒,才会继续执行,继续执行了,就会调用e.set(),实现函数a就可以从死等状态变为继续执行执行状态主程序:声明了一个信号对象e启动了进程a,执行函数a启动了进程b,执行函数b然后就结束了 只要有一个e.set(),所有死等的都开始运行了 运行结果:
二、管道pip(只能用2个进程之间进行通信)
Socket可以不同进程间的通讯
#encoding=utf-8import multiprocessing as mpdef proc_1(pipe): pipe.send('hello')#发消息 print('proc_1 received: %s' %pipe.recv())#收消息,没有消息就是死等,可跨进程 pipe.send("what is your name?") print('proc_1 received: %s' %pipe.recv())def proc_2(pipe): print('proc_2 received: %s' %pipe.recv()) pipe.send('hello, too') print('proc_2 received: %s' %pipe.recv()) pipe.send("I don't tell you!")if __name__ == '__main__': # 创建一个管道对象pipe pipe = mp.Pipe()#声明管道 print(len(pipe)) print(type(pipe)) # 将第一个pipe对象传给进程1 p1 = mp.Process(target = proc_1, args = (pipe[0], )) # 将第二个pipe对象传给进程2 p2 = mp.Process(target = proc_2, args = (pipe[1], )) p2.start() p1.start() p2.join() p1.join()
交换进程 结果是一样的,[1]和[0]
只能同一机器上运行
运行结果
三、进程同步(condition)例子:生产者和消费者
#encoding=utf-8import multiprocessing as mpimport threadingimport timedef consumer(cond): with cond: print("consumer before wait") cond.wait() # 等待消费 print("consumer after wait")def producer(cond): with cond: print("producer before notifyAll") cond.notify_all() # 通知消费者可以消费了 print("producer after notifyAll")if __name__ == '__main__': p1 = mp.Process(name = "p1", target = consumer, args=(condition,)) p2 = mp.Process(name = "p2", target = consumer, args=(condition,)) p3 = mp.Process(name = "p3", target = producer, args=(condition,)) p1.start() time.sleep(2) p2.start() time.sleep(2) p3.start()
运行结果
四、进程间共享数字变量
变量之间不影响
进程:文本区() 数据区 堆栈
主进程和子进程存的变量地址不一样,所以不会互相影响
变量之间影响了
#encoding=utf-8from multiprocessing import Process, Value, Arraydef f(n, a): n.value = n.value+1 for i in range(len(a)): a[i] = -a[i]if __name__ == '__main__': num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0;d表示double双精度 arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10] p = Process(target = f, args = (num, arr)) p.start() p.join() p = Process(target = f, args = (num, arr)) p.start() p.join() print(num.value) # 获取共享变量num的值 print(arr[:])
五、加锁,避免冲突
#encoding=utf-8import timefrom multiprocessing import Process, Value, Lockclass Counter(object): def __init__(self, initval = 0): self.val = Value('i', initval) self.lock = Lock() def increment(self): with self.lock: self.val.value += 1 # 共享变量自加1 #print(“increment one time!”,self.value() ) #加此句死锁 def value(self): with self.lock: return self.val.valuedef func(counter): for i in range(50): time.sleep(0.01) counter.increment()if __name__ == '__main__': counter = Counter(0) procs = [Process(target = func, args = (counter,)) for i in range(10)] # 等价于 # for i in range(10): # Process(target = func, args = (counter,)) for p in procs: p.start() for p in procs: p.join() print(counter.value())
运行结果:
六、共享字符串
#encoding=utf-8from multiprocessing import Process, Manager, Valuefrom ctypes import c_char_pdef greet(shareStr): shareStr.value = shareStr.value + ", World!"if __name__ == '__main__': manager = Manager() shareStr = manager.Value(c_char_p, "Hello") #固定写法 process = Process(target = greet, args = (shareStr,)) process.start() process.join() print(shareStr.value)
运行结果:
七、共享列表和字典
#encoding=utf-8from multiprocessing import Process, Managerdef f( shareDict, shareList ): shareDict[1] = '1' shareDict['2'] = 2 shareDict[0.25] = None shareList.reverse() # 翻转列表if __name__ == '__main__': manager = Manager() shareDict = manager.dict() # 创建共享的字典类型 shareList = manager.list( range( 10 ) ) # 创建共享的列表类型 p = Process( target = f, args = ( shareDict, shareList ) ) p.start() p.join() print(shareDict)print(shareList)
运行结果:
八、共享实例对象
#encoding=utf-8import time, osimport randomfrom multiprocessing import Pool, Value, Lock, Managerfrom multiprocessing.managers import BaseManagerclass MyManager(BaseManager): passdef Manager(): m = MyManager() m.start() return mclass Counter(object): def __init__(self, initval=0): self.val = Value('i', initval) self.lock = Lock() def increment(self): with self.lock: self.val.value += 1 def value(self): with self.lock: return self.val.value#将Counter类注册到Manager管理类中MyManager.register('Counter', Counter)def long_time_task(name,counter): time.sleep(0.2) print('Run task %s (%s)...\n' % (name, os.getpid())) start = time.time() #time.sleep(random.random() * 3) for i in range(50): time.sleep(0.01) counter.increment() end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__': manager = Manager() # 创建共享Counter类实例对象的变量,Counter类的初始值0 counter = manager.Counter(0) print('Parent process %s.' % os.getpid()) p = Pool() for i in range(5): p.apply_async(long_time_task, args = (str(i), counter)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') print(counter.value())
运行结果: