Source code for tox.session

"""
Automatically package and test a Python project against configurable
Python2 and Python3 based virtual environments. Environments are
setup by using virtualenv. Configuration is generally done through an
INI-style "tox.ini" file.
"""

import os
import pipes
import re
import shutil
import subprocess
import sys
import time
from collections import OrderedDict
from contextlib import contextmanager
from threading import Event, Semaphore, Thread

import pkg_resources
import py

import tox
from tox.config import parseconfig
from tox.config.parallel import ENV_VAR_KEY as PARALLEL_ENV_VAR_KEY
from tox.config.parallel import OFF_VALUE as PARALLEL_OFF
from tox.result import ResultLog
from tox.util import set_os_env_var
from tox.util.graph import stable_topological_sort
from tox.util.spinner import Spinner
from tox.venv import VirtualEnv


def prepare(args):
    config = parseconfig(args)
    if config.option.help:
        show_help(config)
        raise SystemExit(0)
    elif config.option.helpini:
        show_help_ini(config)
        raise SystemExit(0)
    return config


def cmdline(args=None):
    if args is None:
        args = sys.argv[1:]
    main(args)


def main(args):
    try:
        config = prepare(args)
        with set_os_env_var("TOX_WORK_DIR", config.toxworkdir):
            retcode = build_session(config).runcommand()
        if retcode is None:
            retcode = 0
        raise SystemExit(retcode)
    except KeyboardInterrupt:
        raise SystemExit(2)
    except (tox.exception.MinVersionError, tox.exception.MissingRequirement) as e:
        r = Reporter(None)
        r.error(str(e))
        raise SystemExit(1)


def build_session(config):
    return Session(config)


def show_help(config):
    tw = py.io.TerminalWriter()
    tw.write(config._parser._format_help())
    tw.line()
    tw.line("Environment variables", bold=True)
    tw.line("TOXENV: comma separated list of environments (overridable by '-e')")
    tw.line("TOX_SKIP_ENV: regular expression to filter down from running tox environments")
    tw.line(
        "TOX_TESTENV_PASSENV: space-separated list of extra environment variables to be "
        "passed into test command environments"
    )
    tw.line("PY_COLORS: 0 disable colorized output, 1 enable (default)")


def show_help_ini(config):
    tw = py.io.TerminalWriter()
    tw.sep("-", "per-testenv attributes")
    for env_attr in config._testenv_attr:
        tw.line(
            "{:<15} {:<8} default: {}".format(
                env_attr.name, "<{}>".format(env_attr.type), env_attr.default
            ),
            bold=True,
        )
        tw.line(env_attr.help)
        tw.line()


class Action(object):
    def __init__(self, session, venv, msg, args):
        self.venv = venv
        self.msg = msg
        self.activity = msg.split(" ", 1)[0]
        self.session = session
        self.report = session.report
        self.args = args
        self.id = venv and venv.envconfig.envname or "tox"
        self._popenlist = []
        if self.venv:
            self.venvname = self.venv.name
        else:
            self.venvname = "GLOB"
        if msg == "runtests":
            cat = "test"
        else:
            cat = "setup"
        envlog = session.resultlog.get_envlog(self.venvname)
        self.commandlog = envlog.get_commandlog(cat)

    def __enter__(self):
        self.report.logaction_start(self)
        return self

    def __exit__(self, *args):
        self.report.logaction_finish(self)

    def setactivity(self, name, msg):
        self.activity = name
        if msg:
            self.report.verbosity0("{} {}: {}".format(self.venvname, name, msg), bold=True)
        else:
            self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)

    def info(self, name, msg):
        self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)

    def _initlogpath(self, actionid):
        if self.venv:
            logdir = self.venv.envconfig.envlogdir
        else:
            logdir = self.session.config.logdir
        try:
            log_count = len(logdir.listdir("{}-*".format(actionid)))
        except (py.error.ENOENT, py.error.ENOTDIR):
            logdir.ensure(dir=1)
            log_count = 0
        path = logdir.join("{}-{}.log".format(actionid, log_count))
        f = path.open("w")
        f.flush()
        return f

    def popen(
        self,
        args,
        cwd=None,
        env=None,
        redirect=True,
        returnout=False,
        ignore_ret=False,
        capture_err=True,
    ):
        stdout = outpath = None
        resultjson = self.session.config.option.resultjson

        stderr = subprocess.STDOUT if capture_err else None

        cmd_args = [str(x) for x in args]
        cmd_args_shell = " ".join(pipes.quote(i) for i in cmd_args)
        if resultjson or redirect:
            fout = self._initlogpath(self.id)
            fout.write(
                "actionid: {}\nmsg: {}\ncmdargs: {!r}\n\n".format(
                    self.id, self.msg, cmd_args_shell
                )
            )
            fout.flush()
            outpath = py.path.local(fout.name)
            fin = outpath.open("rb")
            fin.read()  # read the header, so it won't be written to stdout
            stdout = fout
        elif returnout:
            stdout = subprocess.PIPE
        if cwd is None:
            # FIXME XXX cwd = self.session.config.cwd
            cwd = py.path.local()
        try:
            popen = self._popen(args, cwd, env=env, stdout=stdout, stderr=stderr)
        except OSError as e:
            self.report.error(
                "invocation failed (errno {:d}), args: {}, cwd: {}".format(
                    e.errno, cmd_args_shell, cwd
                )
            )
            raise
        popen.outpath = outpath
        popen.args = cmd_args
        popen.cwd = cwd
        popen.action = self
        self._popenlist.append(popen)
        try:
            self.report.logpopen(popen, cmd_args_shell)
            try:
                if resultjson and not redirect:
                    if popen.stderr is not None:
                        # prevent deadlock
                        raise ValueError("stderr must not be piped here")
                    # we read binary from the process and must write using a
                    # binary stream
                    buf = getattr(sys.stdout, "buffer", sys.stdout)
                    out = None
                    last_time = time.time()
                    while 1:
                        # we have to read one byte at a time, otherwise there
                        # might be no output for a long time with slow tests
                        data = fin.read(1)
                        if data:
                            buf.write(data)
                            if b"\n" in data or (time.time() - last_time) > 1:
                                # we flush on newlines or after 1 second to
                                # provide quick enough feedback to the user
                                # when printing a dot per test
                                buf.flush()
                                last_time = time.time()
                        elif popen.poll() is not None:
                            if popen.stdout is not None:
                                popen.stdout.close()
                            break
                        else:
                            time.sleep(0.1)
                            # the seek updates internal read buffers
                            fin.seek(0, 1)
                    fin.close()
                else:
                    out, err = popen.communicate()
            except KeyboardInterrupt:
                self.report.keyboard_interrupt()
                popen.wait()
                raise
            ret = popen.wait()
        finally:
            self._popenlist.remove(popen)
        if ret and not ignore_ret:
            invoked = " ".join(map(str, popen.args))
            if outpath:
                self.report.error(
                    "invocation failed (exit code {:d}), logfile: {}".format(ret, outpath)
                )
                out = outpath.read()
                self.report.error(out)
                if hasattr(self, "commandlog"):
                    self.commandlog.add_command(popen.args, out, ret)
                raise tox.exception.InvocationError("{} (see {})".format(invoked, outpath), ret)
            else:
                raise tox.exception.InvocationError("{!r}".format(invoked), ret)
        if not out and outpath:
            out = outpath.read()
        if hasattr(self, "commandlog"):
            self.commandlog.add_command(popen.args, out, ret)
        return out

    def _rewriteargs(self, cwd, args):
        newargs = []
        for arg in args:
            if not tox.INFO.IS_WIN and isinstance(arg, py.path.local):
                arg = cwd.bestrelpath(arg)
            newargs.append(str(arg))
        # subprocess does not always take kindly to .py scripts so adding the interpreter here
        if tox.INFO.IS_WIN:
            ext = os.path.splitext(str(newargs[0]))[1].lower()
            if ext == ".py" and self.venv:
                newargs = [str(self.venv.envconfig.envpython)] + newargs
        return newargs

    def _popen(self, args, cwd, stdout, stderr, env=None):
        if env is None:
            env = os.environ.copy()
        return self.session.popen(
            self._rewriteargs(cwd, args),
            shell=False,
            cwd=str(cwd),
            universal_newlines=True,
            stdout=stdout,
            stderr=stderr,
            env=env,
        )


class Verbosity(object):
    DEBUG = 2
    INFO = 1
    DEFAULT = 0
    QUIET = -1
    EXTRA_QUIET = -2


class Reporter(object):
    def __init__(self, session):
        self.tw = py.io.TerminalWriter()
        self.session = session
        self.reported_lines = []

    @property
    def verbosity(self):
        if self.session:
            return (
                self.session.config.option.verbose_level - self.session.config.option.quiet_level
            )
        else:
            return Verbosity.DEBUG

    def logpopen(self, popen, cmd_args_shell):
        """ log information about the action.popen() created process. """
        if popen.outpath:
            self.verbosity1("  {}$ {} >{}".format(popen.cwd, cmd_args_shell, popen.outpath))
        else:
            self.verbosity1("  {}$ {} ".format(popen.cwd, cmd_args_shell))

    def logaction_start(self, action):
        msg = "{} {}".format(action.msg, " ".join(map(str, action.args)))
        self.verbosity2("{} start: {}".format(action.venvname, msg), bold=True)
        assert not hasattr(action, "_starttime")
        action._starttime = time.time()

    def logaction_finish(self, action):
        duration = time.time() - action._starttime
        self.verbosity2(
            "{} finish: {} after {:.2f} seconds".format(action.venvname, action.msg, duration),
            bold=True,
        )
        delattr(action, "_starttime")

    def startsummary(self):
        if self.verbosity >= Verbosity.QUIET:
            self.tw.sep("_", "summary")

    def logline_if(self, level, msg, key=None, **kwargs):
        if self.verbosity >= level:
            message = str(msg) if key is None else "{}{}".format(key, msg)
            self.logline(message, **kwargs)

    def logline(self, msg, **opts):
        self.reported_lines.append(msg)
        self.tw.line("{}".format(msg), **opts)

    def keyboard_interrupt(self):
        self.error("KEYBOARDINTERRUPT")

    def keyvalue(self, name, value):
        if name.endswith(":"):
            name += " "
        self.tw.write(name, bold=True)
        self.tw.write(value)
        self.tw.line()

    def line(self, msg, **opts):
        self.logline(msg, **opts)

    def info(self, msg):
        self.logline_if(Verbosity.DEBUG, msg)

    def using(self, msg):
        self.logline_if(Verbosity.INFO, msg, "using ", bold=True)

    def good(self, msg):
        self.logline_if(Verbosity.QUIET, msg, green=True)

    def warning(self, msg):
        self.logline_if(Verbosity.QUIET, msg, "WARNING: ", red=True)

    def error(self, msg):
        self.logline_if(Verbosity.QUIET, msg, "ERROR: ", red=True)

    def skip(self, msg):
        self.logline_if(Verbosity.QUIET, msg, "SKIPPED: ", yellow=True)

    def verbosity0(self, msg, **opts):
        self.logline_if(Verbosity.DEFAULT, msg, **opts)

    def verbosity1(self, msg, **opts):
        self.logline_if(Verbosity.INFO, msg, **opts)

    def verbosity2(self, msg, **opts):
        self.logline_if(Verbosity.DEBUG, msg, **opts)


[docs]class Session: """The session object that ties together configuration, reporting, venv creation, testing.""" def __init__(self, config, popen=subprocess.Popen, Report=Reporter): self.config = config self.popen = popen self.resultlog = ResultLog() self.report = Report(self) self.make_emptydir(config.logdir) config.logdir.ensure(dir=1) self.report.using("tox.ini: {}".format(self.config.toxinipath)) self._spec2pkg = {} self._name2venv = {} try: self.venvlist = [self.getvenv(x) for x in self.evaluated_env_list()] except LookupError: raise SystemExit(1) except tox.exception.ConfigError as exception: self.report.error(str(exception)) raise SystemExit(1) try: self.venv_order = stable_topological_sort( OrderedDict((v.name, v.envconfig.depends) for v in self.venvlist) ) except ValueError as exception: self.report.error("circular dependency detected: {}".format(exception)) raise SystemExit(1) self._actions = [] def evaluated_env_list(self): tox_env_filter = os.environ.get("TOX_SKIP_ENV") tox_env_filter_re = re.compile(tox_env_filter) if tox_env_filter is not None else None for name in self.config.envlist: if tox_env_filter_re is not None and tox_env_filter_re.match(name): msg = "skip environment {}, matches filter {!r}".format( name, tox_env_filter_re.pattern ) self.report.verbosity1(msg) continue yield name @property def hook(self): return self.config.pluginmanager.hook def _makevenv(self, name): envconfig = self.config.envconfigs.get(name, None) if envconfig is None: self.report.error("unknown environment {!r}".format(name)) raise LookupError(name) elif envconfig.envdir == self.config.toxinidir: self.report.error( "venv {!r} in {} would delete project".format(name, envconfig.envdir) ) raise tox.exception.ConfigError("envdir must not equal toxinidir") venv = VirtualEnv(envconfig=envconfig, session=self) self._name2venv[name] = venv return venv
[docs] def getvenv(self, name): """ return a VirtualEnv controler object for the 'name' env. """ try: return self._name2venv[name] except KeyError: return self._makevenv(name)
def newaction(self, venv, msg, *args): action = Action(self, venv, msg, args) self._actions.append(action) return action def runcommand(self): self.report.using("tox-{} from {}".format(tox.__version__, tox.__file__)) verbosity = self.report.verbosity > Verbosity.DEFAULT if self.config.option.showconfig: self.showconfig() elif self.config.option.listenvs: self.showenvs(all_envs=False, description=verbosity) elif self.config.option.listenvs_all: self.showenvs(all_envs=True, description=verbosity) else: with self.cleanup(): return self.subcommand_test() @contextmanager def cleanup(self): self.config.temp_dir.ensure(dir=True) try: yield finally: for name in self.venv_order: tox_env = self.getvenv(name) if ( hasattr(tox_env, "package") and isinstance(tox_env.package, py.path.local) and tox_env.package.exists() ): self.report.verbosity2("cleanup {}".format(tox_env.package)) tox_env.package.remove() py.path.local(tox_env.package.dirname).remove(ignore_errors=True) def make_emptydir(self, path): if path.check(): self.report.info(" removing {}".format(path)) shutil.rmtree(str(path), ignore_errors=True) path.ensure(dir=1) def setupenv(self, venv): if venv.envconfig.missing_subs: venv.status = ( "unresolvable substitution(s): {}. " "Environment variables are missing or defined recursively.".format( ",".join(["'{}'".format(m) for m in venv.envconfig.missing_subs]) ) ) return if not venv.matching_platform(): venv.status = "platform mismatch" return # we simply omit non-matching platforms with self.newaction(venv, "getenv", venv.envconfig.envdir) as action: venv.status = 0 default_ret_code = 1 envlog = self.resultlog.get_envlog(venv.name) try: status = venv.update(action=action) except IOError as e: if e.args[0] != 2: raise status = ( "Error creating virtualenv. Note that spaces in paths are " "not supported by virtualenv. Error details: {!r}".format(e) ) except tox.exception.InvocationError as e: status = ( "Error creating virtualenv. Note that some special characters (e.g. ':' and " "unicode symbols) in paths are not supported by virtualenv. Error details: " "{!r}".format(e) ) except tox.exception.InterpreterNotFound as e: status = e if self.config.option.skip_missing_interpreters == "true": default_ret_code = 0 if status: str_status = str(status) commandlog = envlog.get_commandlog("setup") commandlog.add_command(["setup virtualenv"], str_status, default_ret_code) venv.status = status if default_ret_code == 0: self.report.skip(str_status) else: self.report.error(str_status) return False commandpath = venv.getcommandpath("python") envlog.set_python_info(commandpath) return True def finishvenv(self, venv): with self.newaction(venv, "finishvenv"): venv.finish() return True def developpkg(self, venv, setupdir): with self.newaction(venv, "developpkg", setupdir) as action: try: venv.developpkg(setupdir, action) return True except tox.exception.InvocationError as exception: venv.status = exception return False
[docs] def installpkg(self, venv, path): """Install package in the specified virtual environment. :param VenvConfig venv: Destination environment :param str path: Path to the distribution package. :return: True if package installed otherwise False. :rtype: bool """ self.resultlog.set_header(installpkg=py.path.local(path)) with self.newaction(venv, "installpkg", path) as action: try: venv.installpkg(path, action) return True except tox.exception.InvocationError as exception: venv.status = exception return False
def subcommand_test(self): if self.config.skipsdist: self.report.info("skipping sdist step") else: for name in self.venv_order: venv = self.getvenv(name) if not venv.envconfig.skip_install: venv.package = self.hook.tox_package(session=self, venv=venv) if not venv.package: return 2 venv.envconfig.setenv[str("TOX_PACKAGE")] = str(venv.package) if self.config.option.sdistonly: return within_parallel = PARALLEL_ENV_VAR_KEY in os.environ if not within_parallel and self.config.option.parallel != PARALLEL_OFF: self.run_parallel() else: self.run_sequential() retcode = self._summary() return retcode def run_sequential(self): for name in self.venv_order: venv = self.getvenv(name) if self.setupenv(venv): if venv.envconfig.skip_install: self.finishvenv(venv) else: if venv.envconfig.usedevelop: self.developpkg(venv, self.config.setupdir) elif self.config.skipsdist: self.finishvenv(venv) else: self.installpkg(venv, venv.package) self.runenvreport(venv) self.runtestenv(venv)
[docs] def run_parallel(self): """here we'll just start parallel sub-processes""" live_out = self.config.option.parallel_live args = [sys.executable, "-m", "tox"] + self.config.args try: position = args.index("--") except ValueError: position = len(args) try: parallel_at = args[0:position].index("--parallel") del args[parallel_at] position -= 1 except ValueError: pass max_parallel = self.config.option.parallel if max_parallel is None: max_parallel = len(self.venv_order) semaphore = Semaphore(max_parallel) finished = Event() sink = None if live_out else subprocess.PIPE show_progress = not live_out and self.report.verbosity > Verbosity.QUIET with Spinner(enabled=show_progress) as spinner: def run_in_thread(tox_env, os_env): res = None env_name = tox_env.envconfig.envname try: os_env[str(PARALLEL_ENV_VAR_KEY)] = str(env_name) args_sub = list(args) if hasattr(tox_env, "package"): args_sub.insert(position, str(tox_env.package)) args_sub.insert(position, "--installpkg") process = subprocess.Popen( args_sub, env=os_env, stdout=sink, stderr=sink, stdin=None, universal_newlines=True, ) res = process.wait() finally: semaphore.release() finished.set() tox_env.status = ( "skipped tests" if self.config.option.notest else ("parallel child exit code {}".format(res) if res else res) ) done.add(env_name) report = spinner.succeed if self.config.option.notest: report = spinner.skip elif res: report = spinner.fail report(env_name) if not live_out: out, err = process.communicate() if res or tox_env.envconfig.parallel_show_output: outcome = ( "Failed {} under process {}, stdout:\n".format(env_name, process.pid) if res else "" ) message = "{}{}{}".format( outcome, out, "\nstderr:\n{}".format(err) if err else "" ).rstrip() self.report.logline_if(Verbosity.QUIET, message) threads = [] todo_keys = set(self.venv_order) todo = OrderedDict( (i, todo_keys & set(self.getvenv(i).envconfig.depends)) for i in self.venv_order ) done = set() while todo: for name, depends in list(todo.items()): if depends - done: # skip if has unfinished dependencies continue del todo[name] venv = self.getvenv(name) semaphore.acquire(blocking=True) spinner.add(name) thread = Thread(target=run_in_thread, args=(venv, os.environ.copy())) thread.start() threads.append(thread) if todo: # wait until someone finishes and retry queuing jobs finished.wait() finished.clear() for thread in threads: thread.join()
[docs] def runenvreport(self, venv): """ Run an environment report to show which package versions are installed in the venv """ with self.newaction(venv, "envreport") as action: packages = self.hook.tox_runenvreport(venv=venv, action=action) action.setactivity("installed", ",".join(packages)) envlog = self.resultlog.get_envlog(venv.name) envlog.set_installed(packages)
def runtestenv(self, venv, redirect=False): if venv.status == 0 and self.config.option.notest: venv.status = "skipped tests" else: if venv.status: return self.hook.tox_runtest_pre(venv=venv) if venv.status == 0: self.hook.tox_runtest(venv=venv, redirect=redirect) self.hook.tox_runtest_post(venv=venv) def _summary(self): is_parallel_child = PARALLEL_ENV_VAR_KEY in os.environ if not is_parallel_child: self.report.startsummary() exit_code = 0 for name in self.venv_order: venv = self.getvenv(name) reporter = self.report.good status = venv.status if isinstance(status, tox.exception.InterpreterNotFound): msg = " {}: {}".format(venv.envconfig.envname, str(status)) if self.config.option.skip_missing_interpreters == "true": reporter = self.report.skip else: exit_code = 1 reporter = self.report.error elif status == "platform mismatch": msg = " {}: {}".format(venv.envconfig.envname, str(status)) reporter = self.report.skip elif status and status == "ignored failed command": msg = " {}: {}".format(venv.envconfig.envname, str(status)) elif status and status != "skipped tests": msg = " {}: {}".format(venv.envconfig.envname, str(status)) reporter = self.report.error exit_code = 1 else: if not status: status = "commands succeeded" msg = " {}: {}".format(venv.envconfig.envname, status) if not is_parallel_child: reporter(msg) if not exit_code and not is_parallel_child: self.report.good(" congratulations :)") if not is_parallel_child: path = self.config.option.resultjson if path: path = py.path.local(path) path.write(self.resultlog.dumps_json()) self.report.line("wrote json report at: {}".format(path)) return exit_code def showconfig(self): self.info_versions() self.report.keyvalue("config-file:", self.config.option.configfile) self.report.keyvalue("toxinipath: ", self.config.toxinipath) self.report.keyvalue("toxinidir: ", self.config.toxinidir) self.report.keyvalue("toxworkdir: ", self.config.toxworkdir) self.report.keyvalue("setupdir: ", self.config.setupdir) self.report.keyvalue("distshare: ", self.config.distshare) self.report.keyvalue("skipsdist: ", self.config.skipsdist) self.report.tw.line() for envconfig in self.config.envconfigs.values(): self.report.line("[testenv:{}]".format(envconfig.envname), bold=True) for attr in self.config._parser._testenv_attr: self.report.line(" {:<15} = {}".format(attr.name, getattr(envconfig, attr.name))) def showenvs(self, all_envs=False, description=False): env_conf = self.config.envconfigs # this contains all environments default = self.config.envlist # this only the defaults ignore = {self.config.isolated_build_env}.union(default) extra = [e for e in env_conf if e not in ignore] if all_envs else [] if description: self.report.line("default environments:") max_length = max(len(env) for env in (default + extra)) def report_env(e): if description: text = env_conf[e].description or "[no description]" msg = "{} -> {}".format(e.ljust(max_length), text).strip() else: msg = e self.report.line(msg) for e in default: report_env(e) if all_envs and extra: if description: self.report.line("") self.report.line("additional environments:") for e in extra: report_env(e) def info_versions(self): versions = ["tox-{}".format(tox.__version__)] proc = subprocess.Popen( (sys.executable, "-m", "virtualenv", "--version"), stdout=subprocess.PIPE ) out, _ = proc.communicate() versions.append("virtualenv-{}".format(out.decode("UTF-8").strip())) self.report.keyvalue("tool-versions:", " ".join(versions)) def _resolve_package(self, package_spec): try: return self._spec2pkg[package_spec] except KeyError: self._spec2pkg[package_spec] = x = self._get_latest_version_of_package(package_spec) return x def _get_latest_version_of_package(self, package_spec): if not os.path.isabs(str(package_spec)): return package_spec p = py.path.local(package_spec) if p.check(): return p if not p.dirpath().check(dir=1): raise tox.exception.MissingDirectory(p.dirpath()) self.report.info("determining {}".format(p)) candidates = p.dirpath().listdir(p.basename) if len(candidates) == 0: raise tox.exception.MissingDependency(package_spec) if len(candidates) > 1: version_package = [] for filename in candidates: version = get_version_from_filename(filename.basename) if version is not None: version_package.append((version, filename)) else: self.report.warning("could not determine version of: {}".format(str(filename))) if not version_package: raise tox.exception.MissingDependency(package_spec) version_package.sort() _, package_with_largest_version = version_package[-1] return package_with_largest_version else: return candidates[0]
_REGEX_FILE_NAME_WITH_VERSION = re.compile(r"[\w_\-\+\.]+-(.*)\.(zip|tar\.gz)") def get_version_from_filename(basename): m = _REGEX_FILE_NAME_WITH_VERSION.match(basename) if m is None: return None version = m.group(1) try: return pkg_resources.packaging.version.Version(version) except pkg_resources.packaging.version.InvalidVersion: return None