Python parallel 多进程
目录
要点: 并发能大幅节省时间,特别是单细胞生信分析过程中的平行耗时任务。并发方式有 多进程、多线程、分布式等。
Python多进程技术
并发执行能最大限度使用硬件资源,缩短项目运行总时间。
一个进程 process 至少包含一个线程 thread。由于GIL(the Global Interpreter Lock, 全局解释器锁)的存在,python是“伪多线程”的,同一个时刻只能使用一个CPU。而多进程才能实质上利用多核,达到缩短项目总时间的目的。
python中的多进程主要使用到 multiprocessing 这个库。
from multiprocessing import Process
自定义简陋多进程包 ParallelMe.py
这是我的Python功力鼎盛时期写的并发脚本,反复改写,最后抽象为ParallelMe类。 进程池控制多进程worker函数数量,队列Queue实现读写进程间通信,writer进程把执行结果记录到日志文件。
适用情况:批量平行的耗时任务,如多文件的STAR、samtools等,以及请求不能太密集的批量下载任务,如wget等。
该类只能在linux上运行。使用简单,记住 ParallelMe三步曲 即可。
使用方法简单,只需要import引用,定义好 1)单个id可运行的自定义函数,返回需要的结果,或者直接函数内部做IO,返回空字符串或者返回id;建议用绝对路径; 2)id列表文件 3)定义好输出日志,里面记录着id和自定义函数的返回值。 难点:第二步 自定义函数 doLinuxCMD(id),这就是要扔给worker们执行的繁重任务。保证单id能运行。 函数内完成主要的输入和输出,函数仅提供所需的id,返回并log执行结果的状态,0表示执行成功。 易错点:自定义函数是否需要临时文件夹?如果需要,还需要继续编码解决该问题,否则还会混乱。 需要考虑硬件限制,合理设置进程池规模,防止内存和CPU爆满卡死。 为了防止切换带来的额外花销,进程池的进程数量不能大于CPU核数。
I. 引入类文件 ParallelMe.py
# define a class: ParallelMe[Run on Linux only!]
# version: 0.2.1
import subprocess
import time,multiprocessing,os,re,random,datetime
class ParallelMe(object):
#初始化属性
def __init__(self, doLinuxCMD_fn, id_list_file, core=3, hint_n=5,log_file_name='logs.txt',uniqLogName=False):
"""
待批量处理的函数 doLinuxCMD_fn 仅仅依赖一个id,然后id的list在文件 id_list_file 中提供。
if hint_n定义几次,就出几次进度提示,默认提示5次,最小是1次;
uniqLogName==True时,会对log文件加时间戳后缀,默认不加时间戳;
"""
# 必选值
self.doLinuxCMD=doLinuxCMD_fn
self.id_list_file=id_list_file
#默认值
self.core=core #使用的CPU逻辑核心数。该数字 x linux命令使用的进程数 要小于硬件CPU逻辑核心数
self.hint_n=hint_n;
self.log_file_name=log_file_name; #输出日志的文件名,内容是: id号 运行状态(0表示正常,否则表示异常)
#需要处理
#self.uniqLogName=uniqLogName;#这个文件名要加上时间戳,防止忘了修改日志文件名而被覆盖掉
if uniqLogName:
timsString=time.strftime("%Y%m%d-%H%M%S", time.localtime())
self.log_file_name=log_file_name + timsString
#
#self.q=Queue(core+5) #创建队列
self.queue=multiprocessing.Manager().Queue(core+5);
#定义worker: 读和处理数据,并行
def worker(self, cb):
#print("worker===> ",cb, os.getpid() );
#一个很耗时的计算
rs=str(cb)+"\t"+str( self.doLinuxCMD(cb) ) #part2 中定义的
self.queue.put(rs) #结果输出到管道
#保存的进程1个
def writer(self,log_file_name,ID_total,hint_n=10):
hit_n=int(hint_n);
if hint_n<1:
hint_n=1;
breaks=int(ID_total/hint_n) #显示hint_n次进度提示
i=0
with open(log_file_name, 'w') as f: #这里不能是变量名?
while True:
if i%breaks==0 or i==ID_total: #进度条
print(i," items processed in ", round(time.time()-self.start, 2)," seconds",sep="")
# 如果所有条目都保存过了,则退出死循环
if(i==ID_total):
break;
i+=1
rs=self.queue.get() #waite while queue is empty
f.write(rs+"\n") #写入文件
f.flush() #刷新缓存,一次性输出到文件
#主进程: 向进程池中提交任务,交给并行的worker()来处理
def main(self):
#1. 声明进程池对象
pool=multiprocessing.Pool(self.core)
#2. 读取id_list_file文件,分配任务给进程
fr=open(self.id_list_file,'r')
lines=fr.readlines();
ID_total=len(lines);
for lineR in lines:
line=lineR.strip()
arr=re.split(' ',line) ##print("start new process", line) #任务是一次发送完的
pool.apply_async( self.worker, args=(arr[0],) )
fr.close() #关闭文件
#3. 分完任务,开始启动保存进程,由writer()函数来处理
pOut = multiprocessing.Process(target=self.writer, args=(self.log_file_name,ID_total,self.hint_n,)) # args:元组参数,如果参数就一个,记得加逗号’,’
pOut.start()
#4. 等待读进程worker()全部结束
pool.close()
pool.join()
#5. 等待写循环结束
pOut.join()
#运行
def run(self):
#输出运行参数
self.start=time.time();#启动时的时间
print("function name:", self.doLinuxCMD);
print("id list file:", self.id_list_file);
print('CPU core number:', self.core);
print('hint number:', self.hint_n);
print('log_file_name:', self.log_file_name);
#
print("#"*40,'\n',datetime.datetime.now(),"\n","#"*40, sep="")
print('='*10, ">Begin of main process[", os.getpid(), "][child pid by parent ppid]", sep="")
self.main(); #开启多进程
print(time.time()-self.start,'s <', '='*10, "End of main process[", os.getpid(),']', sep="")
print("#"*40,"\nLog file: "+ os.getcwd()+"/"+self.log_file_name,"\n","#"*40, sep="")
# end of class
# 引入类
import sys
sys.path.append("/home/wangjl/test/")
# 该类和demo.py在同一个文件夹内直接import,如果在其他文件夹,需要先添加类所在路径。
# from ParallelMe import ParallelMe
II. 测试实例 demo.py (使用 ParallelMe 三步曲)
import os,subprocess,random
from ParallelMe import ParallelMe
##part 1 定义路径
print('Step1> define path: ',os.getcwd())
os.chdir('/home/wangjl/test') #定义工作目录,仅对python有效。对linux命令建议都使用绝对路径。
#part 2 定义linux命令,返回字符串,会被记录到日志文件中。
print("Step2> define the function to be run parallelly: doLinuxCMD(id)")
#目的:需要平行处理的linux命令。if the function can run on one id, it can run on a list of ids.
#要点: 使用id拼接linux命令。建议都用绝对路径。
def doLinuxCMD(id):
#构建命令,很复杂的linux命令
#这个linux命令为休眠一段时间。可以是linux脚本,有输入和输出,建议用绝对路径。
cmd="sleep "+str(random.randint(0,4));
#执行linux命令
(status, output)=subprocess.getstatusoutput(cmd)
#print(output) #查看linux命令输出到屏幕上的文字
rs=str(status)+"\t"+output# +"\t"+str(os.getpid())+"\t"+str(os.getppid());
return rs #返回状态码status,0表示命令正常执行,其他表示异常,需要查看output推测具体原因
#test
#doLinuxCMD(1) #status output pid ppid
#part 3 批量运行该linux命令
print("Step3> run the function parallelly");
#doLinuxCMD为函数,要有str返回值
#id_list为id列表文本文件名,一个id一行。建议用绝对路径
#core为并行个数(默认是3),要小于CPU个数,但是超过id总个数也没有意义;
id_list="/home/wangjl/test/a.txt"
myTasks=ParallelMe(doLinuxCMD, id_list, core=55);
myTasks.run()
III. 运行与输出、执行状态检查
## id list就是一行一个id号即可
$ head /home/wangjl/test/a.txt
1
2
3
...
50
数字或字符串,文件或任务的uniq标识。
$ python demo.py
## 输出:
Step3> run the function parallelly
function name:
id list file: /home/wangjl/test/a.txt
CPU core number: 55
hint number: 5
log_file_name: logs.txt
########################################
2025-03-26 22:09:46.501692
########################################
==========>Begin of main process[271161][child pid by parent ppid]
0 items processed in 0.31 seconds
10 items processed in 1.31 seconds
20 items processed in 1.33 seconds
30 items processed in 2.33 seconds
40 items processed in 3.33 seconds
50 items processed in 4.33 seconds
4.480337619781494s <==========End of main process[271161]
########################################
Log file: /home/wangjl/test/logs.txt
########################################
log文件第一列是任务id,第二列是运行状态,0表示成功执行。 检查log文件第二列,人工复查非0的id任务数量:
$ awk '{print $2}' /home/wangjl/test/logs.txt | sort | uniq -c
## 50 0
分布式
并发 参考资料
https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376 Python多进程和多线程(跑满CPU) https://blog.csdn.net/qq_40317897/article/details/89921083 多核CPU上python多线程并行的一个假象(转) https://blog.csdn.net/weixin_33922672/article/details/86234859