Source code for reproman.interface.jobs

# -*- coding: utf-8 -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the reproman package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Operate on `reproman run` jobs.
"""

from functools import partial
import operator
import logging
import yaml

from reproman.dochelpers import exc_str
from reproman.interface.base import Interface
from reproman.support.jobs.local_registry import LocalRegistry
from reproman.support.jobs.orchestrators import ORCHESTRATORS
from reproman.resource import get_manager
from reproman.support.param import Parameter
from reproman.support.constraints import EnsureChoice
from reproman.support.exceptions import OrchestratorError
from reproman.support.exceptions import ResourceNotFoundError
from reproman.utils import chpwd

lgr = logging.getLogger("reproman.interface.jobs")

__docformat__ = "restructuredtext"


LREG = LocalRegistry()


def _load(job_file):
    with open(job_file) as jfh:
        return yaml.safe_load(jfh)


def match(query_id, jobids):
    """Match `query_id` against `job_ids`.

    Three types of matches are considered, in this order: full match or partial
    match. If there is a full match, partial matches are not considered.

    Parameters
    ----------
    query_id : str
        A candidate for a match or partial match with a known job ID.
    jobids : list of str
        Known job IDs.

    Returns
    -------
    Matched job ID (str) or None if there is no match.

    Raises
    ------
    ValueError if there are multiple hits for `query_id`.
    """
    query_fns = [operator.eq, operator.contains]
    for fn in query_fns:
        matches = [jobid for jobid in jobids if fn(jobid, query_id)]
        if len(matches) == 1:
            return matches[0]
        elif matches:
            # TODO: Use custom exception.
            raise ValueError("ID {} matches multiple jobs: {}"
                             .format(query_id, ", ".join(matches)))


def _resurrect_orc(job):
    resource = get_manager().get_resource(job["resource_id"], "id")
    try:
        # Create chpwd separately so that this try-except block doesn't cover
        # the context manager suite below.
        cd = chpwd(job["local_directory"])
    except FileNotFoundError:
        raise OrchestratorError(
            "local directory for job {} no longer exists: {}"
            .format(job["_jobid"], job["local_directory"]))

    with cd:
        orchestrator_class = ORCHESTRATORS[job["orchestrator"]]
        orc = orchestrator_class(resource, job["submitter"], job,
                                 resurrection=True)
        orc.submitter.submission_id = job.get("_submission_id")
    return orc


# Action functions


def show_oneline(job, status=False):
    """Display `job` as a single summary line.
    """
    fmt = "{status}{j[_jobid]} on {j[resource_name]} via {j[submitter]}$ {cmd}"
    if status:
        orc = _resurrect_orc(job)
        orc_status = orc.status
        _, queried_status = orc.submitter.status
        if orc_status == queried_status:
            # Drop repeated status (e.g., our and condor's "running").
            queried_status = None
        stat = "[status: {}{}] ".format(
            orc_status,
            ", " + queried_status if queried_status else "")
    else:
        stat = ""
    try:
        cmd = job["_resolved_command_str"]
        print(fmt
              .format(status=stat, j=job,
                      cmd=cmd[:47] + "..." if len(cmd) > 50 else cmd))
    except KeyError as exc:
        lgr.warning(
            "Skipping following job record missing %s: %s",
            exc, job
        )


def show(job, status=False):
    """Display detailed information about `job`.
    """
    if status:
        orc = _resurrect_orc(job)
        queried_normalized, queried = orc.submitter.status
        job["status"] = {"orchestrator": orc.status,
                         "queried": queried,
                         "queried_normalized": queried_normalized}
    print(yaml.safe_dump(job))


def fetch(job):
    """Fetch `job` locally.
    """
    orc = _resurrect_orc(job)
    if orc.has_completed:
        orc.fetch()
        LREG.unregister(orc.jobid)
    else:
        lgr.warning("Not fetching incomplete job %s [status: %s]",
                    job["_jobid"],
                    orc.status or "unknown")


class Jobs(Interface):
    """View and manage `reproman run` jobs.

    The possible actions are

      - list: Display a oneline list of all registered jobs

      - show: Display more information for each job over multiple lines

      - delete: Unregister a job locally

      - fetch: Fetch a completed job

      - auto: If jobs are specified (via JOB or --all), behave like 'fetch'.
        Otherwise, behave like 'list'.
    """

    _params_ = dict(
        queries=Parameter(
            metavar="JOB",
            nargs="*",
            doc="""A full job ID or a unique substring."""),
        action=Parameter(
            args=("-a", "--action"),
            constraints=EnsureChoice(
                "auto", "list", "show",
                "delete", "fetch"),
            doc="""Operation to perform on the job(s)."""),
        all_=Parameter(
            dest="all_",
            args=("--all",),
            action="store_true",
            doc="Operate on all jobs"),
        status=Parameter(
            dest="status",
            args=("-s", "--status"),
            action="store_true",
            doc="""Query the resource for status information when listing or
            showing jobs"""),
        # TODO: Add ability to restrict to resource.
    )

    @staticmethod
    def __call__(queries, action="auto", all_=False, status=False):
        job_files = LREG.find_job_files()

        if not job_files:
            lgr.info("No jobs found")
            return

        if all_:
            matched_ids = job_files.keys()
        else:
            matched_ids = []
            for query in queries:
                m = match(query, job_files)
                if m:
                    matched_ids.append(m)
                else:
                    lgr.warning("No jobs matched query %s", query)

        if not matched_ids and action in ["delete", "fetch"]:
            # These are actions where we don't want to just conveniently
            # default to "all" unless --all is explicitly specified.
            raise ValueError("Must specify jobs to {}".format(action))

        # We don't need to load the job to delete it, so check that first.
        if action == "delete":
            for i in matched_ids:
                LREG.unregister(i)
        else:
            jobs = [_load(job_files[i]) for i in matched_ids or job_files]

            if action == "fetch" or (action == "auto" and matched_ids):
                fn = fetch
            elif action == "list" or action == "auto":
                fn = partial(show_oneline, status=status)
            elif action == "show":
                fn = partial(show, status=status)
            else:
                raise RuntimeError("Unknown action: {}".format(action))

            for job in jobs:
                try:
                    fn(job)
                except OrchestratorError as exc:
                    lgr.error("job %s failed: %s", job["_jobid"], exc_str(exc))
                except ResourceNotFoundError:
                    lgr.error("Resource %s (%s) no longer exists",
                              job["resource_id"], job["resource_name"])