python下Pool与target方法写在同一个类里要注意的坑
admin
2023-07-09 16:44:21
0

在工作中遇到要对开发的接口做压力测试,以前没有做过开清楚什么压测工具好用,正好接口不是什么复杂的接口,curl -X post "接口地址" --data-binary @二进制认证文件 OK!(@表示验证数据是文件类型)

既然这样那我就写个脚本好了,脚本内容如下:

======================================================================
#!/usr/bin/evn python
#_coding:utf8_
from multiprocessing import Pool,Queue
import time,subprocess,os
class YaCe(object):
def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):
self.api = api
self.binfile = binfile
self.status = status
self.maxpool = maxpool
self.maxrequest = maxrequest
self.qu = qu
def prorequest(self):
for i in range(self.maxrequest):
self.qu.put(i)
print(i)
for i in range(int(self.maxpool)):
self.qu.put(None)
print("None")

def conumers(self,i):
    while True:
        data = self.qu.get(True)
        if data == None:
            print("进程%s任务完成..."%i)
            break
        else:
            command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            if self.status == "success":
                logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                if "CgoyMDAwMDAwMDAw" in command:
                    print("进程%s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write(command+"\n")
                    f.close()
                else:
                                        print("进程%s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write("Faild\n")
                        f.write(command+"\n")
                    f.close()
            else:
                logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time
                #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                if "CAES+" in command:
                    print("进程%s__%s..."%(str(i),str(data)))
                    info = command.split('\n')[-3:]
                    info1 = "\n".join(info)
                    with open(logfile,"a") as f:
                        f.write(info1+"\n")
                    f.close()
                else:
                    print("进程%s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write("Faild\n")
                        f.write(command+"\n")
                    f.close()
def multirun(self):
    ps = int(int(self.maxpool) - 1)
    p = Pool(ps)
    for i in range(self.maxpool):
        print("开启子进程%s"%i)
        p.apply_async(self.conumers,args=(self,i))
    print('等待所有添加的进程运行完毕。。。')
    p.close()
    p.join()
    endtime = time.strftime("%Y%m%d_%X",time.localtime())
    if self.status == "success":
        logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
    else:
        logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time
    with open(logfile,"a") as f:
        f.write("============[%s]============\n"%endtime)
    f.close()
    print('End!!,PID:%s'% os.getpid())

if name == "main":
q = Queue()
Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))
Yc.prorequest()
print("++++++")
global date_time
datetime = time.strftime("%Y%m%d%X",time.localtime())
Yc.multirun()

====================================================================================
问题
到这里写完了,测试的问题来了,从脚本来看如果运行成功,会有多进程在处理队列的输出,可是结果的输出确是如下:

01
2
3
4
5
6
7
8
9
None
None
++++++
开启子进程0
开启子进程1
等待所有添加的进程运行完毕。。。
End!!,PID:4819

原因
子进程conumers方法完全没有运行,也没有报错这就尴尬了;查了大量的文档资料;发现这个pool方法都使用了queue.Queue将task传递给工作进程。multiprocessing必须将数据序列化以在进程间传递。方法只有在模块的顶层时才能被序列化,跟类绑定的方法不能被序列化,就会出现上面的异常 ; 那肿么办,我不是一个轻易放弃的人,终于被我找到了方法;

注意
解决方作者是在python3下测试了,python2下用脚本的subprocess要换成value,command = commands.getstatusoutput

解决方法1(亲测)
1.首先要看报错,需要对脚本修改如下:
YaCe类下的multirun方法下修改
for i in range(self.maxpool):
print("开启子进程%s"%i)
p.apply_async(self.conumers,args=(self,i))

for i in range(self.maxpool):
print("开启子进程%s"%i)
res = p.apply_async(self.conumers,args=(self,i))
print(res.get)
这就可以看到报错:
cPickle.PicklingError: Can't pickle : attribute lookup builtin.instancemethod failed

2.解决方法如下在脚本中加一个新的函数
(1).def conumers_wrapper(cls_instance,i):
return cls_instance.conumers(i)

(2).修改YaCe下multirun方法
for i in range(self.maxpool):
print("开启子进程%s"%i)
res = p.apply_async(self.conumers,args=(self,i))
print(res.get())

for i in range(self.maxpool):
print("开启子进程%s"%i)
res = p.apply_async(conumers_wrapper,args=(self,i))
print(res.get)

问题解决了,运行一下脚本结果还有报错:
RuntimeError: Queue objects should only be shared between processes through inheritance

原因
这里不可以用Queue,要改用Manager.Queue;因为进程之前的同共离用Queue会用问题;

完结
最终代码如下:

==================================================================================
#!/usr/bin/evn python
#_coding:utf8_
from multiprocessing import Pool,Queue,Manager
import time,subprocess,os
class YaCe(object):
def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):
self.api = api
self.binfile = binfile
self.status = status
self.maxpool = maxpool
self.maxrequest = maxrequest
self.qu = qu
def prorequest(self):
for i in range(self.maxrequest):
self.qu.put(i)
print(i)
for i in range(int(self.maxpool)):
self.qu.put(None)
print("None")

def conumers(self,i):
    while True:
        data = self.qu.get(True)
        if data == None:
            print("进程%s任务完成..."%i)
            break
        else:
            #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            #command = subprocess.getoutput("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
            if self.status == "success":
                logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                if "CgoyMDAwMDAwMDAw" in command:
                    print("进程%s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write(command+"\n")
                    f.close()
                else:
                    with open(logfile,"a") as f:
                        f.write("Faild\n")
                        f.write(command+"\n")
                    f.close()
            else:
                logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time
                #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                #command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                if "CAES+" in command:
                    print("进程%s__%s..."%(str(i),str(data)))
                    info = command.split('\n')[-3:]
                    info1 = "\n".join(info)
                    with open(logfile,"a") as f:
                        f.write(info1+"\n")
                    f.close()
                else:
                    print("进程%s__%s..."%(str(i),str(data)))
                    with open(logfile,"a") as f:
                        f.write("Faild\n")
                        f.write(command+"\n")
                    f.close()
def multirun(self):
    ps = int(int(self.maxpool) - 1)
    p = Pool(ps)
    for i in range(self.maxpool):
        print("开启子进程%s"%i)
        p.apply_async(conumers_wrapper,args=(self,i))
    #print(res.get)
    print('等待所有添加的进程运行完毕。。。')
    p.close()
    p.join()
    endtime = time.strftime("%Y%m%d_%X",time.localtime())
    if self.status == "success":
        logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
    else:
        logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time
    with open(logfile,"a") as f:
        f.write("============[%s]============\n"%endtime)
    f.close()
    print('End!!,PID:%s'% os.getpid())

def conumers_wrapper(cls_instance,i):
return cls_instance.conumers(i)

if name == "main":
q = Manager().Queue()
Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))
Yc.prorequest()
print("++++++")
global date_time
datetime = time.strftime("%Y%m%d%X",time.localtime())
Yc.multirun()

相关内容

热门资讯

美伊谈判濒临破裂之际,伊朗议长... 因为以色列持续对黎巴嫩进行军事打击,伊朗宣布暂停同美国的谈判。不过美国总统特朗普称,对话仍在继续。谈...
罕见!以军政策发生“重大转变” 新华社北京6月1日电 题:罕见纵深推进,以军对黎行动会否搅动美伊谈判新华社记者刘品然 阚静文 席玥以...
山西太原发现一处新石器遗址,出... 山西省太原市文物保护研究院协同相关科研机构,近期在太原市阳曲县西盘威村发现一处新石器时代重要遗址——...
伊媒发布穆杰塔巴罕见照片 伊朗塔斯尼姆通讯社6月1日发布了一张最高领袖穆杰塔巴的照片。照片中,穆杰塔巴面露笑容,抱着一个婴儿。...
福建“泡药杨梅”曝光后,浙江杨... 这两天,浙江本地杨梅少量进入市场。虽然受到此前福建 “泡药杨梅” 事件影响,市场整体销量相比去年同期...
尺素金声 | 前4月规上工业企... 5月27日,国家统计局发布最新数据显示,今年前4月,全国规上工业企业实现利润同比增长18.2%,增速...
郑丽文:台湾民众越来越了解“台... 针对台湾《联合报》民调显示,63%受访者民意希望维持现状,即将访美的中国国民党主席郑丽文1日表示,民...
美前副总统:共和党失去了方向,... 2026年是美国的中期选举年,共和党选情不利,可能在年底的选举中遭遇挫败。美国前副总统彭斯5月31日...
南枝原来去过中国?《给阿嬷的情... 《给阿嬷的情书》票房口碑双丰收,目前票房已突破13亿。凤凰卫视最新一期《问答神州》专访了该片导演蓝鸿...
法国海军扣押一艘俄“影子舰队”... 近日,法国海军在大西洋海域扣押了一艘据称从俄罗斯摩尔曼斯克出发的油轮,引发俄方强烈不满。俄新社6月1...