Python script 脚本化
要点: 脚本化是一种对用户友好的封装,不仅能提高程序的健壮性,还能很容易的集成到分析流程中。
脚本化,就是把程序写成有输入和输出的独立程序文件。就像shell的 ls 命令, 比对软件 STAR 一样,用户只需要调用脚本名字、给出参数,程序就能判断参数是否合法,并返回结果。把程序像黑盒一样封装起来,方便用户使用,还能轻易整合到 snakemake 等流程中。
接收参数可以使用内置模块 sys.argv 或 argparse,以及第三方模块 click。
sys.argv 的写法太死板,只有1-2个参数可以使用,多了就太麻烦了。
我最喜欢的是 argparse 模块,这个模块是内置模块,但是功能完全够用,非常方便。
功能最全面的的是click,数据格式检查、转换都自动化完成了。
脚本内部把参数列表保存在数组 sys.argv 中,下标0是该脚本文件名本身,下标1是第一个参数,以此类推。
获取的参数是字符串形式,要做算术运算需要主动转为数字形式,比如 int(str2)。
一般还需要验证参数,比如个数、类型是否正确,输出文件是否已经存在等。
# 实例(Ubuntu 20.04 + Python 3.7.0)
#例1: 接收参数,并对第一个参数加100后输出
$ cat t1.py
import sys
arr=sys.argv;
print("Para length=", len(arr),"; arr=", arr)
print("paras:", arr[0], arr[1], arr[2])
n1=int(arr[1]) + 100 #刚获取的参数是字符串,要转为需要的格式
print("n1=", n1)
运行该脚本,并传入参数:
$ python3 t1.py 2 arg2 arg3
Para length= 4 ; arr= ['t1.py', '2', 'arg2', 'arg3']
paras: t1.py 2 arg2
n1= 102
# 如果传入参数个数不够,还可能报错。
$ python3 t1.py 5
Para length= 2 ; arr= ['t1.py', '5']
Traceback (most recent call last):
File "t1.py", line 5, in <module>
print("paras:", arr[0], arr[1], arr[2])
IndexError: list index out of range
小结:sys.argv 形式传入参数的方式比较简单,但是也很死板,因为传入的参数是一个有序的列表,所以在命令行中必须按照脚本规定的顺序去输入参数,这种方法比较适合脚本中需要的参数个数很少且参数固定的脚本。
argparse 模块是 Python 内置的一个用于命令项选项与参数解析的模块,argparse 模块可以让人轻松编写用户友好的命令行接口。通过在程序中定义好我们需要的参数,然后 argparse 将会从 sys.argv 解析出这些参数。argparse 模块还会自动生成帮助和使用手册,并在用户给程序传入无效参数时报出错误信息。
帮助文档:
argparse — Parser for command-line options, arguments and sub-commands,
argparse Tutorial
$ cat t2.py
import argparse
# step1 创建 参数解析器 实例。description 中描述该脚本做什么以及怎么做
parser = argparse.ArgumentParser(description='Test for argparse')
# step2 添加参数及其属性。前2个是参数前缀的两种形式,help是帮助,type是类型,default是默认值
parser.add_argument('name', help='positional arguments, required')
parser.add_argument('title', help='positional arguments, optional', default="")
parser.add_argument('--year', '-y', help='year 属性,非必要参数,但是有默认值', type=int, default=2022)
parser.add_argument('-b', '--body', help='body 属性,必要参数', required=True) #前2个参数的位置随意,必须参数
# step3 解析参数。
args = parser.parse_args()
print(args)
print("name=", args.name)
if ""!=args.title:
print("title=", args.title)
print("year=", args.year)
print("body=", args.body)
没有-/--前缀的是必须参数,必须指定,有默认值也不行。有-/--的是可选参数,可以给默认值,可以使用required=True指定是必选参数。
尝试调用该脚本:
$ python3 t2.py name="White Ice" title="CEO" --year 2099 -b "body of this msg"
Namespace(body='body of this msg', name='name=White Ice', title='title=CEO', year=2099)
name= name=White Ice
title= title=CEO
year= 2099
body= body of this msg
#
$ python3 t2.py name="White Ice" "CEO" -b 'msg'
Namespace(body='msg', name='name=White Ice', title='CEO', year=2022)
name= name=White Ice
title= CEO
year= 2022
body= msg
# 如果啥都不知道,没输入参数呢?
$ python3 t2.py
usage: t2.py [-h] [--year YEAR] -b BODY name title
t2.py: error: the following arguments are required: name, title, -b/--body
# 主动提示调用方式,可选参数使用[],后面是必须参数。
# 还有-h自动生成的帮助文档:
$ python3 t2.py -h
usage: t2.py [-h] [--year YEAR] -b BODY name title
Test for argparse
positional arguments:
name positional arguments, required
title positional arguments, optional
optional arguments:
-h, --help show this help message and exit
--year YEAR, -y YEAR year 属性,非必要参数,但是有默认值
-b BODY, --body BODY body 属性,必要参数
add_argument() 方法定义如何解析命令行参数:
ArgumentParser.add_argument(name or flags...[, action][, nargs][, const][, default][, type][, choices][, required][, help][, metavar][, dest])
每个参数解释如下:
name or flags - 选项字符串的名字或者列表,例如 foo 或者 -f, --foo。
action - 命令行遇到参数时的动作,默认值是 store。
store_const,表示赋值为const;
append,将遇到的值存储成列表,也就是如果参数重复则会保存多个值;
append_const,将参数规范中定义的一个值保存到一个列表;
count,存储遇到的次数;此外,也可以继承 argparse.Action 自定义参数解析;
nargs - 应该读取的命令行参数个数,可以是具体的数字,或者是?号,当不指定值时对于 Positional argument 使用 default,对于 Optional argument 使用 const;或者是 * 号,表示 0 或多个参数;或者是 + 号表示 1 或多个参数。
const - action 和 nargs 所需要的常量值。
default - 不指定参数时的默认值。
type - 命令行参数应该被转换成的类型。
choices - 参数可允许的值的一个容器。
required - 可选参数是否可以省略 (仅针对可选参数)。
help - 参数的帮助信息,当指定为 argparse.SUPPRESS 时表示不显示该参数的帮助信息.
metavar - 在 usage 说明中的参数名称,对于必选参数默认就是参数名称,对于可选参数默认是全大写的参数名称.
dest - 解析后的参数名称,默认情况下,对于可选参数选取最长的名称,中划线转换为下划线.
更多的参数介绍和使用可以查看官方文档:Python 官方文档:argparse (https://docs.python.org/zh-cn/3/library/argparse.html?highlight=argparse#module-argparse)
小结:其实我非常喜欢这个内置的命令行参数模块,因为它不仅方便使用,更重要的是它就是内置的,不需要单独安装依赖。
Click 是 Flask 的团队 pallets 开发的优秀开源项目,它为命令行工具的开发封装了大量方法,使开发者只需要专注于功能实现。基于optparse 而不是 argparse。这是一个第三方库,专门为了命令行而生的非常有名的 Python 命令行模块。
主要功能: 任意嵌入命令,自动生成帮助页,支持运行时子命令的lazy loading。
查版本号
$ pip3 list | grep -i click
## click 8.0.3
如果没有就手动安装
$ pip3 install click -i https://pypi.douban.com/simple/
$ cat t3.py
import click
@click.command()
@click.option("--count", default=1, help="Number of greetings.")
@click.option('--name', prompt='Your name', help='The person to greet.')
def hello(count, name):
"""Simple program that greets NAME for a total of COUNT times."""
for x in range(count):
click.echo(f"hello {name}!")
if __name__ == '__main__':
hello()
调用1:
$ python3 t3.py
Your name: John
hello John!
调用2:
$ python3 t3.py --count 3
Your name: John
hello John!
hello John!
hello John!
调用3:
$ python3 t3.py --help
Usage: t3.py [OPTIONS]
Simple program that greets NAME for a total of COUNT times.
Options:
--count INTEGER Number of greetings.
--name TEXT The person to greet.
--help Show this message and exit.
可以看到 click 是使用装饰器的方式给函数添加命令行属性,比较特殊的是最后调用函数的时候是没有带上参数的,因为参数会自动通过命令行的形式传入。其他设置参数的属性跟前面的 argparse 的方式非常相似,具体的参数可以参考文档和其他的教程用法
小结:click 库也是一个非常人性化的命令行参数模块,它其实非常强大,强大到把所有的命令行参数可能涉及的情况都考虑到了,需要自己去探索。
略作修改后测试,发现:
1)函数调用 hello() 下一条语句不能执行到。
2) hello()上一条可以执行,但是拿不到传入的参数。也就是说参数只在装饰器修饰的函数范围内有效,要想在其他地方有效,就要在该函数内调用其他函数,并传参。
3)函数内的 click.echo 也可以替换为我们熟悉的 print("hello %s!" % name)。
4)也可以设置 type=int 来强制转换参数类型。
先检查参数个数,再检查参数类型,还要检查输出文件是否存在,如果存在是否覆盖?
主动报错,就是在参数不合法时,主动抛出错误,终止程序的运行,并指出错误或给出合理化建议。
如果文件不存在,则报错:
import os
x="tmp.py" # this file name
#x="tmp123456789.py" #not exist
if os.path.exists(x):
print("file exist")
else:
raise Exception("File not exist.")
print("==Exception demo end==")
如果不是文件,则报错:
import os
#inputFile = "tmp.py"
inputFile = "tmp123456.py" #os.environ.get('PYTHONSTARTUP')
if inputFile and os.path.isfile(inputFile):
pass;
else:
raise Exception( "input file not exist or not a file: %s" % inputFile )
另一种主动报错方法是使用断言 assert,这个比较简短,短脚本用用没啥大问题,但-O编译时可能会跳过断言语句。
x = 23
assert x > 0, "x is not zero or negative"
assert x%2 == 0, "x is not an even number"
import sys
# 正常退出,默认返回0
sys.exit()
# 退出并返回错误代码
sys.exit(1)
所有代码都放到函数中,然后在入口依次调用这些函数。
有明确的函数入口,会使程序更结构化,更清晰易读。
def fn1():
print("fn1")
def fn2():
print("fn2")
if __name__ == "__main__":
fn1()
fn2()
运行时间长的脚本需要进度条,以缓解用户的焦虑,告诉用户程序运行到哪了,方便用户估计还要等多久。一般有伴随着进度条直接写时分秒的,还有写运行时间的。
显示时分秒的
def time_now():
import time
return time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
def task():
i=0
while True:
i=i+1
if i%1e6 ==0:
print( "[%s]" % time_now(), "Processing line ", i )
if i>1e7:
break;
if __name__ == "__main__":
print( "[%s] Start Analysis ..." % time_now() )
print( "["+time_now()+"]", "processing line 0" )
task()
输出:
[2022/02/19 09:57:08] Start Analysis ...
[2022/02/19 09:57:08] processing line 0
[2022/02/19 09:57:08] Processing line 1000000
[2022/02/19 09:57:08] Processing line 2000000
...
其他时间显示形式:
# style2
time.strftime("%Y%m%d-%H%M%S", time.localtime()) ##'20220218-162528'
# style3
import datetime
now_time = datetime.datetime.now() ## 2022-02-18 16:25:55.565440
显示运行时间
import time
start=time.time();
#do sth;
time.sleep(1.45)
timeString=time.strftime("%Y%m%d-%H%M%S", time.localtime());
print('耗时',round(time.time()-start,2),'s; ', timeString, sep='')
# 耗时1.45s; 20220218-162228
fr.readlines() 读取大文件很慢(不推荐)
# 要把整个文件读入内存,启动特别慢,不适用超大文件
fr=open("test.txt", 'r', encoding="utf-8") #后2个参数可以省略
for line in fr.readlines():
print('line is:', line)
fr.close()
【推荐!】对超大文件秒读
fr=open("test.fq", 'r', encoding="utf-8") #后2个参数可以省略
while True:
line=fr.readline()
if not line:
break
print('read line is:', line)
fr.close()
或者for:
fqFile="c12ROW27.keep.fq"
fr=open(fqFile,'r',encoding="utf-8")
i=0
for line in fr:
i+=1
if i>10:
break
print('[%d]' % i, line.strip())
fr.close()
如果是 gz 压缩后的文本文件,可以不解压用gzip包直接读取
import gzip
fname="ref_R2_extracted_nonPolyA_Reads.fq.gz"
fr=gzip.open(fname,'rb')
i=0
while True:
i+=1
if i>10:
break
line=fr.readline()
if not line:
break
line2=line.decode() #转2进制为常规字符串
print(i,line2.strip())
fr.close()
https://pysam.readthedocs.io/en/latest/index.html
Pysam is a python module for reading, manipulating and writing genomic data sets.
一般在交互模式(或jupyter)探索,在脚本模式下批量执行。
更多内容,查看: pysam.html
一个处理 bam 文件的简单py脚本实例
对 STAR 的输出文件bam进行过滤,获取 uniq mapping 的reads(只保留 MAPQ==255的序列),并按照正负链分成2个bam文件。
$ vim scripts/split_bam_by_strand.py
# v0.1
import argparse, os
parser = argparse.ArgumentParser(description='Split bam by pA or nonpA')
parser.add_argument('--input', '-I', help='input filename, bam format, required', required=True)
parser.add_argument('--outputP', '-P', help='output filename, bam format, save reads on + strand, required', required=True)
parser.add_argument('--outputN', '-N', help='output filename, bam format, save reads on - strand, required', required=True)
args = parser.parse_args()
# get params
inputFile=args.input
outputFile1=args.outputP
outputFile2=args.outputN
if (not inputFile.endswith(".bam")) or (not outputFile1.endswith(".bam")) or (not outputFile2.endswith(".bam")):
raise Exception("Input file and output files must be .bam!")
if not os.path.exists(inputFile):
raise Exception("Input file not exist:" + inputFile)
if os.path.exists(outputFile1):
raise Exception("Output file already exist:" + outputFile1)
if os.path.exists(outputFile2):
raise Exception("Output file already exist:" + outputFile2)
# part II
import pysam,re
def split_bam_by_strand(inputFile, outputFile1, outputFile2):
print( "Function: split bam by strand. \nVersion 0.1 work now\nparams: -I %s -P %s -N %s\n" % (inputFile, outputFile1, outputFile2) )
samfile = pysam.AlignmentFile(inputFile, "rb")
# write
samOutPos=pysam.AlignmentFile( outputFile1, "wb", template=samfile)
samOutNeg=pysam.AlignmentFile( outputFile2, "wb", template=samfile)
i=0 #total
pos=0; neg=0;
for line in samfile:
i+=1
# 1. only keep reads with MAPQ==255
if line.mapq != 255:
continue
# 2. split by flag
if line.flag == 0:
samOutPos.write(line)
pos+=1
elif line.flag==16:
samOutNeg.write(line)
neg+=1
# close files
samfile.close()
samOutPos.close()
samOutNeg.close()
print(f"total: {i}, pos:{pos}, neg:{neg}, keep:{pos+neg}, keep_ratio:{round( (neg+pos)/i ,2)}")
if __name__ == "__main__":
split_bam_by_strand(inputFile, outputFile1, outputFile2)
运行:
$ samtools index L_Aligned.sortedByCoord.out.bam
$ python3 /data/pipeline/scripts/split_bam_by_strand.py -I L_Aligned.sortedByCoord.out.bam -P pos.bam -N neg.bam
Function: split bam by strand.
Version 0.1 work now
params: -I L_Aligned.sortedByCoord.out.bam -P pos.bam -N neg.bam
total: 51187, pos:7771, neg:6777, keep:14548, keep_ratio:0.28
检查文件:
$ samtools view pos.bam |wc
7771 131631 3372502
$ samtools view neg.bam |wc
6777 114855 2954822
查看该文件:
$ samtools view L_Aligned.sortedByCoord.out.bam |awk '{print $2"\t"$5}' |sort|uniq -c
1463 0 0
1656 0 1
7771 0 255
1587 0 3
1177 16 0
1800 16 1
6777 16 255
1788 16 3
9125 256 0
3750 256 1
1517 256 3
6465 272 0
4453 272 1
1858 272 3