# -*- coding: utf-8 -*-
# Copyright (c) 2013-2014 Simon Jagoe
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the 3-clause BSD license. See the LICENSE.txt file for details.
from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from enum import Enum
from functools import wraps
import locale
import sys
import traceback
import warnings
import six
from six.moves import StringIO
from .error_holder import ErrorHolder
[docs]class TestCompletionStatus(Enum):
"""Enumeration to represent the status of a single test.
"""
#: The test completed successfully.
success = 1
#: The test failed, but did not encounter an unexpected error.
failure = 2
#: The test encountered an unexpected error.
error = 3
#: A test marked as expected to fail unexpected passed.
unexpected_success = 4
#: A test failed as expected
expected_failure = 5
#: A test was skipped
skipped = 6
_successful_results = set([TestCompletionStatus.success,
TestCompletionStatus.expected_failure,
TestCompletionStatus.skipped])
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
def _is_relevant_tb_level(tb):
return '__unittest' in tb.tb_frame.f_globals
def _count_relevant_tb_levels(tb):
length = 0
while tb and not _is_relevant_tb_level(tb):
length += 1
tb = tb.tb_next
return length
def _decode(line, encoding):
if isinstance(line, six.text_type):
return line
try:
return line.decode(encoding)
except UnicodeDecodeError:
return line.decode(encoding, 'replace')
def _format_exception(err, is_failure, stdout=None, stderr=None):
"""Converts a sys.exc_info()-style tuple of values into a string."""
exctype, value, tb = err
# Skip test runner traceback levels
while tb and _is_relevant_tb_level(tb):
tb = tb.tb_next
if is_failure:
# Skip assert*() traceback levels
length = _count_relevant_tb_levels(tb)
msgLines = traceback.format_exception(exctype, value, tb, length)
else:
msgLines = traceback.format_exception(exctype, value, tb)
encoding = locale.getpreferredencoding()
msgLines = [_decode(line, encoding) for line in msgLines]
if stdout:
if not stdout.endswith('\n'):
stdout += '\n'
msgLines.append(STDOUT_LINE % stdout)
if stderr:
if not stderr.endswith('\n'):
stderr += '\n'
msgLines.append(STDERR_LINE % stderr)
return ''.join(msgLines)
[docs]class TestDuration(object):
"""An orderable representation of the duration of an individual test.
"""
def __init__(self, start_time, stop_time=None):
if stop_time is not None:
self._start_time = start_time
self._stop_time = stop_time
self._duration = stop_time - start_time
else:
# Once calculations are done, start & stop are meaningless
self._start_time = None
self._stop_time = None
duration = start_time
if not isinstance(duration, timedelta):
duration = timedelta(seconds=float(start_time))
self._duration = duration
self._total_seconds = None
@property
def start_time(self):
return self._start_time
@property
def stop_time(self):
return self._stop_time
@property
def duration(self):
return self._duration
@property
def total_seconds(self):
if self._total_seconds is None:
if sys.version_info < (2, 7):
delta = self._duration
total_seconds = delta.microseconds / 1000000.0 + delta.seconds
total_seconds += delta.days * 24 * 60 * 60
else:
total_seconds = self._duration.total_seconds()
self._total_seconds = total_seconds
return self._total_seconds
def __repr__(self):
return '<TestDuration {0}>'.format(str(self))
def __str__(self):
hours_template = '{hours: >3.0f}:'
template = '{hours_fmt}{minutes:0>2.0f}:{seconds:0>6.3f}'
minutes, seconds = divmod(self.total_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
hours_fmt = hours_template.format(hours=hours)
else:
hours_fmt = ''
return template.format(
hours_fmt=hours_fmt,
minutes=minutes,
seconds=seconds,
)
def __eq__(self, other):
if not hasattr(other, 'duration'):
return NotImplemented
return self.duration == other.duration
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
if not hasattr(other, 'duration'):
return NotImplemented
return self.duration < other.duration
def __le__(self, other):
return not (self > other)
def __gt__(self, other):
if not hasattr(other, 'duration'):
return NotImplemented
return self.duration > other.duration
def __ge__(self, other):
return not (self < other)
def __hash__(self):
return hash((self._start_time, self._stop_time))
# To support statistics.mean() on TestDuration objects
[docs] def as_integer_ratio(self):
return self.total_seconds.as_integer_ratio()
def __add__(self, other):
if not isinstance(other, TestDuration):
return NotImplemented
return TestDuration(self.duration + other.duration)
def __truediv__(self, divisor):
if not isinstance(self, TestDuration) and isinstance(divisor, int):
return NotImplemented
return TestDuration(self.duration / divisor)
[docs]class TestResult(object):
"""Container object for all information related to the run of a single
test. This contains the test itself, the actual status including
the reason or error associated with status, along with timing
information.
"""
def __init__(self, test_class, test_method_name, status, duration,
exception=None, message=None):
self.test_class = test_class
self.test_method_name = test_method_name
self.status = status
self.exception = exception
self.message = message
self.duration = duration
def __repr__(self):
template = ('<{0} class={1}, method={2}, exc={3!r}, status={4!r}, '
'duration={5!r}>')
return template.format(
type(self).__name__, self.test_class.__name__,
self.test_method_name, self.exception, self.status,
self.duration)
def __eq__(self, other):
if not isinstance(other, TestResult):
return NotImplemented
return (
self.test_class == other.test_class and
self.test_method_name == other.test_method_name and
self.status == other.status and
self.exception == other.exception and
self.message == other.message and
self.duration == other.duration
)
def __ne__(self, other):
return not (self == other)
[docs] @classmethod
def from_test_case(cls, test_case, status, duration,
exception=None, message=None, stdout=None, stderr=None):
"""Construct a :class:`~.TestResult` object from the test and a status.
Parameters
----------
test_case : unittest.TestCase
The test that this result will represent.
status : haas.result.TestCompletionStatus
The status of the test.
exception : tuple
``exc_info`` tuple ``(type, value, traceback)``.
message : str
Optional message associated with the result (e.g. skip
reason).
stdout : str
The test stdout if stdout was buffered.
stderr : str
The test stderr if stderr was buffered.
"""
test_class = type(test_case)
test_method_name = test_case._testMethodName
if exception is not None:
exctype, value, tb = exception
is_failure = exctype is test_case.failureException
exception = _format_exception(
exception, is_failure, stdout, stderr)
return cls(test_class, test_method_name, status, duration,
exception, message)
@property
def test(self):
"""The test case instance this result represents.
"""
return self.test_class(self.test_method_name)
[docs] def to_dict(self):
"""Serialize the ``TestResult`` to a dictionary.
"""
return {
'test_class': self.test_class,
'test_method_name': self.test_method_name,
'status': self.status,
'exception': self.exception,
'message': self.message,
# 'started_time': self.started_time,
'completed_time': self.completed_time,
}
[docs] @classmethod
def from_dict(cls, data):
"""Create a ``TestResult`` from a dictionary created by
:meth:`~.TestResult.to_dict`
"""
return cls(**data)
# Temporary compatibility with unittest's runner
separator2 = '-' * 70
# Copied from unittest.result
[docs]def failfast(method):
@wraps(method)
def inner(self, *args, **kw):
if self.failfast:
self.stop()
return method(self, *args, **kw)
return inner
[docs]class ResultCollector(object):
"""Collecter for test results. This handles creating
:class:`~.TestResult` instances and handing them off the registered
result output handlers.
"""
# Temporary compatibility with unittest's runner
separator2 = separator2
def __init__(self, buffer=False, failfast=False):
self.buffer = buffer
self.failfast = failfast
self._result_handlers = []
self._sorted_handlers = None
self.testsRun = 0
self.expectedFailures = []
self.unexpectedSuccesses = []
self.skipped = []
self.failures = []
self.errors = []
self.shouldStop = False
self._successful = True
self._mirror_output = False
self._stderr_buffer = None
self._stdout_buffer = None
self._original_stderr = sys.stderr
self._original_stdout = sys.stdout
self._test_timing = {}
@property
def _handlers(self):
if self._sorted_handlers is None:
from .plugins.result_handler import sort_result_handlers
self._sorted_handlers = sort_result_handlers(self._result_handlers)
return self._sorted_handlers
@staticmethod
def _testcase_to_key(test):
return (type(test), test._testMethodName)
def _setup_stdout(self):
"""Hook stdout and stderr if buffering is enabled.
"""
if self.buffer:
if self._stderr_buffer is None:
self._stderr_buffer = StringIO()
self._stdout_buffer = StringIO()
sys.stdout = self._stdout_buffer
sys.stderr = self._stderr_buffer
def _restore_stdout(self):
"""Unhook stdout and stderr if buffering is enabled.
"""
if self.buffer:
if self._mirror_output:
output = sys.stdout.getvalue()
error = sys.stderr.getvalue()
if output:
if not output.endswith('\n'):
output += '\n'
self._original_stdout.write(STDOUT_LINE % output)
if error:
if not error.endswith('\n'):
error += '\n'
self._original_stderr.write(STDERR_LINE % error)
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
[docs] def printErrors(self): # pragma: no cover
# FIXME: Remove
pass
[docs] def add_result_handler(self, handler):
"""Register a new result handler.
"""
self._result_handlers.append(handler)
# Reset sorted handlers
if self._sorted_handlers:
self._sorted_handlers = None
[docs] def startTest(self, test, start_time=None):
"""Indicate that an individual test is starting.
Parameters
----------
test : unittest.TestCase
The test that is starting.
start_time : datetime
An internal parameter to allow the parallel test runner to
set the actual start time of a test run in a subprocess.
"""
if start_time is None:
start_time = datetime.utcnow()
self._test_timing[self._testcase_to_key(test)] = start_time
self._mirror_output = False
self._setup_stdout()
self.testsRun += 1
for handler in self._handlers:
handler.start_test(test)
[docs] def stopTest(self, test):
"""Indicate that an individual test has completed.
Parameters
----------
test : unittest.TestCase
The test that has completed.
"""
for handler in self._handlers:
handler.stop_test(test)
self._restore_stdout()
self._mirror_output = False
[docs] def startTestRun(self):
"""Indicate that the test run is starting.
"""
for handler in self._handlers:
handler.start_test_run()
[docs] def stopTestRun(self):
"""Indicate that the test run has completed.
"""
for handler in self._handlers:
handler.stop_test_run()
[docs] def add_result(self, result):
"""Add an already-constructed :class:`~.TestResult` to this
:class:`~.ResultCollector`.
This may be used when collecting results created by other
ResultCollectors (e.g. in subprocesses).
"""
for handler in self._handlers:
handler(result)
if self._successful and result.status not in _successful_results:
self._successful = False
def _handle_result(self, test, status, exception=None, message=None):
"""Create a :class:`~.TestResult` and add it to this
:class:`~ResultCollector`.
Parameters
----------
test : unittest.TestCase
The test that this result will represent.
status : haas.result.TestCompletionStatus
The status of the test.
exception : tuple
``exc_info`` tuple ``(type, value, traceback)``.
message : str
Optional message associated with the result (e.g. skip
reason).
"""
if self.buffer:
stderr = self._stderr_buffer.getvalue()
stdout = self._stdout_buffer.getvalue()
else:
stderr = stdout = None
started_time = self._test_timing.get(self._testcase_to_key(test))
if started_time is None and isinstance(test, ErrorHolder):
started_time = datetime.utcnow()
elif started_time is None:
raise RuntimeError(
'Missing test start! Please report this error as a bug in '
'haas.')
completion_time = datetime.utcnow()
duration = TestDuration(started_time, completion_time)
result = TestResult.from_test_case(
test,
status,
duration=duration,
exception=exception,
message=message,
stdout=stdout,
stderr=stderr,
)
self.add_result(result)
return result
[docs] @failfast
def addError(self, test, exception):
"""Register that a test ended in an error.
Parameters
----------
test : unittest.TestCase
The test that has completed.
exception : tuple
``exc_info`` tuple ``(type, value, traceback)``.
"""
result = self._handle_result(
test, TestCompletionStatus.error, exception=exception)
self.errors.append(result)
self._mirror_output = True
[docs] @failfast
def addFailure(self, test, exception):
"""Register that a test ended with a failure.
Parameters
----------
test : unittest.TestCase
The test that has completed.
exception : tuple
``exc_info`` tuple ``(type, value, traceback)``.
"""
result = self._handle_result(
test, TestCompletionStatus.failure, exception=exception)
self.failures.append(result)
self._mirror_output = True
[docs] def addSuccess(self, test):
"""Register that a test ended in success.
Parameters
----------
test : unittest.TestCase
The test that has completed.
"""
self._handle_result(test, TestCompletionStatus.success)
[docs] def addSkip(self, test, reason):
"""Register that a test that was skipped.
Parameters
----------
test : unittest.TestCase
The test that has completed.
reason : str
The reason the test was skipped.
"""
result = self._handle_result(
test, TestCompletionStatus.skipped, message=reason)
self.skipped.append(result)
[docs] def addExpectedFailure(self, test, exception):
"""Register that a test that failed and was expected to fail.
Parameters
----------
test : unittest.TestCase
The test that has completed.
exception : tuple
``exc_info`` tuple ``(type, value, traceback)``.
"""
result = self._handle_result(
test, TestCompletionStatus.expected_failure, exception=exception)
self.expectedFailures.append(result)
[docs] @failfast
def addUnexpectedSuccess(self, test):
"""Register a test that passed unexpectedly.
Parameters
----------
test : unittest.TestCase
The test that has completed.
"""
result = self._handle_result(
test, TestCompletionStatus.unexpected_success)
self.unexpectedSuccesses.append(result)
[docs] def wasSuccessful(self):
"""Return ``True`` if the run was successful.
"""
return self._successful
[docs] def stop(self):
"""Set the ``shouldStop`` flag, used by the test cases to determine if
they should terminate early.
"""
self.shouldStop = True
[docs]class ResultCollecter(ResultCollector):
def __init__(self, *args, **kwargs):
super(ResultCollecter, self).__init__(*args, **kwargs)
warnings.warn(
'ResultCollecter is deprecated in favour of ResultCollector and '
'will be removed in the next release.',
DeprecationWarning,
)