博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python_多进程_进程间的同步
阅读量:5334 次
发布时间:2019-06-15

本文共 6346 字,大约阅读时间需要 21 分钟。

进程间的同步

一、进程同步_信号传递(Event)

#encoding=utf-8import multiprocessingimport timedef wait_for_event(e):#无超时等待    """Wait for the event to be set before doing anything"""    print('wait_for_event: starting')    e.wait() # 等待收到能执行信号,如果一直未收到将一直阻塞    print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):#有超时等待    """Wait t seconds and then timeout"""     print('wait_for_event_timeout: starting')     e.wait(t)# 等待t秒超时,此时Event的状态仍未未设置,继续执行     print('wait_for_event_timeout: e.is_set()->', e.is_set())     e.set()# 初始内部标志为真 if __name__ == '__main__':     e = multiprocessing.Event()     print("begin,e.is_set()", e.is_set())#默认值是false     w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))     w1.start()          #可将2改为5,看看执行结果     w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))      w2.start()     print('main: waiting before calling Event.set()')     time.sleep(3)     # e.set()   #可注释此句话看效果,要有set才能在wait的时候继续执行     print('main: event is set')
逻辑:函数a-wait_for_event:e.wait():死等,等别的进程调用e.set(),才会继续执行函数b-wait_for_event_timeoute.wait(t):死等t秒,等别的进程调用e.set()或经过t秒,才会继续执行,继续执行了,就会调用e.set(),实现函数a就可以从死等状态变为继续执行执行状态主程序:声明了一个信号对象e启动了进程a,执行函数a启动了进程b,执行函数b然后就结束了 只要有一个e.set(),所有死等的都开始运行了 运行结果:

二、管道pip(只能用2个进程之间进行通信)

Socket可以不同进程间的通讯

#encoding=utf-8import multiprocessing as mpdef proc_1(pipe):    pipe.send('hello')#发消息    print('proc_1 received: %s' %pipe.recv())#收消息,没有消息就是死等,可跨进程    pipe.send("what is your name?")    print('proc_1 received: %s' %pipe.recv())def proc_2(pipe):    print('proc_2 received: %s' %pipe.recv())    pipe.send('hello, too')    print('proc_2 received: %s' %pipe.recv())    pipe.send("I don't tell you!")if __name__ == '__main__':  # 创建一个管道对象pipe  pipe = mp.Pipe()#声明管道  print(len(pipe))  print(type(pipe))  # 将第一个pipe对象传给进程1  p1 = mp.Process(target = proc_1, args = (pipe[0], ))  # 将第二个pipe对象传给进程2  p2 = mp.Process(target = proc_2, args = (pipe[1], ))  p2.start()  p1.start()  p2.join()  p1.join()

交换进程  结果是一样的,[1]和[0]

只能同一机器上运行

 运行结果

 

三、进程同步(condition)例子:生产者和消费者

#encoding=utf-8import multiprocessing as mpimport threadingimport timedef consumer(cond):  with cond:    print("consumer before wait")    cond.wait() # 等待消费    print("consumer after wait")def producer(cond):  with cond:    print("producer before notifyAll")    cond.notify_all() # 通知消费者可以消费了    print("producer after notifyAll")if __name__ == '__main__':         p1 = mp.Process(name = "p1", target = consumer, args=(condition,))  p2 = mp.Process(name = "p2", target = consumer, args=(condition,))  p3 = mp.Process(name = "p3", target = producer, args=(condition,))  p1.start()  time.sleep(2)  p2.start()  time.sleep(2)  p3.start()

运行结果

四、进程间共享数字变量

变量之间不影响

进程:文本区() 数据区  堆栈

主进程和子进程存的变量地址不一样,所以不会互相影响

 

变量之间影响了

#encoding=utf-8from multiprocessing import Process, Value, Arraydef f(n, a):    n.value = n.value+1    for i in range(len(a)):        a[i] = -a[i]if __name__ == '__main__':    num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0;d表示double双精度    arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]    p = Process(target = f, args = (num, arr))    p.start()    p.join()    p = Process(target = f, args = (num, arr))    p.start()    p.join()    print(num.value) # 获取共享变量num的值    print(arr[:])

五、加锁,避免冲突

#encoding=utf-8import timefrom multiprocessing import Process, Value, Lockclass Counter(object):    def __init__(self, initval = 0):        self.val = Value('i', initval)        self.lock = Lock()    def increment(self):        with self.lock:            self.val.value += 1 # 共享变量自加1            #print(“increment one time!”,self.value() )               #加此句死锁    def value(self):        with self.lock:            return self.val.valuedef func(counter):    for i in range(50):        time.sleep(0.01)        counter.increment()if __name__ == '__main__':    counter = Counter(0)    procs = [Process(target = func, args = (counter,)) for i in range(10)]    # 等价于    # for i in range(10):      # Process(target = func, args = (counter,))    for p in procs: p.start()    for p in procs: p.join()    print(counter.value())

运行结果:

六、共享字符串

#encoding=utf-8from multiprocessing import Process, Manager, Valuefrom ctypes import c_char_pdef greet(shareStr):    shareStr.value = shareStr.value + ", World!"if __name__ == '__main__':    manager = Manager()    shareStr = manager.Value(c_char_p, "Hello") #固定写法    process = Process(target = greet, args = (shareStr,))    process.start()    process.join()    print(shareStr.value)

运行结果:

七、共享列表和字典

#encoding=utf-8from multiprocessing import Process, Managerdef f( shareDict, shareList ):    shareDict[1] = '1'    shareDict['2'] = 2    shareDict[0.25] = None    shareList.reverse() # 翻转列表if __name__ == '__main__':    manager = Manager()    shareDict = manager.dict() # 创建共享的字典类型    shareList = manager.list( range( 10 ) ) # 创建共享的列表类型    p = Process( target = f, args = ( shareDict, shareList ) )    p.start()    p.join()    print(shareDict)print(shareList)

运行结果:

八、共享实例对象

#encoding=utf-8import time, osimport randomfrom multiprocessing import Pool, Value, Lock, Managerfrom multiprocessing.managers import BaseManagerclass MyManager(BaseManager):   passdef Manager():    m = MyManager()    m.start()    return mclass Counter(object):    def __init__(self, initval=0):        self.val = Value('i', initval)        self.lock = Lock()    def increment(self):        with self.lock:            self.val.value += 1    def value(self):        with self.lock:            return self.val.value#将Counter类注册到Manager管理类中MyManager.register('Counter', Counter)def long_time_task(name,counter):    time.sleep(0.2)    print('Run task %s (%s)...\n' % (name, os.getpid()))    start = time.time()    #time.sleep(random.random() * 3)    for i in range(50):        time.sleep(0.01)        counter.increment()    end = time.time()    print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__':    manager = Manager()    # 创建共享Counter类实例对象的变量,Counter类的初始值0    counter = manager.Counter(0)     print('Parent process %s.' % os.getpid())    p = Pool()    for i in range(5):        p.apply_async(long_time_task, args = (str(i), counter))    print('Waiting for all subprocesses done...')    p.close()    p.join()    print('All subprocesses done.')    print(counter.value())

运行结果:

 

转载于:https://www.cnblogs.com/rychh/articles/11374774.html

你可能感兴趣的文章
CSS 透明度级别 及 背景透明
查看>>
Linux 的 date 日期的使用
查看>>
PHP zip压缩文件及解压
查看>>
SOAP web service用AFNetWorking实现请求
查看>>
Java变量类型,实例变量 与局部变量 静态变量
查看>>
mysql操作命令梳理(4)-中文乱码问题
查看>>
Python环境搭建(安装、验证与卸载)
查看>>
一个.NET通用JSON解析/构建类的实现(c#)
查看>>
Windows Phone开发(5):室内装修 转:http://blog.csdn.net/tcjiaan/article/details/7269014
查看>>
详谈js面向对象 javascript oop,持续更新
查看>>
关于这次软件以及pda终端的培训
查看>>
jQuery上传插件Uploadify 3.2在.NET下的详细例子
查看>>
如何辨别一个程序员的水平高低?是靠发量吗?
查看>>
新手村之循环!循环!循环!
查看>>
正则表达式的用法
查看>>
线程安全问题
查看>>
SSM集成activiti6.0错误集锦(一)
查看>>
下拉刷新
查看>>
linux的子进程调用exec( )系列函数
查看>>
MSChart的研究
查看>>