python进程使用Queue和Pipe通信
admin
2023-01-20 06:40:51
0

背景

当使用多个线程操作任务的时候,如果线程间有需要通信的地方,那么不可避免的要实现到线程间的通信,来互相通知消息,同步任务的执行。

一.通信

1.线程threading共享内存地址,进程与进程Peocess之间相互独立,互不影响(相当于深拷贝);

2.在线程间通信的时候可以使用Queue模块完成,进程间通信也可以通过Queue完成,但是此Queue并非线程的Queue,进程间通信Queue是将数据 pickle 后传给另一个进程的 Queue,用于父进程与子进程之间的通信或同一父进程的子进程之间通信;

queue

python中的queue模块其实是对数据结构中栈和队列这种数据结构的封装,把抽象的数据结构封装成类的属性和方法

使用Queue线程间通信:


1

2

3

4

5

#导入线程相关模块

import threading

import queue  

 

q = queue.Queue()

 

使用Queue进程间通信,适用于多个进程之间通信:


1

2

3

4

5

# 导入进程相关模块

from multiprocessing import Process

from multiprocessing import Queue

 

q = Queue()

 

使用Pipe进程间通信,适用于两个进程之间通信(一对一):


1

2

3

4

5

# 导入进程相关模块

from multiprocessing import Process

from multiprocessing import Pipe

 

pipe = Pipe()

 

 

二.python进程间通信Queue/Pipe使用

python提供了多种进程通信的方式,主要Queue和Pipe这两种方式,Queue用于多个进程间实现通信,Pipe用于两个进程的通信;

1.使用Queue进程间通信,Queue包含两个方法:

  • put():以插入数据到队列中,他还有两个可选参数:blocked和timeout。详情自行百度

  • get():从队列读取并且删除一个元素。同样,他还有两个可选参数:blocked和timeout。详情自行百度


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

# !usr/bin/env python

# -*- coding:utf-8 _*-

"""

@Author:何以解忧

@Blog(个人博客地址): shuopython.com

@WeChat Official Account(微信公众号):猿说python

@Github:www.github.com

 

@File:python_process_queue.py

@Time:2019/12/21 21:25

 

@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!

"""

 

from multiprocessing import Process

from multiprocessing import Queue

import os,time,random

 

#写数据进程执行的代码

def proc_write(q,urls):

    print ('Process is write....')

    for url in urls:

        q.put(url)

        print ('put %s to queue... ' %url)

        time.sleep(random.random())

 

#读数据进程的代码

def proc_read(q):

    print('Process is reading...')

    while True:

        url = q.get(True)

        print('Get %s from queue' %url)

 

if __name__ == '__main__':

    #父进程创建Queue,并传给各个子进程

    q = Queue()

    proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3']))

    proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6']))

    proc_reader = Process(target=proc_read,args=(q,))

    #启动子进程,写入

    proc_write1.start()

    proc_write2.start()

 

    proc_reader.start()

    #等待proc_write1结束

    proc_write1.join()

    proc_write2.join()

    #proc_raader进程是死循环,强制结束

    proc_reader.terminate()

    print("mian")

输出结果:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Process is write....

put url_1 to queue...

Process is write....

put url_4 to queue...

Process is reading...

Get url_1 from queue

Get url_4 from queue

put url_5 to queue...

Get url_5 from queue

put url_2 to queue...

Get url_2 from queue

put url_3 to queue...

Get url_3 from queue

put url_6 to queue...

Get url_6 from queue

mian

 

2.使用Pipe进程间通信

Pipe常用于两个进程,两个进程分别位于管道的两端 * Pipe方法返回(conn1,conn2)代表一个管道的两个端,Pipe方法有duplex参数,默认为True,即全双工模式,若为FALSE,conn1只负责接收信息,conn2负责发送,Pipe同样也包含两个方法:

send() : 发送信息;

recv() : 接收信息;


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

from multiprocessing import Process

from multiprocessing import Pipe

import os,time,random

#写数据进程执行的代码

def proc_send(pipe,urls):

    #print 'Process is write....'

    for url in urls:

 

        print ('Process is send :%s' %url)

        pipe.send(url)

        time.sleep(random.random())

 

#读数据进程的代码

def proc_recv(pipe):

    while True:

        print('Process rev:%s' %pipe.recv())

        time.sleep(random.random())

 

if __name__ == '__main__':

    #父进程创建pipe,并传给各个子进程

    pipe = Pipe()

    p1 = Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10) ]))

    p2 = Process(target=proc_recv,args=(pipe[1],))

    #启动子进程,写入

    p1.start()

    p2.start()

 

    p1.join()

    p2.terminate()

    print("mian")

输出结果:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Process is send :url_0

Process rev:url_0

Process is send :url_1

Process rev:url_1

Process is send :url_2

Process rev:url_2

Process is send :url_3

Process rev:url_3

Process is send :url_4

Process rev:url_4

Process is send :url_5

Process is send :url_6

Process is send :url_7

Process rev:url_5

Process is send :url_8

Process is send :url_9

Process rev:url_6

mian

 

三.测试queue.Queue来完成进程间通信能否成功?

当然我们也可以尝试使用线程threading的Queue是否能完成线程间通信,示例代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

from multiprocessing import Process

# from multiprocessing import Queue     # 进程间通信Queue,两者不要混淆

import queue                            # 线程间通信queue.Queue,两者不要混淆

import time

 

def p_put(q,*args):

    q.put(args)

    print('Has put %s' % args)

 

 

def p_get(q,*args):

    print('%s wait to get...' % args)

 

    print(q.get())

    print('%s got it' % args)

 

 

 

 

if __name__ == "__main__":

    q = queue.Queue()

    p1 = Process(target=p_put, args=(q,'p1', ))

    p2 = Process(target=p_get, args=(q,'p2', ))

    p1.start()

    p2.start()

直接异常报错:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Traceback (most recent call last):

  File "E:/Project/python_project/untitled10/123.py", line 38, in <module>

    p1.start()

  File "G:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start

    self._popen = self._Popen(self)

  File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen

    return _default_context.get_context().Process._Popen(process_obj)

  File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen

    return Popen(process_obj)

  File "G:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__

    reduction.dump(process_obj, to_child)

  File "G:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump

    ForkingPickler(file, protocol).dump(obj)

TypeError: can't pickle _thread.lock objects



相关内容

热门资讯

欧盟想对付中国汽车,英国“躺枪... 【文/观察者网 潘昱辰 编辑/高莘】据英国《金融时报》报道,3月4日,欧盟委员会正式公布《工业加速器...
最便宜的苹果笔记本!MacBo... 快科技3月7日消息,苹果本周正式推出了全新的入门级笔记本电脑MacBook Neo,官方起售价定为4...
干将新材料取得风味保持剂混料处... 国家知识产权局信息显示,干将新材料有限公司取得一项名为“一种风味保持剂混料处理机构”的专利,授权公告...
刚刚,Gemini攻克「宇宙弦... 新智元报道 编辑:定慧 【新智元导读】就在刚刚,Google Research团队用Gemini ...
华为无线专家:打造一张面向智能... 文/观察者网 吕栋 移动AI的发展异常迅猛,人类社会正快速迈入智能体互联网时代。 如今在中国,超...
算电协同首次被写入政府工作报告... 来源:证券日报网 “算电协同”被首次写入政府工作报告。3月5日,政府工作报告提出,“实施超大规模智算...
原创 天... 黑洞奇点奠定物理基础 罗杰·彭罗斯1931年出生在英国埃塞克斯郡科尔切斯特,从小家里就满是科学氛...
美军打伊朗是为了“世界末日,耶... 【文/观察者网 阮佳琪】美以伊战事紧张之际,本以为特朗普故技重施,找来一堆牧师围着自己“发功祈祷”,...
伊朗:最高领袖选举会议将在未来... 新华社德黑兰3月7日电 据伊朗伊斯兰革命卫队7日发布的消息,一名伊朗专家会议成员称,选举伊朗最高领袖...
腾讯QQ正式接入OpenCla... 3月7日,腾讯宣布为QQ新增AI生态能力,用户现可通过官方渠道将OpenClaw智能体接入QQ机器人...