Python script 脚本化

目录

要点: 脚本化是一种对用户友好的封装,不仅能提高程序的健壮性,还能很容易的集成到分析流程中。

Python 脚本简介

脚本化,就是把程序写成有输入和输出的独立程序文件。就像shell的 ls 命令, 比对软件 STAR 一样,用户只需要调用脚本名字、给出参数,程序就能判断参数是否合法,并返回结果。把程序像黑盒一样封装起来,方便用户使用,还能轻易整合到 snakemake 等流程中。

Python 脚本化相关技术

接收参数可以使用内置模块 sys.argv 或 argparse,以及第三方模块 click。


sys.argv 的写法太死板,只有1-2个参数可以使用,多了就太麻烦了。

我最喜欢的是 argparse 模块,这个模块是内置模块,但是功能完全够用,非常方便。

功能最全面的的是click,数据格式检查、转换都自动化完成了。

使用 sys.argv 接收参数

脚本内部把参数列表保存在数组 sys.argv 中,下标0是该脚本文件名本身,下标1是第一个参数,以此类推。

获取的参数是字符串形式,要做算术运算需要主动转为数字形式,比如 int(str2)。

一般还需要验证参数,比如个数、类型是否正确,输出文件是否已经存在等。

# 实例(Ubuntu 20.04 + Python 3.7.0) #例1: 接收参数,并对第一个参数加100后输出 $ cat t1.py import sys arr=sys.argv; print("Para length=", len(arr),"; arr=", arr) print("paras:", arr[0], arr[1], arr[2]) n1=int(arr[1]) + 100 #刚获取的参数是字符串,要转为需要的格式 print("n1=", n1)

运行该脚本,并传入参数:

$ python3 t1.py 2 arg2 arg3 Para length= 4 ; arr= ['t1.py', '2', 'arg2', 'arg3'] paras: t1.py 2 arg2 n1= 102 # 如果传入参数个数不够,还可能报错。 $ python3 t1.py 5 Para length= 2 ; arr= ['t1.py', '5'] Traceback (most recent call last): File "t1.py", line 5, in <module> print("paras:", arr[0], arr[1], arr[2]) IndexError: list index out of range

小结:sys.argv 形式传入参数的方式比较简单,但是也很死板,因为传入的参数是一个有序的列表,所以在命令行中必须按照脚本规定的顺序去输入参数,这种方法比较适合脚本中需要的参数个数很少且参数固定的脚本。

使用 argparse 模块 接收参数

argparse 模块是 Python 内置的一个用于命令项选项与参数解析的模块,argparse 模块可以让人轻松编写用户友好的命令行接口。通过在程序中定义好我们需要的参数,然后 argparse 将会从 sys.argv 解析出这些参数。argparse 模块还会自动生成帮助和使用手册,并在用户给程序传入无效参数时报出错误信息。

帮助文档: argparse — Parser for command-line options, arguments and sub-commands, argparse Tutorial

$ cat t2.py import argparse # step1 创建 参数解析器 实例。description 中描述该脚本做什么以及怎么做 parser = argparse.ArgumentParser(description='Test for argparse') # step2 添加参数及其属性。前2个是参数前缀的两种形式,help是帮助,type是类型,default是默认值 parser.add_argument('name', help='positional arguments, required') parser.add_argument('title', help='positional arguments, optional', default="") parser.add_argument('--year', '-y', help='year 属性,非必要参数,但是有默认值', type=int, default=2022) parser.add_argument('-b', '--body', help='body 属性,必要参数', required=True) #前2个参数的位置随意,必须参数 # step3 解析参数。 args = parser.parse_args() print(args) print("name=", args.name) if ""!=args.title: print("title=", args.title) print("year=", args.year) print("body=", args.body)

没有-/--前缀的是必须参数,必须指定,有默认值也不行。有-/--的是可选参数,可以给默认值,可以使用required=True指定是必选参数。

尝试调用该脚本:

$ python3 t2.py name="White Ice" title="CEO" --year 2099 -b "body of this msg" Namespace(body='body of this msg', name='name=White Ice', title='title=CEO', year=2099) name= name=White Ice title= title=CEO year= 2099 body= body of this msg # $ python3 t2.py name="White Ice" "CEO" -b 'msg' Namespace(body='msg', name='name=White Ice', title='CEO', year=2022) name= name=White Ice title= CEO year= 2022 body= msg # 如果啥都不知道,没输入参数呢? $ python3 t2.py usage: t2.py [-h] [--year YEAR] -b BODY name title t2.py: error: the following arguments are required: name, title, -b/--body # 主动提示调用方式,可选参数使用[],后面是必须参数。 # 还有-h自动生成的帮助文档: $ python3 t2.py -h usage: t2.py [-h] [--year YEAR] -b BODY name title Test for argparse positional arguments: name positional arguments, required title positional arguments, optional optional arguments: -h, --help show this help message and exit --year YEAR, -y YEAR year 属性,非必要参数,但是有默认值 -b BODY, --body BODY body 属性,必要参数

add_argument() 方法定义如何解析命令行参数:

ArgumentParser.add_argument(name or flags...[, action][, nargs][, const][, default][, type][, choices][, required][, help][, metavar][, dest])

每个参数解释如下:
name or flags - 选项字符串的名字或者列表,例如 foo 或者 -f, --foo。
action - 命令行遇到参数时的动作,默认值是 store。
store_const,表示赋值为const;
append,将遇到的值存储成列表,也就是如果参数重复则会保存多个值;
append_const,将参数规范中定义的一个值保存到一个列表;
count,存储遇到的次数;此外,也可以继承 argparse.Action 自定义参数解析;
nargs - 应该读取的命令行参数个数,可以是具体的数字,或者是?号,当不指定值时对于 Positional argument 使用 default,对于 Optional argument 使用 const;或者是 * 号,表示 0 或多个参数;或者是 + 号表示 1 或多个参数。
const - action 和 nargs 所需要的常量值。
default - 不指定参数时的默认值。
type - 命令行参数应该被转换成的类型。
choices - 参数可允许的值的一个容器。
required - 可选参数是否可以省略 (仅针对可选参数)。
help - 参数的帮助信息,当指定为 argparse.SUPPRESS 时表示不显示该参数的帮助信息.
metavar - 在 usage 说明中的参数名称,对于必选参数默认就是参数名称,对于可选参数默认是全大写的参数名称.
dest - 解析后的参数名称,默认情况下,对于可选参数选取最长的名称,中划线转换为下划线.

更多的参数介绍和使用可以查看官方文档:Python 官方文档:argparse (https://docs.python.org/zh-cn/3/library/argparse.html?highlight=argparse#module-argparse)

小结:其实我非常喜欢这个内置的命令行参数模块,因为它不仅方便使用,更重要的是它就是内置的,不需要单独安装依赖。

使用 click 第三方模块 接收参数

Click 是 Flask 的团队 pallets 开发的优秀开源项目,它为命令行工具的开发封装了大量方法,使开发者只需要专注于功能实现。基于optparse 而不是 argparse。这是一个第三方库,专门为了命令行而生的非常有名的 Python 命令行模块。

主要功能: 任意嵌入命令,自动生成帮助页,支持运行时子命令的lazy loading。

查版本号 
$ pip3 list | grep -i click 
## click 8.0.3

如果没有就手动安装 
$ pip3 install click -i https://pypi.douban.com/simple/
$ cat t3.py import click @click.command() @click.option("--count", default=1, help="Number of greetings.") @click.option('--name', prompt='Your name', help='The person to greet.') def hello(count, name): """Simple program that greets NAME for a total of COUNT times.""" for x in range(count): click.echo(f"hello {name}!") if __name__ == '__main__': hello() 调用1: $ python3 t3.py Your name: John hello John! 调用2: $ python3 t3.py --count 3 Your name: John hello John! hello John! hello John! 调用3: $ python3 t3.py --help Usage: t3.py [OPTIONS] Simple program that greets NAME for a total of COUNT times. Options: --count INTEGER Number of greetings. --name TEXT The person to greet. --help Show this message and exit.

可以看到 click 是使用装饰器的方式给函数添加命令行属性,比较特殊的是最后调用函数的时候是没有带上参数的,因为参数会自动通过命令行的形式传入。其他设置参数的属性跟前面的 argparse 的方式非常相似,具体的参数可以参考文档和其他的教程用法

小结:click 库也是一个非常人性化的命令行参数模块,它其实非常强大,强大到把所有的命令行参数可能涉及的情况都考虑到了,需要自己去探索。

略作修改后测试,发现:
1)函数调用 hello() 下一条语句不能执行到。
2) hello()上一条可以执行,但是拿不到传入的参数。也就是说参数只在装饰器修饰的函数范围内有效,要想在其他地方有效,就要在该函数内调用其他函数,并传参。
3)函数内的 click.echo 也可以替换为我们熟悉的 print("hello %s!" % name)。
4)也可以设置 type=int 来强制转换参数类型。

参数校验与主动报错

先检查参数个数,再检查参数类型,还要检查输出文件是否存在,如果存在是否覆盖?

主动报错,就是在参数不合法时,主动抛出错误,终止程序的运行,并指出错误或给出合理化建议。

如果文件不存在,则报错:

import os

x="tmp.py" # this file name
#x="tmp123456789.py" #not exist

if os.path.exists(x):
  print("file exist")
else:
  raise Exception("File not exist.")

print("==Exception demo end==")

如果不是文件,则报错:

import os
#inputFile = "tmp.py"
inputFile = "tmp123456.py" #os.environ.get('PYTHONSTARTUP')
if inputFile and os.path.isfile(inputFile):
  pass;
else:
  raise Exception( "input file not exist or not a file: %s" % inputFile )

另一种主动报错方法是使用断言 assert,这个比较简短,短脚本用用没啥大问题,但-O编译时可能会跳过断言语句。

x = 23
assert x > 0, "x is not zero or negative"
assert x%2 == 0, "x is not an even number"

主动退出脚本: sys.exit(0)

import sys
 
# 正常退出,默认返回0
sys.exit()

# 退出并返回错误代码
sys.exit(1)

使用 "__main__" 程序入口

所有代码都放到函数中,然后在入口依次调用这些函数。 有明确的函数入口,会使程序更结构化,更清晰易读。

def fn1():
  print("fn1")

def fn2():
  print("fn2")


if __name__ == "__main__":
  fn1()
  fn2()

打印时间戳进度条

运行时间长的脚本需要进度条,以缓解用户的焦虑,告诉用户程序运行到哪了,方便用户估计还要等多久。一般有伴随着进度条直接写时分秒的,还有写运行时间的。

显示时分秒的

def time_now(): import time return time.strftime("%Y/%m/%d %H:%M:%S", time.localtime()) def task(): i=0 while True: i=i+1 if i%1e6 ==0: print( "[%s]" % time_now(), "Processing line ", i ) if i>1e7: break; if __name__ == "__main__": print( "[%s] Start Analysis ..." % time_now() ) print( "["+time_now()+"]", "processing line 0" ) task() 输出: [2022/02/19 09:57:08] Start Analysis ... [2022/02/19 09:57:08] processing line 0 [2022/02/19 09:57:08] Processing line 1000000 [2022/02/19 09:57:08] Processing line 2000000 ...

其他时间显示形式:

# style2
time.strftime("%Y%m%d-%H%M%S", time.localtime())  ##'20220218-162528'

# style3
import datetime
now_time = datetime.datetime.now() ## 2022-02-18 16:25:55.565440

显示运行时间

import time start=time.time(); #do sth; time.sleep(1.45) timeString=time.strftime("%Y%m%d-%H%M%S", time.localtime()); print('耗时',round(time.time()-start,2),'s; ', timeString, sep='') # 耗时1.45s; 20220218-162228


超大文件秒读

fr.readlines() 读取大文件很慢(不推荐)

# 要把整个文件读入内存,启动特别慢,不适用超大文件
fr=open("test.txt", 'r', encoding="utf-8") #后2个参数可以省略

for line in fr.readlines():
    print('line is:', line)
fr.close()

【推荐!】对超大文件秒读

fr=open("test.fq", 'r', encoding="utf-8") #后2个参数可以省略
while True:
    line=fr.readline()
    if not line:
        break
    print('read line is:', line)
fr.close()


或者for:
fqFile="c12ROW27.keep.fq"
fr=open(fqFile,'r',encoding="utf-8")
i=0
for line in fr:
    i+=1
    if i>10:
        break
    print('[%d]' % i, line.strip())
fr.close()

如果是 gz 压缩后的文本文件,可以不解压用gzip包直接读取

import gzip
fname="ref_R2_extracted_nonPolyA_Reads.fq.gz"
fr=gzip.open(fname,'rb')

i=0
while True:
    i+=1
    if i>10:
        break
    line=fr.readline()
    if not line:
        break
    line2=line.decode() #转2进制为常规字符串
    print(i,line2.strip())
    
fr.close()

pysam 模块读写 bam 文件

https://pysam.readthedocs.io/en/latest/index.html

Pysam is a python module for reading, manipulating and writing genomic data sets.

一般在交互模式(或jupyter)探索,在脚本模式下批量执行。

更多内容,查看: pysam.html

一个处理 bam 文件的简单py脚本实例

对 STAR 的输出文件bam进行过滤,获取 uniq mapping 的reads(只保留 MAPQ==255的序列),并按照正负链分成2个bam文件。

$ vim scripts/split_bam_by_strand.py # v0.1 import argparse, os parser = argparse.ArgumentParser(description='Split bam by pA or nonpA') parser.add_argument('--input', '-I', help='input filename, bam format, required', required=True) parser.add_argument('--outputP', '-P', help='output filename, bam format, save reads on + strand, required', required=True) parser.add_argument('--outputN', '-N', help='output filename, bam format, save reads on - strand, required', required=True) args = parser.parse_args() # get params inputFile=args.input outputFile1=args.outputP outputFile2=args.outputN if (not inputFile.endswith(".bam")) or (not outputFile1.endswith(".bam")) or (not outputFile2.endswith(".bam")): raise Exception("Input file and output files must be .bam!") if not os.path.exists(inputFile): raise Exception("Input file not exist:" + inputFile) if os.path.exists(outputFile1): raise Exception("Output file already exist:" + outputFile1) if os.path.exists(outputFile2): raise Exception("Output file already exist:" + outputFile2) # part II import pysam,re def split_bam_by_strand(inputFile, outputFile1, outputFile2): print( "Function: split bam by strand. \nVersion 0.1 work now\nparams: -I %s -P %s -N %s\n" % (inputFile, outputFile1, outputFile2) ) samfile = pysam.AlignmentFile(inputFile, "rb") # write samOutPos=pysam.AlignmentFile( outputFile1, "wb", template=samfile) samOutNeg=pysam.AlignmentFile( outputFile2, "wb", template=samfile) i=0 #total pos=0; neg=0; for line in samfile: i+=1 # 1. only keep reads with MAPQ==255 if line.mapq != 255: continue # 2. split by flag if line.flag == 0: samOutPos.write(line) pos+=1 elif line.flag==16: samOutNeg.write(line) neg+=1 # close files samfile.close() samOutPos.close() samOutNeg.close() print(f"total: {i}, pos:{pos}, neg:{neg}, keep:{pos+neg}, keep_ratio:{round( (neg+pos)/i ,2)}") if __name__ == "__main__": split_bam_by_strand(inputFile, outputFile1, outputFile2) 运行: $ samtools index L_Aligned.sortedByCoord.out.bam $ python3 /data/pipeline/scripts/split_bam_by_strand.py -I L_Aligned.sortedByCoord.out.bam -P pos.bam -N neg.bam Function: split bam by strand. Version 0.1 work now params: -I L_Aligned.sortedByCoord.out.bam -P pos.bam -N neg.bam total: 51187, pos:7771, neg:6777, keep:14548, keep_ratio:0.28 检查文件: $ samtools view pos.bam |wc 7771 131631 3372502 $ samtools view neg.bam |wc 6777 114855 2954822 查看该文件: $ samtools view L_Aligned.sortedByCoord.out.bam |awk '{print $2"\t"$5}' |sort|uniq -c 1463 0 0 1656 0 1 7771 0 255 1587 0 3 1177 16 0 1800 16 1 6777 16 255 1788 16 3 9125 256 0 3750 256 1 1517 256 3 6465 272 0 4453 272 1 1858 272 3

参考资料