1260 lines
45 KiB
Python
1260 lines
45 KiB
Python
#
|
|
# subunit: extensions to Python unittest to get test results from subprocesses.
|
|
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
|
|
#
|
|
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
|
|
# license at the users choice. A copy of both licenses are available in the
|
|
# project source as Apache-2.0 and BSD. You may not use this file except in
|
|
# compliance with one of these two licences.
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# license you chose for the specific language governing permissions and
|
|
# limitations under that license.
|
|
#
|
|
|
|
"""Subunit - a streaming test protocol
|
|
|
|
Overview
|
|
++++++++
|
|
|
|
The ``subunit`` Python package provides a number of ``unittest`` extensions
|
|
which can be used to cause tests to output Subunit, to parse Subunit streams
|
|
into test activity, perform seamless test isolation within a regular test
|
|
case and variously sort, filter and report on test runs.
|
|
|
|
|
|
Key Classes
|
|
-----------
|
|
|
|
The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
|
|
extension which will translate a test run into a Subunit stream.
|
|
|
|
The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
|
|
protocol and the ``unittest.TestCase`` object protocol. It is used to translate
|
|
a stream into a test run, which regular ``unittest.TestResult`` objects can
|
|
process and report/inspect.
|
|
|
|
Subunit has support for non-blocking usage too, for use with asyncore or
|
|
Twisted. See the ``TestProtocolServer`` parser class for more details.
|
|
|
|
Subunit includes extensions to the Python ``TestResult`` protocol. These are
|
|
all done in a compatible manner: ``TestResult`` objects that do not implement
|
|
the extension methods will not cause errors to be raised, instead the extension
|
|
will either lose fidelity (for instance, folding expected failures to success
|
|
in Python versions < 2.7 or 3.1), or discard the extended data (for extra
|
|
details, tags, timestamping and progress markers).
|
|
|
|
The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
|
|
``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
|
|
which can be used instead of the usual python unittest parameter.
|
|
When used the value of details should be a dict from ``string`` to
|
|
``testtools.content.Content`` objects. This is a draft API being worked on with
|
|
the Python Testing In Python mail list, with the goal of permitting a common
|
|
way to provide additional data beyond a traceback, such as captured data from
|
|
disk, logging messages etc. The reference for this API is in testtools (0.9.0
|
|
and newer).
|
|
|
|
The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
|
|
remove tags in the test run that is currently executing. If called when no
|
|
test is in progress (that is, if called outside of the ``startTest``,
|
|
``stopTest`` pair), the the tags apply to all subsequent tests. If called
|
|
when a test is in progress, then the tags only apply to that test.
|
|
|
|
The ``time(a_datetime)`` method is called (if present) when a ``time:``
|
|
directive is encountered in a Subunit stream. This is used to tell a TestResult
|
|
about the time that events in the stream occurred at, to allow reconstructing
|
|
test timing from a stream.
|
|
|
|
The ``progress(offset, whence)`` method controls progress data for a stream.
|
|
The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
|
|
subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
|
|
ignore the offset parameter.
|
|
|
|
|
|
Python test support
|
|
-------------------
|
|
|
|
``subunit.run`` is a convenience wrapper to run a Python test suite via
|
|
the command line, reporting via Subunit::
|
|
|
|
$ python -m subunit.run mylib.tests.test_suite
|
|
|
|
The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
|
|
tests, allowing isolation between the test runner and some tests.
|
|
|
|
Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
|
|
tests that will fork() before that individual test is run.
|
|
|
|
`ExecTestCase`` is a convenience wrapper for running an external
|
|
program to get a Subunit stream and then report that back to an arbitrary
|
|
result object::
|
|
|
|
class AggregateTests(subunit.ExecTestCase):
|
|
|
|
def test_script_one(self):
|
|
'./bin/script_one'
|
|
|
|
def test_script_two(self):
|
|
'./bin/script_two'
|
|
|
|
# Normally your normal test loading would take of this automatically,
|
|
# It is only spelt out in detail here for clarity.
|
|
suite = unittest.TestSuite([AggregateTests("test_script_one"),
|
|
AggregateTests("test_script_two")])
|
|
# Create any TestResult class you like.
|
|
result = unittest._TextTestResult(sys.stdout)
|
|
# And run your suite as normal, Subunit will exec each external script as
|
|
# needed and report to your result object.
|
|
suite.run(result)
|
|
|
|
Utility modules
|
|
---------------
|
|
|
|
* subunit.chunked contains HTTP chunked encoding/decoding logic.
|
|
* subunit.test_results contains TestResult helper classes.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import unittest
|
|
|
|
from testtools import content, content_type, ExtendedToOriginalDecorator
|
|
from testtools.compat import _b, _u, BytesIO, StringIO
|
|
try:
|
|
from testtools.testresult.real import _StringException
|
|
RemoteException = _StringException
|
|
# For testing: different pythons have different str() implementations.
|
|
if sys.version_info > (3, 0):
|
|
_remote_exception_str = "testtools.testresult.real._StringException"
|
|
_remote_exception_str_chunked = "34\r\n" + _remote_exception_str
|
|
else:
|
|
_remote_exception_str = "_StringException"
|
|
_remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
|
|
except ImportError:
|
|
raise ImportError ("testtools.testresult.real does not contain "
|
|
"_StringException, check your version.")
|
|
from testtools import testresult
|
|
|
|
from subunit import chunked, details, iso8601, test_results
|
|
|
|
|
|
PROGRESS_SET = 0
|
|
PROGRESS_CUR = 1
|
|
PROGRESS_PUSH = 2
|
|
PROGRESS_POP = 3
|
|
|
|
|
|
def test_suite():
|
|
import subunit.tests
|
|
return subunit.tests.test_suite()
|
|
|
|
|
|
def join_dir(base_path, path):
|
|
"""
|
|
Returns an absolute path to C{path}, calculated relative to the parent
|
|
of C{base_path}.
|
|
|
|
@param base_path: A path to a file or directory.
|
|
@param path: An absolute path, or a path relative to the containing
|
|
directory of C{base_path}.
|
|
|
|
@return: An absolute path to C{path}.
|
|
"""
|
|
return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
|
|
|
|
|
|
def tags_to_new_gone(tags):
|
|
"""Split a list of tags into a new_set and a gone_set."""
|
|
new_tags = set()
|
|
gone_tags = set()
|
|
for tag in tags:
|
|
if tag[0] == '-':
|
|
gone_tags.add(tag[1:])
|
|
else:
|
|
new_tags.add(tag)
|
|
return new_tags, gone_tags
|
|
|
|
|
|
class DiscardStream(object):
|
|
"""A filelike object which discards what is written to it."""
|
|
|
|
def write(self, bytes):
|
|
pass
|
|
|
|
|
|
class _ParserState(object):
|
|
"""State for the subunit parser."""
|
|
|
|
def __init__(self, parser):
|
|
self.parser = parser
|
|
self._test_sym = (_b('test'), _b('testing'))
|
|
self._colon_sym = _b(':')
|
|
self._error_sym = (_b('error'),)
|
|
self._failure_sym = (_b('failure'),)
|
|
self._progress_sym = (_b('progress'),)
|
|
self._skip_sym = _b('skip')
|
|
self._success_sym = (_b('success'), _b('successful'))
|
|
self._tags_sym = (_b('tags'),)
|
|
self._time_sym = (_b('time'),)
|
|
self._xfail_sym = (_b('xfail'),)
|
|
self._uxsuccess_sym = (_b('uxsuccess'),)
|
|
self._start_simple = _u(" [")
|
|
self._start_multipart = _u(" [ multipart")
|
|
|
|
def addError(self, offset, line):
|
|
"""An 'error:' directive has been read."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def addExpectedFail(self, offset, line):
|
|
"""An 'xfail:' directive has been read."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def addFailure(self, offset, line):
|
|
"""A 'failure:' directive has been read."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def addSkip(self, offset, line):
|
|
"""A 'skip:' directive has been read."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def addSuccess(self, offset, line):
|
|
"""A 'success:' directive has been read."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def lineReceived(self, line):
|
|
"""a line has been received."""
|
|
parts = line.split(None, 1)
|
|
if len(parts) == 2 and line.startswith(parts[0]):
|
|
cmd, rest = parts
|
|
offset = len(cmd) + 1
|
|
cmd = cmd.rstrip(self._colon_sym)
|
|
if cmd in self._test_sym:
|
|
self.startTest(offset, line)
|
|
elif cmd in self._error_sym:
|
|
self.addError(offset, line)
|
|
elif cmd in self._failure_sym:
|
|
self.addFailure(offset, line)
|
|
elif cmd in self._progress_sym:
|
|
self.parser._handleProgress(offset, line)
|
|
elif cmd in self._skip_sym:
|
|
self.addSkip(offset, line)
|
|
elif cmd in self._success_sym:
|
|
self.addSuccess(offset, line)
|
|
elif cmd in self._tags_sym:
|
|
self.parser._handleTags(offset, line)
|
|
self.parser.subunitLineReceived(line)
|
|
elif cmd in self._time_sym:
|
|
self.parser._handleTime(offset, line)
|
|
self.parser.subunitLineReceived(line)
|
|
elif cmd in self._xfail_sym:
|
|
self.addExpectedFail(offset, line)
|
|
elif cmd in self._uxsuccess_sym:
|
|
self.addUnexpectedSuccess(offset, line)
|
|
else:
|
|
self.parser.stdOutLineReceived(line)
|
|
else:
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def lostConnection(self):
|
|
"""Connection lost."""
|
|
self.parser._lostConnectionInTest(_u('unknown state of '))
|
|
|
|
def startTest(self, offset, line):
|
|
"""A test start command received."""
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
|
|
class _InTest(_ParserState):
|
|
"""State for the subunit parser after reading a test: directive."""
|
|
|
|
def _outcome(self, offset, line, no_details, details_state):
|
|
"""An outcome directive has been read.
|
|
|
|
:param no_details: Callable to call when no details are presented.
|
|
:param details_state: The state to switch to for details
|
|
processing of this outcome.
|
|
"""
|
|
test_name = line[offset:-1].decode('utf8')
|
|
if self.parser.current_test_description == test_name:
|
|
self.parser._state = self.parser._outside_test
|
|
self.parser.current_test_description = None
|
|
no_details()
|
|
self.parser.client.stopTest(self.parser._current_test)
|
|
self.parser._current_test = None
|
|
self.parser.subunitLineReceived(line)
|
|
elif self.parser.current_test_description + self._start_simple == \
|
|
test_name:
|
|
self.parser._state = details_state
|
|
details_state.set_simple()
|
|
self.parser.subunitLineReceived(line)
|
|
elif self.parser.current_test_description + self._start_multipart == \
|
|
test_name:
|
|
self.parser._state = details_state
|
|
details_state.set_multipart()
|
|
self.parser.subunitLineReceived(line)
|
|
else:
|
|
self.parser.stdOutLineReceived(line)
|
|
|
|
def _error(self):
|
|
self.parser.client.addError(self.parser._current_test,
|
|
details={})
|
|
|
|
def addError(self, offset, line):
|
|
"""An 'error:' directive has been read."""
|
|
self._outcome(offset, line, self._error,
|
|
self.parser._reading_error_details)
|
|
|
|
def _xfail(self):
|
|
self.parser.client.addExpectedFailure(self.parser._current_test,
|
|
details={})
|
|
|
|
def addExpectedFail(self, offset, line):
|
|
"""An 'xfail:' directive has been read."""
|
|
self._outcome(offset, line, self._xfail,
|
|
self.parser._reading_xfail_details)
|
|
|
|
def _uxsuccess(self):
|
|
self.parser.client.addUnexpectedSuccess(self.parser._current_test)
|
|
|
|
def addUnexpectedSuccess(self, offset, line):
|
|
"""A 'uxsuccess:' directive has been read."""
|
|
self._outcome(offset, line, self._uxsuccess,
|
|
self.parser._reading_uxsuccess_details)
|
|
|
|
def _failure(self):
|
|
self.parser.client.addFailure(self.parser._current_test, details={})
|
|
|
|
def addFailure(self, offset, line):
|
|
"""A 'failure:' directive has been read."""
|
|
self._outcome(offset, line, self._failure,
|
|
self.parser._reading_failure_details)
|
|
|
|
def _skip(self):
|
|
self.parser.client.addSkip(self.parser._current_test, details={})
|
|
|
|
def addSkip(self, offset, line):
|
|
"""A 'skip:' directive has been read."""
|
|
self._outcome(offset, line, self._skip,
|
|
self.parser._reading_skip_details)
|
|
|
|
def _succeed(self):
|
|
self.parser.client.addSuccess(self.parser._current_test, details={})
|
|
|
|
def addSuccess(self, offset, line):
|
|
"""A 'success:' directive has been read."""
|
|
self._outcome(offset, line, self._succeed,
|
|
self.parser._reading_success_details)
|
|
|
|
def lostConnection(self):
|
|
"""Connection lost."""
|
|
self.parser._lostConnectionInTest(_u(''))
|
|
|
|
|
|
class _OutSideTest(_ParserState):
|
|
"""State for the subunit parser outside of a test context."""
|
|
|
|
def lostConnection(self):
|
|
"""Connection lost."""
|
|
|
|
def startTest(self, offset, line):
|
|
"""A test start command received."""
|
|
self.parser._state = self.parser._in_test
|
|
test_name = line[offset:-1].decode('utf8')
|
|
self.parser._current_test = RemotedTestCase(test_name)
|
|
self.parser.current_test_description = test_name
|
|
self.parser.client.startTest(self.parser._current_test)
|
|
self.parser.subunitLineReceived(line)
|
|
|
|
|
|
class _ReadingDetails(_ParserState):
|
|
"""Common logic for readin state details."""
|
|
|
|
def endDetails(self):
|
|
"""The end of a details section has been reached."""
|
|
self.parser._state = self.parser._outside_test
|
|
self.parser.current_test_description = None
|
|
self._report_outcome()
|
|
self.parser.client.stopTest(self.parser._current_test)
|
|
|
|
def lineReceived(self, line):
|
|
"""a line has been received."""
|
|
self.details_parser.lineReceived(line)
|
|
self.parser.subunitLineReceived(line)
|
|
|
|
def lostConnection(self):
|
|
"""Connection lost."""
|
|
self.parser._lostConnectionInTest(_u('%s report of ') %
|
|
self._outcome_label())
|
|
|
|
def _outcome_label(self):
|
|
"""The label to describe this outcome."""
|
|
raise NotImplementedError(self._outcome_label)
|
|
|
|
def set_simple(self):
|
|
"""Start a simple details parser."""
|
|
self.details_parser = details.SimpleDetailsParser(self)
|
|
|
|
def set_multipart(self):
|
|
"""Start a multipart details parser."""
|
|
self.details_parser = details.MultipartDetailsParser(self)
|
|
|
|
|
|
class _ReadingFailureDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading failure details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addFailure(self.parser._current_test,
|
|
details=self.details_parser.get_details())
|
|
|
|
def _outcome_label(self):
|
|
return "failure"
|
|
|
|
|
|
class _ReadingErrorDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading error details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addError(self.parser._current_test,
|
|
details=self.details_parser.get_details())
|
|
|
|
def _outcome_label(self):
|
|
return "error"
|
|
|
|
|
|
class _ReadingExpectedFailureDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading xfail details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addExpectedFailure(self.parser._current_test,
|
|
details=self.details_parser.get_details())
|
|
|
|
def _outcome_label(self):
|
|
return "xfail"
|
|
|
|
|
|
class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading uxsuccess details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addUnexpectedSuccess(self.parser._current_test,
|
|
details=self.details_parser.get_details())
|
|
|
|
def _outcome_label(self):
|
|
return "uxsuccess"
|
|
|
|
|
|
class _ReadingSkipDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading skip details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addSkip(self.parser._current_test,
|
|
details=self.details_parser.get_details("skip"))
|
|
|
|
def _outcome_label(self):
|
|
return "skip"
|
|
|
|
|
|
class _ReadingSuccessDetails(_ReadingDetails):
|
|
"""State for the subunit parser when reading success details."""
|
|
|
|
def _report_outcome(self):
|
|
self.parser.client.addSuccess(self.parser._current_test,
|
|
details=self.details_parser.get_details("success"))
|
|
|
|
def _outcome_label(self):
|
|
return "success"
|
|
|
|
|
|
class TestProtocolServer(object):
|
|
"""A parser for subunit.
|
|
|
|
:ivar tags: The current tags associated with the protocol stream.
|
|
"""
|
|
|
|
def __init__(self, client, stream=None, forward_stream=None):
|
|
"""Create a TestProtocolServer instance.
|
|
|
|
:param client: An object meeting the unittest.TestResult protocol.
|
|
:param stream: The stream that lines received which are not part of the
|
|
subunit protocol should be written to. This allows custom handling
|
|
of mixed protocols. By default, sys.stdout will be used for
|
|
convenience. It should accept bytes to its write() method.
|
|
:param forward_stream: A stream to forward subunit lines to. This
|
|
allows a filter to forward the entire stream while still parsing
|
|
and acting on it. By default forward_stream is set to
|
|
DiscardStream() and no forwarding happens.
|
|
"""
|
|
self.client = ExtendedToOriginalDecorator(client)
|
|
if stream is None:
|
|
stream = sys.stdout
|
|
if sys.version_info > (3, 0):
|
|
stream = stream.buffer
|
|
self._stream = stream
|
|
self._forward_stream = forward_stream or DiscardStream()
|
|
# state objects we can switch too
|
|
self._in_test = _InTest(self)
|
|
self._outside_test = _OutSideTest(self)
|
|
self._reading_error_details = _ReadingErrorDetails(self)
|
|
self._reading_failure_details = _ReadingFailureDetails(self)
|
|
self._reading_skip_details = _ReadingSkipDetails(self)
|
|
self._reading_success_details = _ReadingSuccessDetails(self)
|
|
self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
|
|
self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
|
|
# start with outside test.
|
|
self._state = self._outside_test
|
|
# Avoid casts on every call
|
|
self._plusminus = _b('+-')
|
|
self._push_sym = _b('push')
|
|
self._pop_sym = _b('pop')
|
|
|
|
def _handleProgress(self, offset, line):
|
|
"""Process a progress directive."""
|
|
line = line[offset:].strip()
|
|
if line[0] in self._plusminus:
|
|
whence = PROGRESS_CUR
|
|
delta = int(line)
|
|
elif line == self._push_sym:
|
|
whence = PROGRESS_PUSH
|
|
delta = None
|
|
elif line == self._pop_sym:
|
|
whence = PROGRESS_POP
|
|
delta = None
|
|
else:
|
|
whence = PROGRESS_SET
|
|
delta = int(line)
|
|
self.client.progress(delta, whence)
|
|
|
|
def _handleTags(self, offset, line):
|
|
"""Process a tags command."""
|
|
tags = line[offset:].decode('utf8').split()
|
|
new_tags, gone_tags = tags_to_new_gone(tags)
|
|
self.client.tags(new_tags, gone_tags)
|
|
|
|
def _handleTime(self, offset, line):
|
|
# Accept it, but do not do anything with it yet.
|
|
try:
|
|
event_time = iso8601.parse_date(line[offset:-1])
|
|
except TypeError:
|
|
raise TypeError(_u("Failed to parse %r, got %r")
|
|
% (line, sys.exec_info[1]))
|
|
self.client.time(event_time)
|
|
|
|
def lineReceived(self, line):
|
|
"""Call the appropriate local method for the received line."""
|
|
self._state.lineReceived(line)
|
|
|
|
def _lostConnectionInTest(self, state_string):
|
|
error_string = _u("lost connection during %stest '%s'") % (
|
|
state_string, self.current_test_description)
|
|
self.client.addError(self._current_test, RemoteError(error_string))
|
|
self.client.stopTest(self._current_test)
|
|
|
|
def lostConnection(self):
|
|
"""The input connection has finished."""
|
|
self._state.lostConnection()
|
|
|
|
def readFrom(self, pipe):
|
|
"""Blocking convenience API to parse an entire stream.
|
|
|
|
:param pipe: A file-like object supporting readlines().
|
|
:return: None.
|
|
"""
|
|
for line in pipe.readlines():
|
|
self.lineReceived(line)
|
|
self.lostConnection()
|
|
|
|
def _startTest(self, offset, line):
|
|
"""Internal call to change state machine. Override startTest()."""
|
|
self._state.startTest(offset, line)
|
|
|
|
def subunitLineReceived(self, line):
|
|
self._forward_stream.write(line)
|
|
|
|
def stdOutLineReceived(self, line):
|
|
self._stream.write(line)
|
|
|
|
|
|
class TestProtocolClient(testresult.TestResult):
|
|
"""A TestResult which generates a subunit stream for a test run.
|
|
|
|
# Get a TestSuite or TestCase to run
|
|
suite = make_suite()
|
|
# Create a stream (any object with a 'write' method). This should accept
|
|
# bytes not strings: subunit is a byte orientated protocol.
|
|
stream = file('tests.log', 'wb')
|
|
# Create a subunit result object which will output to the stream
|
|
result = subunit.TestProtocolClient(stream)
|
|
# Optionally, to get timing data for performance analysis, wrap the
|
|
# serialiser with a timing decorator
|
|
result = subunit.test_results.AutoTimingTestResultDecorator(result)
|
|
# Run the test suite reporting to the subunit result object
|
|
suite.run(result)
|
|
# Close the stream.
|
|
stream.close()
|
|
"""
|
|
|
|
def __init__(self, stream):
|
|
testresult.TestResult.__init__(self)
|
|
self._stream = stream
|
|
_make_stream_binary(stream)
|
|
self._progress_fmt = _b("progress: ")
|
|
self._bytes_eol = _b("\n")
|
|
self._progress_plus = _b("+")
|
|
self._progress_push = _b("push")
|
|
self._progress_pop = _b("pop")
|
|
self._empty_bytes = _b("")
|
|
self._start_simple = _b(" [\n")
|
|
self._end_simple = _b("]\n")
|
|
|
|
def addError(self, test, error=None, details=None):
|
|
"""Report an error in test test.
|
|
|
|
Only one of error and details should be provided: conceptually there
|
|
are two separate methods:
|
|
addError(self, test, error)
|
|
addError(self, test, details)
|
|
|
|
:param error: Standard unittest positional argument form - an
|
|
exc_info tuple.
|
|
:param details: New Testing-in-python drafted API; a dict from string
|
|
to subunit.Content objects.
|
|
"""
|
|
self._addOutcome("error", test, error=error, details=details)
|
|
|
|
def addExpectedFailure(self, test, error=None, details=None):
|
|
"""Report an expected failure in test test.
|
|
|
|
Only one of error and details should be provided: conceptually there
|
|
are two separate methods:
|
|
addError(self, test, error)
|
|
addError(self, test, details)
|
|
|
|
:param error: Standard unittest positional argument form - an
|
|
exc_info tuple.
|
|
:param details: New Testing-in-python drafted API; a dict from string
|
|
to subunit.Content objects.
|
|
"""
|
|
self._addOutcome("xfail", test, error=error, details=details)
|
|
|
|
def addFailure(self, test, error=None, details=None):
|
|
"""Report a failure in test test.
|
|
|
|
Only one of error and details should be provided: conceptually there
|
|
are two separate methods:
|
|
addFailure(self, test, error)
|
|
addFailure(self, test, details)
|
|
|
|
:param error: Standard unittest positional argument form - an
|
|
exc_info tuple.
|
|
:param details: New Testing-in-python drafted API; a dict from string
|
|
to subunit.Content objects.
|
|
"""
|
|
self._addOutcome("failure", test, error=error, details=details)
|
|
|
|
def _addOutcome(self, outcome, test, error=None, details=None,
|
|
error_permitted=True):
|
|
"""Report a failure in test test.
|
|
|
|
Only one of error and details should be provided: conceptually there
|
|
are two separate methods:
|
|
addOutcome(self, test, error)
|
|
addOutcome(self, test, details)
|
|
|
|
:param outcome: A string describing the outcome - used as the
|
|
event name in the subunit stream.
|
|
:param error: Standard unittest positional argument form - an
|
|
exc_info tuple.
|
|
:param details: New Testing-in-python drafted API; a dict from string
|
|
to subunit.Content objects.
|
|
:param error_permitted: If True then one and only one of error or
|
|
details must be supplied. If False then error must not be supplied
|
|
and details is still optional. """
|
|
self._stream.write(_b("%s: %s" % (outcome, test.id())))
|
|
if error_permitted:
|
|
if error is None and details is None:
|
|
raise ValueError
|
|
else:
|
|
if error is not None:
|
|
raise ValueError
|
|
if error is not None:
|
|
self._stream.write(self._start_simple)
|
|
# XXX: this needs to be made much stricter, along the lines of
|
|
# Martin[gz]'s work in testtools. Perhaps subunit can use that?
|
|
for line in self._exc_info_to_unicode(error, test).splitlines():
|
|
self._stream.write(("%s\n" % line).encode('utf8'))
|
|
elif details is not None:
|
|
self._write_details(details)
|
|
else:
|
|
self._stream.write(_b("\n"))
|
|
if details is not None or error is not None:
|
|
self._stream.write(self._end_simple)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
"""Report a skipped test."""
|
|
if reason is None:
|
|
self._addOutcome("skip", test, error=None, details=details)
|
|
else:
|
|
self._stream.write(_b("skip: %s [\n" % test.id()))
|
|
self._stream.write(_b("%s\n" % reason))
|
|
self._stream.write(self._end_simple)
|
|
|
|
def addSuccess(self, test, details=None):
|
|
"""Report a success in a test."""
|
|
self._addOutcome("successful", test, details=details, error_permitted=False)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
"""Report an unexpected success in test test.
|
|
|
|
Details can optionally be provided: conceptually there
|
|
are two separate methods:
|
|
addError(self, test)
|
|
addError(self, test, details)
|
|
|
|
:param details: New Testing-in-python drafted API; a dict from string
|
|
to subunit.Content objects.
|
|
"""
|
|
self._addOutcome("uxsuccess", test, details=details,
|
|
error_permitted=False)
|
|
|
|
def startTest(self, test):
|
|
"""Mark a test as starting its test run."""
|
|
super(TestProtocolClient, self).startTest(test)
|
|
self._stream.write(_b("test: %s\n" % test.id()))
|
|
self._stream.flush()
|
|
|
|
def stopTest(self, test):
|
|
super(TestProtocolClient, self).stopTest(test)
|
|
self._stream.flush()
|
|
|
|
def progress(self, offset, whence):
|
|
"""Provide indication about the progress/length of the test run.
|
|
|
|
:param offset: Information about the number of tests remaining. If
|
|
whence is PROGRESS_CUR, then offset increases/decreases the
|
|
remaining test count. If whence is PROGRESS_SET, then offset
|
|
specifies exactly the remaining test count.
|
|
:param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
|
|
PROGRESS_POP.
|
|
"""
|
|
if whence == PROGRESS_CUR and offset > -1:
|
|
prefix = self._progress_plus
|
|
offset = _b(str(offset))
|
|
elif whence == PROGRESS_PUSH:
|
|
prefix = self._empty_bytes
|
|
offset = self._progress_push
|
|
elif whence == PROGRESS_POP:
|
|
prefix = self._empty_bytes
|
|
offset = self._progress_pop
|
|
else:
|
|
prefix = self._empty_bytes
|
|
offset = _b(str(offset))
|
|
self._stream.write(self._progress_fmt + prefix + offset +
|
|
self._bytes_eol)
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
"""Inform the client about tags added/removed from the stream."""
|
|
if not new_tags and not gone_tags:
|
|
return
|
|
tags = set([tag.encode('utf8') for tag in new_tags])
|
|
tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
|
|
tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
|
|
self._stream.write(tag_line)
|
|
|
|
def time(self, a_datetime):
|
|
"""Inform the client of the time.
|
|
|
|
":param datetime: A datetime.datetime object.
|
|
"""
|
|
time = a_datetime.astimezone(iso8601.Utc())
|
|
self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
|
|
time.year, time.month, time.day, time.hour, time.minute,
|
|
time.second, time.microsecond)))
|
|
|
|
def _write_details(self, details):
|
|
"""Output details to the stream.
|
|
|
|
:param details: An extended details dict for a test outcome.
|
|
"""
|
|
self._stream.write(_b(" [ multipart\n"))
|
|
for name, content in sorted(details.items()):
|
|
self._stream.write(_b("Content-Type: %s/%s" %
|
|
(content.content_type.type, content.content_type.subtype)))
|
|
parameters = content.content_type.parameters
|
|
if parameters:
|
|
self._stream.write(_b(";"))
|
|
param_strs = []
|
|
for param, value in parameters.items():
|
|
param_strs.append("%s=%s" % (param, value))
|
|
self._stream.write(_b(",".join(param_strs)))
|
|
self._stream.write(_b("\n%s\n" % name))
|
|
encoder = chunked.Encoder(self._stream)
|
|
list(map(encoder.write, content.iter_bytes()))
|
|
encoder.close()
|
|
|
|
def done(self):
|
|
"""Obey the testtools result.done() interface."""
|
|
|
|
|
|
def RemoteError(description=_u("")):
|
|
return (_StringException, _StringException(description), None)
|
|
|
|
|
|
class RemotedTestCase(unittest.TestCase):
|
|
"""A class to represent test cases run in child processes.
|
|
|
|
Instances of this class are used to provide the Python test API a TestCase
|
|
that can be printed to the screen, introspected for metadata and so on.
|
|
However, as they are a simply a memoisation of a test that was actually
|
|
run in the past by a separate process, they cannot perform any interactive
|
|
actions.
|
|
"""
|
|
|
|
def __eq__ (self, other):
|
|
try:
|
|
return self.__description == other.__description
|
|
except AttributeError:
|
|
return False
|
|
|
|
def __init__(self, description):
|
|
"""Create a psuedo test case with description description."""
|
|
self.__description = description
|
|
|
|
def error(self, label):
|
|
raise NotImplementedError("%s on RemotedTestCases is not permitted." %
|
|
label)
|
|
|
|
def setUp(self):
|
|
self.error("setUp")
|
|
|
|
def tearDown(self):
|
|
self.error("tearDown")
|
|
|
|
def shortDescription(self):
|
|
return self.__description
|
|
|
|
def id(self):
|
|
return "%s" % (self.__description,)
|
|
|
|
def __str__(self):
|
|
return "%s (%s)" % (self.__description, self._strclass())
|
|
|
|
def __repr__(self):
|
|
return "<%s description='%s'>" % \
|
|
(self._strclass(), self.__description)
|
|
|
|
def run(self, result=None):
|
|
if result is None: result = self.defaultTestResult()
|
|
result.startTest(self)
|
|
result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
|
|
result.stopTest(self)
|
|
|
|
def _strclass(self):
|
|
cls = self.__class__
|
|
return "%s.%s" % (cls.__module__, cls.__name__)
|
|
|
|
|
|
class ExecTestCase(unittest.TestCase):
|
|
"""A test case which runs external scripts for test fixtures."""
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
"""Create an instance of the class that will use the named test
|
|
method when executed. Raises a ValueError if the instance does
|
|
not have a method with the specified name.
|
|
"""
|
|
unittest.TestCase.__init__(self, methodName)
|
|
testMethod = getattr(self, methodName)
|
|
self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
|
|
testMethod.__doc__)
|
|
|
|
def countTestCases(self):
|
|
return 1
|
|
|
|
def run(self, result=None):
|
|
if result is None: result = self.defaultTestResult()
|
|
self._run(result)
|
|
|
|
def debug(self):
|
|
"""Run the test without collecting errors in a TestResult"""
|
|
self._run(testresult.TestResult())
|
|
|
|
def _run(self, result):
|
|
protocol = TestProtocolServer(result)
|
|
process = subprocess.Popen(self.script, shell=True,
|
|
stdout=subprocess.PIPE)
|
|
_make_stream_binary(process.stdout)
|
|
output = process.communicate()[0]
|
|
protocol.readFrom(BytesIO(output))
|
|
|
|
|
|
class IsolatedTestCase(unittest.TestCase):
|
|
"""A TestCase which executes in a forked process.
|
|
|
|
Each test gets its own process, which has a performance overhead but will
|
|
provide excellent isolation from global state (such as django configs,
|
|
zope utilities and so on).
|
|
"""
|
|
|
|
def run(self, result=None):
|
|
if result is None: result = self.defaultTestResult()
|
|
run_isolated(unittest.TestCase, self, result)
|
|
|
|
|
|
class IsolatedTestSuite(unittest.TestSuite):
|
|
"""A TestSuite which runs its tests in a forked process.
|
|
|
|
This decorator that will fork() before running the tests and report the
|
|
results from the child process using a Subunit stream. This is useful for
|
|
handling tests that mutate global state, or are testing C extensions that
|
|
could crash the VM.
|
|
"""
|
|
|
|
def run(self, result=None):
|
|
if result is None: result = testresult.TestResult()
|
|
run_isolated(unittest.TestSuite, self, result)
|
|
|
|
|
|
def run_isolated(klass, self, result):
|
|
"""Run a test suite or case in a subprocess, using the run method on klass.
|
|
"""
|
|
c2pread, c2pwrite = os.pipe()
|
|
# fixme - error -> result
|
|
# now fork
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
# Child
|
|
# Close parent's pipe ends
|
|
os.close(c2pread)
|
|
# Dup fds for child
|
|
os.dup2(c2pwrite, 1)
|
|
# Close pipe fds.
|
|
os.close(c2pwrite)
|
|
|
|
# at this point, sys.stdin is redirected, now we want
|
|
# to filter it to escape ]'s.
|
|
### XXX: test and write that bit.
|
|
stream = os.fdopen(1, 'wb')
|
|
result = TestProtocolClient(stream)
|
|
klass.run(self, result)
|
|
stream.flush()
|
|
sys.stderr.flush()
|
|
# exit HARD, exit NOW.
|
|
os._exit(0)
|
|
else:
|
|
# Parent
|
|
# Close child pipe ends
|
|
os.close(c2pwrite)
|
|
# hookup a protocol engine
|
|
protocol = TestProtocolServer(result)
|
|
fileobj = os.fdopen(c2pread, 'rb')
|
|
protocol.readFrom(fileobj)
|
|
os.waitpid(pid, 0)
|
|
# TODO return code evaluation.
|
|
return result
|
|
|
|
|
|
def TAP2SubUnit(tap, subunit):
|
|
"""Filter a TAP pipe into a subunit pipe.
|
|
|
|
:param tap: A tap pipe/stream/file object.
|
|
:param subunit: A pipe/stream/file object to write subunit results to.
|
|
:return: The exit code to exit with.
|
|
"""
|
|
BEFORE_PLAN = 0
|
|
AFTER_PLAN = 1
|
|
SKIP_STREAM = 2
|
|
state = BEFORE_PLAN
|
|
plan_start = 1
|
|
plan_stop = 0
|
|
def _skipped_test(subunit, plan_start):
|
|
# Some tests were skipped.
|
|
subunit.write('test test %d\n' % plan_start)
|
|
subunit.write('error test %d [\n' % plan_start)
|
|
subunit.write('test missing from TAP output\n')
|
|
subunit.write(']\n')
|
|
return plan_start + 1
|
|
# Test data for the next test to emit
|
|
test_name = None
|
|
log = []
|
|
result = None
|
|
def _emit_test():
|
|
"write out a test"
|
|
if test_name is None:
|
|
return
|
|
subunit.write("test %s\n" % test_name)
|
|
if not log:
|
|
subunit.write("%s %s\n" % (result, test_name))
|
|
else:
|
|
subunit.write("%s %s [\n" % (result, test_name))
|
|
if log:
|
|
for line in log:
|
|
subunit.write("%s\n" % line)
|
|
subunit.write("]\n")
|
|
del log[:]
|
|
for line in tap:
|
|
if state == BEFORE_PLAN:
|
|
match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
|
|
if match:
|
|
state = AFTER_PLAN
|
|
_, plan_stop, comment = match.groups()
|
|
plan_stop = int(plan_stop)
|
|
if plan_start > plan_stop and plan_stop == 0:
|
|
# skipped file
|
|
state = SKIP_STREAM
|
|
subunit.write("test file skip\n")
|
|
subunit.write("skip file skip [\n")
|
|
subunit.write("%s\n" % comment)
|
|
subunit.write("]\n")
|
|
continue
|
|
# not a plan line, or have seen one before
|
|
match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
|
|
if match:
|
|
# new test, emit current one.
|
|
_emit_test()
|
|
status, number, description, directive, directive_comment = match.groups()
|
|
if status == 'ok':
|
|
result = 'success'
|
|
else:
|
|
result = "failure"
|
|
if description is None:
|
|
description = ''
|
|
else:
|
|
description = ' ' + description
|
|
if directive is not None:
|
|
if directive.upper() == 'TODO':
|
|
result = 'xfail'
|
|
elif directive.upper() == 'SKIP':
|
|
result = 'skip'
|
|
if directive_comment is not None:
|
|
log.append(directive_comment)
|
|
if number is not None:
|
|
number = int(number)
|
|
while plan_start < number:
|
|
plan_start = _skipped_test(subunit, plan_start)
|
|
test_name = "test %d%s" % (plan_start, description)
|
|
plan_start += 1
|
|
continue
|
|
match = re.match("Bail out\!(?:\s*(.*))?\n", line)
|
|
if match:
|
|
reason, = match.groups()
|
|
if reason is None:
|
|
extra = ''
|
|
else:
|
|
extra = ' %s' % reason
|
|
_emit_test()
|
|
test_name = "Bail out!%s" % extra
|
|
result = "error"
|
|
state = SKIP_STREAM
|
|
continue
|
|
match = re.match("\#.*\n", line)
|
|
if match:
|
|
log.append(line[:-1])
|
|
continue
|
|
subunit.write(line)
|
|
_emit_test()
|
|
while plan_start <= plan_stop:
|
|
# record missed tests
|
|
plan_start = _skipped_test(subunit, plan_start)
|
|
return 0
|
|
|
|
|
|
def tag_stream(original, filtered, tags):
|
|
"""Alter tags on a stream.
|
|
|
|
:param original: The input stream.
|
|
:param filtered: The output stream.
|
|
:param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
|
|
'-TAG' commands.
|
|
|
|
A 'TAG' command will add the tag to the output stream,
|
|
and override any existing '-TAG' command in that stream.
|
|
Specifically:
|
|
* A global 'tags: TAG' will be added to the start of the stream.
|
|
* Any tags commands with -TAG will have the -TAG removed.
|
|
|
|
A '-TAG' command will remove the TAG command from the stream.
|
|
Specifically:
|
|
* A 'tags: -TAG' command will be added to the start of the stream.
|
|
* Any 'tags: TAG' command will have 'TAG' removed from it.
|
|
Additionally, any redundant tagging commands (adding a tag globally
|
|
present, or removing a tag globally removed) are stripped as a
|
|
by-product of the filtering.
|
|
:return: 0
|
|
"""
|
|
new_tags, gone_tags = tags_to_new_gone(tags)
|
|
def write_tags(new_tags, gone_tags):
|
|
if new_tags or gone_tags:
|
|
filtered.write("tags: " + ' '.join(new_tags))
|
|
if gone_tags:
|
|
for tag in gone_tags:
|
|
filtered.write("-" + tag)
|
|
filtered.write("\n")
|
|
write_tags(new_tags, gone_tags)
|
|
# TODO: use the protocol parser and thus don't mangle test comments.
|
|
for line in original:
|
|
if line.startswith("tags:"):
|
|
line_tags = line[5:].split()
|
|
line_new, line_gone = tags_to_new_gone(line_tags)
|
|
line_new = line_new - gone_tags
|
|
line_gone = line_gone - new_tags
|
|
write_tags(line_new, line_gone)
|
|
else:
|
|
filtered.write(line)
|
|
return 0
|
|
|
|
|
|
class ProtocolTestCase(object):
|
|
"""Subunit wire protocol to unittest.TestCase adapter.
|
|
|
|
ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
|
|
calling a ProtocolTestCase or invoking the run() method will make a 'test
|
|
run' happen. The 'test run' will simply be a replay of the test activity
|
|
that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
|
|
and ``countTestCases`` methods are not supported because there isn't a
|
|
sensible mapping for those methods.
|
|
|
|
# Get a stream (any object with a readline() method), in this case the
|
|
# stream output by the example from ``subunit.TestProtocolClient``.
|
|
stream = file('tests.log', 'rb')
|
|
# Create a parser which will read from the stream and emit
|
|
# activity to a unittest.TestResult when run() is called.
|
|
suite = subunit.ProtocolTestCase(stream)
|
|
# Create a result object to accept the contents of that stream.
|
|
result = unittest._TextTestResult(sys.stdout)
|
|
# 'run' the tests - process the stream and feed its contents to result.
|
|
suite.run(result)
|
|
stream.close()
|
|
|
|
:seealso: TestProtocolServer (the subunit wire protocol parser).
|
|
"""
|
|
|
|
def __init__(self, stream, passthrough=None, forward=False):
|
|
"""Create a ProtocolTestCase reading from stream.
|
|
|
|
:param stream: A filelike object which a subunit stream can be read
|
|
from.
|
|
:param passthrough: A stream pass non subunit input on to. If not
|
|
supplied, the TestProtocolServer default is used.
|
|
:param forward: A stream to pass subunit input on to. If not supplied
|
|
subunit input is not forwarded.
|
|
"""
|
|
self._stream = stream
|
|
_make_stream_binary(stream)
|
|
self._passthrough = passthrough
|
|
self._forward = forward
|
|
|
|
def __call__(self, result=None):
|
|
return self.run(result)
|
|
|
|
def run(self, result=None):
|
|
if result is None:
|
|
result = self.defaultTestResult()
|
|
protocol = TestProtocolServer(result, self._passthrough, self._forward)
|
|
line = self._stream.readline()
|
|
while line:
|
|
protocol.lineReceived(line)
|
|
line = self._stream.readline()
|
|
protocol.lostConnection()
|
|
|
|
|
|
class TestResultStats(testresult.TestResult):
|
|
"""A pyunit TestResult interface implementation for making statistics.
|
|
|
|
:ivar total_tests: The total tests seen.
|
|
:ivar passed_tests: The tests that passed.
|
|
:ivar failed_tests: The tests that failed.
|
|
:ivar seen_tags: The tags seen across all tests.
|
|
"""
|
|
|
|
def __init__(self, stream):
|
|
"""Create a TestResultStats which outputs to stream."""
|
|
testresult.TestResult.__init__(self)
|
|
self._stream = stream
|
|
self.failed_tests = 0
|
|
self.skipped_tests = 0
|
|
self.seen_tags = set()
|
|
|
|
@property
|
|
def total_tests(self):
|
|
return self.testsRun
|
|
|
|
def addError(self, test, err, details=None):
|
|
self.failed_tests += 1
|
|
|
|
def addFailure(self, test, err, details=None):
|
|
self.failed_tests += 1
|
|
|
|
def addSkip(self, test, reason, details=None):
|
|
self.skipped_tests += 1
|
|
|
|
def formatStats(self):
|
|
self._stream.write("Total tests: %5d\n" % self.total_tests)
|
|
self._stream.write("Passed tests: %5d\n" % self.passed_tests)
|
|
self._stream.write("Failed tests: %5d\n" % self.failed_tests)
|
|
self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
|
|
tags = sorted(self.seen_tags)
|
|
self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
|
|
|
|
@property
|
|
def passed_tests(self):
|
|
return self.total_tests - self.failed_tests - self.skipped_tests
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
"""Accumulate the seen tags."""
|
|
self.seen_tags.update(new_tags)
|
|
|
|
def wasSuccessful(self):
|
|
"""Tells whether or not this result was a success"""
|
|
return self.failed_tests == 0
|
|
|
|
|
|
def get_default_formatter():
|
|
"""Obtain the default formatter to write to.
|
|
|
|
:return: A file-like object.
|
|
"""
|
|
formatter = os.getenv("SUBUNIT_FORMATTER")
|
|
if formatter:
|
|
return os.popen(formatter, "w")
|
|
else:
|
|
stream = sys.stdout
|
|
if sys.version_info > (3, 0):
|
|
stream = stream.buffer
|
|
return stream
|
|
|
|
|
|
if sys.version_info > (3, 0):
|
|
from io import UnsupportedOperation as _NoFilenoError
|
|
else:
|
|
_NoFilenoError = AttributeError
|
|
|
|
def read_test_list(path):
|
|
"""Read a list of test ids from a file on disk.
|
|
|
|
:param path: Path to the file
|
|
:return: Sequence of test ids
|
|
"""
|
|
f = open(path, 'rb')
|
|
try:
|
|
return [l.rstrip("\n") for l in f.readlines()]
|
|
finally:
|
|
f.close()
|
|
|
|
|
|
def _make_stream_binary(stream):
|
|
"""Ensure that a stream will be binary safe. See _make_binary_on_windows."""
|
|
try:
|
|
fileno = stream.fileno()
|
|
except _NoFilenoError:
|
|
return
|
|
_make_binary_on_windows(fileno)
|
|
|
|
def _make_binary_on_windows(fileno):
|
|
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
|
|
if sys.platform == "win32":
|
|
import msvcrt
|
|
msvcrt.setmode(fileno, os.O_BINARY)
|