# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the reproman package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
"""
__docformat__ = 'restructuredtext'
import argparse
import os
import re
import sys
import gzip
from tempfile import NamedTemporaryFile
from ..cmd import Runner
from ..log import is_interactive
from ..utils import getpwd
from ..version import __version__
from ..dochelpers import exc_str
from logging import getLogger
lgr = getLogger('reproman.cmdline')
[docs]class HelpAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if is_interactive() and option_string == '--help':
# lets use the manpage on mature systems ...
try:
import subprocess
# get the reproman manpage to use
manfile = os.environ.get('MANPATH', '/usr/share/man') \
+ '/man1/{0}.1.gz'.format(parser.prog.replace(' ', '-'))
# extract version field from the manpage
if not os.path.exists(manfile):
raise IOError("manfile is not found")
with gzip.open(manfile) as f:
man_th = [line for line in f if line.startswith(".TH")][0]
man_version = man_th.split(' ')[5].strip(" '\"\t\n")
# don't show manpage if man_version not equal to current reproman_version
if __version__ != man_version:
raise ValueError
subprocess.check_call(
'man %s 2> /dev/null' % manfile,
shell=True)
sys.exit(0)
except (subprocess.CalledProcessError, IOError, OSError, IndexError, ValueError) as e:
lgr.debug("Did not use manpage since %s", exc_str(e))
if option_string == '-h':
helpstr = "%s\n%s" % (
parser.format_usage(),
"Use '--help' to get more comprehensive information.")
else:
helpstr = parser.format_help()
# better for help2man
# for main command -- should be different sections. And since we are in
# heavy output messaging mode...
if "commands for manipulating computation environments" in helpstr.lower():
opt_args_str = '*Global options*'
pos_args_str = '*Commands*'
# tune up usage -- default one is way too heavy
helpstr = re.sub('^[uU]sage: .*?\n\s*\n',
'Usage: reproman [global-opts] command [command-opts]\n\n',
helpstr,
flags=re.MULTILINE | re.DOTALL)
# and altogether remove sections with long list of commands
helpstr = re.sub(r'positional arguments:\s*\n\s*{.*}\n', '', helpstr)
else:
opt_args_str = "*Options*"
pos_args_str = "*Arguments*"
helpstr = re.sub(r'optional arguments:', opt_args_str, helpstr)
helpstr = re.sub(r'positional arguments:', pos_args_str, helpstr)
# convert all headings to have the first character uppercase
headpat = re.compile(r'^([a-z])(.*):$', re.MULTILINE)
helpstr = re.subn(
headpat,
lambda match: r'{0}{1}:'.format(match.group(1).upper(),
match.group(2)),
helpstr)[0]
# usage is on the same line
helpstr = re.sub(r'^usage:', 'Usage:', helpstr)
print(helpstr)
sys.exit(0)
[docs]class LogLevelAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
from ..log import LoggerHelper
LoggerHelper().set_level(level=values)
[docs]class PBSAction(argparse.Action):
"""Action to schedule actual command execution via PBS (e.g. Condor)"""
def __call__(self, parser, namespace, values, option_string=None):
pbs = values[0]
import pdb; pdb.set_trace()
i = 1
[docs]def parser_add_common_args(parser, pos=None, opt=None, **kwargs):
from . import common_args
for i, args in enumerate((pos, opt)):
if args is None:
continue
for arg in args:
arg_tmpl = getattr(common_args, arg)
arg_kwargs = arg_tmpl[2].copy()
arg_kwargs.update(kwargs)
if i:
parser.add_argument(*arg_tmpl[i], **arg_kwargs)
else:
parser.add_argument(arg_tmpl[i], **arg_kwargs)
[docs]def parser_add_common_opt(parser, opt, names=None, **kwargs):
from . import common_args
opt_tmpl = getattr(common_args, opt)
opt_kwargs = opt_tmpl[2].copy()
opt_kwargs.update(kwargs)
if names is None:
parser.add_argument(*opt_tmpl[1], **opt_kwargs)
else:
parser.add_argument(*names, **opt_kwargs)
[docs]def strip_arg_from_argv(args, value, opt_names):
"""Strip an originally listed option (with its value) from the list cmdline args
"""
# Yarik doesn't know better
args = args or sys.argv
# remove present pbs-runner option
args_clean = []
skip = 0
for i, arg in enumerate(args):
if skip:
# we skip only one as instructed
skip -= 1
continue
if not (arg in opt_names and i < len(args)-1 and args[i + 1] == value):
args_clean.append(arg)
else:
# we need to skip this one and next one
skip = 1
return args_clean
[docs]def run_via_pbs(args, pbs):
assert(pbs in ('condor',)) # for now
# TODO: RF to support multiple backends, parameters, etc, for now -- just condor, no options
f = NamedTemporaryFile('w', prefix='reproman-%s-' % pbs, suffix='.submit', delete=False)
try:
pwd = getpwd()
logs = f.name.replace('.submit', '.log')
exe = args[0]
# TODO: we might need better way to join them, escaping spaces etc. There must be a stock helper
#exe_args = ' '.join(map(repr, args[1:])) if len(args) > 1 else ''
exe_args = ' '.join(args[1:]) if len(args) > 1 else ''
f.write("""\
Executable = %(exe)s
Initialdir = %(pwd)s
Output = %(logs)s
Error = %(logs)s
getenv = True
arguments = %(exe_args)s
queue
""" % locals())
f.close()
Runner().run(['condor_submit', f.name])
lgr.info("Scheduled execution via %s. Logs will be stored under %s" % (pbs, logs))
finally:
os.unlink(f.name)
[docs]class RegexpType(object):
"""Factory for creating regular expression types for argparse
DEPRECATED AFAIK -- now things are in the config file,
but we might provide a mode where we operate solely from cmdline
"""
def __call__(self, string):
if string:
return re.compile(string)
else:
return None
# TODO: useful also outside of cmdline, move to support/
from os import curdir
[docs]def get_repo_instance(path=curdir, class_=None):
"""Returns an instance of appropriate reproman repository for path.
Check whether a certain path is inside a known type of repository and
returns an instance representing it. May also check for a certain type
instead of detecting the type of repository.
Parameters
----------
path: str
path to check; default: current working directory
class_: class
if given, check whether path is inside a repository, that can be
represented as an instance of the passed class.
Raises
------
RuntimeError, in case cwd is not inside a known repository.
"""
from os.path import join as opj, ismount, exists, abspath, expanduser, \
expandvars, normpath, isabs
from git.exc import InvalidGitRepositoryError
from ..utils import expandpath
from ..support.gitrepo import GitRepo
from ..support.annexrepo import AnnexRepo
dir_ = expandpath(path)
abspath_ = path if isabs(path) else dir_
if class_ is not None:
if class_ == AnnexRepo:
type_ = "annex"
elif class_ == GitRepo:
type_ = "git"
else:
raise RuntimeError("Unknown class %s." % str(class_))
while not ismount(dir_): # TODO: always correct termination?
if exists(opj(dir_, '.git')):
# found git dir
if class_ is None:
# detect repo type:
try:
return AnnexRepo(dir_, create=False)
except RuntimeError as e:
pass
try:
return GitRepo(dir_, create=False)
except InvalidGitRepositoryError as e:
raise RuntimeError("No reproman repository found in %s" %
abspath_)
else:
try:
return class_(dir_, create=False)
except (RuntimeError, InvalidGitRepositoryError) as e:
raise RuntimeError("No %s repository found in %s." %
(type_, abspath_))
else:
dir_ = normpath(opj(dir_, ".."))
if class_ is not None:
raise RuntimeError("No %s repository found in %s" % (type_, abspath_))
else:
raise RuntimeError("No reproman repository found in %s" % abspath_)
from appdirs import AppDirs
from os.path import join as opj
dirs = AppDirs("reproman", "reproman.org")