python-subprocess-模块使用以及详解管道阻塞的坑

43次阅读

共计 4118 个字符,预计需要花费 11 分钟才能阅读完成。

本文内容均基于 python 2.7 版本,整理一下这个模块的一些日常套路和 一些要注意的坑,省去大家读官方文档的时间。但还是推荐认真读下 官方文档,并不长

功能和用途

subprocess 是 python 标准库中的一个模块,用于创建子进程和与子进程交互

该模块替换了一些过时的模块和函数

os.system
os.spawn*
os.popen*
popen2.*
commands.*

常见使用姿势

共计四个函数,包括:

  • 三个满足不同需求的简化函数(call, check_call, check_output)
  • 以及一个类 Popen,是上面三个简化函数的基础,也就是说上面三个简化版函数是封装了这个方式的一些特定用法而已
  • 三个 Popen 类的成员函数,用于父子进程间的交互

以下为详细说明


先引入包,为了让下面函数看起来清晰一些,包名临时简化一下为 ‘s’

import subprocess as s

s.call()
创建一个子进程并 等待其结束,返回一个 returncode(即 exit code,详见 Linux 基础)

# 函数定义
s.call(args, *, stdin=None, stdout=None, stderr=None, shell=False)

# 举例
>>> subprocess.call(["ls", "-l"])
0
>>> subprocess.call("exit 1", shell=True)
1

s.check_call()
跟上面的 s.call() 类似,只不过变成了 returncode 如果不是 0 的话就抛出异常 subprocess.CalledProcessError,这个对象包含 returncode 属性,可以用 try…except… 来处理(参考 Python 异常处理)

# 函数定义
s.check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False)

# 举例
>>> subprocess.check_call(["ls", "-l"])
0
>>> subprocess.check_call("exit 1", shell=True)
Traceback (most recent call last):
   ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

s.check_output()
创建一个子进程 并等待其结束,返回子进程向 stdout 的输出结果。如果 returncode 不为 0,则抛出异常 subprocess.CalledProcessError

# 函数定义
s.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False)

# 举例
>>> subprocess.check_output(["echo", "Hello World!"])
'Hello World!\n'
>>> subprocess.check_output("exit 1", shell=True)
Traceback (most recent call last):
   ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

s.Popen()
这是整个 subprocess 模块最重要也是最基础的类,以上三个简化方法均是基于这个类。具体每个参数什么含义请看 官方文档

# 类定义
class subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)

Popen.poll()
检查子进程是否已经结束,返回 returncode

Popen.wait()
阻塞父进程,直到子进程结束,返回 returncode

Popen.communicate(input=None)
阻塞父进程,直到子进程结束。并可以与子进程交互:发送数据(即参数 input,类型必须为 string)到 stdin,以及从 stdout、stderr 读取信息


重点(坑)介绍


先来个开胃菜
以上函数(类)的首个参数 args,有两种使用方式

  1. 一个 list,如 ['ls', '-al'],一般 推荐 这种方式
  2. 一个字符串,如 'ls -al',这种方式以上函数中的 shell 参数必须为 True。这种方式存在安全风险,具体请看官方文档

再来一盘
要使父进程和子进程可以通信,必须将对应的流的管道(PIPE,自行搜索 Linux 管道)打开,如 stdout=subprocess.PIPE。其中 subprocess.PIPE 是一个特定的值(源码中 hardcode PIPE = -1),当参数列表中某个流被置为了这个值,则将会在创建子进程的同时为这种流创建一个管道对象(调用系统 api,如 p2cread, _ = _winapi.CreatePipe(None, 0)),用于父子进程间的通信
而管道这个东西,其实就是一段内存区间,每个系统内核对其的大小都有所限制,如 linux 上默认最大为 64KB,可以在终端用 ulimit -a 来查看

可选类型还有:

  1. subprocess.PIPE = -1
  2. subprocess.STDOUT = -2 重定向到 stdout
  3. subprocess.DEVNULL = -3 重定向到 /dev/null,即丢弃信息

开始正餐

我们先来看这样一个需求:
脚本 A 需调用一个外部程序 B 来做一些工作,并获取 B 的 stdout 输出

实现方式一:

import subprocess

child = subprocess.Popen(['./B'], stdout=subprocess.PIPE)

child.wait()

print child.stdout

这种方式官方文档中再三警告,会导致死锁问题,不怕坑自己的话就用吧,good luck

导致死锁的原因:
如上文提到,管道的大小是有所限制的,当子进程一直向 stdout 管道写数据且写满的时候,子进程将发生 I/O 阻塞;
而此时父进程只是干等子进程退出,也处于阻塞状态;
于是,GG。

实现方式二(加粗:推荐):

import subprocess

child = subprocess.Popen(['./B'], stdout=subprocess.PIPE)

stdout, stderr = child.communicate()

这是官方推荐的方式,但是也有坑:
看 communicate 的实现

for key, events in ready:
    if key.fileobj is self.stdin:
        chunk = input_view[self._input_offset :
                           self._input_offset + _PIPE_BUF]
        try:
            self._input_offset += os.write(key.fd, chunk)
        except BrokenPipeError:
            selector.unregister(key.fileobj)
            key.fileobj.close()
        else:
            if self._input_offset >= len(self._input):
                selector.unregister(key.fileobj)
                key.fileobj.close()
    elif key.fileobj in (self.stdout, self.stderr):
        data = os.read(key.fd, 32768)
        if not data:
            selector.unregister(key.fileobj)
            key.fileobj.close()
        self._fileobj2output[key.fileobj].append(data)

其实 communicate 就是帮我们做了循环读取管道的操作,保证管道不会被塞满;于是子进程可以很爽地写完自己要输出的数据,正常退出,避免了死锁

这里有注意点:
communicate 其实是循环读取管道中的数据(每次 32768 字节)并将其存在一个 list 里面,到最后将 list 中的所有数据连接起来(b''.join(list))返回给用户。于是就出现了一个坑:如果子进程输出内容非常多甚至无限输出,则机器内存会被撑爆,再次 GG

第三种实现
第三种的思路其实很简单,就是当子进程的可能输出非常大的时候,直接将 stdout 或 stderr 重定向到文件,避免了撑爆内存的问题。不过这种情形很少见,不常用


Tips:
communicate 的实现中可以看到一些有趣的东西,写一下想法,欢迎讨论

  1. 每次读取管道中的数据为 32768bytes,即 32KB。选用这个数据的原因猜想为

    • 取一个折中的值 16KB~64KB,适应不同系统对 PIPE SIZE 最大值的限制(MAC OS 初始默认 16KB,当管道满后自动扩大为 64KB,Linux 默认为 64KB)
    • 32KB 是 512bytes 的整数倍,适应内核对内存的分页机制,提高效率。类似的思路比较常见,如对以太网帧的尾部封装(现已被废弃)
  2. 从管道中读取数据并缓存到内存中的操作,并非循环用字符串拼接的方式,而是先将数据分段放进一个 list,最后再连接起来,原因

    • python 中的每次字符串拼接操作都会生成新对象,当数据量大的时候会有严重的性能问题
    • 而使用 list 合并的方式,仅在每次 list 需要扩容的时候需要将 list 整体搬迁到内存中的另一个空间;以及最后将所有 list 中的元素合并为一个字符串对象的时候生成新对象有性能消耗,相对来说代价非常低
  3. 在观察 linux 系统默认管道大小的时候,ulimit -a 的输出中,可以看到 PIPE BUF 4KBPIPE SIZE 64KB 两个值,查了一下了解到,前者是对管道单次写入大小的限制,而后者是管道总大小的限制。前者刚好对应了内核分页单页的大小。

正文完
 0