This module is intended to be used for controlling all child processes in KVM tests: both QEMU processes and SSH/SCP/Telnet processes. Processes started with this module keep running and can be interacted with even after the parent process exits. The current run_bg() utility tracks a child process as long as the parent process is running. When the parent process exits, the tracking thread terminates and cannot resume when needed. Currently SSH/SCP/Telnet communication is handled by kvm_utils.kvm_spawn, which does not allow the child process to run after the parent process exits. Thus, open SSH/SCP/Telnet sessions cannot be reused by tests following the one in which they are opened. The new module provides a solution to these two problems, and also saves some code by reusing common code required both for QEMU processes and SSH/SCP/Telnet processes. Signed-off-by: Michael Goldish <mgoldish@xxxxxxxxxx> --- client/tests/kvm/kvm_subprocess.py | 870 ++++++++++++++++++++++++++++++++++++ 1 files changed, 870 insertions(+), 0 deletions(-) create mode 100644 client/tests/kvm/kvm_subprocess.py diff --git a/client/tests/kvm/kvm_subprocess.py b/client/tests/kvm/kvm_subprocess.py new file mode 100644 index 0000000..a6090ad --- /dev/null +++ b/client/tests/kvm/kvm_subprocess.py @@ -0,0 +1,870 @@ +#!/usr/bin/python +import sys, subprocess, pty, select, os, time, signal, re, termios, fcntl +import threading, logging +import common, kvm_utils + +""" +A class and functions used for running and controlling child processes. + +@copyright: 2008-2009 Red Hat Inc. +""" + + +def _lock(filename): + if not os.path.exists(filename): + open(filename, "w").close() + fd = os.open(filename, os.O_RDWR) + fcntl.lockf(fd, fcntl.LOCK_EX) + return fd + + +def _unlock(fd): + fcntl.lockf(fd, fcntl.LOCK_UN) + os.close(fd) + + +def _locked(filename): + try: + fd = os.open(filename, os.O_RDWR) + except: + return False + try: + fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except: + os.close(fd) + return True + fcntl.lockf(fd, fcntl.LOCK_UN) + os.close(fd) + return False + + +def _wait(filename): + fd = _lock(filename) + _unlock(fd) + + +def _get_filenames(base_dir, id): + return map(lambda s: os.path.join(base_dir, s + "-" + id), + ("pid", "status", "output", "outpipe1", "outpipe2", "inpipe", + "lock-server-running", "lock-client-starting")) + + +class kvm_spawn: + """ + This class is used for spawning and controlling a child process. + + A new instance of this class can either run a new server (a small Python + program that reads output from the child process and reports it to the + client and to a text file) or attach to an already running server. + When a server is started it runs the child process. + The server reports output from the child's STDOUT and STDERR via 3 + channels: two named pipes and a text file. + The text file can be accessed at any time using get_output(). + The first named pipe is used by _tail(), a function that runs in the + background and reports new output from the child as it is produced. + The second named pipe is used by a set of functions that read and parse + output as requested by the user in an interactive manner, similar to + pexpect. + The server also receives input from the client and sends it to the child + process. + An instance of this class can be pickled. When unpickled it automatically + resumes _tail() if needed. + """ + + def __init__(self, command=None, id=None, termination_func=None, + output_func=None, output_prefix="", linesep="\n", + prompt=r"[\#\$]\s*$", status_test_command="echo $?"): + """ + Initialize the class and run command as a child process. + + @param command: Command that will be run, or None if accessing an + already running process. + @param id: id of an already running instance, if accessing a running + instance, or None if starting a new one. + @param termination_func: Function to call when the process exits. The + function must accept a single exit status parameter. + @param output_func: Function to call whenever a line of output is + available from the STDOUT or STDERR streams of the process. + The function must accept a single string parameter. The string + does not include the final newline. + @param output_prefix: String to prepend to lines sent to output_func. + @param linesep: Line separator to be appended to strings sent to the + child process using sendline(). + @param prompt: Regular expression to be used with read_up_to_prompt(). + @param status_test_command: Command to be used for getting the last + exit status of commands run inside a shell (used by + get_command_status_output() and friends). + """ + self.id = id or kvm_utils.generate_random_string(8) + + # Define filenames for communication with server + base_dir = "/tmp/kvm_spawn" + try: + os.makedirs(base_dir) + except: + pass + (self.pid_filename, + self.status_filename, + self.output_filename, + self.outpipe1_filename, + self.outpipe2_filename, + self.inpipe_filename, + self.lock_server_running_filename, + self.lock_client_starting_filename) = _get_filenames(base_dir, + self.id) + + # Parameters for _tail() + self.termination_func = termination_func + self.output_func = output_func + self.output_prefix = output_prefix + + # Set some optional attributes + self.linesep = linesep + self.prompt = prompt + self.status_test_command = status_test_command + + lock_client_starting = _lock(self.lock_client_starting_filename) + + # Start the server (which runs the command) + if command: + sub = subprocess.Popen("python %s %s '%s'" % + (__file__, self.id, command), shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Wait for server to complete its initialization + sub.stdout.readline() + + # Open the pipes for reading + try: + assert(_locked(self.lock_server_running_filename)) + self.fd_outpipe1 = os.open(self.outpipe1_filename, os.O_RDONLY) + self.fd_outpipe2 = os.open(self.outpipe2_filename, os.O_RDONLY) + except: + self.fd_outpipe1 = None + self.fd_outpipe2 = None + + _unlock(lock_client_starting) + + # If _tail() has work to do (report output or termination), start it + # in the background + if termination_func or output_func: + self.thread = threading.Thread(None, self._tail) + self.thread.start() + else: + self.thread = None + + + # The following two functions are defined to make sure the state is set + # exclusively by the constructor call as specified in __getinitargs__(). + + def __getstate__(self): + pass + + + def __setstate__(self, state): + pass + + + def __getinitargs__(self): + # Save some information when pickling -- will be passed to the + # constructor upon unpickling + return (None, self.id, + self.termination_func, self.output_func, self.output_prefix, + self.linesep, self.prompt, self.status_test_command) + + + def get_id(self): + """ + Return the instance's id attribute, which may be used to access the + process in the future. + """ + return self.id + + + def get_pid(self): + """ + Return the PID of the process, or None if not available. + """ + try: + file = open(self.pid_filename, "r") + pid = int(file.read()) + file.close() + return pid + except: + return None + + + def get_status(self): + """ + Wait for the process to exit and return its exit status, or None + if not available. + """ + _wait(self.lock_server_running_filename) + try: + file = open(self.status_filename, "r") + status = int(file.read()) + file.close() + return status + except: + return None + + + def get_output(self): + """ + Return the STDOUT and STDERR output of the process so far. + """ + try: + file = open(self.output_filename, "r") + output = file.read() + file.close() + return output + except: + return "" + + + def close(self, sig=signal.SIGTERM): + """ + Kill the child process if it's alive and remove temporary files. + + @param sig: The signal to send the process when attempting to kill it. + """ + # Give the process some time to finish writing stuff + self.read_nonblocking(0.1) + # Kill it if it's alive + if self.is_alive(): + try: + os.kill(self.get_pid(), sig) + except: + pass + # Wait for the server to exit + _wait(self.lock_server_running_filename) + # Wait for the _tail() thread to finish + if self.thread: + self.thread.join() + # Close file handles + for fd in (self.fd_outpipe1, self.fd_outpipe2): + try: + os.close(fd) + except: + pass + # Clean up + for filename in (self.pid_filename, self.status_filename, + self.output_filename, self.outpipe1_filename, + self.outpipe2_filename, self.inpipe_filename, + self.lock_server_running_filename, + self.lock_client_starting_filename): + try: + os.unlink(filename) + except OSError: + pass + + + def is_alive(self): + """ + Return True if the process is running. + """ + # See if the PID exists + try: + os.kill(self.get_pid(), 0) + except: + return False + # Make sure the PID belongs to the original process + filename = "/proc/%d/cmdline" % self.get_pid() + try: + file = open(filename, "r") + cmdline = file.read() + file.close() + except: + # If we couldn't find the file for some reason, skip the check + return True + if self.id in cmdline: + return True + return False + + + def set_termination_callback(self, termination_func): + """ + Set the termination_func attribute. See __init__() for details. + + @param termination_func: Function to call when the process terminates. + Must take 1 parameter -- the exit status. + """ + self.termination_func = termination_func + + + def set_output_callback(self, output_func): + """ + Set the output_func attribute. See __init__() for details. + + @param output_func: Function to call for each line of STDOUT/STDERR + output from the process. Must take 1 string parameter. + """ + self.output_func = output_func + + + def set_output_prefix(self, output_prefix): + """ + Set the output_prefix attribute. See __init__() for details. + + @param output_prefix: String to pre-pend to each line sent to + output_func (see set_output_callback()). + """ + self.output_prefix = output_prefix + + + def _tail(self): + def send_output(func, prefix, text): + # Pre-pend prefix and remove trailing whitespace + text = prefix + text.rstrip() + # Sanitize text and send it to func + func(text.decode("utf-8", "replace")) + + data = "" + while True: + try: + # See if there's any output to read from the child's outpipe1. + r, w, x = select.select([self.fd_outpipe1], [], [], 0.05) + except: + break + if self.fd_outpipe1 in r: + # Read some output + new_data = os.read(self.fd_outpipe1, 1024) + if not new_data: + break + data += new_data + # Send the output to output_func line by line + lines = (data + "\n").splitlines() + if self.output_func: + for line in lines[:-1]: + send_output(self.output_func, self.output_prefix, line) + # Leave only the output that wasn't sent + data = lines[-1] + else: + if data and self.output_func: + send_output(self.output_func, self.output_prefix, data) + data = "" + # The process is no longer running; + # Print any remaining output + if data and self.output_func: + send_output(self.output_func, self.output_prefix, data) + # Get the exit status, print it and send it to termination_func + status = self.get_status() + if status is None: + return + if self.output_func: + send_output(self.output_func, self.output_prefix, + "(Process terminated with status %s)" % status) + if self.termination_func: + self.termination_func(status) + + + def set_linesep(self, linesep): + """ + Sets the line separator string (usually "\\n"). + + @param linesep: Line separator character. + """ + self.linesep = linesep + + + def sendline(self, str=""): + """ + Send a string followed by a line separator to the child process. + + @param str: String to send to the child process. + """ + try: + fd = os.open(self.inpipe_filename, os.O_RDWR) + os.write(fd, str + self.linesep) + os.close(fd) + except: + pass + + + def send(self, str=""): + """ + Send a string to the child process. + + @param str: String to send to the child process. + """ + try: + fd = os.open(self.inpipe_filename, os.O_RDWR) + os.write(fd, str) + os.close(fd) + except: + pass + + + def read_nonblocking(self, timeout=1.0): + """ + Read from child until there is nothing to read for timeout seconds. + + @param timeout: Time (seconds) of wait before we give up reading from + the child process. + """ + data = "" + while True: + try: + r, w, x = select.select([self.fd_outpipe2], [], [], timeout) + except: + return data + if self.fd_outpipe2 in r: + newdata = os.read(self.fd_outpipe2, 1024) + if not newdata: + return data + data += newdata + else: + return data + + + def is_responsive(self, timeout=5.0): + """ + Return True if the process responds to STDIN/terminal input. + + Send a newline to the child process (e.g. SSH or Telnet) and read some + output using read_nonblocking(). + If all is OK, some output should be available (e.g. the shell prompt). + In that case return True. Otherwise return False. + + @param timeout: Time duration to wait before the process is considered + unresponsive. + """ + self.read_nonblocking(timeout=0.1) + self.sendline() + output = self.read_nonblocking(timeout=timeout) + if output.strip(): + return True + return False + + + def match_patterns(self, str, patterns): + """ + Match str against a list of patterns. + + Return the index of the first pattern that matches a substring of str. + None and empty strings in patterns are ignored. + If no match is found, return None. + + @param patterns: List of strings (regular expression patterns). + """ + for i in range(len(patterns)): + if not patterns[i]: + continue + if re.search(patterns[i], str): + return i + + + def read_until_output_matches(self, patterns, filter=lambda x: x, + timeout=30.0, internal_timeout=1.0, + print_func=None): + """ + Read using read_nonblocking until a match is found using match_patterns, + or until timeout expires. Before attempting to search for a match, the + data is filtered using the filter function provided. + + @brief: Read from child using read_nonblocking until a pattern + matches. + @param patterns: List of strings (regular expression patterns) + @param filter: Function to apply to the data read from the child before + attempting to match it against the patterns (should take and + return a string) + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + @return: Tuple containing the match index (or None if no match was + found) and the data read so far. + """ + match = None + data = "" + + end_time = time.time() + timeout + while time.time() < end_time: + # Read data from child + newdata = self.read_nonblocking(internal_timeout) + # Print it if necessary + if print_func and newdata: + for line in newdata.splitlines(): + print_func(line.decode("utf-8", "replace")) + data += newdata + + done = False + # Look for patterns + match = self.match_patterns(filter(data), patterns) + if match is not None: + done = True + # Check if child has died + if not self.is_alive(): + logging.debug("Process terminated with status %s" % self.get_status()) + done = True + # Are we done? + if done: break + + # Print some debugging info + if match is None and (self.is_alive() or self.get_status() != 0): + logging.debug("Timeout elapsed or process terminated. Output:" + + kvm_utils.format_str_for_message(data.strip())) + + return (match, data) + + + def read_until_last_word_matches(self, patterns, timeout=30.0, + internal_timeout=1.0, print_func=None): + """ + Read using read_nonblocking until the last word of the output matches + one of the patterns (using match_patterns), or until timeout expires. + + @param patterns: A list of strings (regular expression patterns) + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + @return: A tuple containing the match index (or None if no match was + found) and the data read so far. + """ + def get_last_word(str): + if str: + return str.split()[-1] + else: + return "" + + return self.read_until_output_matches(patterns, get_last_word, + timeout, internal_timeout, + print_func) + + + def read_until_last_line_matches(self, patterns, timeout=30.0, + internal_timeout=1.0, print_func=None): + """ + Read using read_nonblocking until the last non-empty line of the output + matches one of the patterns (using match_patterns), or until timeout + expires. Return a tuple containing the match index (or None if no match + was found) and the data read so far. + + @brief: Read using read_nonblocking until the last non-empty line + matches a pattern. + + @param patterns: A list of strings (regular expression patterns) + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + """ + def get_last_line(str): + last_line = "" + for line in str.splitlines(): + if line: + last_line = line + return last_line + + return self.read_until_output_matches(patterns, get_last_line, + timeout, internal_timeout, + print_func) + + + def set_prompt(self, prompt): + """ + Set the prompt attribute for later use by read_up_to_prompt. + + @param: String that describes the prompt contents. + """ + self.prompt = prompt + + + def read_up_to_prompt(self, timeout=30.0, internal_timeout=1.0, + print_func=None): + """ + Read using read_nonblocking until the last non-empty line of the output + matches the prompt regular expression set by set_prompt, or until + timeout expires. + + @brief: Read using read_nonblocking until the last non-empty line + matches the prompt. + + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being + read (should take a string parameter) + + @return: A tuple containing True/False indicating whether the prompt + was found, and the data read so far. + """ + (match, output) = self.read_until_last_line_matches([self.prompt], + timeout, + internal_timeout, + print_func) + if match is None: + return (False, output) + else: + return (True, output) + + + def set_status_test_command(self, status_test_command): + """ + Set the command to be sent in order to get the last exit status. + + @param status_test_command: Command that will be sent to get the last + exit status. + """ + self.status_test_command = status_test_command + + + def get_command_status_output(self, command, timeout=30.0, + internal_timeout=1.0, print_func=None): + """ + Send a command and return its exit status and output. + + @param command: Command to send + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + + @return: A tuple (status, output) where status is the exit status or + None if no exit status is available (e.g. timeout elapsed), and + output is the output of command. + """ + def remove_first_line_if_equals(str, line): + lines = str.splitlines() + if lines[0] == line: + del lines[0] + return "\n".join(lines) + + def remove_last_line(str): + return "\n".join(str.splitlines()[:-1]) + + # Print some debugging info + logging.debug("Sending command: %s" % command) + + # Read everything that's waiting to be read + self.read_nonblocking(0.1) + + # Send the command and get its output + self.sendline(command) + (match, output) = self.read_up_to_prompt(timeout, internal_timeout, + print_func) + # Remove the echoed command from the output + output = remove_first_line_if_equals(output, command) + # If the prompt was not found, return the output so far + if not match: + return (None, output) + # Remove the final shell prompt from the output + output = remove_last_line(output) + + # Send the 'echo ...' command to get the last exit status + self.sendline(self.status_test_command) + (match, status) = self.read_up_to_prompt(10.0, internal_timeout) + if not match: + return (None, output) + status = remove_first_line_if_equals(status, self.status_test_command) + status = remove_last_line(status) + status = int(status.strip()) + + # Print some debugging info + if status != 0: + logging.debug("Command failed; status: %d, output:%s", status, + kvm_utils.format_str_for_message(output.strip())) + + return (status, output) + + + def get_command_status(self, command, timeout=30.0, internal_timeout=1.0, + print_func=None): + """ + Send a command and return its exit status. + + @param command: Command to send + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + + @return: Exit status or None if no exit status is available (e.g. + timeout elapsed). + """ + (status, output) = self.get_command_status_output(command, timeout, + internal_timeout, + print_func) + return status + + + def get_command_output(self, command, timeout=30.0, internal_timeout=1.0, + print_func=None): + """ + Send a command and return its output. + + @param command: Command to send + @param timeout: The duration (in seconds) to wait until a match is + found + @param internal_timeout: The timeout to pass to read_nonblocking + @param print_func: A function to be used to print the data being read + (should take a string parameter) + """ + (status, output) = self.get_command_status_output(command, timeout, + internal_timeout, + print_func) + return output + + +def run_bg(command, termination_func=None, output_func=None, prefix="", timeout=1.0): + """ + Run command as a subprocess. Call output_func with each line of output from + the subprocess (prefixed by prefix). Call termination_func when the + subprocess terminates. If timeout expires and the subprocess is still + running, return. + + @brief: Run a subprocess in the background and collect its output and + exit status. + + @param command: The shell command to execute + @param termination_func: A function to call when the process terminates + (should take an integer exit status parameter) + @param stdout_func: A function to call with each line of output from + the subprocess (should take a string parameter) + @param prefix: A string to pre-pend to each line of the output, before + passing it to stdout_func + @param timeout: Time duration (in seconds) to wait for the subprocess to + terminate before returning + + @return: A kvm_spawn object. + """ + process = kvm_spawn(command, None, termination_func, output_func, prefix) + + end_time = time.time() + timeout + while time.time() < end_time and process.is_alive(): + time.sleep(0.1) + + return process + + +def run_fg(command, output_func=None, prefix="", timeout=1.0): + """ + Run command as a subprocess. Call output_func with each line of output from + the subprocess (prefixed by prefix). If timeout expires and the subprocess + is still running, kill it and return. + + @brief: Run a subprocess in the foreground and collect its output and + exit status. + + @param command: The shell command to execute + @param stdout_func: A function to call with each line of output from + the subprocess (should take a string parameter) + @param prefix: A string to pre-pend to each line of the output, before + passing it to stdout_func + @param timeout: Time duration (in seconds) to wait for the subprocess to + terminate before killing it and returning + + @return: A 2-tuple containing the exit status of the process and its + STDOUT/STDERR output. If timeout expires before the process + terminates, the returned status is None. + """ + process = run_bg(command, None, output_func, prefix, timeout) + output = process.get_output() + if process.is_alive(): + status = None + else: + status = process.get_status() + process.close() + return (status, output) + + +# The following is the server part of the module. + +def _server_main(): + if len(sys.argv) < 3: + print "Usage: python %s id command" % sys.argv[0] + sys.exit(1) + + id = sys.argv[1] + command = " ".join(sys.argv[2:]) + " && echo %s > /dev/null" % id + + # Define filenames used for communication + (pid_filename, + status_filename, + output_filename, + outpipe1_filename, + outpipe2_filename, + inpipe_filename, + lock_server_running_filename, + lock_client_starting_filename) = _get_filenames("/tmp/kvm_spawn", id) + + # Create FIFOs + for filename in (outpipe1_filename, outpipe2_filename, inpipe_filename): + os.mkfifo(filename) + + (pid, fd) = pty.fork() + if pid == 0: + os.execv("/bin/sh", ["/bin/sh", "-c", command]) + else: + lock_server_running = _lock(lock_server_running_filename) + + # Disable terminal post processing + attr = termios.tcgetattr(fd) + attr[1] = attr[1] & ~termios.OPOST + termios.tcsetattr(fd, termios.TCSANOW, attr) + + # Write PID to file + pid_file = open(pid_filename, "w") + pid_file.write(str(pid) + "\n") + pid_file.close() + + # Open files/pipes for output/input + output_file = open(output_filename, "w") + outpipe1 = os.open(outpipe1_filename, os.O_RDWR) + outpipe2 = os.open(outpipe2_filename, os.O_RDWR) + inpipe = os.open(inpipe_filename, os.O_RDWR) + + # Print something to stdout so the client can start working + print "hello" + sys.stdout.flush() + + # Read from child and write to files/pipes + while True: + r, w, x = select.select([fd, inpipe], [], []) + if fd in r: + try: + data = os.read(fd, 1024) + except OSError: + break + output_file.write(data) + output_file.flush() + os.write(outpipe1, data) + os.write(outpipe2, data) + if inpipe in r: + data = os.read(inpipe, 1024) + os.write(fd, data) + + # Wait for the process to exit and get its exit status + pid, status = os.waitpid(pid, 0) + status = os.WEXITSTATUS(status) + status_file = open(status_filename, "w") + status_file.write(str(status) + "\n") + status_file.close() + + # Wait for the client to finish initializing + _wait(lock_client_starting_filename) + + # Delete FIFOs + for filename in (outpipe1_filename, outpipe2_filename, inpipe_filename): + try: + os.unlink(filename) + except OSError: + pass + + # Close output/input files/pipes + output_file.close() + os.close(outpipe1) + os.close(outpipe2) + os.close(inpipe) + + _unlock(lock_server_running) + + +if __name__ == "__main__": + _server_main() -- 1.5.4.1 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html