Source code for tox.session

"""
Automatically package and test a Python project against configurable
Python2 and Python3 based virtual environments. Environments are
setup by using virtualenv. Configuration is generally done through an
INI-style "tox.ini" file.
"""
from __future__ import print_function

import os
import re
import shutil
import subprocess
import sys
import time

import pkg_resources
import py

import tox
from tox.config import parseconfig
from tox.result import ResultLog
from tox.venv import VirtualEnv


def prepare(args):
    config = parseconfig(args)
    if config.option.help:
        show_help(config)
        raise SystemExit(0)
    elif config.option.helpini:
        show_help_ini(config)
        raise SystemExit(0)
    return config


def cmdline(args=None):
    if args is None:
        args = sys.argv[1:]
    main(args)


def main(args):
    try:
        config = prepare(args)
        retcode = Session(config).runcommand()
        if retcode is None:
            retcode = 0
        raise SystemExit(retcode)
    except KeyboardInterrupt:
        raise SystemExit(2)
    except tox.exception.MinVersionError as e:
        r = Reporter(None)
        r.error(str(e))
        raise SystemExit(1)


def show_help(config):
    tw = py.io.TerminalWriter()
    tw.write(config._parser._format_help())
    tw.line()
    tw.line("Environment variables", bold=True)
    tw.line("TOXENV: comma separated list of environments (overridable by '-e')")
    tw.line(
        "TOX_TESTENV_PASSENV: space-separated list of extra environment variables to be "
        "passed into test command environments"
    )


def show_help_ini(config):
    tw = py.io.TerminalWriter()
    tw.sep("-", "per-testenv attributes")
    for env_attr in config._testenv_attr:
        tw.line(
            "{:<15} {:<8} default: {}".format(
                env_attr.name, "<" + env_attr.type + ">", env_attr.default
            ),
            bold=True,
        )
        tw.line(env_attr.help)
        tw.line()


class Action(object):
    def __init__(self, session, venv, msg, args):
        self.venv = venv
        self.msg = msg
        self.activity = msg.split(" ", 1)[0]
        self.session = session
        self.report = session.report
        self.args = args
        self.id = venv and venv.envconfig.envname or "tox"
        self._popenlist = []
        if self.venv:
            self.venvname = self.venv.name
        else:
            self.venvname = "GLOB"
        if msg == "runtests":
            cat = "test"
        else:
            cat = "setup"
        envlog = session.resultlog.get_envlog(self.venvname)
        self.commandlog = envlog.get_commandlog(cat)

    def __enter__(self):
        self.report.logaction_start(self)
        return self

    def __exit__(self, *args):
        self.report.logaction_finish(self)

    def setactivity(self, name, msg):
        self.activity = name
        if msg:
            self.report.verbosity0("{} {}: {}".format(self.venvname, name, msg), bold=True)
        else:
            self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)

    def info(self, name, msg):
        self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)

    def _initlogpath(self, actionid):
        if self.venv:
            logdir = self.venv.envconfig.envlogdir
        else:
            logdir = self.session.config.logdir
        try:
            log_count = len(logdir.listdir("{}-*".format(actionid)))
        except (py.error.ENOENT, py.error.ENOTDIR):
            logdir.ensure(dir=1)
            log_count = 0
        path = logdir.join("{}-{}.log".format(actionid, log_count))
        f = path.open("w")
        f.flush()
        return f

    def popen(self, args, cwd=None, env=None, redirect=True, returnout=False, ignore_ret=False):
        stdout = outpath = None
        resultjson = self.session.config.option.resultjson
        if resultjson or redirect:
            fout = self._initlogpath(self.id)
            fout.write("actionid: {}\nmsg: {}\ncmdargs: {!r}\n\n".format(self.id, self.msg, args))
            fout.flush()
            outpath = py.path.local(fout.name)
            fin = outpath.open("rb")
            fin.read()  # read the header, so it won't be written to stdout
            stdout = fout
        elif returnout:
            stdout = subprocess.PIPE
        if cwd is None:
            # FIXME XXX cwd = self.session.config.cwd
            cwd = py.path.local()
        try:
            popen = self._popen(args, cwd, env=env, stdout=stdout, stderr=subprocess.STDOUT)
        except OSError as e:
            self.report.error(
                "invocation failed (errno {:d}), args: {}, cwd: {}".format(e.errno, args, cwd)
            )
            raise
        popen.outpath = outpath
        popen.args = [str(x) for x in args]
        popen.cwd = cwd
        popen.action = self
        self._popenlist.append(popen)
        try:
            self.report.logpopen(popen, env=env)
            try:
                if resultjson and not redirect:
                    if popen.stderr is not None:
                        # prevent deadlock
                        raise ValueError("stderr must not be piped here")
                    # we read binary from the process and must write using a
                    # binary stream
                    buf = getattr(sys.stdout, "buffer", sys.stdout)
                    out = None
                    last_time = time.time()
                    while 1:
                        # we have to read one byte at a time, otherwise there
                        # might be no output for a long time with slow tests
                        data = fin.read(1)
                        if data:
                            buf.write(data)
                            if b"\n" in data or (time.time() - last_time) > 1:
                                # we flush on newlines or after 1 second to
                                # provide quick enough feedback to the user
                                # when printing a dot per test
                                buf.flush()
                                last_time = time.time()
                        elif popen.poll() is not None:
                            if popen.stdout is not None:
                                popen.stdout.close()
                            break
                        else:
                            time.sleep(0.1)
                            # the seek updates internal read buffers
                            fin.seek(0, 1)
                    fin.close()
                else:
                    out, err = popen.communicate()
            except KeyboardInterrupt:
                self.report.keyboard_interrupt()
                popen.wait()
                raise
            ret = popen.wait()
        finally:
            self._popenlist.remove(popen)
        if ret and not ignore_ret:
            invoked = " ".join(map(str, popen.args))
            if outpath:
                self.report.error(
                    "invocation failed (exit code {:d}), logfile: {}".format(ret, outpath)
                )
                out = outpath.read()
                self.report.error(out)
                if hasattr(self, "commandlog"):
                    self.commandlog.add_command(popen.args, out, ret)
                raise tox.exception.InvocationError("{} (see {})".format(invoked, outpath), ret)
            else:
                raise tox.exception.InvocationError("{!r}".format(invoked), ret)
        if not out and outpath:
            out = outpath.read()
        if hasattr(self, "commandlog"):
            self.commandlog.add_command(popen.args, out, ret)
        return out

    def _rewriteargs(self, cwd, args):
        newargs = []
        for arg in args:
            if not tox.INFO.IS_WIN and isinstance(arg, py.path.local):
                arg = cwd.bestrelpath(arg)
            newargs.append(str(arg))
        # subprocess does not always take kindly to .py scripts so adding the interpreter here
        if tox.INFO.IS_WIN:
            ext = os.path.splitext(str(newargs[0]))[1].lower()
            if ext == ".py" and self.venv:
                newargs = [str(self.venv.envconfig.envpython)] + newargs
        return newargs

    def _popen(self, args, cwd, stdout, stderr, env=None):
        if env is None:
            env = os.environ.copy()
        return self.session.popen(
            self._rewriteargs(cwd, args),
            shell=False,
            cwd=str(cwd),
            universal_newlines=True,
            stdout=stdout,
            stderr=stderr,
            env=env,
        )


class Verbosity(object):
    DEBUG = 2
    INFO = 1
    DEFAULT = 0
    QUIET = -1
    EXTRA_QUIET = -2


class Reporter(object):
    actionchar = "-"

    def __init__(self, session):
        self.tw = py.io.TerminalWriter()
        self.session = session
        self._reportedlines = []

    @property
    def verbosity(self):
        if self.session:
            return (
                self.session.config.option.verbose_level - self.session.config.option.quiet_level
            )
        else:
            return Verbosity.DEBUG

    def logpopen(self, popen, env):
        """ log information about the action.popen() created process. """
        cmd = " ".join(map(str, popen.args))
        if popen.outpath:
            self.verbosity1("  {}$ {} >{}".format(popen.cwd, cmd, popen.outpath))
        else:
            self.verbosity1("  {}$ {} ".format(popen.cwd, cmd))

    def logaction_start(self, action):
        msg = "{} {}".format(action.msg, " ".join(map(str, action.args)))
        self.verbosity2("{} start: {}".format(action.venvname, msg), bold=True)
        assert not hasattr(action, "_starttime")
        action._starttime = time.time()

    def logaction_finish(self, action):
        duration = time.time() - action._starttime
        self.verbosity2(
            "{} finish: {} after {:.2f} seconds".format(action.venvname, action.msg, duration),
            bold=True,
        )
        delattr(action, "_starttime")

    def startsummary(self):
        if self.verbosity >= Verbosity.QUIET:
            self.tw.sep("_", "summary")

    def info(self, msg):
        if self.verbosity >= Verbosity.DEBUG:
            self.logline(msg)

    def using(self, msg):
        if self.verbosity >= Verbosity.INFO:
            self.logline("using {}".format(msg), bold=True)

    def keyboard_interrupt(self):
        self.error("KEYBOARDINTERRUPT")

    def keyvalue(self, name, value):
        if name.endswith(":"):
            name += " "
        self.tw.write(name, bold=True)
        self.tw.write(value)
        self.tw.line()

    def line(self, msg, **opts):
        self.logline(msg, **opts)

    def good(self, msg):
        if self.verbosity >= Verbosity.QUIET:
            self.logline(msg, green=True)

    def warning(self, msg):
        if self.verbosity >= Verbosity.QUIET:
            self.logline("WARNING: {}".format(msg), red=True)

    def error(self, msg):
        if self.verbosity >= Verbosity.QUIET:
            self.logline("ERROR: {}".format(msg), red=True)

    def skip(self, msg):
        if self.verbosity >= Verbosity.QUIET:
            self.logline("SKIPPED: {}".format(msg), yellow=True)

    def logline(self, msg, **opts):
        self._reportedlines.append(msg)
        self.tw.line("{}".format(msg), **opts)

    def verbosity0(self, msg, **opts):
        if self.verbosity >= Verbosity.DEFAULT:
            self.logline("{}".format(msg), **opts)

    def verbosity1(self, msg, **opts):
        if self.verbosity >= Verbosity.INFO:
            self.logline("{}".format(msg), **opts)

    def verbosity2(self, msg, **opts):
        if self.verbosity >= Verbosity.DEBUG:
            self.logline("{}".format(msg), **opts)

    # def log(self, msg):
    #    print(msg, file=sys.stderr)


[docs]class Session: """The session object that ties together configuration, reporting, venv creation, testing.""" def __init__(self, config, popen=subprocess.Popen, Report=Reporter): self.config = config self.popen = popen self.resultlog = ResultLog() self.report = Report(self) self.make_emptydir(config.logdir) config.logdir.ensure(dir=1) self.report.using("tox.ini: {}".format(self.config.toxinipath)) self._spec2pkg = {} self._name2venv = {} try: self.venvlist = [self.getvenv(x) for x in self.config.envlist] except LookupError: raise SystemExit(1) except tox.exception.ConfigError as e: self.report.error(str(e)) raise SystemExit(1) self._actions = [] @property def hook(self): return self.config.pluginmanager.hook def _makevenv(self, name): envconfig = self.config.envconfigs.get(name, None) if envconfig is None: self.report.error("unknown environment {!r}".format(name)) raise LookupError(name) elif envconfig.envdir == self.config.toxinidir: self.report.error( "venv {!r} in {} would delete project".format(name, envconfig.envdir) ) raise tox.exception.ConfigError("envdir must not equal toxinidir") venv = VirtualEnv(envconfig=envconfig, session=self) self._name2venv[name] = venv return venv
[docs] def getvenv(self, name): """ return a VirtualEnv controler object for the 'name' env. """ try: return self._name2venv[name] except KeyError: return self._makevenv(name)
def newaction(self, venv, msg, *args): action = Action(self, venv, msg, args) self._actions.append(action) return action def runcommand(self): self.report.using("tox-{} from {}".format(tox.__version__, tox.__file__)) verbosity = self.report.verbosity > Verbosity.DEFAULT if self.config.option.showconfig: self.showconfig() elif self.config.option.listenvs: self.showenvs(all_envs=False, description=verbosity) elif self.config.option.listenvs_all: self.showenvs(all_envs=True, description=verbosity) else: return self.subcommand_test() def _copyfiles(self, srcdir, pathlist, destdir): for relpath in pathlist: src = srcdir.join(relpath) if not src.check(): self.report.error("missing source file: {}".format(src)) raise SystemExit(1) target = destdir.join(relpath) target.dirpath().ensure(dir=1) src.copy(target) def _makesdist(self): setup = self.config.setupdir.join("setup.py") if not setup.check(): self.report.error( "No setup.py file found. The expected location is:\n" " {}\n" "You can\n" " 1. Create one:\n" " https://packaging.python.org/tutorials/distributing-packages/#setup-py\n" " 2. Configure tox to avoid running sdist:\n" " http://tox.readthedocs.io/en/latest/example/general.html" "#avoiding-expensive-sdist".format(setup) ) raise SystemExit(1) with self.newaction(None, "packaging") as action: action.setactivity("sdist-make", setup) self.make_emptydir(self.config.distdir) action.popen( [ sys.executable, setup, "sdist", "--formats=zip", "--dist-dir", self.config.distdir, ], cwd=self.config.setupdir, ) try: return self.config.distdir.listdir()[0] except py.error.ENOENT: # check if empty or comment only data = [] with open(str(setup)) as fp: for line in fp: if line and line[0] == "#": continue data.append(line) if not "".join(data).strip(): self.report.error("setup.py is empty") raise SystemExit(1) self.report.error( "No dist directory found. Please check setup.py, e.g with:\n" " python setup.py sdist" ) raise SystemExit(1) def make_emptydir(self, path): if path.check(): self.report.info(" removing {}".format(path)) shutil.rmtree(str(path), ignore_errors=True) path.ensure(dir=1) def setupenv(self, venv): if venv.envconfig.missing_subs: venv.status = ( "unresolvable substitution(s): {}. " "Environment variables are missing or defined recursively.".format( ",".join(["'{}'".format(m) for m in venv.envconfig.missing_subs]) ) ) return if not venv.matching_platform(): venv.status = "platform mismatch" return # we simply omit non-matching platforms with self.newaction(venv, "getenv", venv.envconfig.envdir) as action: venv.status = 0 default_ret_code = 1 envlog = self.resultlog.get_envlog(venv.name) try: status = venv.update(action=action) except IOError as e: if e.args[0] != 2: raise status = ( "Error creating virtualenv. Note that spaces in paths are " "not supported by virtualenv. Error details: {!r}".format(e) ) except tox.exception.InvocationError as e: status = ( "Error creating virtualenv. Note that some special characters (e.g. ':' and " "unicode symbols) in paths are not supported by virtualenv. Error details: " "{!r}".format(e) ) except tox.exception.InterpreterNotFound as e: status = e if self.config.option.skip_missing_interpreters: default_ret_code = 0 if status: str_status = str(status) commandlog = envlog.get_commandlog("setup") commandlog.add_command(["setup virtualenv"], str_status, default_ret_code) venv.status = status if default_ret_code == 0: self.report.skip(str_status) else: self.report.error(str_status) return False commandpath = venv.getcommandpath("python") envlog.set_python_info(commandpath) return True def finishvenv(self, venv): with self.newaction(venv, "finishvenv"): venv.finish() return True def developpkg(self, venv, setupdir): with self.newaction(venv, "developpkg", setupdir) as action: try: venv.developpkg(setupdir, action) return True except tox.exception.InvocationError: venv.status = sys.exc_info()[1] return False
[docs] def installpkg(self, venv, path): """Install package in the specified virtual environment. :param VenvConfig venv: Destination environment :param str path: Path to the distribution package. :return: True if package installed otherwise False. :rtype: bool """ self.resultlog.set_header(installpkg=py.path.local(path)) with self.newaction(venv, "installpkg", path) as action: try: venv.installpkg(path, action) return True except tox.exception.InvocationError: venv.status = sys.exc_info()[1] return False
[docs] def get_installpkg_path(self): """ :return: Path to the distribution :rtype: py.path.local """ if not self.config.option.sdistonly and ( self.config.sdistsrc or self.config.option.installpkg ): path = self.config.option.installpkg if not path: path = self.config.sdistsrc path = self._resolve_package(path) self.report.info("using package {!r}, skipping 'sdist' activity ".format(str(path))) else: try: path = self._makesdist() except tox.exception.InvocationError: v = sys.exc_info()[1] self.report.error("FAIL could not package project - v = {!r}".format(v)) return sdistfile = self.config.distshare.join(path.basename) if sdistfile != path: self.report.info("copying new sdistfile to {!r}".format(str(sdistfile))) try: sdistfile.dirpath().ensure(dir=1) except py.error.Error: self.report.warning( "could not copy distfile to {}".format(sdistfile.dirpath()) ) else: path.copy(sdistfile) return path
def subcommand_test(self): if self.config.skipsdist: self.report.info("skipping sdist step") path = None else: path = self.get_installpkg_path() if not path: return 2 if self.config.option.sdistonly: return for venv in self.venvlist: if self.setupenv(venv): if venv.envconfig.skip_install: self.finishvenv(venv) else: if venv.envconfig.usedevelop: self.developpkg(venv, self.config.setupdir) elif self.config.skipsdist: self.finishvenv(venv) else: self.installpkg(venv, path) self.runenvreport(venv) self.runtestenv(venv) retcode = self._summary() return retcode
[docs] def runenvreport(self, venv): """ Run an environment report to show which package versions are installed in the venv """ with self.newaction(venv, "envreport") as action: packages = self.hook.tox_runenvreport(venv=venv, action=action) action.setactivity("installed", ",".join(packages)) envlog = self.resultlog.get_envlog(venv.name) envlog.set_installed(packages)
def runtestenv(self, venv, redirect=False): if not self.config.option.notest: if venv.status: return self.hook.tox_runtest_pre(venv=venv) self.hook.tox_runtest(venv=venv, redirect=redirect) self.hook.tox_runtest_post(venv=venv) else: venv.status = "skipped tests" def _summary(self): self.report.startsummary() retcode = 0 for venv in self.venvlist: status = venv.status if isinstance(status, tox.exception.InterpreterNotFound): msg = " {}: {}".format(venv.envconfig.envname, str(status)) if self.config.option.skip_missing_interpreters: self.report.skip(msg) else: retcode = 1 self.report.error(msg) elif status == "platform mismatch": msg = " {}: {}".format(venv.envconfig.envname, str(status)) self.report.skip(msg) elif status and status == "ignored failed command": msg = " {}: {}".format(venv.envconfig.envname, str(status)) self.report.good(msg) elif status and status != "skipped tests": msg = " {}: {}".format(venv.envconfig.envname, str(status)) self.report.error(msg) retcode = 1 else: if not status: status = "commands succeeded" self.report.good(" {}: {}".format(venv.envconfig.envname, status)) if not retcode: self.report.good(" congratulations :)") path = self.config.option.resultjson if path: path = py.path.local(path) path.write(self.resultlog.dumps_json()) self.report.line("wrote json report at: {}".format(path)) return retcode def showconfig(self): self.info_versions() self.report.keyvalue("config-file:", self.config.option.configfile) self.report.keyvalue("toxinipath: ", self.config.toxinipath) self.report.keyvalue("toxinidir: ", self.config.toxinidir) self.report.keyvalue("toxworkdir: ", self.config.toxworkdir) self.report.keyvalue("setupdir: ", self.config.setupdir) self.report.keyvalue("distshare: ", self.config.distshare) self.report.keyvalue("skipsdist: ", self.config.skipsdist) self.report.tw.line() for envconfig in self.config.envconfigs.values(): self.report.line("[testenv:{}]".format(envconfig.envname), bold=True) for attr in self.config._parser._testenv_attr: self.report.line(" {:<15} = {}".format(attr.name, getattr(envconfig, attr.name))) def showenvs(self, all_envs=False, description=False): env_conf = self.config.envconfigs # this contains all environments default = self.config.envlist # this only the defaults extra = sorted(e for e in env_conf if e not in default) if all_envs else [] if description: self.report.line("default environments:") max_length = max(len(env) for env in (default + extra)) def report_env(e): if description: text = env_conf[e].description or "[no description]" msg = "{} -> {}".format(e.ljust(max_length), text).strip() else: msg = e self.report.line(msg) for e in default: report_env(e) if all_envs and extra: if description: self.report.line("") self.report.line("additional environments:") for e in extra: report_env(e) def info_versions(self): versions = ["tox-{}".format(tox.__version__)] proc = subprocess.Popen( (sys.executable, "-m", "virtualenv", "--version"), stdout=subprocess.PIPE ) out, _ = proc.communicate() versions.append("virtualenv-{}".format(out.decode("UTF-8").strip())) self.report.keyvalue("tool-versions:", " ".join(versions)) def _resolve_package(self, package_spec): try: return self._spec2pkg[package_spec] except KeyError: self._spec2pkg[package_spec] = x = self._get_latest_version_of_package(package_spec) return x def _get_latest_version_of_package(self, package_spec): if not os.path.isabs(str(package_spec)): return package_spec p = py.path.local(package_spec) if p.check(): return p if not p.dirpath().check(dir=1): raise tox.exception.MissingDirectory(p.dirpath()) self.report.info("determining {}".format(p)) candidates = p.dirpath().listdir(p.basename) if len(candidates) == 0: raise tox.exception.MissingDependency(package_spec) if len(candidates) > 1: version_package = [] for filename in candidates: version = get_version_from_filename(filename.basename) if version is not None: version_package.append((version, filename)) else: self.report.warning("could not determine version of: {}".format(str(filename))) if not version_package: raise tox.exception.MissingDependency(package_spec) version_package.sort() _, package_with_largest_version = version_package[-1] return package_with_largest_version else: return candidates[0]
_REGEX_FILE_NAME_WITH_VERSION = re.compile(r"[\w_\-\+\.]+-(.*)\.(zip|tar\.gz)") def get_version_from_filename(basename): m = _REGEX_FILE_NAME_WITH_VERSION.match(basename) if m is None: return None version = m.group(1) try: return pkg_resources.packaging.version.Version(version) except pkg_resources.packaging.version.InvalidVersion: return None