Merge pull request #29 from JBKahn/master
Remove hacky stream watcher to improve compatibility
This commit is contained in:
		
							
								
								
									
										11
									
								
								readme.rst
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								readme.rst
									
									
									
									
									
								
							| @@ -35,4 +35,15 @@ to a file). To control colouring, use one of:: | ||||
|  | ||||
| (you can also control this by setting the environment variable NOSE_REDNOSE_COLOR to 'force' or 'no') | ||||
|  | ||||
| Rednose by default prints file paths relative to the working | ||||
| directory. If you want the full path in the traceback then | ||||
| use:: | ||||
|  | ||||
| 	nosetests --rednose --full-file-path | ||||
|  | ||||
| Rednose supports printing the test results mid run as well as at | ||||
| the end, to enable it use:: | ||||
|  | ||||
| 	nosetests --rednose --immediate | ||||
|  | ||||
| .. _nosetests: http://somethingaboutorange.com/mrl/projects/nose/ | ||||
|   | ||||
							
								
								
									
										669
									
								
								rednose.py
									
									
									
									
									
								
							
							
						
						
									
										669
									
								
								rednose.py
									
									
									
									
									
								
							| @@ -32,368 +32,365 @@ import os | ||||
| import sys | ||||
| import linecache | ||||
| import re | ||||
| import time | ||||
|  | ||||
| colorama = None | ||||
| if os.name == 'nt': | ||||
| 	import colorama | ||||
|  | ||||
| import nose | ||||
|  | ||||
| import termstyle | ||||
|  | ||||
| PY3 = sys.version_info[0] >= 3 | ||||
| if PY3: | ||||
|     to_unicode = str | ||||
| else: | ||||
|     def to_unicode(s): | ||||
|         try: | ||||
|             return unicode(s) | ||||
|         except UnicodeDecodeError: | ||||
|             s = str(s) | ||||
|             try: | ||||
|                 # try utf-8, the most likely case | ||||
|                 return unicode(s, 'UTF-8') | ||||
|             except UnicodeDecodeError: | ||||
|                 # Can't decode, just use `repr` | ||||
|                 return unicode(repr(s)) | ||||
|  | ||||
|  | ||||
| failure = 'FAILED' | ||||
| error = 'ERROR' | ||||
| success = 'passed' | ||||
| skip = 'skipped' | ||||
| expected_failure = 'expected failure' | ||||
| unexpected_success = 'unexpected success' | ||||
| line_length = 77 | ||||
|  | ||||
| PY3 = sys.version_info[0] >= 3 | ||||
| if PY3: | ||||
| 	to_unicode = str | ||||
| else: | ||||
| 	def to_unicode(s): | ||||
| 		try: | ||||
| 			return unicode(s) | ||||
| 		except UnicodeDecodeError: | ||||
| 			s = str(s) | ||||
| 			try: | ||||
| 				# try utf-8, the most likely case | ||||
| 				return unicode(s, 'UTF-8') | ||||
| 			except UnicodeDecodeError: | ||||
| 				# Can't decode, just use `repr` | ||||
| 				return unicode(repr(s)) | ||||
|  | ||||
| BLACKLISTED_WRITERS = [ | ||||
| 	'nose[\\/]result\\.pyc?$', | ||||
| 	'unittest[\\/]runner\\.pyc?$' | ||||
| ] | ||||
| REDNOSE_DEBUG = False | ||||
|  | ||||
|  | ||||
| class RedNose(nose.plugins.Plugin): | ||||
| 	env_opt = 'NOSE_REDNOSE' | ||||
| 	env_opt_color = 'NOSE_REDNOSE_COLOR' | ||||
| 	score = 199  # just under the `coverage` module | ||||
|     env_opt = 'NOSE_REDNOSE' | ||||
|     env_opt_color = 'NOSE_REDNOSE_COLOR' | ||||
|  | ||||
| 	def __init__(self, *args): | ||||
| 		super(RedNose, self).__init__(*args) | ||||
| 		self.reports = [] | ||||
| 		self.error = self.success = self.failure = self.skip = 0 | ||||
| 		self.total = 0 | ||||
| 		self.stream = None | ||||
| 		self.verbose = False | ||||
| 		self.enabled = False | ||||
| 		self.tree = False | ||||
|     def __init__(self, *args): | ||||
|         super(RedNose, self).__init__(*args) | ||||
|         self.enabled = False | ||||
|  | ||||
| 	def options(self, parser, env=os.environ): | ||||
| 		global REDNOSE_DEBUG | ||||
| 		rednose_on = bool(env.get(self.env_opt, False)) | ||||
| 		rednose_color = env.get(self.env_opt_color, 'auto') | ||||
| 		REDNOSE_DEBUG = bool(env.get('REDNOSE_DEBUG', False)) | ||||
|     def options(self, parser, env=os.environ): | ||||
|         rednose_on = bool(env.get(self.env_opt, False)) | ||||
|         rednose_color = env.get(self.env_opt_color, 'auto') | ||||
|  | ||||
| 		parser.add_option( | ||||
| 			"--rednose", | ||||
| 			action="store_true", | ||||
| 			default=rednose_on, | ||||
| 			dest="rednose", | ||||
| 			help="enable colour output (alternatively, set $%s=1)" % (self.env_opt,) | ||||
| 		) | ||||
| 		parser.add_option( | ||||
| 			"--no-color", | ||||
| 			action="store_false", | ||||
| 			dest="rednose", | ||||
| 			help="disable colour output" | ||||
| 		) | ||||
| 		parser.add_option( | ||||
| 			"--force-color", | ||||
| 			action="store_const", | ||||
| 			dest='rednose_color', | ||||
| 			default=rednose_color, | ||||
| 			const='force', | ||||
| 			help="force colour output when not using a TTY (alternatively, set $%s=force)" % (self.env_opt_color,) | ||||
| 		) | ||||
| 		parser.add_option( | ||||
| 			"--immediate", | ||||
| 			action="store_true", | ||||
| 			default=False, | ||||
| 			help="print errors and failures as they happen, as well as at the end" | ||||
| 		) | ||||
|         parser.add_option( | ||||
|             "--rednose", | ||||
|             action="store_true", | ||||
|             default=rednose_on, | ||||
|             dest="rednose", | ||||
|             help="enable colour output (alternatively, set $%s=1)" % (self.env_opt,) | ||||
|         ) | ||||
|         parser.add_option( | ||||
|             "--no-color", | ||||
|             action="store_false", | ||||
|             dest="rednose", | ||||
|             help="disable colour output" | ||||
|         ) | ||||
|         parser.add_option( | ||||
|             "--force-color", | ||||
|             action="store_const", | ||||
|             dest='rednose_color', | ||||
|             default=rednose_color, | ||||
|             const='force', | ||||
|             help="force colour output when not using a TTY (alternatively, set $%s=force)" % (self.env_opt_color,) | ||||
|         ) | ||||
|         parser.add_option( | ||||
|             "--immediate", | ||||
|             action="store_true", | ||||
|             default=False, | ||||
|             help="print errors and failures as they happen, as well as at the end" | ||||
|         ) | ||||
|         parser.add_option( | ||||
|             "--full-file-path", | ||||
|             action="store_true", | ||||
|             default=False, | ||||
|             help="print the full file path as opposed to the one relative to your directory (default)" | ||||
|         ) | ||||
|  | ||||
| 	def configure(self, options, conf): | ||||
| 		if options.rednose: | ||||
| 			self.enabled = True | ||||
| 			termstyle_init = { | ||||
| 				'force': termstyle.enable, | ||||
| 				'off': termstyle.disable | ||||
| 			}.get(options.rednose_color, termstyle.auto) | ||||
| 			termstyle_init() | ||||
|     def configure(self, options, conf): | ||||
|         if options.rednose: | ||||
|             self.enabled = True | ||||
|             termstyle_init = { | ||||
|                 'force': termstyle.enable, | ||||
|                 'off': termstyle.disable | ||||
|             }.get(options.rednose_color, termstyle.auto) | ||||
|             termstyle_init() | ||||
|  | ||||
| 			self.immediate = options.immediate | ||||
| 			self.verbose = options.verbosity >= 2 | ||||
|             self.immediate = options.immediate | ||||
|             self.verbose = options.verbosity >= 2 | ||||
|             self.full_file_path = options.full_file_path | ||||
|  | ||||
| 	def begin(self): | ||||
| 		self.start_time = time.time() | ||||
| 		self._in_test = False | ||||
|     def prepareTestResult(self, result):  # noqa | ||||
|         """Required to prevent others from monkey patching the add methods.""" | ||||
|         return result | ||||
|  | ||||
| 	def _format_test_name(self, test): | ||||
| 		return test.shortDescription() or to_unicode(test) | ||||
|     def prepareTestRunner(self, runner):  # noqa | ||||
|         return ColourTestRunner(stream=runner.stream, descriptions=runner.descriptions, verbosity=runner.verbosity, config=runner.config, immediate=self.immediate, use_relative_path=not self.full_file_path) | ||||
|  | ||||
| 	def prepareTestResult(self, result): | ||||
| 		result.stream = FilteringStream(self.stream, BLACKLISTED_WRITERS) | ||||
|  | ||||
| 	def beforeTest(self, test): | ||||
| 		self._in_test = True | ||||
| 		if self.verbose: | ||||
| 			self._out(self._format_test_name(test) + ' ... ') | ||||
|  | ||||
| 	def afterTest(self, test): | ||||
| 		if self._in_test: | ||||
| 			self.addSkip() | ||||
|  | ||||
| 	def _print_test(self, type_, color): | ||||
| 		self.total += 1 | ||||
| 		if self.verbose: | ||||
| 			self._outln(color(type_)) | ||||
| 		else: | ||||
| 			if type_ == failure: | ||||
| 				short_ = 'F' | ||||
| 			elif type_ == error: | ||||
| 				short_ = 'X' | ||||
| 			elif type_ == skip: | ||||
| 				short_ = '-' | ||||
| 			else: | ||||
| 				short_ = '.' | ||||
| 			self._out(color(short_)) | ||||
| 			if self.total % line_length == 0: | ||||
| 				self._outln() | ||||
| 		self._in_test = False | ||||
|  | ||||
| 	def _add_report(self, report): | ||||
| 		failure_type, test, err = report | ||||
| 		self.reports.append(report) | ||||
| 		if self.immediate: | ||||
| 			self._outln() | ||||
| 			self._report_test(len(self.reports), *report) | ||||
|  | ||||
| 	def addFailure(self, test, err): | ||||
| 		self.failure += 1 | ||||
| 		self._add_report((failure, test, err)) | ||||
| 		self._print_test(failure, termstyle.red) | ||||
|  | ||||
| 	def addError(self, test, err): | ||||
| 		if err[0].__name__ == 'SkipTest': | ||||
| 			self.addSkip(test, err) | ||||
| 			return | ||||
| 		self.error += 1 | ||||
| 		self._add_report((error, test, err)) | ||||
| 		self._print_test(error, termstyle.yellow) | ||||
|  | ||||
| 	def addSuccess(self, test): | ||||
| 		self.success += 1 | ||||
| 		self._print_test(success, termstyle.green) | ||||
|  | ||||
| 	def addSkip(self, test=None, err=None): | ||||
| 		self.skip += 1 | ||||
| 		self._print_test(skip, termstyle.blue) | ||||
|  | ||||
| 	def setOutputStream(self, stream): | ||||
| 		if colorama: | ||||
| 			stream = colorama.initialise.wrap_stream(stream, convert=True, strip=False, autoreset=False, wrap=True) | ||||
| 		self.stream = stream | ||||
|  | ||||
| 	def report(self, stream): | ||||
| 		"""report on all registered failures and errors""" | ||||
| 		self._outln() | ||||
| 		if self.immediate: | ||||
| 			for x in range(0, 5): | ||||
| 				self._outln() | ||||
| 		report_num = 0 | ||||
| 		if len(self.reports) > 0: | ||||
| 			for report_num, report in enumerate(self.reports): | ||||
| 				self._report_test(report_num + 1, *report) | ||||
| 			self._outln() | ||||
|  | ||||
| 		self._summarize() | ||||
|  | ||||
| 	def _summarize(self): | ||||
| 		"""summarize all tests - the number of failures, errors and successes""" | ||||
| 		self._line(termstyle.black) | ||||
| 		self._out("%s test%s run in %0.1f seconds" % ( | ||||
| 			self.total, | ||||
| 			self._plural(self.total), | ||||
| 			time.time() - self.start_time)) | ||||
| 		if self.total > self.success: | ||||
| 			self._outln(". ") | ||||
| 			additionals = [] | ||||
| 			if self.failure > 0: | ||||
| 				additionals.append(termstyle.red("%s FAILED" % ( | ||||
| 					self.failure,))) | ||||
| 			if self.error > 0: | ||||
| 				additionals.append(termstyle.yellow("%s error%s" % ( | ||||
| 					self.error, | ||||
| 					self._plural(self.error) ))) | ||||
| 			if self.skip > 0: | ||||
| 				additionals.append(termstyle.blue("%s skipped" % ( | ||||
| 					self.skip))) | ||||
| 			self._out(', '.join(additionals)) | ||||
|  | ||||
| 		self._out(termstyle.green(" (%s test%s passed)" % ( | ||||
| 			self.success, | ||||
| 			self._plural(self.success) ))) | ||||
| 		self._outln() | ||||
|  | ||||
| 	def _report_test(self, report_num, type_, test, err): | ||||
| 		"""report the results of a single (failing or errored) test""" | ||||
| 		self._line(termstyle.black) | ||||
| 		self._out("%s) " % (report_num)) | ||||
| 		if type_ == failure: | ||||
| 			color = termstyle.red | ||||
| 			self._outln(color('FAIL: %s' % (self._format_test_name(test),))) | ||||
| 		else: | ||||
| 			color = termstyle.yellow | ||||
| 			self._outln(color('ERROR: %s' % (self._format_test_name(test),))) | ||||
|  | ||||
| 		exc_type, exc_instance, exc_trace = err | ||||
|  | ||||
| 		self._outln() | ||||
| 		self._outln(self._fmt_traceback(exc_trace)) | ||||
| 		self._out(color('   ', termstyle.bold(color(exc_type.__name__)), ": ")) | ||||
| 		self._outln(self._fmt_message(exc_instance, color)) | ||||
| 		self._outln() | ||||
|  | ||||
| 	def _relative_path(self, path): | ||||
| 		""" | ||||
| 		If path is a child of the current working directory, the relative | ||||
| 		path is returned surrounded by bold xterm escape sequences. | ||||
| 		If path is not a child of the working directory, path is returned | ||||
| 		""" | ||||
| 		try: | ||||
| 			here = os.path.abspath(os.path.realpath(os.getcwd())) | ||||
| 			fullpath = os.path.abspath(os.path.realpath(path)) | ||||
| 		except OSError: | ||||
| 			return path | ||||
| 		if fullpath.startswith(here): | ||||
| 			return termstyle.bold(fullpath[len(here)+1:]) | ||||
| 		return path | ||||
|  | ||||
| 	def _file_line(self, tb): | ||||
| 		"""formats the file / lineno / function line of a traceback element""" | ||||
| 		prefix = "file://" | ||||
| 		prefix = "" | ||||
|  | ||||
| 		f = tb.tb_frame | ||||
| 		if '__unittest' in f.f_globals: | ||||
| 			# this is the magical flag that prevents unittest internal | ||||
| 			# code from junking up the stacktrace | ||||
| 			return None | ||||
|  | ||||
| 		filename = f.f_code.co_filename | ||||
| 		lineno = tb.tb_lineno | ||||
| 		linecache.checkcache(filename) | ||||
| 		function_name = f.f_code.co_name | ||||
|  | ||||
| 		line_contents = linecache.getline(filename, lineno, f.f_globals).strip() | ||||
|  | ||||
| 		return "    %s line %s in %s\n      %s" % ( | ||||
| 			termstyle.blue(prefix, self._relative_path(filename)), | ||||
| 			termstyle.bold(termstyle.cyan(lineno)), | ||||
| 			termstyle.cyan(function_name), | ||||
| 			line_contents) | ||||
|  | ||||
| 	def _fmt_traceback(self, trace): | ||||
| 		"""format a traceback""" | ||||
| 		ret = [] | ||||
| 		ret.append(termstyle.default("   Traceback (most recent call last):")) | ||||
| 		current_trace = trace | ||||
| 		while current_trace is not None: | ||||
| 			line = self._file_line(current_trace) | ||||
| 			if line is not None: | ||||
| 				ret.append(line) | ||||
| 			current_trace = current_trace.tb_next | ||||
| 		return '\n'.join(ret) | ||||
|  | ||||
| 	def _fmt_message(self, exception, color): | ||||
| 		orig_message_lines = to_unicode(exception).splitlines() | ||||
|  | ||||
| 		if len(orig_message_lines) == 0: | ||||
| 			return '' | ||||
| 		message_lines = [color(orig_message_lines[0])] | ||||
| 		for line in orig_message_lines[1:]: | ||||
| 			match = re.match('^---.* begin captured stdout.*----$', line) | ||||
| 			if match: | ||||
| 				color = None | ||||
| 				message_lines.append('') | ||||
| 			line = '   ' + line | ||||
| 			message_lines.append(color(line) if color is not None else line) | ||||
| 		return '\n'.join(message_lines) | ||||
|  | ||||
| 	def _out(self, msg='', newline=False): | ||||
| 		self.stream.write(msg) | ||||
| 		if newline: | ||||
| 			self.stream.write('\n') | ||||
|  | ||||
| 	def _outln(self, msg=''): | ||||
| 		self._out(msg, True) | ||||
|  | ||||
| 	def _plural(self, num): | ||||
| 		return '' if num == 1 else 's' | ||||
|  | ||||
| 	def _line(self, color=termstyle.reset, char='-'): | ||||
| 		""" | ||||
| 		print a line of separator characters (default '-') | ||||
| 		in the given colour (default black) | ||||
| 		""" | ||||
| 		self._outln(color(char * line_length)) | ||||
|     def setOutputStream(self, stream):  # noqa | ||||
|         self.stream = stream | ||||
|         if os.name == 'nt': | ||||
|             import colorama | ||||
|             self.stream = colorama.initialise.wrap_stream(stream, convert=True, strip=False, autoreset=False, wrap=True) | ||||
|  | ||||
|  | ||||
| import traceback | ||||
| import sys | ||||
| class ColourTestRunner(nose.core.TextTestRunner): | ||||
|  | ||||
|     def __init__(self, stream, descriptions, verbosity, config, immediate, use_relative_path): | ||||
|         super(ColourTestRunner, self).__init__(stream=stream, descriptions=descriptions, verbosity=verbosity, config=config) | ||||
|         self.immediate = immediate | ||||
|         self.use_relative_path = use_relative_path | ||||
|  | ||||
|     def _makeResult(self):  # noqa | ||||
|         return ColourTextTestResult(self.stream, self.descriptions, self.verbosity, self.config, immediate=self.immediate, use_relative_path=self.use_relative_path) | ||||
|  | ||||
|  | ||||
| class FilteringStream(object): | ||||
| 	""" | ||||
| 	A wrapper for a stream that will filter | ||||
| 	calls to `write` and `writeln` to ignore calls | ||||
| 	from blacklisted callers | ||||
| 	(implemented as a regex on their filename, according | ||||
| 	to traceback.extract_stack()) | ||||
| class ColourTextTestResult(nose.result.TextTestResult): | ||||
|     """ | ||||
|     A test result class that prints colour formatted text results to the stream. | ||||
|     """ | ||||
|  | ||||
| 	It's super hacky, but there seems to be no other way | ||||
| 	to suppress nose's default output | ||||
| 	""" | ||||
| 	def __init__(self, stream, excludes): | ||||
| 		self.__stream = stream | ||||
| 		self.__excludes = list(map(re.compile, excludes)) | ||||
|     def __init__(self, stream, descriptions, verbosity, config, errorClasses=None, immediate=False, use_relative_path=False):  # noqa | ||||
|         super(ColourTextTestResult, self).__init__(stream=stream, descriptions=descriptions, verbosity=verbosity, config=config, errorClasses=errorClasses) | ||||
|         self.has_test_ids = config.options.enable_plugin_id | ||||
|         if self.has_test_ids: | ||||
|             self.ids = self.get_test_ids(self.config.options.testIdFile) | ||||
|         self.total = 0 | ||||
|         self.immediate = immediate | ||||
|         self.use_relative_path = use_relative_path | ||||
|         self.test_failures_and_exceptions = [] | ||||
|         self.error = self.success = self.failure = self.skip = self.expected_failure = self.unexpected_success = 0 | ||||
|         self.verbose = config.verbosity >= 2 | ||||
|         self.short_status_map = { | ||||
|             failure: 'F', | ||||
|             error: 'E', | ||||
|             skip: '-', | ||||
|             expected_failure: "X", | ||||
|             unexpected_success: "U", | ||||
|             success: '.', | ||||
|         } | ||||
|  | ||||
| 	def __should_filter(self): | ||||
| 		try: | ||||
| 			stack = traceback.extract_stack(limit=3)[0] | ||||
| 			filename = stack[0] | ||||
| 			pattern_matches_filename = lambda pattern: pattern.search(filename) | ||||
| 			should_filter = any(map(pattern_matches_filename, self.__excludes)) | ||||
| 			if REDNOSE_DEBUG: | ||||
| 				print >> sys.stderr, "REDNOSE_DEBUG: got write call from %s, should_filter = %s" % ( | ||||
| 						filename, should_filter) | ||||
| 			return should_filter | ||||
| 		except StandardError as e: | ||||
| 			if REDNOSE_DEBUG: | ||||
| 				print("\nError in rednose filtering: %s" % (e,), file=sys.stderr) | ||||
| 				traceback.print_exc(sys.stderr) | ||||
| 			return False | ||||
|     def get_test_ids(self, test_id_file): | ||||
|         """Returns a mapping of test to id if one exists, else an empty dictionary.""" | ||||
|         try: | ||||
|             with open(test_id_file, 'rb') as fh: | ||||
|                 try: | ||||
|                     from cPickle import load | ||||
|                 except ImportError: | ||||
|                     from pickle import load | ||||
|                 data = load(fh) | ||||
|             return {address: _id for _id, address in data["ids"].items()} | ||||
|         except IOError: | ||||
|             return {} | ||||
|  | ||||
| 	def write(self, *a): | ||||
| 		if self.__should_filter(): | ||||
| 			return | ||||
| 		return self.__stream.write(*a) | ||||
|     def printSummary(self, start, stop):  # noqa | ||||
|         """Summarize all tests - the number of failures, errors and successes.""" | ||||
|         self._line(termstyle.black) | ||||
|         self._out("%s test%s run in %0.3f seconds" % (self.total, self._plural(self.total), stop - start)) | ||||
|         if self.total > self.success: | ||||
|             self._outln(". ") | ||||
|  | ||||
| 	def writeln(self, *a): | ||||
| 		if self.__should_filter(): | ||||
| 			return | ||||
| 		return self.__stream.writeln(*a) | ||||
|             additionals = [ | ||||
|                 {"color": termstyle.red, "count": self.failure, "message": "%s FAILED"}, | ||||
|                 {"color": termstyle.yellow, "count": self.error, "message": "%s error%s" % ("%s", self._plural(self.error))}, | ||||
|                 {"color": termstyle.blue, "count": self.skip, "message": "%s skipped"}, | ||||
|                 {"color": termstyle.green, "count": self.expected_failure, "message": "%s expected_failures"}, | ||||
|                 {"color": termstyle.cyan, "count": self.unexpected_success, "message": "%s unexpected_successes"}, | ||||
|             ] | ||||
|  | ||||
| 	# pass non-known methods through to self.__stream | ||||
| 	def __getattr__(self, name): | ||||
| 		if REDNOSE_DEBUG: | ||||
| 			print("REDNOSE_DEBUG: getting attr %s" % (name,), file=sys.stderr) | ||||
| 		return getattr(self.__stream, name) | ||||
|             additionals_to_print = [ | ||||
|                 additional["color"](additional["message"] % (additional["count"])) for additional in additionals if additional["count"] > 0 | ||||
|             ] | ||||
|  | ||||
|             self._out(', '.join(additionals_to_print)) | ||||
|  | ||||
|         self._out(termstyle.green(" (%s test%s passed)" % (self.success, self._plural(self.success)))) | ||||
|         self._outln() | ||||
|  | ||||
|     def _plural(self, num): | ||||
|         return '' if num == 1 else 's' | ||||
|  | ||||
|     def _line(self, color=termstyle.reset, char='-'): | ||||
|         """ | ||||
|         Print a line of separator characters (default '-') in the given colour (default black). | ||||
|         """ | ||||
|         self._outln(color(char * line_length)) | ||||
|  | ||||
|     def _print_test(self, type_, color): | ||||
|         self.total += 1 | ||||
|         if self.verbose: | ||||
|             self._outln(color(type_)) | ||||
|         else: | ||||
|             short_ = self.short_status_map.get(type_, ".") | ||||
|             self._out(color(short_)) | ||||
|             if self.total % line_length == 0: | ||||
|                 self._outln() | ||||
|  | ||||
|     def _out(self, msg='', newline=False): | ||||
|         self.stream.write(msg) | ||||
|         if newline: | ||||
|             self.stream.write('\n') | ||||
|  | ||||
|     def _outln(self, msg=''): | ||||
|         self._out(msg=msg, newline=True) | ||||
|  | ||||
|     def _generate_and_add_test_report(self, type_, test, err): | ||||
|         report = self._report_test(len(self.test_failures_and_exceptions), type_, test, err) | ||||
|         self.test_failures_and_exceptions.append(report) | ||||
|  | ||||
|     def addFailure(self, test, err):  # noqa | ||||
|         self.failure += 1 | ||||
|         self._print_test(failure, termstyle.red) | ||||
|         self._generate_and_add_test_report(failure, test, err) | ||||
|  | ||||
|     def addError(self, test, err):  # noqa | ||||
|         self.error += 1 | ||||
|         self._print_test(error, termstyle.yellow) | ||||
|         self._generate_and_add_test_report(error, test, err) | ||||
|  | ||||
|     def addSuccess(self, test):  # noqa | ||||
|         self.success += 1 | ||||
|         self._print_test(success, termstyle.green) | ||||
|  | ||||
|     def addSkip(self, test, err):  # noqa | ||||
|         self.skip += 1 | ||||
|         self._print_test(skip, termstyle.blue) | ||||
|  | ||||
|     def addExpectedFailure(self, test, err):  # noqa | ||||
|         self.expected_failure += 1 | ||||
|         self._print_test(expected_failure, termstyle.green) | ||||
|  | ||||
|     def addUnexpectedSuccess(self, test):  # noqa | ||||
|         self.unexpected_success += 1 | ||||
|         self._print_test(unexpected_success, termstyle.cyan) | ||||
|  | ||||
|     def _report_test(self, report_index_num, type_, test, err):  # noqa | ||||
|         """report the results of a single (failing or errored) test""" | ||||
|         if type_ == failure: | ||||
|             color = termstyle.red | ||||
|         else: | ||||
|             color = termstyle.yellow | ||||
|  | ||||
|         exc_type, exc_instance, exc_trace = err | ||||
|  | ||||
|         colored_error_text = [ | ||||
|             ''.join(self.format_traceback(exc_trace)), | ||||
|             self._format_exception_message(exc_type, exc_instance, color) | ||||
|         ] | ||||
|  | ||||
|         if type_ == failure: | ||||
|             self.failures.append((test, colored_error_text)) | ||||
|             flavour = "FAIL" | ||||
|         else: | ||||
|             self.errors.append((test, colored_error_text)) | ||||
|             flavour = "ERROR" | ||||
|  | ||||
|         if self.immediate: | ||||
|             self._outln() | ||||
|             self.printErrorList(flavour, [(test, colored_error_text)], self.immediate) | ||||
|  | ||||
|         if self.has_test_ids: | ||||
|             test_id = self.ids.get(test.address(), self.total) | ||||
|         else: | ||||
|             test_id = report_index_num + 1 | ||||
|         return (test_id, flavour, test, colored_error_text) | ||||
|  | ||||
|     def format_traceback(self, tb): | ||||
|         ret = [termstyle.default("   Traceback (most recent call last):")] | ||||
|  | ||||
|         current_trace = tb | ||||
|         while current_trace is not None: | ||||
|             line = self._format_traceback_line(current_trace) | ||||
|             if line is not None: | ||||
|                 ret.append(line) | ||||
|             current_trace = current_trace.tb_next | ||||
|         return '\n'.join(ret) | ||||
|  | ||||
|     def _format_traceback_line(self, tb): | ||||
|         """ | ||||
|         Formats the file / lineno / function line of a traceback element. | ||||
|  | ||||
|         Returns None is the line is not relevent to the user i.e. inside the test runner. | ||||
|         """ | ||||
|         if self._is_relevant_tb_level(tb): | ||||
|             return None | ||||
|  | ||||
|         f = tb.tb_frame | ||||
|         filename = f.f_code.co_filename | ||||
|         lineno = tb.tb_lineno | ||||
|         linecache.checkcache(filename) | ||||
|         function_name = f.f_code.co_name | ||||
|  | ||||
|         line_contents = linecache.getline(filename, lineno, f.f_globals).strip() | ||||
|  | ||||
|         return "    %s line %s in %s\n      %s" % ( | ||||
|             termstyle.blue(self._relative_path(filename) if self.use_relative_path else filename), | ||||
|             termstyle.bold(termstyle.cyan(lineno)), | ||||
|             termstyle.cyan(function_name), | ||||
|             line_contents | ||||
|         ) | ||||
|  | ||||
|     def _format_exception_message(self, exception_type, exception_instance, message_color): | ||||
|         """Returns a colorized formatted exception message.""" | ||||
|         orig_message_lines = to_unicode(exception_instance).splitlines() | ||||
|  | ||||
|         if len(orig_message_lines) == 0: | ||||
|             return '' | ||||
|         exception_message = orig_message_lines[0] | ||||
|  | ||||
|         message_lines = [message_color('   ', termstyle.bold(message_color(exception_type.__name__)), ": ") + message_color(exception_message)] | ||||
|         for line in orig_message_lines[1:]: | ||||
|             match = re.match('^---.* begin captured stdout.*----$', line) | ||||
|             if match: | ||||
|                 message_color = termstyle.magenta | ||||
|                 message_lines.append('') | ||||
|             line = '   ' + line | ||||
|             message_lines.append(message_color(line)) | ||||
|         return '\n'.join(message_lines) | ||||
|  | ||||
|     def _relative_path(self, path): | ||||
|         """ | ||||
|         Returns the relative path of a file to the current working directory. | ||||
|  | ||||
|         If path is a child of the current working directory, the relative | ||||
|         path is returned surrounded. | ||||
|         If path is not a child of the working directory, path is returned | ||||
|         """ | ||||
|         try: | ||||
|             here = os.path.abspath(os.path.realpath(os.getcwd())) | ||||
|             fullpath = os.path.abspath(os.path.realpath(path)) | ||||
|         except OSError: | ||||
|             return path | ||||
|         if fullpath.startswith(here): | ||||
|             return fullpath[len(here) + 1:] | ||||
|         return path | ||||
|  | ||||
|     def printErrors(self):  # noqa | ||||
|         if not self.verbose: | ||||
|             self._outln() | ||||
|         if self.immediate: | ||||
|             self._outln() | ||||
|             for x in range(0, 4): | ||||
|                 self._outln() | ||||
|  | ||||
|             self._outln(termstyle.green("TEST RESULT OUTPUT:")) | ||||
|  | ||||
|         for (test_id, flavour, test, coloured_output_lines) in (self.test_failures_and_exceptions): | ||||
|             self._printError(flavour=flavour, test=test, coloured_output_lines=coloured_output_lines, test_id=test_id) | ||||
|  | ||||
|     def _printError(self, flavour, test, coloured_output_lines, test_id, is_mid_test=False):  # noqa | ||||
|         if flavour == "FAIL": | ||||
|             color = termstyle.red | ||||
|         else: | ||||
|             color = termstyle.yellow | ||||
|  | ||||
|         self._outln(color(self.separator1)) | ||||
|         self._outln(color("%s) %s: %s" % (test_id, flavour, self.getDescription(test)))) | ||||
|         self._outln(color(self.separator2)) | ||||
|  | ||||
|         for err_line in coloured_output_lines: | ||||
|             self._outln("%s" % err_line) | ||||
|  | ||||
|         if is_mid_test: | ||||
|             self._outln(color(self.separator2)) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Tim Cuthbertson
					Tim Cuthbertson