In a previous post,
we pointed out two drawbacks of Python’s subprocess.communicate
method. In this post, we look at the first problem in more detail.
To reiterate, the problem is that we collect the subprocess’s output
streams into strings. If the subprocess is going to generate a huge
amount of output, it can be better to process the output data in a
stream-oriented manner — that way we use a constant amount of memory
regardless of how much output is produced.
If we look at the
implementation
of the communicate
method, we see this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
def _communicate_with_select(self, input): read_set = [] write_set = [] stdout = None # Return stderr = None # Return if self.stdin and input: write_set.append(self.stdin) if self.stdout: read_set.append(self.stdout) stdout = [] if self.stderr: read_set.append(self.stderr) stderr = [] input_offset = 0 while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, []) except select.error, e: if e.args[0] == errno.EINTR: continue raise if self.stdin in wlist: chunk = input[input_offset : input_offset + _PIPE_BUF] bytes_written = os.write(self.stdin.fileno(), chunk) input_offset += bytes_written if input_offset >= len(input): self.stdin.close() write_set.remove(self.stdin) if self.stdout in rlist: data = os.read(self.stdout.fileno(), 1024) if data == "": self.stdout.close() read_set.remove(self.stdout) stdout.append(data) if self.stderr in rlist: data = os.read(self.stderr.fileno(), 1024) if data == "": self.stderr.close() read_set.remove(self.stderr) stderr.append(data) return (stdout, stderr) |
(There are actually several different communicate
implementations in
the module: a Windows-specific implementation, an implementation using
the POSIX poll
function, and one using POSIX select
. We’re going
to look at the select
implementation; the modifications we make can
be rolled into the other methods, too.)
For collecting stdout, the important part is lines 33-38. If the
select
call tells us that the stdout stream is ready for reading, we
try to read up to 1024 bytes from it. If we get the empty string,
this means we’ve reached EOF, and can close down the stream. (We also
no longer have to keep passing it in to further select
calls, since
we know we’re done with this stream.) If we get a non-empty string,
then we append it into a list. The function that calls
_communicate_with_select
will eventually join
this list of strings
together, yielding a single string.
It’s actually a very simple change to make this process the output
using a stream-based callback. For now, we assume that we’ve been
given the callback, in a stdout_callback
variable. Then, we can
change line 38 to
stdout_callback(data)
The callback can be any Python callable object; it should accept a
single argument, which is the next chunk of data from stdout. We can
make a similar change to line 45 to send the stderr data to its own
stderr_callback
.
One possible issue with the output callbacks in the previous section is that the data is sent into the callback in arbitrary chunks. We might prefer to guarantee that the callback will be called exactly once for each line of output. This would allow us, for intsance, to easily interleave the output lines of a bunch of subprocesses into the output of the parent process, without having to worry about locking.
To use a line-based callback, we have to wrap it, creating a arbitrary-chunk callback that buffers data as necessary. Once it receives a full line of data, it sends it to the wrapped line callback.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
def wrap_line_callback(line_callback): class Callback(object): def __init__(self, line_callback): self.line_buffer = [] self.line_callback = line_callback def output_buffer(self): line = "".join(self.line_buffer) self.line_callback(line) self.line_buffer = [] def data_callback(self, data): # If we get an empty string, that represents the end of # the input. If there is anything in the buffer, send # then out first, then send an empty string on the to line # callback. if data == "": if len(self.line_buffer) > 0: self.output_buffer() self.line_callback("") return # Otherwise, we split the new data into separate lines, # each of which we call an “entry”. We add each entry to # the buffer. If the entry ends with a newline, we output # the buffer and then clear it. lines = data.splitlines(True) for line in lines: self.line_buffer.append(line) if line.endswith(("\r\n", "\n", "\r")): self.output_buffer() return return Callback(line_callback).data_callback |
line 1 — We start by declaring the wrapping function. It will take in a line-based callback, and return an arbitrary-chunk callback.
lines 2-5 — We’ll need to maintain some state in between
invocations of the arbitrary-chunk callback — specifically, if a
line of output data falls across a chunk boundary, we’ll need to
hold onto the part in the first chunk until we receive the part in
the second chunk. One relatively easy way to do this is to create a
new class, and store the state in self
properties. Note that the
Callback
class is declared inside the wrap_line_callback
function.
The buffer is a list of strings. This needs to be a list to support arbitrarily long lines, since we don’t know how many chunks we’ll need to store before outputting the line.
lines 7-10 — We also declare a method in the class that can
output the current contents of the saved buffer (if any). Like the
original communicate
method, we join the buffer list together into
a single string, then pass it onto the wrapped line callback.
lines 12-23 — Next we can define the arbitrary-chunk callback. First, it checks to see if we’ve received an EOF indicator (the empty string). If so, we first output whatever is currently in the buffer; then we pass on the EOF to the line callback.
lines 25-34 — If we get some actual data, we first split the
current chunk into separate lines. We pass in a True
parameter to
the splitlines
method so that we get the newlines in the split
strings. This will let us tell if the final string in the list
represents a complete line, or if it’s the first part of a line that
falls across a chunk boundary.
We then loop through the split strings, adding them to the buffer.
If any of them end with one of the newline endings (Unix, Windows,
or otherwise), we output the buffer. Note that only the last string
in the lines
list can end with something other than a newline;
anything at the beginning of the list is only a separate element
because splitlines
found a newline to split on!
line 38 — Finally, we instantiate the new class and return its arbitrary-chunk callback.
This gives us a nice output callback mechanism, to prevent us from buffering the entire contents of the stdout and stderr streams in memory. In later posts, we’ll look at doing the same with the input data, and then we’ll address the second problem, of dealing with more than one subprocess simultaneously.