| #!/usr/bin/env python2 |
| # Copyright 2013 Google Inc. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import cPickle |
| import errno |
| import gzip |
| import multiprocessing |
| import optparse |
| import os |
| import signal |
| import subprocess |
| import sys |
| import tempfile |
| import thread |
| import threading |
| import time |
| import zlib |
| |
| # An object that catches SIGINT sent to the Python process and notices |
| # if processes passed to wait() die by SIGINT (we need to look for |
| # both of those cases, because pressing Ctrl+C can result in either |
| # the main process or one of the subprocesses getting the signal). |
| # |
| # Before a SIGINT is seen, wait(p) will simply call p.wait() and |
| # return the result. Once a SIGINT has been seen (in the main process |
| # or a subprocess, including the one the current call is waiting for), |
| # wait(p) will call p.terminate() and raise ProcessWasInterrupted. |
| class SigintHandler(object): |
| class ProcessWasInterrupted(Exception): pass |
| sigint_returncodes = {-signal.SIGINT, # Unix |
| -1073741510, # Windows |
| } |
| def __init__(self): |
| self.__lock = threading.Lock() |
| self.__processes = set() |
| self.__got_sigint = False |
| signal.signal(signal.SIGINT, self.__sigint_handler) |
| def __on_sigint(self): |
| self.__got_sigint = True |
| while self.__processes: |
| try: |
| self.__processes.pop().terminate() |
| except OSError: |
| pass |
| def __sigint_handler(self, signal_num, frame): |
| with self.__lock: |
| self.__on_sigint() |
| def got_sigint(self): |
| with self.__lock: |
| return self.__got_sigint |
| def wait(self, p): |
| with self.__lock: |
| if self.__got_sigint: |
| p.terminate() |
| self.__processes.add(p) |
| code = p.wait() |
| with self.__lock: |
| self.__processes.discard(p) |
| if code in self.sigint_returncodes: |
| self.__on_sigint() |
| if self.__got_sigint: |
| raise self.ProcessWasInterrupted |
| return code |
| sigint_handler = SigintHandler() |
| |
| # Return the width of the terminal, or None if it couldn't be |
| # determined (e.g. because we're not being run interactively). |
| def term_width(out): |
| if not out.isatty(): |
| return None |
| try: |
| p = subprocess.Popen(["stty", "size"], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| (out, err) = p.communicate() |
| if p.returncode != 0 or err: |
| return None |
| return int(out.split()[1]) |
| except (IndexError, OSError, ValueError): |
| return None |
| |
| # Output transient and permanent lines of text. If several transient |
| # lines are written in sequence, the new will overwrite the old. We |
| # use this to ensure that lots of unimportant info (tests passing) |
| # won't drown out important info (tests failing). |
| class Outputter(object): |
| def __init__(self, out_file): |
| self.__out_file = out_file |
| self.__previous_line_was_transient = False |
| self.__width = term_width(out_file) # Line width, or None if not a tty. |
| def transient_line(self, msg): |
| if self.__width is None: |
| self.__out_file.write(msg + "\n") |
| else: |
| self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width)) |
| self.__previous_line_was_transient = True |
| def flush_transient_output(self): |
| if self.__previous_line_was_transient: |
| self.__out_file.write("\n") |
| self.__previous_line_was_transient = False |
| def permanent_line(self, msg): |
| self.flush_transient_output() |
| self.__out_file.write(msg + "\n") |
| |
| stdout_lock = threading.Lock() |
| |
| class FilterFormat: |
| if sys.stdout.isatty(): |
| # stdout needs to be unbuffered since the output is interactive. |
| sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) |
| |
| out = Outputter(sys.stdout) |
| total_tests = 0 |
| finished_tests = 0 |
| |
| tests = {} |
| outputs = {} |
| failures = [] |
| |
| def print_test_status(self, last_finished_test, time_ms): |
| self.out.transient_line("[%d/%d] %s (%d ms)" |
| % (self.finished_tests, self.total_tests, |
| last_finished_test, time_ms)) |
| |
| def handle_meta(self, job_id, args): |
| (command, arg) = args.split(' ', 1) |
| if command == "TEST": |
| (binary, test) = arg.split(' ', 1) |
| self.tests[job_id] = (binary, test.strip()) |
| elif command == "EXIT": |
| (exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)] |
| self.finished_tests += 1 |
| (binary, test) = self.tests[job_id] |
| self.print_test_status(test, time_ms) |
| if exit_code != 0: |
| self.failures.append(self.tests[job_id]) |
| with open(self.outputs[job_id]) as f: |
| for line in f.readlines(): |
| self.out.permanent_line(line.rstrip()) |
| self.out.permanent_line( |
| "[%d/%d] %s returned/aborted with exit code %d (%d ms)" |
| % (self.finished_tests, self.total_tests, test, exit_code, time_ms)) |
| elif command == "TESTCNT": |
| self.total_tests = int(arg.split(' ', 1)[1]) |
| self.out.transient_line("[0/%d] Running tests..." % self.total_tests) |
| |
| def logfile(self, job_id, name): |
| self.outputs[job_id] = name |
| |
| def log(self, line): |
| stdout_lock.acquire() |
| (prefix, output) = line.split(' ', 1) |
| |
| assert prefix[-1] == ':' |
| self.handle_meta(int(prefix[:-1]), output) |
| stdout_lock.release() |
| |
| def end(self): |
| if self.failures: |
| self.out.permanent_line("FAILED TESTS (%d/%d):" |
| % (len(self.failures), self.total_tests)) |
| for (binary, test) in self.failures: |
| self.out.permanent_line(" " + binary + ": " + test) |
| self.out.flush_transient_output() |
| |
| class RawFormat: |
| def log(self, line): |
| stdout_lock.acquire() |
| sys.stdout.write(line + "\n") |
| sys.stdout.flush() |
| stdout_lock.release() |
| def logfile(self, job_id, name): |
| with open(name) as f: |
| for line in f.readlines(): |
| self.log(str(job_id) + '> ' + line.rstrip()) |
| def end(self): |
| pass |
| |
| # Record of test runtimes. Has built-in locking. |
| class TestTimes(object): |
| def __init__(self, save_file): |
| "Create new object seeded with saved test times from the given file." |
| self.__times = {} # (test binary, test name) -> runtime in ms |
| |
| # Protects calls to record_test_time(); other calls are not |
| # expected to be made concurrently. |
| self.__lock = threading.Lock() |
| |
| try: |
| with gzip.GzipFile(save_file, "rb") as f: |
| times = cPickle.load(f) |
| except (EOFError, IOError, cPickle.UnpicklingError, zlib.error): |
| # File doesn't exist, isn't readable, is malformed---whatever. |
| # Just ignore it. |
| return |
| |
| # Discard saved times if the format isn't right. |
| if type(times) is not dict: |
| return |
| for ((test_binary, test_name), runtime) in times.items(): |
| if (type(test_binary) is not str or type(test_name) is not str |
| or type(runtime) not in {int, long, type(None)}): |
| return |
| |
| self.__times = times |
| |
| def get_test_time(self, binary, testname): |
| """Return the last duration for the given test as an integer number of |
| milliseconds, or None if the test failed or if there's no record for it.""" |
| return self.__times.get((binary, testname), None) |
| |
| def record_test_time(self, binary, testname, runtime_ms): |
| """Record that the given test ran in the specified number of |
| milliseconds. If the test failed, runtime_ms should be None.""" |
| with self.__lock: |
| self.__times[(binary, testname)] = runtime_ms |
| |
| def write_to_file(self, save_file): |
| "Write all the times to file." |
| try: |
| with open(save_file, "wb") as f: |
| with gzip.GzipFile("", "wb", 9, f) as gzf: |
| cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL) |
| except IOError: |
| pass # ignore errors---saving the times isn't that important |
| |
| # Remove additional arguments (anything after --). |
| additional_args = [] |
| |
| for i in range(len(sys.argv)): |
| if sys.argv[i] == '--': |
| additional_args = sys.argv[i+1:] |
| sys.argv = sys.argv[:i] |
| break |
| |
| parser = optparse.OptionParser( |
| usage = 'usage: %prog [options] binary [binary ...] -- [additional args]') |
| |
| parser.add_option('-d', '--output_dir', type='string', |
| default=os.path.join(tempfile.gettempdir(), "gtest-parallel"), |
| help='output directory for test logs') |
| parser.add_option('-r', '--repeat', type='int', default=1, |
| help='repeat tests') |
| parser.add_option('--failed', action='store_true', default=False, |
| help='run only failed and new tests') |
| parser.add_option('-w', '--workers', type='int', |
| default=multiprocessing.cpu_count(), |
| help='number of workers to spawn') |
| parser.add_option('--gtest_color', type='string', default='yes', |
| help='color output') |
| parser.add_option('--gtest_filter', type='string', default='', |
| help='test filter') |
| parser.add_option('--gtest_also_run_disabled_tests', action='store_true', |
| default=False, help='run disabled tests too') |
| parser.add_option('--format', type='string', default='filter', |
| help='output format (raw,filter)') |
| parser.add_option('--print_test_times', action='store_true', default=False, |
| help='When done, list the run time of each test') |
| |
| (options, binaries) = parser.parse_args() |
| |
| if binaries == []: |
| parser.print_usage() |
| sys.exit(1) |
| |
| logger = RawFormat() |
| if options.format == 'raw': |
| pass |
| elif options.format == 'filter': |
| logger = FilterFormat() |
| else: |
| sys.exit("Unknown output format: " + options.format) |
| |
| # Find tests. |
| save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times") |
| times = TestTimes(save_file) |
| tests = [] |
| for test_binary in binaries: |
| command = [test_binary] |
| if options.gtest_also_run_disabled_tests: |
| command += ['--gtest_also_run_disabled_tests'] |
| |
| list_command = list(command) |
| if options.gtest_filter != '': |
| list_command += ['--gtest_filter=' + options.gtest_filter] |
| |
| try: |
| test_list = subprocess.Popen(list_command + ['--gtest_list_tests'], |
| stdout=subprocess.PIPE).communicate()[0] |
| except OSError as e: |
| sys.exit("%s: %s" % (test_binary, str(e))) |
| |
| command += additional_args |
| |
| test_group = '' |
| for line in test_list.split('\n'): |
| if not line.strip(): |
| continue |
| if line[0] != " ": |
| # Remove comments for typed tests and strip whitespace. |
| test_group = line.split('#')[0].strip() |
| continue |
| # Remove comments for parameterized tests and strip whitespace. |
| line = line.split('#')[0].strip() |
| if not line: |
| continue |
| |
| test = test_group + line |
| if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test: |
| continue |
| tests.append((times.get_test_time(test_binary, test), |
| test_binary, test, command)) |
| |
| if options.failed: |
| # The first element of each entry is the runtime of the most recent |
| # run if it was successful, or None if the test is new or the most |
| # recent run failed. |
| tests = [x for x in tests if x[0] is None] |
| |
| # Sort tests by falling runtime (with None, which is what we get for |
| # new and failing tests, being considered larger than any real |
| # runtime). |
| tests.sort(reverse=True, key=lambda x: ((1 if x[0] is None else 0), x)) |
| |
| # Repeat tests (-r flag). |
| tests *= options.repeat |
| test_lock = threading.Lock() |
| job_id = 0 |
| logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests))) |
| |
| exit_code = 0 |
| |
| # Create directory for test log output. |
| try: |
| os.makedirs(options.output_dir) |
| except OSError as e: |
| # Ignore errors if this directory already exists. |
| if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir): |
| raise e |
| # Remove files from old test runs. |
| for logfile in os.listdir(options.output_dir): |
| os.remove(os.path.join(options.output_dir, logfile)) |
| |
| # Run the specified job. Return the elapsed time in milliseconds if |
| # the job succeeds, or None if the job fails. (This ensures that |
| # failing tests will run first the next time.) |
| def run_job((command, job_id, test)): |
| begin = time.time() |
| |
| with tempfile.NamedTemporaryFile(dir=options.output_dir, delete=False) as log: |
| sub = subprocess.Popen(command + ['--gtest_filter=' + test] + |
| ['--gtest_color=' + options.gtest_color], |
| stdout=log.file, |
| stderr=log.file) |
| try: |
| code = sigint_handler.wait(sub) |
| except sigint_handler.ProcessWasInterrupted: |
| thread.exit() |
| runtime_ms = int(1000 * (time.time() - begin)) |
| logger.logfile(job_id, log.name) |
| |
| logger.log("%s: EXIT %s %d" % (job_id, code, runtime_ms)) |
| if code == 0: |
| return runtime_ms |
| global exit_code |
| exit_code = code |
| return None |
| |
| def worker(): |
| global job_id |
| while True: |
| job = None |
| test_lock.acquire() |
| if job_id < len(tests): |
| (_, test_binary, test, command) = tests[job_id] |
| logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test) |
| job = (command, job_id, test) |
| job_id += 1 |
| test_lock.release() |
| if job is None: |
| return |
| times.record_test_time(test_binary, test, run_job(job)) |
| |
| def start_daemon(func): |
| t = threading.Thread(target=func) |
| t.daemon = True |
| t.start() |
| return t |
| |
| workers = [start_daemon(worker) for i in range(options.workers)] |
| |
| [t.join() for t in workers] |
| logger.end() |
| times.write_to_file(save_file) |
| if options.print_test_times: |
| ts = sorted((times.get_test_time(test_binary, test), test_binary, test) |
| for (_, test_binary, test, _) in tests |
| if times.get_test_time(test_binary, test) is not None) |
| for (time_ms, test_binary, test) in ts: |
| print "%8s %s" % ("%dms" % time_ms, test) |
| sys.exit(-signal.SIGINT if sigint_handler.got_sigint() else exit_code) |