# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the reproman package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Miscellaneous utilities to assist with testing"""
import inspect
import os
import sys
import re
import tempfile
import platform
import multiprocessing
import logging
from unittest.mock import patch
import pytest
from http.server import SimpleHTTPRequestHandler
from http.server import HTTPServer
from functools import wraps
from os.path import exists, realpath, join as opj
from ..utils import chpwd
from ..utils import get_tempfile_kwargs
from ..utils import getpwd
from ..utils import make_tempfile
from ..utils import on_windows
from ..utils import optional_args
from ..utils import rmtemp
from ..dochelpers import borrowkwargs
# temp paths used by clones
_TEMP_PATHS_CLONES = set()
# The path to be used while retracing and expecting it to be
# provided by some system package
COMMON_SYSTEM_PATH = '/bin/bash'
# Package it should belong to (to avoid guessing or assuming matching the name)
COMMON_SYSTEM_PACKAGE = 'bash'
# pytest variants for nose.tools commands. These exist to avoid unnecessary
# churn in tests that already use these names. New code should use plain
# asserts to take advantage of pytest's assertion introspection.
[docs]def assert_equal(a, b, msg=None):
assert a == b, msg or "{!r} != {!r}".format(a, b)
[docs]def assert_not_equal(a, b, msg=None):
assert a != b, msg or "{!r} == {!r}".format(a, b)
[docs]def assert_greater(a, b, msg=None):
assert a > b, msg or "{!r} > {!r}".format(a, b)
[docs]def assert_greater_equal(a, b, msg=None):
assert a >= b, msg or "{!r} >= {!r}".format(a, b)
[docs]def assert_true(x, msg=None):
assert x, msg or "{!r} is not true".format(x)
[docs]def assert_false(x, msg=None):
assert not x, msg or "{!r} is not false".format(x)
[docs]def assert_in(x, collection, msg=None):
assert x in collection, \
msg or "{!r} not found in {!r}".format(x, collection)
[docs]def assert_not_in(x, collection, msg=None):
assert x not in collection, \
msg or "{!r} unexpectedly found in {!r}".format(x, collection)
[docs]def assert_is(a, b, msg=None):
assert a is b, msg or "{!r} is not {!r}".format(a, b)
[docs]def assert_is_instance(a, b, msg=None):
assert isinstance(a, b), \
msg or "{!r} is not an instance of {!r}".format(a, b)
# additional shortcuts
assert_raises = pytest.raises
eq_ = assert_equal
neq_ = assert_not_equal
ok_ = assert_true
nok_ = assert_false
in_ = assert_in
# def create_tree_archive(path, name, load, overwrite=False, archives_leading_dir=True):
# """Given an archive `name`, create under `path` with specified `load` tree
# """
# from ..support.archives import compress_files
# dirname = file_basename(name)
# full_dirname = opj(path, dirname)
# os.makedirs(full_dirname)
# create_tree(full_dirname, load, archives_leading_dir=archives_leading_dir)
# # create archive
# if archives_leading_dir:
# compress_files([dirname], name, path=path, overwrite=overwrite)
# else:
# compress_files(list(map(basename, glob.glob(opj(full_dirname, '*')))),
# opj(pardir, name),
# path=opj(path, dirname),
# overwrite=overwrite)
# # remove original tree
# shutil.rmtree(full_dirname)
[docs]def create_tree(path, tree, archives_leading_dir=True):
"""Given a list of tuples (name, load) create such a tree
if load is a tuple itself -- that would create either a subtree or an archive
with that content and place it into the tree if name ends with .tar.gz
"""
lgr.log(5, "Creating a tree under %s", path)
if not exists(path):
os.makedirs(path)
if isinstance(tree, dict):
tree = tree.items()
for name, load in tree:
full_name = opj(path, name)
if isinstance(load, (tuple, list, dict)):
# if name.endswith('.tar.gz') or name.endswith('.tar'):
# create_tree_archive(path, name, load, archives_leading_dir=archives_leading_dir)
# else:
create_tree(full_name, load, archives_leading_dir=archives_leading_dir)
else:
#encoding = sys.getfilesystemencoding()
#if isinstance(full_name, str):
# import pydb; pydb.debugger()
with open(full_name, 'w') as f:
f.write(load)
#
# Addition "checkers"
#
#
# Helpers to test symlinks
#
[docs]def ok_symlink(path):
"""Checks whether path is either a working or broken symlink"""
link_path = os.path.islink(path)
if not link_path:
raise AssertionError("Path {} seems not to be a symlink".format(path))
[docs]def ok_good_symlink(path):
ok_symlink(path)
rpath = realpath(path)
ok_(exists(rpath),
msg="Path {} seems to be missing. Symlink {} is broken".format(
rpath, path))
[docs]def ok_broken_symlink(path):
ok_symlink(path)
rpath = realpath(path)
assert_false(exists(rpath),
msg="Path {} seems to be present. Symlink {} is not broken".format(
rpath, path))
[docs]def ok_startswith(s, prefix):
ok_(s.startswith(prefix),
msg="String %r doesn't start with %r" % (s, prefix))
[docs]def ok_endswith(s, suffix):
ok_(s.endswith(suffix),
msg="String %r doesn't end with %r" % (s, suffix))
[docs]def nok_startswith(s, prefix):
assert_false(s.startswith(prefix),
msg="String %r starts with %r" % (s, prefix))
[docs]def ok_generator(gen):
assert_true(inspect.isgenerator(gen), msg="%s is not a generator" % gen)
[docs]def ok_file_has_content(path, content):
"""Verify that file exists and has expected content"""
assert(exists(path))
with open(path, 'r') as f:
assert_equal(f.read(), content)
[docs]def assert_in_in(substr, lst):
"""Verify that a substring is in an element of a list"""
for s in lst:
if substr in s:
return
assert False, '"%s" is not in "%s"' % (substr, str(lst))
#
# Decorators
#
[docs]@optional_args
def with_tree(t, tree=None, archives_leading_dir=True, delete=True, **tkwargs):
@wraps(t)
def newfunc(*arg, **kw):
tkwargs_ = get_tempfile_kwargs(tkwargs, prefix="tree", wrapped=t)
d = tempfile.mkdtemp(**tkwargs_)
create_tree(d, tree, archives_leading_dir=archives_leading_dir)
try:
return t(*(arg + (d,)), **kw)
finally:
if delete:
rmtemp(d)
return newfunc
lgr = logging.getLogger('reproman.tests')
[docs]class SilentHTTPHandler(SimpleHTTPRequestHandler):
"""A little adapter to silence the handler
"""
def __init__(self, *args, **kwargs):
self._silent = lgr.getEffectiveLevel() > logging.DEBUG
SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
[docs] def log_message(self, format, *args):
if self._silent:
return
lgr.debug("HTTP: " + format % args)
def _multiproc_serve_path_via_http(hostname, path_to_serve_from, queue): # pragma: no cover
chpwd(path_to_serve_from)
httpd = HTTPServer((hostname, 0), SilentHTTPHandler)
queue.put(httpd.server_port)
httpd.serve_forever()
[docs]@optional_args
def serve_path_via_http(tfunc, *targs):
"""Decorator which serves content of a directory via http url
"""
@wraps(tfunc)
def newfunc(*args, **kwargs):
if targs:
# if a path is passed into serve_path_via_http, then it's in targs
assert len(targs) == 1
path = targs[0]
elif len(args) > 1:
args, path = args[:-1], args[-1]
else:
args, path = (), args[0]
# There is a problem with Haskell on wheezy trying to
# fetch via IPv6 whenever there is a ::1 localhost entry in
# /etc/hosts. Apparently fixing that docker image reliably
# is not that straightforward, although see
# http://jasonincode.com/customizing-hosts-file-in-docker/
# so we just force to use 127.0.0.1 while on wheezy
#hostname = '127.0.0.1' if on_debian_wheezy else 'localhost'
hostname = '127.0.0.1'
queue = multiprocessing.Queue()
multi_proc = multiprocessing.Process(
target=_multiproc_serve_path_via_http,
args=(hostname, path, queue))
multi_proc.start()
port = queue.get(timeout=300)
url = 'http://{}:{}/'.format(hostname, port)
lgr.debug("HTTP: serving {} under {}".format(path, url))
try:
# Such tests don't require real network so if http_proxy settings were
# provided, we remove them from the env for the duration of this run
env = os.environ.copy()
env.pop('http_proxy', None)
with patch.dict('os.environ', env, clear=True):
return tfunc(*(args + (path, url)), **kwargs)
finally:
lgr.debug("HTTP: stopping server under %s" % path)
multi_proc.terminate()
return newfunc
[docs]@optional_args
def without_http_proxy(tfunc):
"""Decorator to remove http*_proxy env variables for the duration of the test
"""
@wraps(tfunc)
def newfunc(*args, **kwargs):
# Such tests don't require real network so if http_proxy settings were
# provided, we remove them from the env for the duration of this run
env = os.environ.copy()
env.pop('http_proxy', None)
env.pop('https_proxy', None)
with patch.dict('os.environ', env, clear=True):
return tfunc(*args, **kwargs)
return newfunc
[docs]@borrowkwargs(methodname=make_tempfile)
@optional_args
def with_tempfile(t, **tkwargs):
"""Decorator function to provide a temporary file name and remove it at the end
Parameters
----------
To change the used directory without providing keyword argument 'dir' set
REPROMAN_TESTS_TEMPDIR.
Examples
--------
::
@with_tempfile
def test_write(tfile):
open(tfile, 'w').write('silly test')
"""
@wraps(t)
def newfunc(*arg, **kw):
with make_tempfile(wrapped=t, **tkwargs) as filename:
return t(*(arg + (filename,)), **kw)
return newfunc
[docs]@optional_args
def assert_cwd_unchanged(func, ok_to_chdir=False):
"""Decorator to test whether the current working directory remains unchanged
Parameters
----------
ok_to_chdir: bool, optional
If True, allow to chdir, so this decorator would not then raise exception
if chdir'ed but only return to original directory
"""
@wraps(func)
def newfunc(*args, **kwargs):
cwd_before = os.getcwd()
pwd_before = getpwd()
exc_info = None
try:
func(*args, **kwargs)
except:
exc_info = sys.exc_info()
finally:
try:
cwd_after = os.getcwd()
except OSError as e:
lgr.warning("Failed to getcwd: %s" % e)
cwd_after = None
if cwd_after != cwd_before:
chpwd(pwd_before)
if not ok_to_chdir:
lgr.warning(
"%s changed cwd to %s. Mitigating and changing back to %s"
% (func, cwd_after, pwd_before))
# If there was already exception raised, we better re-raise
# that one since it must be more important, so not masking it
# here with our assertion
if exc_info is None:
assert_equal(cwd_before, cwd_after,
"CWD changed from %s to %s" % (cwd_before, cwd_after))
if exc_info is not None:
raise exc_info[1].with_traceback(exc_info[2])
return newfunc
[docs]@optional_args
def run_under_dir(func, newdir='.'):
"""Decorator to run tests under another directory
It is somewhat ugly since we can't really chdir
back to a directory which had a symlink in its path.
So using this decorator has potential to move entire
testing run under the dereferenced directory name -- sideeffect.
The only way would be to instruct testing framework (i.e. nose
in our case ATM) to run a test by creating a new process with
a new cwd
"""
@wraps(func)
def newfunc(*args, **kwargs):
pwd_before = getpwd()
try:
chpwd(newdir)
func(*args, **kwargs)
finally:
chpwd(pwd_before)
return newfunc
[docs]def assert_re_in(regex, c, flags=0):
"""Assert that container (list, str, etc) contains entry matching the regex
"""
if not isinstance(c, (list, tuple)):
c = [c]
for e in c:
if re.match(regex, e, flags=flags):
return
raise AssertionError("Not a single entry matched %r in %r" % (regex, c))
# List of most obscure filenames which might or not be supported by different
# filesystems across different OSs. Start with the most obscure
OBSCURE_FILENAMES = (
" \"';a&b/&cd `| ", # shouldn't be supported anywhere I guess due to /
" \"';a&b&cd `| ",
" \"';abcd `| ",
" \"';abcd | ",
" \"';abcd ",
" ;abcd ",
" ;abcd",
" ab cd ",
" ab cd",
"a",
" abc d.dat ", # they all should at least support spaces and dots
)
[docs]@with_tempfile(mkdir=True)
def get_most_obscure_supported_name(tdir):
"""Return the most obscure filename that the filesystem would support under TEMPDIR
TODO: we might want to use it as a function where we would provide tdir
"""
for filename in OBSCURE_FILENAMES:
if on_windows and filename.rstrip() != filename:
continue
try:
with open(opj(tdir, filename), 'w') as f:
f.write("TEST LOAD")
return filename # it will get removed as a part of wiping up the directory
except:
lgr.debug("Filename %r is not supported on %s under %s",
filename, platform.system(), tdir)
pass
raise RuntimeError("Could not create any of the files under %s among %s"
% (tdir, OBSCURE_FILENAMES))
[docs]@optional_args
def with_testsui(t, responses=None):
"""Switch main UI to be 'tests' UI and possibly provide answers to be used"""
@wraps(t)
def newfunc(*args, **kwargs):
from reproman.ui import ui
old_backend = ui.backend
try:
ui.set_backend('tests')
if responses:
ui.add_responses(responses)
ret = t(*args, **kwargs)
if responses:
responses_left = ui.get_responses()
assert not len(responses_left), "Some responses were left not used: %s" % str(responses_left)
return ret
finally:
ui.set_backend(old_backend)
return newfunc
with_testsui.__test__ = False
[docs]def assert_is_subset_recur(a, b, subset_types=[]):
"""Asserts that 'a' is a subset of 'b' (recursive on dicts and lists)
Parameters
----------
a : dict or list
The desired subset collection (items that must be in b)
b : dict or list
The superset collection
subset_types : list
List of classes (from list, dict) that allow subsets. Otherwise
we use strict matching.
"""
# Currently we only allow lists and dicts
assert {list, dict}.issuperset(subset_types)
# For dictionaries recursively check children that are in a
if isinstance(a, dict) and isinstance(b, dict) and dict in subset_types:
for key in a:
if key not in b:
raise AssertionError("Key %s is missing" % key)
assert_is_subset_recur(a[key], b[key], subset_types)
# For lists, recurse for every value a to make sure it is in b
# (note: two items in a may match the same item in b)
elif isinstance(a, list) and isinstance(b, list) and list in subset_types:
for a_val in a:
for b_val in b:
try:
assert_is_subset_recur(a_val, b_val, subset_types)
break
except AssertionError:
pass
else:
raise AssertionError("Array value %s is missing" % a_val)
# For anything else check for straight equality
else:
if a != b:
raise AssertionError("Value %s != %s" % (a, b))
[docs]def create_pymodule(directory):
"""Create a skeleton Python module in `directory`.
Parameters
----------
directory : str
Path to a non-existing directory.
"""
os.makedirs(directory)
with open(os.path.join(directory, "setup.py"), "w") as ofh:
ofh.write("""\
from setuptools import setup
setup(name='nmtest',
version='0.1.0',
py_modules=['nmtest'])""")
with open(os.path.join(directory, "nmtest"), "w") as ofh:
ofh.write("")
#
# Context Managers
#