| [1999] | 1 | """ | 
|---|
 | 2 | Wrappers around subprocess functionality that simulate an actual shell. | 
|---|
 | 3 | """ | 
|---|
 | 4 |  | 
|---|
 | 5 | import subprocess | 
|---|
 | 6 | import logging | 
|---|
 | 7 | import sys | 
|---|
 | 8 | import os | 
|---|
 | 9 | import errno | 
|---|
 | 10 |  | 
|---|
 | 11 | class Shell(object): | 
|---|
 | 12 |     """ | 
|---|
 | 13 |     An advanced shell that performs logging.  If ``dry`` is ``True``, | 
|---|
 | 14 |     no commands are actually run. | 
|---|
 | 15 |     """ | 
|---|
 | 16 |     def __init__(self, dry = False): | 
|---|
 | 17 |         self.dry = dry | 
|---|
 | 18 |         self.cwd = None | 
|---|
 | 19 |     def call(self, *args, **kwargs): | 
|---|
 | 20 |         """ | 
|---|
 | 21 |         Performs a system call.  The actual executable and options should | 
|---|
 | 22 |         be passed as arguments to this function.  Several keyword arguments | 
|---|
 | 23 |         are also supported: | 
|---|
 | 24 |  | 
|---|
 | 25 |         :param input: input to feed the subprocess on standard input. | 
|---|
 | 26 |         :param interactive: whether or not directly hook up all pipes | 
|---|
 | 27 |             to the controlling terminal, to allow interaction with subprocess. | 
|---|
 | 28 |         :param strip: if ``True``, instead of returning a tuple, | 
|---|
 | 29 |             return the string stdout output of the command with trailing newlines | 
|---|
 | 30 |             removed.  This emulates the behavior of backticks and ``$()`` in Bash. | 
|---|
 | 31 |             Prefer to use :meth:`eval` instead (you should only need to explicitly | 
|---|
 | 32 |             specify this if you are using another wrapper around this function). | 
|---|
 | 33 |         :param log: if True, we log the call as INFO, if False, we log the call | 
|---|
 | 34 |             as DEBUG, otherwise, we detect based on ``strip``. | 
|---|
 | 35 |         :param stdout: | 
|---|
 | 36 |         :param stderr: | 
|---|
 | 37 |         :param stdin: a file-type object that will be written to or read from as a pipe. | 
|---|
 | 38 |         :returns: a tuple of strings ``(stdout, stderr)``, or a string ``stdout`` | 
|---|
 | 39 |             if ``strip`` is specified. | 
|---|
 | 40 |  | 
|---|
 | 41 |         >>> sh = Shell() | 
|---|
 | 42 |         >>> sh.call("echo", "Foobar") | 
|---|
 | 43 |         ('Foobar\\n', '') | 
|---|
 | 44 |         >>> sh.call("cat", input='Foobar') | 
|---|
 | 45 |         ('Foobar', '') | 
|---|
 | 46 |         """ | 
|---|
 | 47 |         self._wait() | 
|---|
 | 48 |         kwargs.setdefault("interactive", False) | 
|---|
 | 49 |         kwargs.setdefault("strip", False) | 
|---|
 | 50 |         kwargs.setdefault("python", None) | 
|---|
 | 51 |         kwargs.setdefault("log", None) | 
|---|
 | 52 |         kwargs.setdefault("stdout", subprocess.PIPE) | 
|---|
 | 53 |         kwargs.setdefault("stdin", subprocess.PIPE) | 
|---|
 | 54 |         kwargs.setdefault("stderr", subprocess.PIPE) | 
|---|
 | 55 |         msg = "Running `" + ' '.join(args) + "`" | 
|---|
 | 56 |         if kwargs["strip"] and not kwargs["log"] is True or kwargs["log"] is False: | 
|---|
 | 57 |             logging.debug(msg) | 
|---|
 | 58 |         else: | 
|---|
 | 59 |             logging.info(msg) | 
|---|
 | 60 |         if self.dry: | 
|---|
 | 61 |             if kwargs["strip"]: | 
|---|
 | 62 |                 return '' | 
|---|
 | 63 |             return None, None | 
|---|
 | 64 |         kwargs.setdefault("input", None) | 
|---|
 | 65 |         if kwargs["interactive"]: | 
|---|
 | 66 |             stdout=sys.stdout | 
|---|
 | 67 |             stdin=sys.stdin | 
|---|
 | 68 |             stderr=sys.stderr | 
|---|
 | 69 |         else: | 
|---|
 | 70 |             stdout=kwargs["stdout"] | 
|---|
 | 71 |             stdin=kwargs["stdin"] | 
|---|
 | 72 |             stderr=kwargs["stderr"] | 
|---|
 | 73 |         # XXX: There is a possible problem here where we can fill up | 
|---|
 | 74 |         # the kernel buffer if we have 64KB of data.  This shouldn't | 
|---|
 | 75 |         # be a problem, and the fix for such case would be to write to | 
|---|
 | 76 |         # temporary files instead of a pipe. | 
|---|
 | 77 |         # Another possible way of fixing this is converting from a | 
|---|
 | 78 |         # waitpid() pump to a select() pump, creating a pipe to | 
|---|
 | 79 |         # ourself, and then setting up a | 
|---|
 | 80 |         # SIGCHILD handler to write a single byte to the pipe to get | 
|---|
 | 81 |         # us out of select() when a subprocess exits. | 
|---|
 | 82 |         proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, stdin=stdin, cwd=self.cwd, ) | 
|---|
 | 83 |         if self._async(proc, args, **kwargs): | 
|---|
 | 84 |             return proc | 
|---|
 | 85 |         stdout, stderr = proc.communicate(kwargs["input"]) | 
|---|
 | 86 |         # can occur if we were doing interactive communication; i.e. | 
|---|
 | 87 |         # we didn't pass in PIPE. | 
|---|
 | 88 |         if stdout is None: | 
|---|
 | 89 |             stdout = "" | 
|---|
 | 90 |         if stderr is None: | 
|---|
 | 91 |             stderr = "" | 
|---|
 | 92 |         if not kwargs["interactive"]: | 
|---|
 | 93 |             if kwargs["strip"]: | 
|---|
 | 94 |                 self._log(None, stderr) | 
|---|
 | 95 |             else: | 
|---|
 | 96 |                 self._log(stdout, stderr) | 
|---|
 | 97 |         if proc.returncode: | 
|---|
 | 98 |             raise CallError(proc.returncode, args, stdout, stderr) | 
|---|
 | 99 |         if kwargs["strip"]: | 
|---|
 | 100 |             return str(stdout).rstrip("\n") | 
|---|
 | 101 |         return (stdout, stderr) | 
|---|
 | 102 |     def _log(self, stdout, stderr): | 
|---|
 | 103 |         """Logs the standard output and standard input from a command.""" | 
|---|
 | 104 |         if stdout: | 
|---|
 | 105 |             logging.debug("STDOUT:\n" + stdout) | 
|---|
 | 106 |         if stderr: | 
|---|
 | 107 |             logging.debug("STDERR:\n" + stderr) | 
|---|
 | 108 |     def _wait(self): | 
|---|
 | 109 |         pass | 
|---|
 | 110 |     def _async(self, *args, **kwargs): | 
|---|
 | 111 |         return False | 
|---|
 | 112 |     def callAsUser(self, *args, **kwargs): | 
|---|
 | 113 |         """ | 
|---|
 | 114 |         Performs a system call as a different user.  This is only possible | 
|---|
 | 115 |         if you are running as root.  Keyword arguments | 
|---|
 | 116 |         are the same as :meth:`call` with the following additions: | 
|---|
 | 117 |  | 
|---|
 | 118 |         :param user: name of the user to run command as. | 
|---|
 | 119 |         :param uid: uid of the user to run command as. | 
|---|
 | 120 |  | 
|---|
 | 121 |         .. note:: | 
|---|
 | 122 |  | 
|---|
 | 123 |             The resulting system call internally uses :command:`sudo`, | 
|---|
 | 124 |             and as such environment variables will get scrubbed.  We | 
|---|
 | 125 |             manually preserve :envvar:`SSH_GSSAPI_NAME`. | 
|---|
 | 126 |         """ | 
|---|
 | 127 |         user = kwargs.pop("user", None) | 
|---|
 | 128 |         uid = kwargs.pop("uid", None) | 
|---|
 | 129 |         if not user and not uid: return self.call(*args, **kwargs) | 
|---|
 | 130 |         if os.getenv("SSH_GSSAPI_NAME"): | 
|---|
 | 131 |             # This might be generalized as "preserve some environment" | 
|---|
 | 132 |             args = list(args) | 
|---|
 | 133 |             args.insert(0, "SSH_GSSAPI_NAME=" + os.getenv("SSH_GSSAPI_NAME")) | 
|---|
 | 134 |         if uid: return self.call("sudo", "-u", "#" + str(uid), *args, **kwargs) | 
|---|
 | 135 |         if user: return self.call("sudo", "-u", user, *args, **kwargs) | 
|---|
 | 136 |     def safeCall(self, *args, **kwargs): | 
|---|
 | 137 |         """ | 
|---|
 | 138 |         Checks if the owner of the current working directory is the same | 
|---|
 | 139 |         as the current user, and if it isn't, attempts to sudo to be | 
|---|
 | 140 |         that user.  The intended use case is for calling Git commands | 
|---|
 | 141 |         when running as root, but this method should be used when | 
|---|
 | 142 |         interfacing with any moderately complex program that depends | 
|---|
 | 143 |         on working directory context.  Keyword arguments are the | 
|---|
 | 144 |         same as :meth:`call`. | 
|---|
 | 145 |         """ | 
|---|
 | 146 |         if os.getuid(): | 
|---|
 | 147 |             return self.call(*args, **kwargs) | 
|---|
 | 148 |         uid = os.stat(os.getcwd()).st_uid | 
|---|
 | 149 |         # consider also checking ruid? | 
|---|
 | 150 |         if uid != os.geteuid(): | 
|---|
 | 151 |             kwargs['uid'] = uid | 
|---|
 | 152 |             return self.callAsUser(*args, **kwargs) | 
|---|
 | 153 |         else: | 
|---|
 | 154 |             return self.call(*args, **kwargs) | 
|---|
 | 155 |     def eval(self, *args, **kwargs): | 
|---|
 | 156 |         """ | 
|---|
 | 157 |         Evaluates a command and returns its output, with trailing newlines | 
|---|
 | 158 |         stripped (like backticks in Bash).  This is a convenience method for | 
|---|
 | 159 |         calling :meth:`call` with ``strip``. | 
|---|
 | 160 |  | 
|---|
 | 161 |             >>> sh = Shell() | 
|---|
 | 162 |             >>> sh.eval("echo", "Foobar")  | 
|---|
 | 163 |             'Foobar' | 
|---|
 | 164 |         """ | 
|---|
 | 165 |         kwargs["strip"] = True | 
|---|
 | 166 |         return self.call(*args, **kwargs) | 
|---|
 | 167 |     def setcwd(self, cwd): | 
|---|
 | 168 |         """ | 
|---|
 | 169 |         Sets the directory processes are executed in. This sets a value | 
|---|
 | 170 |         to be passed as the ``cwd`` argument to ``subprocess.Popen``. | 
|---|
 | 171 |         """ | 
|---|
 | 172 |         self.cwd = cwd | 
|---|
 | 173 |  | 
|---|
 | 174 | class ParallelShell(Shell): | 
|---|
 | 175 |     """ | 
|---|
 | 176 |     Modifies the semantics of :class:`Shell` so that | 
|---|
 | 177 |     commands are queued here, and executed in parallel using waitpid | 
|---|
 | 178 |     with ``max`` subprocesses, and result in callback execution | 
|---|
 | 179 |     when they finish. | 
|---|
 | 180 |  | 
|---|
 | 181 |     .. method:: call(*args, **kwargs) | 
|---|
 | 182 |  | 
|---|
 | 183 |         Enqueues a system call for parallel processing.  If there are | 
|---|
 | 184 |         no openings in the queue, this will block.  Keyword arguments | 
|---|
 | 185 |         are the same as :meth:`Shell.call` with the following additions: | 
|---|
 | 186 |  | 
|---|
 | 187 |         :param on_success: Callback function for success (zero exit status). | 
|---|
 | 188 |             The callback function should accept two arguments, | 
|---|
 | 189 |             ``stdout`` and ``stderr``. | 
|---|
 | 190 |         :param on_error: Callback function for failure (nonzero exit status). | 
|---|
 | 191 |             The callback function should accept one argument, the | 
|---|
 | 192 |             exception that would have been thrown by the synchronous | 
|---|
 | 193 |             version. | 
|---|
 | 194 |         :return: The :class:`subprocess.Proc` object that was opened. | 
|---|
 | 195 |  | 
|---|
 | 196 |     .. method:: callAsUser(*args, **kwargs) | 
|---|
 | 197 |  | 
|---|
 | 198 |         Enqueues a system call under a different user for parallel | 
|---|
 | 199 |         processing.  Keyword arguments are the same as | 
|---|
 | 200 |         :meth:`Shell.callAsUser` with the additions of keyword | 
|---|
 | 201 |         arguments from :meth:`call`. | 
|---|
 | 202 |  | 
|---|
 | 203 |     .. method:: safeCall(*args, **kwargs) | 
|---|
 | 204 |  | 
|---|
 | 205 |         Enqueues a "safe" call for parallel processing.  Keyword | 
|---|
 | 206 |         arguments are the same as :meth:`Shell.safeCall` with the | 
|---|
 | 207 |         additions of keyword arguments from :meth:`call`. | 
|---|
 | 208 |  | 
|---|
 | 209 |     .. method:: eval(*args, **kwargs) | 
|---|
 | 210 |  | 
|---|
 | 211 |         No difference from :meth:`call`.  Consider having a | 
|---|
 | 212 |         non-parallel shell if the program you are shelling out | 
|---|
 | 213 |         to is fast. | 
|---|
 | 214 |  | 
|---|
 | 215 |     """ | 
|---|
 | 216 |     def __init__(self, dry = False, max = 10): | 
|---|
 | 217 |         super(ParallelShell, self).__init__(dry=dry) | 
|---|
 | 218 |         self.running = {} | 
|---|
 | 219 |         self.max = max # maximum of commands to run in parallel | 
|---|
 | 220 |     @staticmethod | 
|---|
 | 221 |     def make(no_parallelize, max): | 
|---|
 | 222 |         """Convenience method oriented towards command modules.""" | 
|---|
 | 223 |         if no_parallelize: | 
|---|
 | 224 |             return DummyParallelShell() | 
|---|
 | 225 |         else: | 
|---|
 | 226 |             return ParallelShell(max=max) | 
|---|
 | 227 |     def _async(self, proc, args, python, on_success, on_error, **kwargs): | 
|---|
 | 228 |         """ | 
|---|
 | 229 |         Gets handed a :class:`subprocess.Proc` object from our deferred | 
|---|
 | 230 |         execution.  See :meth:`Shell.call` source code for details. | 
|---|
 | 231 |         """ | 
|---|
 | 232 |         self.running[proc.pid] = (proc, args, python, on_success, on_error) | 
|---|
 | 233 |         return True # so that the parent function returns | 
|---|
 | 234 |     def _wait(self): | 
|---|
 | 235 |         """ | 
|---|
 | 236 |         Blocking call that waits for an open subprocess slot.  This is | 
|---|
 | 237 |         automatically called by :meth:`Shell.call`. | 
|---|
 | 238 |         """ | 
|---|
 | 239 |         # XXX: This API sucks; the actual call/callAsUser call should | 
|---|
 | 240 |         # probably block automatically (unless I have a good reason not to) | 
|---|
 | 241 |         # bail out immediately on initial ramp up | 
|---|
 | 242 |         if len(self.running) < self.max: return | 
|---|
 | 243 |         # now, wait for open pids. | 
|---|
 | 244 |         try: | 
|---|
 | 245 |             self.reap(*os.waitpid(-1, 0)) | 
|---|
 | 246 |         except OSError as e: | 
|---|
 | 247 |             if e.errno == errno.ECHILD: return | 
|---|
 | 248 |             raise | 
|---|
 | 249 |     def join(self): | 
|---|
 | 250 |         """Waits for all of our subprocesses to terminate.""" | 
|---|
 | 251 |         try: | 
|---|
 | 252 |             while True: | 
|---|
 | 253 |                 self.reap(*os.waitpid(-1, 0)) | 
|---|
 | 254 |         except OSError as e: | 
|---|
 | 255 |             if e.errno == errno.ECHILD: return | 
|---|
 | 256 |             raise | 
|---|
 | 257 |     def reap(self, pid, status): | 
|---|
 | 258 |         """Reaps a process.""" | 
|---|
 | 259 |         # ooh, zombie process. reap it | 
|---|
 | 260 |         proc, args, python, on_success, on_error = self.running.pop(pid) | 
|---|
 | 261 |         # XXX: this is slightly dangerous; should actually use | 
|---|
 | 262 |         # temporary files | 
|---|
 | 263 |         stdout = proc.stdout.read() | 
|---|
 | 264 |         stderr = proc.stderr.read() | 
|---|
 | 265 |         self._log(stdout, stderr) | 
|---|
 | 266 |         if status: | 
|---|
 | 267 |             on_error(CallError(proc.returncode, args, stdout, stderr)) | 
|---|
 | 268 |             return | 
|---|
 | 269 |         on_success(stdout, stderr) | 
|---|
 | 270 |  | 
|---|
 | 271 | # Setup a convenience global instance | 
|---|
 | 272 | shell = Shell() | 
|---|
 | 273 | call = shell.call | 
|---|
 | 274 | callAsUser = shell.callAsUser | 
|---|
 | 275 | safeCall = shell.safeCall | 
|---|
 | 276 | eval = shell.eval | 
|---|
 | 277 |  | 
|---|
 | 278 | class DummyParallelShell(ParallelShell): | 
|---|
 | 279 |     """Same API as :class:`ParallelShell`, but doesn't actually | 
|---|
 | 280 |     parallelize (i.e. all calls to :meth:`wait` block.)""" | 
|---|
 | 281 |     def __init__(self, dry = False): | 
|---|
 | 282 |         super(DummyParallelShell, self).__init__(dry=dry, max=1) | 
|---|
 | 283 |  | 
|---|
 | 284 | class CallError: | 
|---|
 | 285 |     """Indicates that a subprocess call returned a nonzero exit status.""" | 
|---|
 | 286 |     #: The exit code of the failed subprocess. | 
|---|
 | 287 |     code = None | 
|---|
 | 288 |     #: List of the program and arguments that failed. | 
|---|
 | 289 |     args = None | 
|---|
 | 290 |     #: The stdout of the program. | 
|---|
 | 291 |     stdout = None | 
|---|
 | 292 |     #: The stderr of the program. | 
|---|
 | 293 |     stderr = None | 
|---|
 | 294 |     def __init__(self, code, args, stdout, stderr): | 
|---|
 | 295 |         self.code = code | 
|---|
 | 296 |         self.args = args | 
|---|
 | 297 |         self.stdout = stdout | 
|---|
 | 298 |         self.stderr = stderr | 
|---|
 | 299 |     def __str__(self): | 
|---|
 | 300 |         compact = self.stderr.rstrip().split("\n")[-1] | 
|---|
 | 301 |         return "%s (exited with %d)\n%s" % (compact, self.code, self.stderr) | 
|---|