2353 lines
78 KiB
Python
2353 lines
78 KiB
Python
|
"""
|
||
|
http://amoffat.github.io/sh/
|
||
|
"""
|
||
|
#===============================================================================
|
||
|
# Copyright (C) 2011-2015 by Andrew Moffat
|
||
|
#
|
||
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
# of this software and associated documentation files (the "Software"), to deal
|
||
|
# in the Software without restriction, including without limitation the rights
|
||
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
# copies of the Software, and to permit persons to whom the Software is
|
||
|
# furnished to do so, subject to the following conditions:
|
||
|
#
|
||
|
# The above copyright notice and this permission notice shall be included in
|
||
|
# all copies or substantial portions of the Software.
|
||
|
#
|
||
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||
|
# THE SOFTWARE.
|
||
|
#===============================================================================
|
||
|
|
||
|
|
||
|
__version__ = "1.11"
|
||
|
__project_url__ = "https://github.com/amoffat/sh"
|
||
|
|
||
|
|
||
|
|
||
|
import platform
|
||
|
|
||
|
if "windows" in platform.system().lower():
|
||
|
raise ImportError("sh %s is currently only supported on linux and osx. \
|
||
|
please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
|
||
|
support." % __version__)
|
||
|
|
||
|
|
||
|
import sys
|
||
|
IS_PY3 = sys.version_info[0] == 3
|
||
|
|
||
|
import traceback
|
||
|
import os
|
||
|
import re
|
||
|
from glob import glob as original_glob
|
||
|
import time
|
||
|
from types import ModuleType
|
||
|
from functools import partial
|
||
|
import inspect
|
||
|
from contextlib import contextmanager
|
||
|
|
||
|
from locale import getpreferredencoding
|
||
|
DEFAULT_ENCODING = getpreferredencoding() or "UTF-8"
|
||
|
|
||
|
|
||
|
if IS_PY3:
|
||
|
from io import StringIO
|
||
|
from io import BytesIO as cStringIO
|
||
|
from queue import Queue, Empty
|
||
|
|
||
|
# for some reason, python 3.1 removed the builtin "callable", wtf
|
||
|
if not hasattr(__builtins__, "callable"):
|
||
|
def callable(ob):
|
||
|
return hasattr(ob, "__call__")
|
||
|
else:
|
||
|
from StringIO import StringIO
|
||
|
from cStringIO import OutputType as cStringIO
|
||
|
from Queue import Queue, Empty
|
||
|
|
||
|
IS_OSX = platform.system() == "Darwin"
|
||
|
THIS_DIR = os.path.dirname(os.path.realpath(__file__))
|
||
|
SH_LOGGER_NAME = "sh"
|
||
|
|
||
|
|
||
|
import errno
|
||
|
import warnings
|
||
|
|
||
|
import pty
|
||
|
import termios
|
||
|
import signal
|
||
|
import gc
|
||
|
import select
|
||
|
import threading
|
||
|
import tty
|
||
|
import fcntl
|
||
|
import struct
|
||
|
import resource
|
||
|
from collections import deque
|
||
|
import logging
|
||
|
import weakref
|
||
|
|
||
|
|
||
|
# TODO remove with contexts in next version
|
||
|
def with_context_warning():
|
||
|
warnings.warn("""
|
||
|
with contexts are deprecated because they are not thread safe. they will be \
|
||
|
removed in the next version. use subcommands instead \
|
||
|
http://amoffat.github.io/sh/#sub-commands. see \
|
||
|
https://github.com/amoffat/sh/issues/195
|
||
|
""".strip(), stacklevel=3)
|
||
|
|
||
|
|
||
|
|
||
|
if IS_PY3:
|
||
|
raw_input = input
|
||
|
unicode = str
|
||
|
basestring = str
|
||
|
|
||
|
|
||
|
_unicode_methods = set(dir(unicode()))
|
||
|
|
||
|
|
||
|
def encode_to_py3bytes_or_py2str(s):
|
||
|
""" takes anything and attempts to return a py2 string or py3 bytes. this
|
||
|
is typically used when creating command + arguments to be executed via
|
||
|
os.exec* """
|
||
|
|
||
|
fallback_encoding = "utf8"
|
||
|
|
||
|
if IS_PY3:
|
||
|
# if we're already bytes, do nothing
|
||
|
if isinstance(s, bytes):
|
||
|
pass
|
||
|
else:
|
||
|
s = str(s)
|
||
|
try:
|
||
|
s = bytes(s, DEFAULT_ENCODING)
|
||
|
except UnicodeEncodeError:
|
||
|
s = bytes(s, fallback_encoding)
|
||
|
else:
|
||
|
# attempt to convert the thing to unicode from the system's encoding
|
||
|
try:
|
||
|
s = unicode(s, DEFAULT_ENCODING)
|
||
|
# if the thing is already unicode, or it's a number, it can't be
|
||
|
# coerced to unicode with an encoding argument, but if we leave out
|
||
|
# the encoding argument, it will convert it to a string, then to unicode
|
||
|
except TypeError:
|
||
|
s = unicode(s)
|
||
|
|
||
|
# now that we have guaranteed unicode, encode to our system encoding,
|
||
|
# but attempt to fall back to something
|
||
|
try:
|
||
|
s = s.encode(DEFAULT_ENCODING)
|
||
|
except:
|
||
|
s = s.encode(fallback_encoding)
|
||
|
return s
|
||
|
|
||
|
|
||
|
class ErrorReturnCode(Exception):
|
||
|
""" base class for all exceptions as a result of a command's exit status
|
||
|
being deemed an error. this base class is dynamically subclassed into
|
||
|
derived classes with the format: ErrorReturnCode_NNN where NNN is the exit
|
||
|
code number. the reason for this is it reduces boiler plate code when
|
||
|
testing error return codes:
|
||
|
|
||
|
try:
|
||
|
some_cmd()
|
||
|
except ErrorReturnCode_12:
|
||
|
print("couldn't do X")
|
||
|
|
||
|
vs:
|
||
|
try:
|
||
|
some_cmd()
|
||
|
except ErrorReturnCode as e:
|
||
|
if e.exit_code == 12:
|
||
|
print("couldn't do X")
|
||
|
|
||
|
it's not much of a savings, but i believe it makes the code easier to read """
|
||
|
|
||
|
truncate_cap = 750
|
||
|
|
||
|
def __init__(self, full_cmd, stdout, stderr):
|
||
|
self.full_cmd = full_cmd
|
||
|
self.stdout = stdout
|
||
|
self.stderr = stderr
|
||
|
|
||
|
|
||
|
if self.stdout is None:
|
||
|
exc_stdout = "<redirected>"
|
||
|
else:
|
||
|
exc_stdout = self.stdout[:self.truncate_cap]
|
||
|
out_delta = len(self.stdout) - len(exc_stdout)
|
||
|
if out_delta:
|
||
|
exc_stdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
|
||
|
|
||
|
if self.stderr is None:
|
||
|
exc_stderr = "<redirected>"
|
||
|
else:
|
||
|
exc_stderr = self.stderr[:self.truncate_cap]
|
||
|
err_delta = len(self.stderr) - len(exc_stderr)
|
||
|
if err_delta:
|
||
|
exc_stderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
|
||
|
|
||
|
msg = "\n\n RAN: %r\n\n STDOUT:\n%s\n\n STDERR:\n%s" % \
|
||
|
(full_cmd, exc_stdout.decode(DEFAULT_ENCODING, "replace"),
|
||
|
exc_stderr.decode(DEFAULT_ENCODING, "replace"))
|
||
|
super(ErrorReturnCode, self).__init__(msg)
|
||
|
|
||
|
|
||
|
class SignalException(ErrorReturnCode): pass
|
||
|
class TimeoutException(Exception):
|
||
|
""" the exception thrown when a command is killed because a specified
|
||
|
timeout (via _timeout) was hit """
|
||
|
def __init__(self, exit_code):
|
||
|
self.exit_code = exit_code
|
||
|
super(Exception, self).__init__()
|
||
|
|
||
|
SIGNALS_THAT_SHOULD_THROW_EXCEPTION = (
|
||
|
signal.SIGABRT,
|
||
|
signal.SIGBUS,
|
||
|
signal.SIGFPE,
|
||
|
signal.SIGILL,
|
||
|
signal.SIGINT,
|
||
|
signal.SIGKILL,
|
||
|
signal.SIGPIPE,
|
||
|
signal.SIGQUIT,
|
||
|
signal.SIGSEGV,
|
||
|
signal.SIGTERM,
|
||
|
signal.SIGSYS,
|
||
|
)
|
||
|
|
||
|
|
||
|
# we subclass AttributeError because:
|
||
|
# https://github.com/ipython/ipython/issues/2577
|
||
|
# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
|
||
|
class CommandNotFound(AttributeError): pass
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_((\d+)|SIG\w+)")
|
||
|
rc_exc_cache = {}
|
||
|
|
||
|
|
||
|
def get_exc_from_name(name):
|
||
|
""" takes an exception name, like:
|
||
|
|
||
|
ErrorReturnCode_1
|
||
|
SignalException_9
|
||
|
SignalException_SIGHUP
|
||
|
|
||
|
and returns the corresponding exception. this is primarily used for
|
||
|
importing exceptions from sh into user code, for instance, to capture those
|
||
|
exceptions """
|
||
|
|
||
|
exc = None
|
||
|
try:
|
||
|
return rc_exc_cache[name]
|
||
|
except KeyError:
|
||
|
m = rc_exc_regex.match(name)
|
||
|
if m:
|
||
|
base = m.group(1)
|
||
|
rc_or_sig_name = m.group(2)
|
||
|
|
||
|
if base == "SignalException":
|
||
|
try:
|
||
|
rc = -int(rc_or_sig_name)
|
||
|
except ValueError:
|
||
|
rc = -getattr(signal, rc_or_sig_name)
|
||
|
else:
|
||
|
rc = int(rc_or_sig_name)
|
||
|
|
||
|
exc = get_rc_exc(rc)
|
||
|
return exc
|
||
|
|
||
|
|
||
|
def get_rc_exc(rc_or_sig_name):
|
||
|
""" takes a exit code, signal number, or signal name, and produces an
|
||
|
exception that corresponds to that return code. positive return codes yield
|
||
|
ErrorReturnCode exception, negative return codes yield SignalException
|
||
|
|
||
|
we also cache the generated exception so that only one signal of that type
|
||
|
exists, preserving identity """
|
||
|
|
||
|
try:
|
||
|
rc = int(rc_or_sig_name)
|
||
|
except ValueError:
|
||
|
rc = -getattr(signal, rc_or_sig_name)
|
||
|
|
||
|
try:
|
||
|
return rc_exc_cache[rc]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
if rc > 0:
|
||
|
name = "ErrorReturnCode_%d" % rc
|
||
|
base = ErrorReturnCode
|
||
|
else:
|
||
|
name = "SignalException_%d" % abs(rc)
|
||
|
base = SignalException
|
||
|
|
||
|
exc = type(name, (base,), {"exit_code": rc})
|
||
|
rc_exc_cache[rc] = exc
|
||
|
return exc
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def which(program):
|
||
|
def is_exe(fpath):
|
||
|
return (os.path.exists(fpath) and
|
||
|
os.access(fpath, os.X_OK) and
|
||
|
os.path.isfile(os.path.realpath(fpath)))
|
||
|
|
||
|
fpath, fname = os.path.split(program)
|
||
|
if fpath:
|
||
|
if is_exe(program):
|
||
|
return program
|
||
|
else:
|
||
|
if "PATH" not in os.environ:
|
||
|
return None
|
||
|
for path in os.environ["PATH"].split(os.pathsep):
|
||
|
exe_file = os.path.join(path, program)
|
||
|
if is_exe(exe_file):
|
||
|
return exe_file
|
||
|
|
||
|
return None
|
||
|
|
||
|
def resolve_program(program):
|
||
|
path = which(program)
|
||
|
if not path:
|
||
|
# our actual command might have a dash in it, but we can't call
|
||
|
# that from python (we have to use underscores), so we'll check
|
||
|
# if a dash version of our underscore command exists and use that
|
||
|
# if it does
|
||
|
if "_" in program:
|
||
|
path = which(program.replace("_", "-"))
|
||
|
if not path:
|
||
|
return None
|
||
|
return path
|
||
|
|
||
|
|
||
|
# we add this thin wrapper to glob.glob because of a specific edge case where
|
||
|
# glob does not expand to anything. for example, if you try to do
|
||
|
# glob.glob("*.py") and there are no *.py files in the directory, glob.glob
|
||
|
# returns an empty list. this empty list gets passed to the command, and
|
||
|
# then the command fails with a misleading error message. this thin wrapper
|
||
|
# ensures that if there is no expansion, we pass in the original argument,
|
||
|
# so that when the command fails, the error message is clearer
|
||
|
def glob(arg):
|
||
|
return original_glob(arg) or arg
|
||
|
|
||
|
|
||
|
|
||
|
class Logger(object):
|
||
|
""" provides a memory-inexpensive logger. a gotcha about python's builtin
|
||
|
logger is that logger objects are never garbage collected. if you create a
|
||
|
thousand loggers with unique names, they'll sit there in memory until your
|
||
|
script is done. with sh, it's easy to create loggers with unique names if
|
||
|
we want our loggers to include our command arguments. for example, these
|
||
|
are all unique loggers:
|
||
|
|
||
|
ls -l
|
||
|
ls -l /tmp
|
||
|
ls /tmp
|
||
|
|
||
|
so instead of creating unique loggers, and without sacrificing logging
|
||
|
output, we use this class, which maintains as part of its state, the logging
|
||
|
"context", which will be the very unique name. this allows us to get a
|
||
|
logger with a very general name, eg: "command", and have a unique name
|
||
|
appended to it via the context, eg: "ls -l /tmp" """
|
||
|
def __init__(self, name, context=None):
|
||
|
self.name = name
|
||
|
if context:
|
||
|
context = context.replace("%", "%%")
|
||
|
self.context = context
|
||
|
self.log = logging.getLogger("%s.%s" % (SH_LOGGER_NAME, name))
|
||
|
|
||
|
def _format_msg(self, msg, *args):
|
||
|
if self.context:
|
||
|
msg = "%s: %s" % (self.context, msg)
|
||
|
return msg % args
|
||
|
|
||
|
def get_child(self, name, context):
|
||
|
new_name = self.name + "." + name
|
||
|
new_context = self.context + "." + context
|
||
|
l = Logger(new_name, new_context)
|
||
|
return l
|
||
|
|
||
|
def info(self, msg, *args):
|
||
|
self.log.info(self._format_msg(msg, *args))
|
||
|
|
||
|
def debug(self, msg, *args):
|
||
|
self.log.debug(self._format_msg(msg, *args))
|
||
|
|
||
|
def error(self, msg, *args):
|
||
|
self.log.error(self._format_msg(msg, *args))
|
||
|
|
||
|
def exception(self, msg, *args):
|
||
|
self.log.exception(self._format_msg(msg, *args))
|
||
|
|
||
|
|
||
|
def friendly_truncate(s, max_len):
|
||
|
if len(s) > max_len:
|
||
|
s = "%s...(%d more)" % (s[:max_len], len(s) - max_len)
|
||
|
return s
|
||
|
|
||
|
|
||
|
class RunningCommand(object):
|
||
|
""" this represents an executing Command object. it is returned as the
|
||
|
result of __call__() being executed on a Command instance. this creates a
|
||
|
reference to a OProc instance, which is a low-level wrapper around the
|
||
|
process that was exec'd
|
||
|
|
||
|
this is the class that gets manipulated the most by user code, and so it
|
||
|
implements various convenience methods and logical mechanisms for the
|
||
|
underlying process. for example, if a user tries to access a
|
||
|
backgrounded-process's stdout/err, the RunningCommand object is smart enough
|
||
|
to know to wait() on the process to finish first. and when the process
|
||
|
finishes, RunningCommand is smart enough to translate exit codes to
|
||
|
exceptions. """
|
||
|
|
||
|
def __init__(self, cmd, call_args, stdin, stdout, stderr):
|
||
|
# self.ran is used for auditing what actually ran. for example, in
|
||
|
# exceptions, or if you just want to know what was ran after the
|
||
|
# command ran
|
||
|
if IS_PY3:
|
||
|
self.ran = " ".join([arg.decode(DEFAULT_ENCODING, "ignore") for arg in cmd])
|
||
|
else:
|
||
|
self.ran = " ".join(cmd)
|
||
|
|
||
|
|
||
|
friendly_cmd = friendly_truncate(self.ran, 20)
|
||
|
friendly_call_args = friendly_truncate(str(call_args), 20)
|
||
|
|
||
|
# we're setting up the logger string here, instead of __repr__ because
|
||
|
# we reserve __repr__ to behave as if it was evaluating the child
|
||
|
# process's output
|
||
|
logger_str = "<Command %r call_args %s>" % (friendly_cmd,
|
||
|
friendly_call_args)
|
||
|
|
||
|
self.log = Logger("command", logger_str)
|
||
|
self.call_args = call_args
|
||
|
self.cmd = cmd
|
||
|
|
||
|
self.process = None
|
||
|
self._process_completed = False
|
||
|
should_wait = True
|
||
|
spawn_process = True
|
||
|
|
||
|
|
||
|
# with contexts shouldn't run at all yet, they prepend
|
||
|
# to every command in the context
|
||
|
if call_args["with"]:
|
||
|
spawn_process = False
|
||
|
Command._prepend_stack.append(self)
|
||
|
|
||
|
|
||
|
if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]:
|
||
|
should_wait = False
|
||
|
|
||
|
# we're running in the background, return self and let us lazily
|
||
|
# evaluate
|
||
|
if call_args["bg"]:
|
||
|
should_wait = False
|
||
|
|
||
|
# redirection
|
||
|
if call_args["err_to_out"]:
|
||
|
stderr = OProc.STDOUT
|
||
|
|
||
|
|
||
|
# set up which stream should write to the pipe
|
||
|
# TODO, make pipe None by default and limit the size of the Queue
|
||
|
# in oproc.OProc
|
||
|
pipe = OProc.STDOUT
|
||
|
if call_args["iter"] == "out" or call_args["iter"] is True:
|
||
|
pipe = OProc.STDOUT
|
||
|
elif call_args["iter"] == "err":
|
||
|
pipe = OProc.STDERR
|
||
|
|
||
|
if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True:
|
||
|
pipe = OProc.STDOUT
|
||
|
elif call_args["iter_noblock"] == "err":
|
||
|
pipe = OProc.STDERR
|
||
|
|
||
|
|
||
|
# there's currently only one case where we wouldn't spawn a child
|
||
|
# process, and that's if we're using a with-context with our command
|
||
|
if spawn_process:
|
||
|
self.log.info("starting process")
|
||
|
self.process = OProc(self.log, cmd, stdin, stdout, stderr,
|
||
|
self.call_args, pipe)
|
||
|
|
||
|
if should_wait:
|
||
|
self.wait()
|
||
|
|
||
|
|
||
|
def wait(self):
|
||
|
if not self._process_completed:
|
||
|
self._process_completed = True
|
||
|
|
||
|
exit_code = self.process.wait()
|
||
|
if self.process.timed_out:
|
||
|
# if we timed out, our exit code represents a signal, which is
|
||
|
# negative, so let's make it positive to store in our
|
||
|
# TimeoutException
|
||
|
raise TimeoutException(-exit_code)
|
||
|
else:
|
||
|
self.handle_command_exit_code(exit_code)
|
||
|
|
||
|
# https://github.com/amoffat/sh/issues/185
|
||
|
if self.call_args["done"]:
|
||
|
self.call_args["done"](self)
|
||
|
|
||
|
return self
|
||
|
|
||
|
|
||
|
def handle_command_exit_code(self, code):
|
||
|
""" here we determine if we had an exception, or an error code that we
|
||
|
weren't expecting to see. if we did, we create and raise an exception
|
||
|
"""
|
||
|
if (code not in self.call_args["ok_code"] and (code > 0 or -code in
|
||
|
SIGNALS_THAT_SHOULD_THROW_EXCEPTION)):
|
||
|
exc = get_rc_exc(code)
|
||
|
raise exc(self.ran, self.process.stdout, self.process.stderr)
|
||
|
|
||
|
|
||
|
|
||
|
@property
|
||
|
def stdout(self):
|
||
|
self.wait()
|
||
|
return self.process.stdout
|
||
|
|
||
|
@property
|
||
|
def stderr(self):
|
||
|
self.wait()
|
||
|
return self.process.stderr
|
||
|
|
||
|
@property
|
||
|
def exit_code(self):
|
||
|
self.wait()
|
||
|
return self.process.exit_code
|
||
|
|
||
|
@property
|
||
|
def pid(self):
|
||
|
return self.process.pid
|
||
|
|
||
|
def __len__(self):
|
||
|
return len(str(self))
|
||
|
|
||
|
def __enter__(self):
|
||
|
""" we don't actually do anything here because anything that should have
|
||
|
been done would have been done in the Command.__call__ call.
|
||
|
essentially all that has to happen is the comand be pushed on the
|
||
|
prepend stack. """
|
||
|
with_context_warning()
|
||
|
|
||
|
def __iter__(self):
|
||
|
return self
|
||
|
|
||
|
def next(self):
|
||
|
""" allow us to iterate over the output of our command """
|
||
|
|
||
|
# we do this because if get blocks, we can't catch a KeyboardInterrupt
|
||
|
# so the slight timeout allows for that.
|
||
|
while True:
|
||
|
try:
|
||
|
chunk = self.process._pipe_queue.get(True, 0.001)
|
||
|
except Empty:
|
||
|
if self.call_args["iter_noblock"]:
|
||
|
return errno.EWOULDBLOCK
|
||
|
else:
|
||
|
if chunk is None:
|
||
|
self.wait()
|
||
|
raise StopIteration()
|
||
|
try:
|
||
|
return chunk.decode(self.call_args["encoding"],
|
||
|
self.call_args["decode_errors"])
|
||
|
except UnicodeDecodeError:
|
||
|
return chunk
|
||
|
|
||
|
# python 3
|
||
|
__next__ = next
|
||
|
|
||
|
def __exit__(self, typ, value, traceback):
|
||
|
if self.call_args["with"] and Command._prepend_stack:
|
||
|
Command._prepend_stack.pop()
|
||
|
|
||
|
def __str__(self):
|
||
|
""" in python3, should return unicode. in python2, should return a
|
||
|
string of bytes """
|
||
|
if IS_PY3:
|
||
|
return self.__unicode__()
|
||
|
else:
|
||
|
return unicode(self).encode(self.call_args["encoding"])
|
||
|
|
||
|
def __unicode__(self):
|
||
|
""" a magic method defined for python2. calling unicode() on a
|
||
|
RunningCommand object will call this """
|
||
|
if self.process and self.stdout:
|
||
|
return self.stdout.decode(self.call_args["encoding"],
|
||
|
self.call_args["decode_errors"])
|
||
|
elif IS_PY3:
|
||
|
return ""
|
||
|
else:
|
||
|
return unicode("")
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
return unicode(self) == unicode(other)
|
||
|
__hash__ = None # Avoid DeprecationWarning in Python < 3
|
||
|
|
||
|
def __contains__(self, item):
|
||
|
return item in str(self)
|
||
|
|
||
|
def __getattr__(self, p):
|
||
|
# let these three attributes pass through to the OProc object
|
||
|
if p in ("signal", "terminate", "kill"):
|
||
|
if self.process:
|
||
|
return getattr(self.process, p)
|
||
|
else:
|
||
|
raise AttributeError
|
||
|
|
||
|
# see if strings have what we're looking for. we're looking at the
|
||
|
# method names explicitly because we don't want to evaluate self unless
|
||
|
# we absolutely have to, the reason being, in python2, hasattr swallows
|
||
|
# exceptions, and if we try to run hasattr on a command that failed and
|
||
|
# is being run with _iter=True, the command will be evaluated, throw an
|
||
|
# exception, but hasattr will discard it
|
||
|
if p in _unicode_methods:
|
||
|
return getattr(unicode(self), p)
|
||
|
|
||
|
raise AttributeError
|
||
|
|
||
|
def __repr__(self):
|
||
|
""" in python3, should return unicode. in python2, should return a
|
||
|
string of bytes """
|
||
|
try:
|
||
|
return str(self)
|
||
|
except UnicodeDecodeError:
|
||
|
if self.process:
|
||
|
if self.stdout:
|
||
|
return repr(self.stdout)
|
||
|
return repr("")
|
||
|
|
||
|
def __long__(self):
|
||
|
return long(str(self).strip())
|
||
|
|
||
|
def __float__(self):
|
||
|
return float(str(self).strip())
|
||
|
|
||
|
def __int__(self):
|
||
|
return int(str(self).strip())
|
||
|
|
||
|
|
||
|
|
||
|
def output_redirect_is_filename(out):
|
||
|
return out \
|
||
|
and not callable(out) \
|
||
|
and not hasattr(out, "write") \
|
||
|
and not isinstance(out, (cStringIO, StringIO))
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class Command(object):
|
||
|
""" represents an un-run system program, like "ls" or "cd". because it
|
||
|
represents the program itself (and not a running instance of it), it should
|
||
|
hold very little state. in fact, the only state it does hold is baked
|
||
|
arguments.
|
||
|
|
||
|
when a Command object is called, the result that is returned is a
|
||
|
RunningCommand object, which represents the Command put into an execution
|
||
|
state. """
|
||
|
_prepend_stack = []
|
||
|
|
||
|
_call_args = {
|
||
|
# currently unsupported
|
||
|
#"fg": False, # run command in foreground
|
||
|
|
||
|
# run a command in the background. commands run in the background
|
||
|
# ignore SIGHUP and do not automatically exit when the parent process
|
||
|
# ends
|
||
|
"bg": False,
|
||
|
|
||
|
"with": False, # prepend the command to every command after it
|
||
|
"in": None,
|
||
|
"out": None, # redirect STDOUT
|
||
|
"err": None, # redirect STDERR
|
||
|
"err_to_out": None, # redirect STDERR to STDOUT
|
||
|
|
||
|
# stdin buffer size
|
||
|
# 1 for line, 0 for unbuffered, any other number for that amount
|
||
|
"in_bufsize": 0,
|
||
|
# stdout buffer size, same values as above
|
||
|
"out_bufsize": 1,
|
||
|
"err_bufsize": 1,
|
||
|
|
||
|
# this is how big the output buffers will be for stdout and stderr.
|
||
|
# this is essentially how much output they will store from the process.
|
||
|
# we use a deque, so if it overflows past this amount, the first items
|
||
|
# get pushed off as each new item gets added.
|
||
|
#
|
||
|
# NOTICE
|
||
|
# this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if
|
||
|
# you're buffering out/err at 1024 bytes, the internal buffer size will
|
||
|
# be "internal_bufsize" CHUNKS of 1024 bytes
|
||
|
"internal_bufsize": 3 * 1024 ** 2,
|
||
|
|
||
|
"env": None,
|
||
|
"piped": None,
|
||
|
"iter": None,
|
||
|
"iter_noblock": None,
|
||
|
"ok_code": 0,
|
||
|
"cwd": None,
|
||
|
|
||
|
# the separator delimiting between a long-argument's name and its value
|
||
|
# for example, --arg=derp, '=' is the long_sep
|
||
|
"long_sep": "=",
|
||
|
|
||
|
# this is for programs that expect their input to be from a terminal.
|
||
|
# ssh is one of those programs
|
||
|
"tty_in": False,
|
||
|
"tty_out": True,
|
||
|
|
||
|
"encoding": DEFAULT_ENCODING,
|
||
|
"decode_errors": "strict",
|
||
|
|
||
|
# how long the process should run before it is auto-killed
|
||
|
"timeout": 0,
|
||
|
"timeout_signal": signal.SIGKILL,
|
||
|
|
||
|
# TODO write some docs on "long-running processes"
|
||
|
# these control whether or not stdout/err will get aggregated together
|
||
|
# as the process runs. this has memory usage implications, so sometimes
|
||
|
# with long-running processes with a lot of data, it makes sense to
|
||
|
# set these to true
|
||
|
"no_out": False,
|
||
|
"no_err": False,
|
||
|
"no_pipe": False,
|
||
|
|
||
|
# if any redirection is used for stdout or stderr, internal buffering
|
||
|
# of that data is not stored. this forces it to be stored, as if
|
||
|
# the output is being T'd to both the redirected destination and our
|
||
|
# internal buffers
|
||
|
"tee": None,
|
||
|
|
||
|
# will be called when a process terminates without exception. this
|
||
|
# option also puts the command in the background, since it doesn't make
|
||
|
# sense to have an un-backgrounded command with a done callback
|
||
|
"done": None,
|
||
|
|
||
|
# a tuple (rows, columns) of the desired size of both the stdout and
|
||
|
# stdin ttys, if ttys are being used
|
||
|
"tty_size": (20, 80),
|
||
|
}
|
||
|
|
||
|
# these are arguments that cannot be called together, because they wouldn't
|
||
|
# make any sense
|
||
|
_incompatible_call_args = (
|
||
|
#("fg", "bg", "Command can't be run in the foreground and background"),
|
||
|
("err", "err_to_out", "Stderr is already being redirected"),
|
||
|
("piped", "iter", "You cannot iterate when this command is being piped"),
|
||
|
("piped", "no_pipe", "Using a pipe doesn't make sense if you've \
|
||
|
disabled the pipe"),
|
||
|
("no_out", "iter", "You cannot iterate over output if there is no \
|
||
|
output"),
|
||
|
)
|
||
|
|
||
|
|
||
|
# this method exists because of the need to have some way of letting
|
||
|
# manual object instantiation not perform the underscore-to-dash command
|
||
|
# conversion that resolve_program uses.
|
||
|
#
|
||
|
# there are 2 ways to create a Command object. using sh.Command(<program>)
|
||
|
# or by using sh.<program>. the method fed into sh.Command must be taken
|
||
|
# literally, and so no underscore-dash conversion is performed. the one
|
||
|
# for sh.<program> must do the underscore-dash converesion, because we
|
||
|
# can't type dashes in method names
|
||
|
@classmethod
|
||
|
def _create(cls, program, **default_kwargs):
|
||
|
path = resolve_program(program)
|
||
|
if not path:
|
||
|
raise CommandNotFound(program)
|
||
|
|
||
|
cmd = cls(path)
|
||
|
if default_kwargs:
|
||
|
cmd = cmd.bake(**default_kwargs)
|
||
|
|
||
|
return cmd
|
||
|
|
||
|
|
||
|
def __init__(self, path):
|
||
|
found = which(path)
|
||
|
if not found:
|
||
|
raise CommandNotFound(path)
|
||
|
|
||
|
self._path = encode_to_py3bytes_or_py2str(found)
|
||
|
|
||
|
self._partial = False
|
||
|
self._partial_baked_args = []
|
||
|
self._partial_call_args = {}
|
||
|
|
||
|
# bugfix for functools.wraps. issue #121
|
||
|
self.__name__ = str(self)
|
||
|
|
||
|
|
||
|
def __getattribute__(self, name):
|
||
|
# convenience
|
||
|
getattr = partial(object.__getattribute__, self)
|
||
|
|
||
|
if name.startswith("_"):
|
||
|
return getattr(name)
|
||
|
if name == "bake":
|
||
|
return getattr("bake")
|
||
|
if name.endswith("_"):
|
||
|
name = name[:-1]
|
||
|
|
||
|
return getattr("bake")(name)
|
||
|
|
||
|
|
||
|
@staticmethod
|
||
|
def _extract_call_args(kwargs, to_override={}):
|
||
|
kwargs = kwargs.copy()
|
||
|
call_args = {}
|
||
|
for parg, default in Command._call_args.items():
|
||
|
key = "_" + parg
|
||
|
|
||
|
if key in kwargs:
|
||
|
call_args[parg] = kwargs[key]
|
||
|
del kwargs[key]
|
||
|
elif parg in to_override:
|
||
|
call_args[parg] = to_override[parg]
|
||
|
|
||
|
# test for incompatible call args
|
||
|
s1 = set(call_args.keys())
|
||
|
for args in Command._incompatible_call_args:
|
||
|
args = list(args)
|
||
|
error = args.pop()
|
||
|
|
||
|
if s1.issuperset(args):
|
||
|
raise TypeError("Invalid special arguments %r: %s" % (args, error))
|
||
|
|
||
|
return call_args, kwargs
|
||
|
|
||
|
|
||
|
def _aggregate_keywords(self, keywords, sep, raw=False):
|
||
|
processed = []
|
||
|
for k, v in keywords.items():
|
||
|
# we're passing a short arg as a kwarg, example:
|
||
|
# cut(d="\t")
|
||
|
if len(k) == 1:
|
||
|
if v is not False:
|
||
|
processed.append(encode_to_py3bytes_or_py2str("-" + k))
|
||
|
if v is not True:
|
||
|
processed.append(encode_to_py3bytes_or_py2str(v))
|
||
|
|
||
|
# we're doing a long arg
|
||
|
else:
|
||
|
if not raw:
|
||
|
k = k.replace("_", "-")
|
||
|
|
||
|
if v is True:
|
||
|
processed.append(encode_to_py3bytes_or_py2str("--" + k))
|
||
|
elif v is False:
|
||
|
pass
|
||
|
else:
|
||
|
arg = encode_to_py3bytes_or_py2str("--%s%s%s" % (k, sep, v))
|
||
|
processed.append(arg)
|
||
|
return processed
|
||
|
|
||
|
|
||
|
def _compile_args(self, args, kwargs, sep):
|
||
|
processed_args = []
|
||
|
|
||
|
# aggregate positional args
|
||
|
for arg in args:
|
||
|
if isinstance(arg, (list, tuple)):
|
||
|
if not arg:
|
||
|
warnings.warn("Empty list passed as an argument to %r. \
|
||
|
If you're using glob.glob(), please use sh.glob() instead." % self._path, stacklevel=3)
|
||
|
for sub_arg in arg:
|
||
|
processed_args.append(encode_to_py3bytes_or_py2str(sub_arg))
|
||
|
elif isinstance(arg, dict):
|
||
|
processed_args += self._aggregate_keywords(arg, sep, raw=True)
|
||
|
else:
|
||
|
processed_args.append(encode_to_py3bytes_or_py2str(arg))
|
||
|
|
||
|
# aggregate the keyword arguments
|
||
|
processed_args += self._aggregate_keywords(kwargs, sep)
|
||
|
|
||
|
return processed_args
|
||
|
|
||
|
|
||
|
# TODO needs documentation
|
||
|
def bake(self, *args, **kwargs):
|
||
|
fn = Command(self._path)
|
||
|
fn._partial = True
|
||
|
|
||
|
call_args, kwargs = self._extract_call_args(kwargs)
|
||
|
|
||
|
pruned_call_args = call_args
|
||
|
for k, v in Command._call_args.items():
|
||
|
try:
|
||
|
if pruned_call_args[k] == v:
|
||
|
del pruned_call_args[k]
|
||
|
except KeyError:
|
||
|
continue
|
||
|
|
||
|
fn._partial_call_args.update(self._partial_call_args)
|
||
|
fn._partial_call_args.update(pruned_call_args)
|
||
|
fn._partial_baked_args.extend(self._partial_baked_args)
|
||
|
sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
|
||
|
fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep))
|
||
|
return fn
|
||
|
|
||
|
def __str__(self):
|
||
|
""" in python3, should return unicode. in python2, should return a
|
||
|
string of bytes """
|
||
|
if IS_PY3:
|
||
|
return self.__unicode__()
|
||
|
else:
|
||
|
return self.__unicode__().encode(DEFAULT_ENCODING)
|
||
|
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
try:
|
||
|
return str(self) == str(other)
|
||
|
except:
|
||
|
return False
|
||
|
__hash__ = None # Avoid DeprecationWarning in Python < 3
|
||
|
|
||
|
|
||
|
def __repr__(self):
|
||
|
""" in python3, should return unicode. in python2, should return a
|
||
|
string of bytes """
|
||
|
return "<Command %r>" % str(self)
|
||
|
|
||
|
|
||
|
def __unicode__(self):
|
||
|
""" a magic method defined for python2. calling unicode() on a
|
||
|
self will call this """
|
||
|
baked_args = " ".join(item.decode(DEFAULT_ENCODING) for item in self._partial_baked_args)
|
||
|
if baked_args:
|
||
|
baked_args = " " + baked_args
|
||
|
return self._path.decode(DEFAULT_ENCODING) + baked_args
|
||
|
|
||
|
def __enter__(self):
|
||
|
with_context_warning()
|
||
|
self(_with=True)
|
||
|
|
||
|
def __exit__(self, typ, value, traceback):
|
||
|
Command._prepend_stack.pop()
|
||
|
|
||
|
|
||
|
def __call__(self, *args, **kwargs):
|
||
|
kwargs = kwargs.copy()
|
||
|
args = list(args)
|
||
|
|
||
|
cmd = []
|
||
|
|
||
|
# aggregate any 'with' contexts
|
||
|
call_args = Command._call_args.copy()
|
||
|
for prepend in self._prepend_stack:
|
||
|
# don't pass the 'with' call arg
|
||
|
pcall_args = prepend.call_args.copy()
|
||
|
try:
|
||
|
del pcall_args["with"]
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
call_args.update(pcall_args)
|
||
|
cmd.extend(prepend.cmd)
|
||
|
|
||
|
cmd.append(self._path)
|
||
|
|
||
|
# here we extract the special kwargs and override any
|
||
|
# special kwargs from the possibly baked command
|
||
|
tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args)
|
||
|
call_args.update(tmp_call_args)
|
||
|
|
||
|
if not getattr(call_args["ok_code"], "__iter__", None):
|
||
|
call_args["ok_code"] = [call_args["ok_code"]]
|
||
|
|
||
|
|
||
|
if call_args["done"]:
|
||
|
call_args["bg"] = True
|
||
|
|
||
|
# check if we're piping via composition
|
||
|
stdin = call_args["in"]
|
||
|
if args:
|
||
|
first_arg = args.pop(0)
|
||
|
if isinstance(first_arg, RunningCommand):
|
||
|
# it makes sense that if the input pipe of a command is running
|
||
|
# in the background, then this command should run in the
|
||
|
# background as well
|
||
|
if first_arg.call_args["bg"]:
|
||
|
call_args["bg"] = True
|
||
|
|
||
|
if first_arg.call_args["piped"] == "direct":
|
||
|
stdin = first_arg.process
|
||
|
else:
|
||
|
stdin = first_arg.process._pipe_queue
|
||
|
|
||
|
else:
|
||
|
args.insert(0, first_arg)
|
||
|
|
||
|
processed_args = self._compile_args(args, kwargs, call_args["long_sep"])
|
||
|
|
||
|
# makes sure our arguments are broken up correctly
|
||
|
split_args = self._partial_baked_args + processed_args
|
||
|
|
||
|
final_args = split_args
|
||
|
|
||
|
cmd.extend(final_args)
|
||
|
|
||
|
|
||
|
# stdout redirection
|
||
|
stdout = call_args["out"]
|
||
|
if output_redirect_is_filename(stdout):
|
||
|
stdout = open(str(stdout), "wb")
|
||
|
|
||
|
# stderr redirection
|
||
|
stderr = call_args["err"]
|
||
|
if output_redirect_is_filename(stderr):
|
||
|
stderr = open(str(stderr), "wb")
|
||
|
|
||
|
|
||
|
return RunningCommand(cmd, call_args, stdin, stdout, stderr)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def _start_daemon_thread(fn, *args):
|
||
|
thrd = threading.Thread(target=fn, args=args)
|
||
|
thrd.daemon = True
|
||
|
thrd.start()
|
||
|
return thrd
|
||
|
|
||
|
|
||
|
def setwinsize(fd, rows_cols):
|
||
|
""" set the terminal size of a tty file descriptor. borrowed logic
|
||
|
from pexpect.py """
|
||
|
rows, cols = rows_cols
|
||
|
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
|
||
|
|
||
|
s = struct.pack('HHHH', rows, cols, 0, 0)
|
||
|
fcntl.ioctl(fd, TIOCSWINSZ, s)
|
||
|
|
||
|
def construct_streamreader_callback(process, handler):
|
||
|
""" here we're constructing a closure for our streamreader callback. this
|
||
|
is used in the case that we pass a callback into _out or _err, meaning we
|
||
|
want to our callback to handle each bit of output
|
||
|
|
||
|
we construct the closure based on how many arguments it takes. the reason
|
||
|
for this is to make it as easy as possible for people to use, without
|
||
|
limiting them. a new user will assume the callback takes 1 argument (the
|
||
|
data). as they get more advanced, they may want to terminate the process,
|
||
|
or pass some stdin back, and will realize that they can pass a callback of
|
||
|
more args """
|
||
|
|
||
|
|
||
|
# implied arg refers to the "self" that methods will pass in. we need to
|
||
|
# account for this implied arg when figuring out what function the user
|
||
|
# passed in based on number of args
|
||
|
implied_arg = 0
|
||
|
|
||
|
partial_args = 0
|
||
|
handler_to_inspect = handler
|
||
|
|
||
|
if isinstance(handler, partial):
|
||
|
partial_args = len(handler.args)
|
||
|
handler_to_inspect = handler.func
|
||
|
|
||
|
if inspect.ismethod(handler_to_inspect):
|
||
|
implied_arg = 1
|
||
|
num_args = len(inspect.getargspec(handler_to_inspect).args)
|
||
|
|
||
|
else:
|
||
|
if inspect.isfunction(handler_to_inspect):
|
||
|
num_args = len(inspect.getargspec(handler_to_inspect).args)
|
||
|
|
||
|
# is an object instance with __call__ method
|
||
|
else:
|
||
|
implied_arg = 1
|
||
|
num_args = len(inspect.getargspec(handler_to_inspect.__call__).args)
|
||
|
|
||
|
|
||
|
net_args = num_args - implied_arg - partial_args
|
||
|
|
||
|
handler_args = ()
|
||
|
|
||
|
# just the chunk
|
||
|
if net_args == 1:
|
||
|
handler_args = ()
|
||
|
|
||
|
# chunk, stdin
|
||
|
if net_args == 2:
|
||
|
handler_args = (process.stdin,)
|
||
|
|
||
|
# chunk, stdin, process
|
||
|
elif net_args == 3:
|
||
|
# notice we're only storing a weakref, to prevent cyclic references
|
||
|
# (where the process holds a streamreader, and a streamreader holds a
|
||
|
# handler-closure with a reference to the process
|
||
|
handler_args = (process.stdin, weakref.ref(process))
|
||
|
|
||
|
def fn(chunk):
|
||
|
# this is pretty ugly, but we're evaluating the process at call-time,
|
||
|
# because it's a weakref
|
||
|
args = handler_args
|
||
|
if len(args) == 2:
|
||
|
args = (handler_args[0], handler_args[1]())
|
||
|
return handler(chunk, *args)
|
||
|
|
||
|
return fn
|
||
|
|
||
|
|
||
|
|
||
|
def handle_process_exit_code(exit_code):
|
||
|
""" this should only ever be called once for each child process """
|
||
|
# if we exited from a signal, let our exit code reflect that
|
||
|
if os.WIFSIGNALED(exit_code):
|
||
|
return -os.WTERMSIG(exit_code)
|
||
|
# otherwise just give us a normal exit code
|
||
|
elif os.WIFEXITED(exit_code):
|
||
|
return os.WEXITSTATUS(exit_code)
|
||
|
else:
|
||
|
raise RuntimeError("Unknown child exit status!")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class OProc(object):
|
||
|
""" this class is instantiated by RunningCommand for a command to be exec'd.
|
||
|
it handles all the nasty business involved with correctly setting up the
|
||
|
input/output to the child process. it gets its name for subprocess.Popen
|
||
|
(process open) but we're calling ours OProc (open process) """
|
||
|
|
||
|
_default_window_size = (24, 80)
|
||
|
|
||
|
# used in redirecting
|
||
|
STDOUT = -1
|
||
|
STDERR = -2
|
||
|
|
||
|
def __init__(self, parent_log, cmd, stdin, stdout, stderr, call_args, pipe):
|
||
|
"""
|
||
|
cmd is the full string that will be exec'd. it includes the program
|
||
|
name and all its arguments
|
||
|
|
||
|
stdin, stdout, stderr are what the child will use for standard
|
||
|
input/output/err
|
||
|
|
||
|
call_args is a mapping of all the special keyword arguments to apply
|
||
|
to the child process
|
||
|
"""
|
||
|
|
||
|
self.call_args = call_args
|
||
|
|
||
|
# I had issues with getting 'Input/Output error reading stdin' from dd,
|
||
|
# until I set _tty_out=False
|
||
|
if self.call_args["piped"] == "direct":
|
||
|
self.call_args["tty_out"] = False
|
||
|
|
||
|
self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]
|
||
|
|
||
|
# this logic is a little convoluted, but basically this top-level
|
||
|
# if/else is for consolidating input and output TTYs into a single
|
||
|
# TTY. this is the only way some secure programs like ssh will
|
||
|
# output correctly (is if stdout and stdin are both the same TTY)
|
||
|
if self._single_tty:
|
||
|
self._stdin_fd, self._slave_stdin_fd = pty.openpty()
|
||
|
|
||
|
self._stdout_fd = self._stdin_fd
|
||
|
self._slave_stdout_fd = self._slave_stdin_fd
|
||
|
|
||
|
self._stderr_fd = self._stdin_fd
|
||
|
self._slave_stderr_fd = self._slave_stdin_fd
|
||
|
|
||
|
# do not consolidate stdin and stdout. this is the most common use-
|
||
|
# case
|
||
|
else:
|
||
|
# this check here is because we may be doing "direct" piping
|
||
|
# (_piped="direct"), and so our stdin might be an instance of
|
||
|
# OProc
|
||
|
if isinstance(stdin, OProc):
|
||
|
self._slave_stdin_fd = stdin._stdout_fd
|
||
|
self._stdin_fd = None
|
||
|
elif self.call_args["tty_in"]:
|
||
|
self._slave_stdin_fd, self._stdin_fd = pty.openpty()
|
||
|
# tty_in=False is the default
|
||
|
else:
|
||
|
self._slave_stdin_fd, self._stdin_fd = os.pipe()
|
||
|
|
||
|
|
||
|
# tty_out=True is the default
|
||
|
if self.call_args["tty_out"]:
|
||
|
self._stdout_fd, self._slave_stdout_fd = pty.openpty()
|
||
|
else:
|
||
|
self._stdout_fd, self._slave_stdout_fd = os.pipe()
|
||
|
|
||
|
# unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe,
|
||
|
# and never a PTY. the reason for this is not totally clear to me,
|
||
|
# but it has to do with the fact that if STDERR isn't set as the
|
||
|
# CTTY (because STDOUT is), the STDERR buffer won't always flush
|
||
|
# by the time the process exits, and the data will be lost.
|
||
|
# i've only seen this on OSX.
|
||
|
if stderr is not OProc.STDOUT:
|
||
|
self._stderr_fd, self._slave_stderr_fd = os.pipe()
|
||
|
|
||
|
|
||
|
# this is a hack, but what we're doing here is intentionally throwing an
|
||
|
# OSError exception if our child processes's directory doesn't exist,
|
||
|
# but we're doing it BEFORE we fork. the reason for before the fork is
|
||
|
# error handling. i'm currently too lazy to implement what
|
||
|
# subprocess.py did and set up a error pipe to handle exceptions that
|
||
|
# happen in the child between fork and exec. it has only been seen in
|
||
|
# the wild for a missing cwd, so we'll handle it here.
|
||
|
cwd = self.call_args["cwd"]
|
||
|
if cwd is not None and not os.path.exists(cwd):
|
||
|
os.chdir(cwd)
|
||
|
|
||
|
|
||
|
gc_enabled = gc.isenabled()
|
||
|
if gc_enabled:
|
||
|
gc.disable()
|
||
|
self.pid = os.fork()
|
||
|
|
||
|
|
||
|
# child
|
||
|
if self.pid == 0: # pragma: no cover
|
||
|
try:
|
||
|
# ignoring SIGHUP lets us persist even after the parent process
|
||
|
# exits. only ignore if we're backgrounded
|
||
|
if self.call_args["bg"] is True:
|
||
|
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||
|
|
||
|
# this piece of ugliness is due to a bug where we can lose output
|
||
|
# if we do os.close(self._slave_stdout_fd) in the parent after
|
||
|
# the child starts writing.
|
||
|
# see http://bugs.python.org/issue15898
|
||
|
if IS_OSX:
|
||
|
time.sleep(0.01)
|
||
|
|
||
|
os.setsid()
|
||
|
|
||
|
if self.call_args["tty_out"]:
|
||
|
# set raw mode, so there isn't any weird translation of
|
||
|
# newlines to \r\n and other oddities. we're not outputting
|
||
|
# to a terminal anyways
|
||
|
#
|
||
|
# we HAVE to do this here, and not in the parent process,
|
||
|
# because we have to guarantee that this is set before the
|
||
|
# child process is run, and we can't do it twice.
|
||
|
tty.setraw(self._slave_stdout_fd)
|
||
|
|
||
|
|
||
|
# if the parent-side fd for stdin exists, close it. the case
|
||
|
# where it may not exist is if we're using piped="direct"
|
||
|
if self._stdin_fd:
|
||
|
os.close(self._stdin_fd)
|
||
|
|
||
|
if not self._single_tty:
|
||
|
os.close(self._stdout_fd)
|
||
|
if stderr is not OProc.STDOUT:
|
||
|
os.close(self._stderr_fd)
|
||
|
|
||
|
|
||
|
if cwd:
|
||
|
os.chdir(cwd)
|
||
|
|
||
|
os.dup2(self._slave_stdin_fd, 0)
|
||
|
os.dup2(self._slave_stdout_fd, 1)
|
||
|
|
||
|
# we're not directing stderr to stdout? then set self._slave_stderr_fd to
|
||
|
# fd 2, the common stderr fd
|
||
|
if stderr is OProc.STDOUT:
|
||
|
os.dup2(self._slave_stdout_fd, 2)
|
||
|
else:
|
||
|
os.dup2(self._slave_stderr_fd, 2)
|
||
|
|
||
|
# don't inherit file descriptors
|
||
|
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
|
||
|
os.closerange(3, max_fd)
|
||
|
|
||
|
|
||
|
# set our controlling terminal. tty_out defaults to true
|
||
|
if self.call_args["tty_out"]:
|
||
|
tmp_fd = os.open(os.ttyname(1), os.O_RDWR)
|
||
|
os.close(tmp_fd)
|
||
|
|
||
|
|
||
|
if self.call_args["tty_out"]:
|
||
|
setwinsize(1, self.call_args["tty_size"])
|
||
|
|
||
|
# actually execute the process
|
||
|
if self.call_args["env"] is None:
|
||
|
os.execv(cmd[0], cmd)
|
||
|
else:
|
||
|
os.execve(cmd[0], cmd, self.call_args["env"])
|
||
|
|
||
|
# we must ensure that we ALWAYS exit the child process, otherwise
|
||
|
# the parent process code will be executed twice on exception
|
||
|
# https://github.com/amoffat/sh/issues/202
|
||
|
#
|
||
|
# if your parent process experiences an exit code 255, it is most
|
||
|
# likely that an exception occurred between the fork of the child
|
||
|
# and the exec. this should be reported.
|
||
|
finally:
|
||
|
os._exit(255)
|
||
|
|
||
|
# parent
|
||
|
else:
|
||
|
if gc_enabled:
|
||
|
gc.enable()
|
||
|
|
||
|
# used to determine what exception to raise. if our process was
|
||
|
# killed via a timeout counter, we'll raise something different than
|
||
|
# a SIGKILL exception
|
||
|
self.timed_out = False
|
||
|
|
||
|
self.started = time.time()
|
||
|
self.cmd = cmd
|
||
|
|
||
|
# exit code should only be manipulated from within self._wait_lock
|
||
|
# to prevent race conditions
|
||
|
self.exit_code = None
|
||
|
|
||
|
self.stdin = stdin or Queue()
|
||
|
|
||
|
# _pipe_queue is used internally to hand off stdout from one process
|
||
|
# to another. by default, all stdout from a process gets dumped
|
||
|
# into this pipe queue, to be consumed in real time (hence the
|
||
|
# thread-safe Queue), or at a potentially later time
|
||
|
self._pipe_queue = Queue()
|
||
|
|
||
|
# this is used to prevent a race condition when we're waiting for
|
||
|
# a process to end, and the OProc's internal threads are also checking
|
||
|
# for the processes's end
|
||
|
self._wait_lock = threading.Lock()
|
||
|
|
||
|
# these are for aggregating the stdout and stderr. we use a deque
|
||
|
# because we don't want to overflow
|
||
|
self._stdout = deque(maxlen=self.call_args["internal_bufsize"])
|
||
|
self._stderr = deque(maxlen=self.call_args["internal_bufsize"])
|
||
|
|
||
|
if self.call_args["tty_in"]:
|
||
|
setwinsize(self._stdin_fd, self.call_args["tty_size"])
|
||
|
|
||
|
|
||
|
self.log = parent_log.get_child("process", repr(self))
|
||
|
|
||
|
os.close(self._slave_stdin_fd)
|
||
|
if not self._single_tty:
|
||
|
os.close(self._slave_stdout_fd)
|
||
|
if stderr is not OProc.STDOUT:
|
||
|
os.close(self._slave_stderr_fd)
|
||
|
|
||
|
self.log.debug("started process")
|
||
|
|
||
|
|
||
|
if self.call_args["tty_in"]:
|
||
|
attr = termios.tcgetattr(self._stdin_fd)
|
||
|
attr[3] &= ~termios.ECHO
|
||
|
termios.tcsetattr(self._stdin_fd, termios.TCSANOW, attr)
|
||
|
|
||
|
# this represents the connection from a Queue object (or whatever
|
||
|
# we're using to feed STDIN) to the process's STDIN fd
|
||
|
self._stdin_stream = None
|
||
|
if not isinstance(self.stdin, OProc):
|
||
|
self._stdin_stream = \
|
||
|
StreamWriter(self.log.get_child("streamwriter",
|
||
|
"stdin"), self._stdin_fd, self.stdin,
|
||
|
self.call_args["in_bufsize"],
|
||
|
self.call_args["encoding"],
|
||
|
self.call_args["tty_in"])
|
||
|
|
||
|
stdout_pipe = None
|
||
|
if pipe is OProc.STDOUT and not self.call_args["no_pipe"]:
|
||
|
stdout_pipe = self._pipe_queue
|
||
|
|
||
|
|
||
|
# this represents the connection from a process's STDOUT fd to
|
||
|
# wherever it has to go, sometimes a pipe Queue (that we will use
|
||
|
# to pipe data to other processes), and also an internal deque
|
||
|
# that we use to aggregate all the output
|
||
|
save_stdout = not self.call_args["no_out"] and \
|
||
|
(self.call_args["tee"] in (True, "out") or stdout is None)
|
||
|
|
||
|
|
||
|
# if we're piping directly into another process's filedescriptor, we
|
||
|
# bypass reading from the stdout stream altogether, because we've
|
||
|
# already hooked up this processes's stdout fd to the other
|
||
|
# processes's stdin fd
|
||
|
self._stdout_stream = None
|
||
|
if self.call_args["piped"] != "direct":
|
||
|
if callable(stdout):
|
||
|
stdout = construct_streamreader_callback(self, stdout)
|
||
|
self._stdout_stream = \
|
||
|
StreamReader(self.log.get_child("streamreader",
|
||
|
"stdout"), self._stdout_fd, stdout, self._stdout,
|
||
|
self.call_args["out_bufsize"],
|
||
|
self.call_args["encoding"],
|
||
|
self.call_args["decode_errors"], stdout_pipe,
|
||
|
save_data=save_stdout)
|
||
|
|
||
|
if stderr is OProc.STDOUT or self._single_tty:
|
||
|
self._stderr_stream = None
|
||
|
else:
|
||
|
stderr_pipe = None
|
||
|
if pipe is OProc.STDERR and not self.call_args["no_pipe"]:
|
||
|
stderr_pipe = self._pipe_queue
|
||
|
|
||
|
save_stderr = not self.call_args["no_err"] and \
|
||
|
(self.call_args["tee"] in ("err",) or stderr is None)
|
||
|
|
||
|
if callable(stderr):
|
||
|
stderr = construct_streamreader_callback(self, stderr)
|
||
|
|
||
|
self._stderr_stream = StreamReader(Logger("streamreader"),
|
||
|
self._stderr_fd, stderr, self._stderr,
|
||
|
self.call_args["err_bufsize"], self.call_args["encoding"],
|
||
|
self.call_args["decode_errors"], stderr_pipe,
|
||
|
save_data=save_stderr)
|
||
|
|
||
|
|
||
|
# start the main io threads
|
||
|
# stdin thread is not needed if we are connecting from another process's stdout pipe
|
||
|
self._input_thread = None
|
||
|
if self._stdin_stream:
|
||
|
self._input_thread = _start_daemon_thread(self.input_thread,
|
||
|
self._stdin_stream)
|
||
|
|
||
|
self._output_thread = _start_daemon_thread(self.output_thread,
|
||
|
self._stdout_stream, self._stderr_stream,
|
||
|
self.call_args["timeout"], self.started,
|
||
|
self.call_args["timeout_signal"])
|
||
|
|
||
|
|
||
|
def __repr__(self):
|
||
|
return "<Process %d %r>" % (self.pid, self.cmd[:500])
|
||
|
|
||
|
|
||
|
def change_in_bufsize(self, buf):
|
||
|
self._stdin_stream.stream_bufferer.change_buffering(buf)
|
||
|
|
||
|
def change_out_bufsize(self, buf):
|
||
|
self._stdout_stream.stream_bufferer.change_buffering(buf)
|
||
|
|
||
|
def change_err_bufsize(self, buf):
|
||
|
self._stderr_stream.stream_bufferer.change_buffering(buf)
|
||
|
|
||
|
|
||
|
def input_thread(self, stdin):
|
||
|
""" this is run in a separate thread. it writes into our process's
|
||
|
stdin (a streamwriter) and waits the process to end AND everything that
|
||
|
can be written to be written """
|
||
|
done = False
|
||
|
while not done and self.is_alive():
|
||
|
self.log.debug("%r ready for more input", stdin)
|
||
|
done = stdin.write()
|
||
|
|
||
|
stdin.close()
|
||
|
|
||
|
|
||
|
def output_thread(self, stdout, stderr, timeout, started, timeout_exc):
|
||
|
""" this function is run in a separate thread. it reads from the
|
||
|
process's stdout stream (a streamreader), and waits for it to claim that
|
||
|
its done """
|
||
|
|
||
|
readers = []
|
||
|
errors = []
|
||
|
|
||
|
if stdout is not None:
|
||
|
readers.append(stdout)
|
||
|
errors.append(stdout)
|
||
|
if stderr is not None:
|
||
|
readers.append(stderr)
|
||
|
errors.append(stderr)
|
||
|
|
||
|
# this is our select loop for polling stdout or stderr that is ready to
|
||
|
# be read and processed. if one of those streamreaders indicate that it
|
||
|
# is done altogether being read from, we remove it from our list of
|
||
|
# things to poll. when no more things are left to poll, we leave this
|
||
|
# loop and clean up
|
||
|
while readers:
|
||
|
outputs, inputs, err = select.select(readers, [], errors, 0.1)
|
||
|
|
||
|
# stdout and stderr
|
||
|
for stream in outputs:
|
||
|
self.log.debug("%r ready to be read from", stream)
|
||
|
done = stream.read()
|
||
|
if done:
|
||
|
readers.remove(stream)
|
||
|
|
||
|
for stream in err:
|
||
|
pass
|
||
|
|
||
|
# test if the process has been running too long
|
||
|
if timeout:
|
||
|
now = time.time()
|
||
|
if now - started > timeout:
|
||
|
self.log.debug("we've been running too long")
|
||
|
self.timed_out = True
|
||
|
self.signal(timeout_exc)
|
||
|
|
||
|
|
||
|
# this is here because stdout may be the controlling TTY, and
|
||
|
# we can't close it until the process has ended, otherwise the
|
||
|
# child will get SIGHUP. typically, if we've broken out of
|
||
|
# the above loop, and we're here, the process is just about to
|
||
|
# end, so it's probably ok to aggressively poll self.is_alive()
|
||
|
#
|
||
|
# the other option to this would be to do the CTTY close from
|
||
|
# the method that does the actual os.waitpid() call, but the
|
||
|
# problem with that is that the above loop might still be
|
||
|
# running, and closing the fd will cause some operation to
|
||
|
# fail. this is less complex than wrapping all the ops
|
||
|
# in the above loop with out-of-band fd-close exceptions
|
||
|
while self.is_alive():
|
||
|
time.sleep(0.001)
|
||
|
|
||
|
if stdout:
|
||
|
stdout.close()
|
||
|
|
||
|
if stderr:
|
||
|
stderr.close()
|
||
|
|
||
|
|
||
|
@property
|
||
|
def stdout(self):
|
||
|
return "".encode(self.call_args["encoding"]).join(self._stdout)
|
||
|
|
||
|
@property
|
||
|
def stderr(self):
|
||
|
return "".encode(self.call_args["encoding"]).join(self._stderr)
|
||
|
|
||
|
|
||
|
def signal(self, sig):
|
||
|
self.log.debug("sending signal %d", sig)
|
||
|
try:
|
||
|
os.kill(self.pid, sig)
|
||
|
except OSError:
|
||
|
pass
|
||
|
|
||
|
def kill(self):
|
||
|
self.log.debug("killing")
|
||
|
self.signal(signal.SIGKILL)
|
||
|
|
||
|
def terminate(self):
|
||
|
self.log.debug("terminating")
|
||
|
self.signal(signal.SIGTERM)
|
||
|
|
||
|
|
||
|
def is_alive(self):
|
||
|
""" polls if our child process has completed, without blocking. this
|
||
|
method has side-effects, such as setting our exit_code, if we happen to
|
||
|
see our child exit while this is running """
|
||
|
|
||
|
if self.exit_code is not None:
|
||
|
return False
|
||
|
|
||
|
# what we're doing here essentially is making sure that the main thread
|
||
|
# (or another thread), isn't calling .wait() on the process. because
|
||
|
# .wait() calls os.waitpid(self.pid, 0), we can't do an os.waitpid
|
||
|
# here...because if we did, and the process exited while in this
|
||
|
# thread, the main thread's os.waitpid(self.pid, 0) would raise OSError
|
||
|
# (because the process ended in another thread).
|
||
|
#
|
||
|
# so essentially what we're doing is, using this lock, checking if
|
||
|
# we're calling .wait(), and if we are, let .wait() get the exit code
|
||
|
# and handle the status, otherwise let us do it.
|
||
|
acquired = self._wait_lock.acquire(False)
|
||
|
if not acquired:
|
||
|
if self.exit_code is not None:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
try:
|
||
|
# WNOHANG is just that...we're calling waitpid without hanging...
|
||
|
# essentially polling the process. the return result is (0, 0) if
|
||
|
# there's no process status, so we check that pid == self.pid below
|
||
|
# in order to determine how to proceed
|
||
|
pid, exit_code = os.waitpid(self.pid, os.WNOHANG)
|
||
|
if pid == self.pid:
|
||
|
self.exit_code = handle_process_exit_code(exit_code)
|
||
|
return False
|
||
|
|
||
|
# no child process
|
||
|
except OSError:
|
||
|
return False
|
||
|
else:
|
||
|
return True
|
||
|
finally:
|
||
|
self._wait_lock.release()
|
||
|
|
||
|
|
||
|
def wait(self):
|
||
|
""" waits for the process to complete, handles the exit code """
|
||
|
|
||
|
self.log.debug("acquiring wait lock to wait for completion")
|
||
|
# using the lock in a with-context blocks, which is what we want if
|
||
|
# we're running wait()
|
||
|
with self._wait_lock:
|
||
|
self.log.debug("got wait lock")
|
||
|
|
||
|
if self.exit_code is None:
|
||
|
self.log.debug("exit code not set, waiting on pid")
|
||
|
pid, exit_code = os.waitpid(self.pid, 0) # blocks
|
||
|
self.exit_code = handle_process_exit_code(exit_code)
|
||
|
else:
|
||
|
self.log.debug("exit code already set (%d), no need to wait", self.exit_code)
|
||
|
|
||
|
# we may not have a thread for stdin, if the pipe has been connected
|
||
|
# via _piped="direct"
|
||
|
if self._input_thread:
|
||
|
self._input_thread.join()
|
||
|
|
||
|
# wait for our stdout and stderr streamreaders to finish reading and
|
||
|
# aggregating the process output
|
||
|
self._output_thread.join()
|
||
|
|
||
|
return self.exit_code
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class DoneReadingForever(Exception): pass
|
||
|
class NotYetReadyToRead(Exception): pass
|
||
|
|
||
|
|
||
|
def determine_how_to_read_input(input_obj):
|
||
|
""" given some kind of input object, return a function that knows how to
|
||
|
read chunks of that input object.
|
||
|
|
||
|
each reader function should return a chunk and raise a DoneReadingForever
|
||
|
exception, or return None, when there's no more data to read
|
||
|
|
||
|
NOTE: the function returned does not need to care much about the requested
|
||
|
buffering type (eg, unbuffered vs newline-buffered). the StreamBufferer
|
||
|
will take care of that. these functions just need to return a
|
||
|
reasonably-sized chunk of data. """
|
||
|
|
||
|
get_chunk = None
|
||
|
|
||
|
if isinstance(input_obj, Queue):
|
||
|
log_msg = "queue"
|
||
|
get_chunk = get_queue_chunk_reader(input_obj)
|
||
|
|
||
|
elif callable(input_obj):
|
||
|
log_msg = "callable"
|
||
|
get_chunk = get_callable_chunk_reader(input_obj)
|
||
|
|
||
|
# also handles stringio
|
||
|
elif hasattr(input_obj, "read"):
|
||
|
log_msg = "file descriptor"
|
||
|
get_chunk = get_file_chunk_reader(input_obj)
|
||
|
|
||
|
elif isinstance(input_obj, basestring):
|
||
|
log_msg = "string"
|
||
|
get_chunk = get_iter_string_reader(input_obj)
|
||
|
|
||
|
else:
|
||
|
log_msg = "general iterable"
|
||
|
get_chunk = get_iter_chunk_reader(iter(input_obj))
|
||
|
|
||
|
return get_chunk, log_msg
|
||
|
|
||
|
|
||
|
|
||
|
def get_queue_chunk_reader(stdin):
|
||
|
def fn():
|
||
|
try:
|
||
|
chunk = stdin.get(True, 0.01)
|
||
|
except Empty:
|
||
|
raise NotYetReadyToRead
|
||
|
if chunk is None:
|
||
|
raise DoneReadingForever
|
||
|
return chunk
|
||
|
return fn
|
||
|
|
||
|
|
||
|
def get_callable_chunk_reader(stdin):
|
||
|
def fn():
|
||
|
try:
|
||
|
return stdin()
|
||
|
except:
|
||
|
raise DoneReadingForever
|
||
|
return fn
|
||
|
|
||
|
|
||
|
def get_iter_string_reader(stdin):
|
||
|
""" return an iterator that returns a chunk of a string every time it is
|
||
|
called. notice that even though bufsize_type might be line buffered, we're
|
||
|
not doing any line buffering here. that's because our StreamBufferer
|
||
|
handles all buffering. we just need to return a reasonable-sized chunk. """
|
||
|
bufsize = 1024
|
||
|
iter_str = (stdin[i:i + bufsize] for i in range(0, len(stdin), bufsize))
|
||
|
return get_iter_chunk_reader(iter_str)
|
||
|
|
||
|
|
||
|
def get_iter_chunk_reader(stdin):
|
||
|
def fn():
|
||
|
try:
|
||
|
if IS_PY3:
|
||
|
chunk = stdin.__next__()
|
||
|
else:
|
||
|
chunk = stdin.next()
|
||
|
return chunk
|
||
|
except StopIteration:
|
||
|
raise DoneReadingForever
|
||
|
return fn
|
||
|
|
||
|
def get_file_chunk_reader(stdin):
|
||
|
bufsize = 1024
|
||
|
|
||
|
def fn():
|
||
|
chunk = stdin.read(bufsize)
|
||
|
if not chunk:
|
||
|
raise DoneReadingForever
|
||
|
else:
|
||
|
return chunk
|
||
|
return fn
|
||
|
|
||
|
|
||
|
def bufsize_type_to_bufsize(bf_type):
|
||
|
""" for a given bufsize type, return the actual bufsize we will read.
|
||
|
notice that although 1 means "newline-buffered", we're reading a chunk size
|
||
|
of 1024. this is because we have to read something. we let a
|
||
|
StreamBufferer instance handle splitting our chunk on newlines """
|
||
|
|
||
|
# newlines
|
||
|
if bf_type == 1:
|
||
|
bufsize = 1024
|
||
|
# unbuffered
|
||
|
elif bf_type == 0:
|
||
|
bufsize = 1
|
||
|
# or buffered by specific amount
|
||
|
else:
|
||
|
bufsize = bf_type
|
||
|
|
||
|
return bufsize
|
||
|
|
||
|
|
||
|
|
||
|
class StreamWriter(object):
|
||
|
""" StreamWriter reads from some input (the stdin param) and writes to a fd
|
||
|
(the stream param). the stdin may be a Queue, a callable, something with
|
||
|
the "read" method, a string, or an iterable """
|
||
|
|
||
|
def __init__(self, log, stream, stdin, bufsize_type, encoding, tty_in):
|
||
|
self.stream = stream
|
||
|
self.stdin = stdin
|
||
|
|
||
|
self.log = log
|
||
|
self.encoding = encoding
|
||
|
self.tty_in = tty_in
|
||
|
|
||
|
|
||
|
self.stream_bufferer = StreamBufferer(bufsize_type, self.encoding)
|
||
|
self.get_chunk, log_msg = determine_how_to_read_input(stdin)
|
||
|
self.log.debug("parsed stdin as a %s", log_msg)
|
||
|
|
||
|
|
||
|
def fileno(self):
|
||
|
""" defining this allows us to do select.select on an instance of this
|
||
|
class """
|
||
|
return self.stream
|
||
|
|
||
|
|
||
|
|
||
|
def write(self):
|
||
|
""" attempt to get a chunk of data to write to our child process's
|
||
|
stdin, then write it. the return value answers the questions "are we
|
||
|
done writing forever?" """
|
||
|
|
||
|
# get_chunk may sometimes return bytes, and sometimes returns trings
|
||
|
# because of the nature of the different types of STDIN objects we
|
||
|
# support
|
||
|
try:
|
||
|
chunk = self.get_chunk()
|
||
|
if chunk is None:
|
||
|
raise DoneReadingForever
|
||
|
|
||
|
except DoneReadingForever:
|
||
|
self.log.debug("done reading")
|
||
|
|
||
|
if self.tty_in:
|
||
|
# EOF time
|
||
|
try:
|
||
|
char = termios.tcgetattr(self.stream)[6][termios.VEOF]
|
||
|
except:
|
||
|
char = chr(4).encode()
|
||
|
os.write(self.stream, char)
|
||
|
|
||
|
return True
|
||
|
|
||
|
except NotYetReadyToRead:
|
||
|
self.log.debug("received no data")
|
||
|
return False
|
||
|
|
||
|
# if we're not bytes, make us bytes
|
||
|
if IS_PY3 and hasattr(chunk, "encode"):
|
||
|
chunk = chunk.encode(self.encoding)
|
||
|
|
||
|
for proc_chunk in self.stream_bufferer.process(chunk):
|
||
|
self.log.debug("got chunk size %d: %r", len(proc_chunk),
|
||
|
proc_chunk[:30])
|
||
|
|
||
|
self.log.debug("writing chunk to process")
|
||
|
try:
|
||
|
os.write(self.stream, proc_chunk)
|
||
|
except OSError:
|
||
|
self.log.debug("OSError writing stdin chunk")
|
||
|
return True
|
||
|
|
||
|
|
||
|
def close(self):
|
||
|
self.log.debug("closing, but flushing first")
|
||
|
chunk = self.stream_bufferer.flush()
|
||
|
self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
|
||
|
try:
|
||
|
if chunk:
|
||
|
os.write(self.stream, chunk)
|
||
|
|
||
|
if not self.tty_in:
|
||
|
self.log.debug("we used a TTY, so closing the stream")
|
||
|
os.close(self.stream)
|
||
|
|
||
|
except OSError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
|
||
|
def determine_how_to_feed_output(handler, encoding, decode_errors):
|
||
|
if callable(handler):
|
||
|
process, finish = get_callback_chunk_consumer(handler, encoding,
|
||
|
decode_errors)
|
||
|
elif isinstance(handler, cStringIO):
|
||
|
process, finish = get_cstringio_chunk_consumer(handler)
|
||
|
elif isinstance(handler, StringIO):
|
||
|
process, finish = get_stringio_chunk_consumer(handler, encoding,
|
||
|
decode_errors)
|
||
|
elif hasattr(handler, "write"):
|
||
|
process, finish = get_file_chunk_consumer(handler)
|
||
|
else:
|
||
|
process = lambda chunk: False
|
||
|
finish = lambda: None
|
||
|
|
||
|
return process, finish
|
||
|
|
||
|
|
||
|
def get_file_chunk_consumer(handler):
|
||
|
def process(chunk):
|
||
|
handler.write(chunk)
|
||
|
# we should flush on an fd. chunk is already the correctly-buffered
|
||
|
# size, so we don't need the fd buffering as well
|
||
|
handler.flush()
|
||
|
return False
|
||
|
|
||
|
def finish():
|
||
|
if hasattr(handler, "flush"):
|
||
|
handler.flush()
|
||
|
|
||
|
return process, finish
|
||
|
|
||
|
def get_callback_chunk_consumer(handler, encoding, decode_errors):
|
||
|
def process(chunk):
|
||
|
# try to use the encoding first, if that doesn't work, send
|
||
|
# the bytes, because it might be binary
|
||
|
try:
|
||
|
chunk = chunk.decode(encoding, decode_errors)
|
||
|
except UnicodeDecodeError:
|
||
|
pass
|
||
|
return handler(chunk)
|
||
|
|
||
|
def finish():
|
||
|
pass
|
||
|
|
||
|
return process, finish
|
||
|
|
||
|
def get_cstringio_chunk_consumer(handler):
|
||
|
def process(chunk):
|
||
|
handler.write(chunk)
|
||
|
return False
|
||
|
|
||
|
def finish():
|
||
|
pass
|
||
|
|
||
|
return process, finish
|
||
|
|
||
|
|
||
|
def get_stringio_chunk_consumer(handler, encoding, decode_errors):
|
||
|
def process(chunk):
|
||
|
handler.write(chunk.decode(encoding, decode_errors))
|
||
|
return False
|
||
|
|
||
|
def finish():
|
||
|
pass
|
||
|
|
||
|
return process, finish
|
||
|
|
||
|
|
||
|
class StreamReader(object):
|
||
|
""" reads from some output (the stream) and sends what it just read to the
|
||
|
handler. """
|
||
|
def __init__(self, log, stream, handler, buffer, bufsize_type, encoding,
|
||
|
decode_errors, pipe_queue=None, save_data=True):
|
||
|
self.stream = stream
|
||
|
self.buffer = buffer
|
||
|
self.save_data = save_data
|
||
|
self.encoding = encoding
|
||
|
self.decode_errors = decode_errors
|
||
|
|
||
|
self.pipe_queue = None
|
||
|
if pipe_queue:
|
||
|
self.pipe_queue = weakref.ref(pipe_queue)
|
||
|
|
||
|
self.log = log
|
||
|
|
||
|
self.stream_bufferer = StreamBufferer(bufsize_type, self.encoding,
|
||
|
self.decode_errors)
|
||
|
self.bufsize = bufsize_type_to_bufsize(bufsize_type)
|
||
|
|
||
|
self.process_chunk, self.finish_chunk_processor = \
|
||
|
determine_how_to_feed_output(handler, encoding, decode_errors)
|
||
|
|
||
|
self.should_quit = False
|
||
|
|
||
|
|
||
|
def fileno(self):
|
||
|
""" defining this allows us to do select.select on an instance of this
|
||
|
class """
|
||
|
return self.stream
|
||
|
|
||
|
def close(self):
|
||
|
chunk = self.stream_bufferer.flush()
|
||
|
self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
|
||
|
if chunk:
|
||
|
self.write_chunk(chunk)
|
||
|
|
||
|
self.finish_chunk_processor()
|
||
|
|
||
|
if self.pipe_queue and self.save_data:
|
||
|
self.pipe_queue().put(None)
|
||
|
|
||
|
try:
|
||
|
os.close(self.stream)
|
||
|
except OSError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def write_chunk(self, chunk):
|
||
|
# in PY3, the chunk coming in will be bytes, so keep that in mind
|
||
|
|
||
|
if not self.should_quit:
|
||
|
self.should_quit = self.process_chunk(chunk)
|
||
|
|
||
|
|
||
|
if self.save_data:
|
||
|
self.buffer.append(chunk)
|
||
|
|
||
|
if self.pipe_queue:
|
||
|
self.log.debug("putting chunk onto pipe: %r", chunk[:30])
|
||
|
self.pipe_queue().put(chunk)
|
||
|
|
||
|
|
||
|
def read(self):
|
||
|
# if we're PY3, we're reading bytes, otherwise we're reading
|
||
|
# str
|
||
|
try:
|
||
|
chunk = os.read(self.stream, self.bufsize)
|
||
|
except OSError as e:
|
||
|
self.log.debug("got errno %d, done reading", e.errno)
|
||
|
return True
|
||
|
if not chunk:
|
||
|
self.log.debug("got no chunk, done reading")
|
||
|
return True
|
||
|
|
||
|
self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
|
||
|
for chunk in self.stream_bufferer.process(chunk):
|
||
|
self.write_chunk(chunk)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class StreamBufferer(object):
|
||
|
""" this is used for feeding in chunks of stdout/stderr, and breaking it up
|
||
|
into chunks that will actually be put into the internal buffers. for
|
||
|
example, if you have two processes, one being piped to the other, and you
|
||
|
want that, first process to feed lines of data (instead of the chunks
|
||
|
however they come in), OProc will use an instance of this class to chop up
|
||
|
the data and feed it as lines to be sent down the pipe """
|
||
|
|
||
|
def __init__(self, buffer_type, encoding=DEFAULT_ENCODING,
|
||
|
decode_errors="strict"):
|
||
|
# 0 for unbuffered, 1 for line, everything else for that amount
|
||
|
self.type = buffer_type
|
||
|
self.buffer = []
|
||
|
self.n_buffer_count = 0
|
||
|
self.encoding = encoding
|
||
|
self.decode_errors = decode_errors
|
||
|
|
||
|
# this is for if we change buffering types. if we change from line
|
||
|
# buffered to unbuffered, its very possible that our self.buffer list
|
||
|
# has data that was being saved up (while we searched for a newline).
|
||
|
# we need to use that up, so we don't lose it
|
||
|
self._use_up_buffer_first = False
|
||
|
|
||
|
# the buffering lock is used because we might chance the buffering
|
||
|
# types from a different thread. for example, if we have a stdout
|
||
|
# callback, we might use it to change the way stdin buffers. so we
|
||
|
# lock
|
||
|
self._buffering_lock = threading.RLock()
|
||
|
self.log = Logger("stream_bufferer")
|
||
|
|
||
|
|
||
|
def change_buffering(self, new_type):
|
||
|
# TODO, when we stop supporting 2.6, make this a with context
|
||
|
self.log.debug("acquiring buffering lock for changing buffering")
|
||
|
self._buffering_lock.acquire()
|
||
|
self.log.debug("got buffering lock for changing buffering")
|
||
|
try:
|
||
|
if new_type == 0:
|
||
|
self._use_up_buffer_first = True
|
||
|
|
||
|
self.type = new_type
|
||
|
finally:
|
||
|
self._buffering_lock.release()
|
||
|
self.log.debug("released buffering lock for changing buffering")
|
||
|
|
||
|
|
||
|
def process(self, chunk):
|
||
|
# MAKE SURE THAT THE INPUT IS PY3 BYTES
|
||
|
# THE OUTPUT IS ALWAYS PY3 BYTES
|
||
|
|
||
|
# TODO, when we stop supporting 2.6, make this a with context
|
||
|
self.log.debug("acquiring buffering lock to process chunk (buffering: %d)", self.type)
|
||
|
self._buffering_lock.acquire()
|
||
|
self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
|
||
|
try:
|
||
|
# we've encountered binary, permanently switch to N size buffering
|
||
|
# since matching on newline doesn't make sense anymore
|
||
|
if self.type == 1:
|
||
|
try:
|
||
|
chunk.decode(self.encoding, self.decode_errors)
|
||
|
except:
|
||
|
self.log.debug("detected binary data, changing buffering")
|
||
|
self.change_buffering(1024)
|
||
|
|
||
|
# unbuffered
|
||
|
if self.type == 0:
|
||
|
if self._use_up_buffer_first:
|
||
|
self._use_up_buffer_first = False
|
||
|
to_write = self.buffer
|
||
|
self.buffer = []
|
||
|
to_write.append(chunk)
|
||
|
return to_write
|
||
|
|
||
|
return [chunk]
|
||
|
|
||
|
# line buffered
|
||
|
# we must decode the bytes before we try to match on newline
|
||
|
elif self.type == 1:
|
||
|
total_to_write = []
|
||
|
chunk = chunk.decode(self.encoding, self.decode_errors)
|
||
|
while True:
|
||
|
newline = chunk.find("\n")
|
||
|
if newline == -1:
|
||
|
break
|
||
|
|
||
|
chunk_to_write = chunk[:newline + 1]
|
||
|
if self.buffer:
|
||
|
# this is ugly, but it's designed to take the existing
|
||
|
# bytes buffer, join it together, tack on our latest
|
||
|
# chunk, then convert the whole thing to a string.
|
||
|
# it's necessary, i'm sure. read the whole block to
|
||
|
# see why.
|
||
|
chunk_to_write = "".encode(self.encoding).join(self.buffer) \
|
||
|
+ chunk_to_write.encode(self.encoding)
|
||
|
chunk_to_write = chunk_to_write.decode(self.encoding)
|
||
|
|
||
|
self.buffer = []
|
||
|
self.n_buffer_count = 0
|
||
|
|
||
|
chunk = chunk[newline + 1:]
|
||
|
total_to_write.append(chunk_to_write.encode(self.encoding))
|
||
|
|
||
|
if chunk:
|
||
|
self.buffer.append(chunk.encode(self.encoding))
|
||
|
self.n_buffer_count += len(chunk)
|
||
|
return total_to_write
|
||
|
|
||
|
# N size buffered
|
||
|
else:
|
||
|
total_to_write = []
|
||
|
while True:
|
||
|
overage = self.n_buffer_count + len(chunk) - self.type
|
||
|
if overage >= 0:
|
||
|
ret = "".encode(self.encoding).join(self.buffer) + chunk
|
||
|
chunk_to_write = ret[:self.type]
|
||
|
chunk = ret[self.type:]
|
||
|
total_to_write.append(chunk_to_write)
|
||
|
self.buffer = []
|
||
|
self.n_buffer_count = 0
|
||
|
else:
|
||
|
self.buffer.append(chunk)
|
||
|
self.n_buffer_count += len(chunk)
|
||
|
break
|
||
|
return total_to_write
|
||
|
finally:
|
||
|
self._buffering_lock.release()
|
||
|
self.log.debug("released buffering lock for processing chunk (buffering: %d)", self.type)
|
||
|
|
||
|
|
||
|
def flush(self):
|
||
|
self.log.debug("acquiring buffering lock for flushing buffer")
|
||
|
self._buffering_lock.acquire()
|
||
|
self.log.debug("got buffering lock for flushing buffer")
|
||
|
try:
|
||
|
ret = "".encode(self.encoding).join(self.buffer)
|
||
|
self.buffer = []
|
||
|
return ret
|
||
|
finally:
|
||
|
self._buffering_lock.release()
|
||
|
self.log.debug("released buffering lock for flushing buffer")
|
||
|
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def pushd(path):
|
||
|
""" pushd is just a specialized form of args, where we're passing in the
|
||
|
current working directory """
|
||
|
with args(_cwd=path):
|
||
|
yield
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def args(*args, **kwargs):
|
||
|
""" allows us to temporarily override all the special keyword parameters in
|
||
|
a with context """
|
||
|
call_args = Command._call_args
|
||
|
old_args = call_args.copy()
|
||
|
|
||
|
for key,value in kwargs.items():
|
||
|
key = key.lstrip("_")
|
||
|
call_args[key] = value
|
||
|
|
||
|
yield
|
||
|
call_args.update(old_args)
|
||
|
|
||
|
|
||
|
|
||
|
class Environment(dict):
|
||
|
""" this allows lookups to names that aren't found in the global scope to be
|
||
|
searched for as a program name. for example, if "ls" isn't found in this
|
||
|
module's scope, we consider it a system program and try to find it.
|
||
|
|
||
|
we use a dict instead of just a regular object as the base class because the
|
||
|
exec() statement used in this file requires the "globals" argument to be a
|
||
|
dictionary """
|
||
|
|
||
|
|
||
|
# this is a list of all of the names that the sh module exports that will
|
||
|
# not resolve to functions. we don't want to accidentally shadow real
|
||
|
# commands with functions/imports that we define in sh.py. for example,
|
||
|
# "import time" may override the time system program
|
||
|
whitelist = set([
|
||
|
"Command",
|
||
|
"CommandNotFound",
|
||
|
"DEFAULT_ENCODING",
|
||
|
"DoneReadingForever",
|
||
|
"ErrorReturnCode",
|
||
|
"NotYetReadyToRead",
|
||
|
"SignalException",
|
||
|
"TimeoutException",
|
||
|
"__project_url__",
|
||
|
"__version__",
|
||
|
"args",
|
||
|
"glob",
|
||
|
"pushd",
|
||
|
])
|
||
|
|
||
|
def __init__(self, globs, baked_args={}):
|
||
|
self.globs = globs
|
||
|
self.baked_args = baked_args
|
||
|
self.disable_whitelist = False
|
||
|
|
||
|
def __setitem__(self, k, v):
|
||
|
self.globs[k] = v
|
||
|
|
||
|
def __getitem__(self, k):
|
||
|
# if we first import "_disable_whitelist" from sh, we can import
|
||
|
# anything defined in the global scope of sh.py. this is useful for our
|
||
|
# tests
|
||
|
if k == "_disable_whitelist":
|
||
|
self.disable_whitelist = True
|
||
|
return None
|
||
|
|
||
|
# we're trying to import something real (maybe), see if it's in our
|
||
|
# global scope
|
||
|
if k in self.whitelist or self.disable_whitelist:
|
||
|
try:
|
||
|
return self.globs[k]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
# somebody tried to be funny and do "from sh import *"
|
||
|
if k == "__all__":
|
||
|
raise AttributeError("Cannot import * from sh. \
|
||
|
Please import sh or import programs individually.")
|
||
|
|
||
|
|
||
|
# check if we're naming a dynamically generated ReturnCode exception
|
||
|
exc = get_exc_from_name(k)
|
||
|
if exc:
|
||
|
return exc
|
||
|
|
||
|
|
||
|
# https://github.com/ipython/ipython/issues/2577
|
||
|
# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
|
||
|
if k.startswith("__") and k.endswith("__"):
|
||
|
raise AttributeError
|
||
|
|
||
|
# how about an environment variable?
|
||
|
try:
|
||
|
return os.environ[k]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
# is it a custom builtin?
|
||
|
builtin = getattr(self, "b_" + k, None)
|
||
|
if builtin:
|
||
|
return builtin
|
||
|
|
||
|
# it must be a command then
|
||
|
# we use _create instead of instantiating the class directly because
|
||
|
# _create uses resolve_program, which will automatically do underscore-
|
||
|
# to-dash conversions. instantiating directly does not use that
|
||
|
return Command._create(k, **self.baked_args)
|
||
|
|
||
|
|
||
|
# methods that begin with "b_" are custom builtins and will override any
|
||
|
# program that exists in our path. this is useful for things like
|
||
|
# common shell builtins that people are used to, but which aren't actually
|
||
|
# full-fledged system binaries
|
||
|
|
||
|
def b_cd(self, path):
|
||
|
os.chdir(path)
|
||
|
|
||
|
def b_which(self, program):
|
||
|
return which(program)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def run_repl(env): # pragma: no cover
|
||
|
banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
|
||
|
|
||
|
print(banner.format(version=__version__))
|
||
|
while True:
|
||
|
try:
|
||
|
line = raw_input("sh> ")
|
||
|
except (ValueError, EOFError):
|
||
|
break
|
||
|
|
||
|
try:
|
||
|
exec(compile(line, "<dummy>", "single"), env, env)
|
||
|
except SystemExit:
|
||
|
break
|
||
|
except:
|
||
|
print(traceback.format_exc())
|
||
|
|
||
|
# cleans up our last line
|
||
|
print("")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
# this is a thin wrapper around THIS module (we patch sys.modules[__name__]).
|
||
|
# this is in the case that the user does a "from sh import whatever"
|
||
|
# in other words, they only want to import certain programs, not the whole
|
||
|
# system PATH worth of commands. in this case, we just proxy the
|
||
|
# import lookup to our Environment class
|
||
|
class SelfWrapper(ModuleType):
|
||
|
def __init__(self, self_module, baked_args={}):
|
||
|
# this is super ugly to have to copy attributes like this,
|
||
|
# but it seems to be the only way to make reload() behave
|
||
|
# nicely. if i make these attributes dynamic lookups in
|
||
|
# __getattr__, reload sometimes chokes in weird ways...
|
||
|
for attr in ["__builtins__", "__doc__", "__name__", "__package__"]:
|
||
|
setattr(self, attr, getattr(self_module, attr, None))
|
||
|
|
||
|
# python 3.2 (2.7 and 3.3 work fine) breaks on osx (not ubuntu)
|
||
|
# if we set this to None. and 3.3 needs a value for __path__
|
||
|
self.__path__ = []
|
||
|
self.__self_module = self_module
|
||
|
self.__env = Environment(globals(), baked_args)
|
||
|
|
||
|
def __setattr__(self, name, value):
|
||
|
if hasattr(self, "__env"):
|
||
|
self.__env[name] = value
|
||
|
else:
|
||
|
ModuleType.__setattr__(self, name, value)
|
||
|
|
||
|
def __getattr__(self, name):
|
||
|
if name == "__env":
|
||
|
raise AttributeError
|
||
|
return self.__env[name]
|
||
|
|
||
|
# accept special keywords argument to define defaults for all operations
|
||
|
# that will be processed with given by return SelfWrapper
|
||
|
def __call__(self, **kwargs):
|
||
|
return SelfWrapper(self.__self_module, kwargs)
|
||
|
|
||
|
|
||
|
|
||
|
# we're being run as a stand-alone script
|
||
|
if __name__ == "__main__": # pragma: no cover
|
||
|
try:
|
||
|
arg = sys.argv.pop(1)
|
||
|
except:
|
||
|
arg = None
|
||
|
|
||
|
if arg == "test":
|
||
|
import subprocess
|
||
|
|
||
|
def run_test(version, locale):
|
||
|
py_version = "python%s" % version
|
||
|
py_bin = which(py_version)
|
||
|
|
||
|
if py_bin:
|
||
|
print("Testing %s, locale %r" % (py_version.capitalize(),
|
||
|
locale))
|
||
|
|
||
|
env = os.environ.copy()
|
||
|
env["LANG"] = locale
|
||
|
p = subprocess.Popen([py_bin, os.path.join(THIS_DIR, "test.py")]
|
||
|
+ sys.argv[1:], env=env)
|
||
|
return_code = p.wait()
|
||
|
|
||
|
if return_code != 0:
|
||
|
exit(1)
|
||
|
else:
|
||
|
print("Couldn't find %s, skipping" % py_version.capitalize())
|
||
|
|
||
|
versions = ("2.6", "2.7", "3.1", "3.2", "3.3", "3.4")
|
||
|
locales = ("en_US.UTF-8", "C")
|
||
|
for locale in locales:
|
||
|
for version in versions:
|
||
|
run_test(version, locale)
|
||
|
|
||
|
else:
|
||
|
env = Environment(globals())
|
||
|
run_repl(env)
|
||
|
|
||
|
# we're being imported from somewhere
|
||
|
else:
|
||
|
self = sys.modules[__name__]
|
||
|
sys.modules[__name__] = SelfWrapper(self)
|