Source code for obob_condor.jobcluster

# -*- coding: UTF-8 -*-
# Copyright (c) 2018, Thomas Hartmann
# This file is part of the obob_condor Project, see:
#    obob_condor is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#    obob_condor is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    GNU General Public License for more details.
#    You should have received a copy of the GNU General Public License
#    along with obob_subjectdb. If not, see <>.

import getpass
import humanfriendly
import six
import sys
import os
import os.path
import obob_condor.job
import subprocess
import logging
import glob
import jinja2
import uuid
import numpy
import itertools
import copy
import collections
import inspect
import shutil

from obob_condor.job import JobItem

if sys.version_info >= (3, 6):
    import pathlib
    import pathlib2 as pathlib

[docs]class JobCluster(object): """ This is the main class, the *controller* of obob_condor. It collects all the jobs and takes care of submitting them to the cluster. It also contains information about how much RAM the jobs need, how many CPUs are requested etc. Parameters ---------- required_ram : :class:`str`, :class:`float`, :class:`int`, optional The amount of RAM required to run one Job in megabytes. A string like "2G" or "200M" will be converted accordingly. adjust_mem : :class:`bool`, optional If True, the job will be restarted automatically if it gets killed by condor because it uses too much RAM. request_cpus : :class:`int`, optional The number of CPUs requested jobs_dir ::class:`str`, optional Folder to put all the jobs in. This one needs to be on the shared filesystem (so somewhere under /mnt/obob) inc_jobsdir ::class:`str`, optional If this is set to True (default), jobs_dir is the parent folder for all the jobs folders. Each time a job is submitted, a new folder is created in the jobs_dir folder that contains all the necessary files and a folder called "log" containing the log files. If jobs_dir is set to False, the respective files are put directly under jobs_dir. In this case, jobs_dir must either be empty or not exist at all to avoid any side effects. owner ::class:`str`, optional Username the job should run under. If you submit your jobs from one of the bombers, you do not need to set this. If you have set up your local machine to submit jobs and your local username is different from your username on the cluster, set owner to that username. python_bin ::class:`str`, optional The path to the python interpreter that should run the jobs. If you do not set it, it gets chosen automatically. If the python interpreter you are using when submitting the jobs is on /mnt/obob/ that one will be used. If the interpreter you are using is **not** on /mnt/obob/ the default one at /mnt/obob/obob_mne will be used. working_directory ::class:`str`, optional The working directory when the jobs run. singularity_image ::class:`str`, optional Set this to a singularity image to have the jobs execute in it. Can be a link to a local file or to some online repository. """ _default_python_env = '/mnt/obob/obob_mne/bin/python' _obob_condor_runner = 'obob_condor.runner' _condor_submit = '/usr/bin/condor_submit' _runner_template = '' _submit_template = 'submit.sub' def __init__(self, required_ram='2G', adjust_mem=True, request_cpus=1, jobs_dir='jobs', inc_jobsdir=True, owner=None, python_bin=None, working_directory=None, singularity_image=None): self.required_ram = required_ram self.adjust_mem = adjust_mem self.request_cpus = request_cpus self.jobs_dir = jobs_dir self.inc_jobsdir = inc_jobsdir self.owner = owner self.python_bin = python_bin self.working_directory = working_directory self._condor_output_folder = None self._singularity_image = singularity_image self._jobs = list() self._jinja_env = jinja2.Environment(loader=jinja2.PackageLoader('obob_condor', 'jinja2_templates'), trim_blocks=True, lstrip_blocks=True)
[docs] def add_job(self, job, *args, **kwargs): """ Add one job to the JobCluster. All further arguments will be passed on to the Job. Parameters ---------- job : child of :class:`obob_condor.Job` The job class to be added. *args Variable length argument list. **kwargs Arbitrary keyword arguments. """ if not obob_condor.job.Job in inspect.getmro(job): raise TypeError('Job must be a subclass of obob_condor.Job') kwargs = collections.OrderedDict(kwargs) args_permuted = [idx for (idx, cur_arg) in enumerate(args) if isinstance(cur_arg, PermuteArgument)] kwargs_permuted = [idx for (idx, cur_key) in enumerate(kwargs.keys()) if isinstance(kwargs[cur_key], PermuteArgument)] if not args_permuted and not kwargs_permuted: self._jobs.append(JobItem(job, *args, **kwargs)) else: all_kwargs_as_array = numpy.array([kwargs[cur_key] for (idx, cur_key) in enumerate(kwargs.keys()) if idx in kwargs_permuted]) all_args_as_array = numpy.hstack((numpy.array(args)[args_permuted], all_kwargs_as_array)) all_args_permutations = itertools.product(*[x.args for x in all_args_as_array]) for cur_perm_args in all_args_permutations: new_args = numpy.array(args) new_kwargs = copy.deepcopy(kwargs) cur_perm_args_list = list(cur_perm_args) if args_permuted: new_args[args_permuted] = cur_perm_args_list[0:len(args_permuted)] del cur_perm_args_list[0:len(args_permuted)] if kwargs_permuted: for idx, cur_kwarg_idx in enumerate(kwargs_permuted): new_kwargs[list(new_kwargs.keys())[cur_kwarg_idx]] = cur_perm_args_list[idx] self.add_job(job, *new_args, **new_kwargs)
[docs] def run_local(self): """ Runs the added jobs locally. """ self._remove_notrunning_jobs() self._prepare_jobs_directory() self._generate_runner_file() submit_fname = self._generate_submit_file() submit_dict = dict() queue_glob = None with open(submit_fname, 'rt') as submit_file: for line in submit_file: if ' = ' in line: (key, val) = line.split(' = ') submit_dict[key.strip()] = val.strip() elif line.startswith('queue'): queue_glob = line.rsplit(' ', 1)[1] for idx, cur_file in enumerate(sorted(glob.glob(queue_glob))): with open(os.path.join(self.condor_output_folder, 'log', 'out.%d' % (idx,)), 'wt') as out, \ open(os.path.join(self.condor_output_folder, 'log', 'err.%d' % (idx,)), 'wt') as err: this_args = submit_dict['arguments'].replace('$(filename)', os.path.abspath(cur_file))[2:-1].split(' ')[submit_dict['executable'], this_args[0], this_args[1]], cwd=self.working_directory, stderr=err, stdout=out)
[docs] def submit(self, do_submit=True): """ Runs the added jobs on the cluster. Parameters ---------- do_submit : :class:`bool`, optional Set this to false to not actually submit but prepare all files. """ self._remove_notrunning_jobs() self._prepare_jobs_directory() self._generate_runner_file() submit_fname = self._generate_submit_file() if do_submit: output = subprocess.check_output([self._condor_submit, submit_fname], stderr=subprocess.STDOUT, cwd=self.working_directory) print('Submit output:\n%s\n' % (output, )) else:'Not actually submitting')
def _remove_notrunning_jobs(self): new_job_list = [] for cur_job in self._jobs: cur_job_object = cur_job.make_object() if cur_job_object.shall_run_private(): new_job_list.append(cur_job) self._jobs = new_job_list def _generate_submit_file(self): template = self._jinja_env.get_template(self._submit_template) submit_context = { 'executable': self.executable, 'arguments': self.arguments, 'jobs_dir': self.condor_output_folder, 'required_mem': int(self.required_ram / 1024 / 1024), 'request_cpus': self.request_cpus, 'owner': self.owner, 'uuid': str(uuid.uuid4()), 'python_home': self.python_home, } submit_rendered = template.render(submit_context) submit_fname = os.path.join(self.condor_output_folder, 'condor', 'submit.sub') with open(submit_fname, 'wt') as submit_file: submit_file.write(submit_rendered) return submit_fname def _generate_runner_file(self): template = self._jinja_env.get_template(self._runner_template) runner_context = { 'requested_ram': int(self.required_ram / 1024 / 1024), 'working_directory': self.working_directory } runner_rendered = template.render(runner_context) with open(self.runner_filename, 'wt') as runner_file: runner_file.write(runner_rendered) def _prepare_jobs_directory(self): final_jobs_dir = None if self.inc_jobsdir: pathlib.Path(self.jobs_dir).mkdir(exist_ok=True) jobs_idx = 1 while os.path.exists(os.path.join(self.jobs_dir, '%03d' % (jobs_idx, ))): jobs_idx = jobs_idx + 1 final_jobs_dir = os.path.join(self.jobs_dir, '%03d' % (jobs_idx, )) else: final_jobs_dir = self.jobs_dir if os.path.exists(final_jobs_dir): raise ValueError('The jobs folder already exists. Please choose another one') pathlib.Path(final_jobs_dir).mkdir(exist_ok=True) json_folder = os.path.join(final_jobs_dir, 'condor') pathlib.Path(json_folder).mkdir(exist_ok=True) log_folder = os.path.join(final_jobs_dir, 'log') pathlib.Path(log_folder).mkdir(exist_ok=True) idx = 1 for cur_job in self._jobs: cur_job.to_json(os.path.join(json_folder, 'job%03d.json.gzip' % (idx, ))) idx = idx + 1 self._condor_output_folder = os.path.abspath(final_jobs_dir) shutil.copytree(os.path.split(inspect.getsourcefile(inspect.getmodule(JobItem)))[0], os.path.join(json_folder, 'obob_condor')) return final_jobs_dir @property def owner(self): return self._owner @owner.setter def owner(self, owner): if not owner: owner = getpass.getuser() self._owner = owner @property def required_ram(self): return self._required_ram @required_ram.setter def required_ram(self, required_ram): if isinstance(required_ram, six.string_types): required_ram = humanfriendly.parse_size(required_ram) if not isinstance(required_ram, (int, float)): raise TypeError('required_ram must be either a string or a number.') self._required_ram = required_ram @property def python_bin(self): return self._python_bin @python_bin.setter def python_bin(self, python_bin): if not python_bin: python_bin = sys.executable if not python_bin.startswith('/mnt/obob/'): logging.warning('Your current python distribution is not on /mnt/obob.\n' 'Using the default one in /mnt/obob/obob_mne now.\n' 'If you want to use a different one, please specify it explicitly in the constructor.') python_bin = self._default_python_env if not os.path.isfile(python_bin): raise ValueError('The python interpreter does not exist.') if not 'Python' in str(subprocess.check_output([python_bin, '-V'], stderr=subprocess.STDOUT)): raise ValueError('Cannot execute the python interpreter or it is not a python interpreter') self._python_bin = python_bin @property def singularity_command(self): return '/usr/bin/singularity' @property def singularity_arguments(self): return f'exec -S /home/{self.owner} -B /var/lib/condor -B /mnt --no-home --writable-tmpfs {self._singularity_image} ' \ f'{self.python_bin} {self.runner_filename}' @property def executable(self): if self._singularity_image is not None: return self.singularity_command else: return self.python_bin @property def arguments(self): if self._singularity_image is not None: return self.singularity_arguments else: return self.runner_filename @property def python_home(self): return os.path.dirname(os.path.dirname(self.python_bin)) @property def working_directory(self): return self._working_directory @working_directory.setter def working_directory(self, working_directory): if not working_directory: working_directory = os.getcwd() if not os.path.isdir(working_directory): raise ValueError('working_directory is not a valid directory.') if not working_directory.startswith('/mnt/obob/'): logging.warning('working_directory is not on the storage. This will not work on the real cluster!') self._working_directory = working_directory @property def condor_output_folder(self): return self._condor_output_folder @property def n_jobs(self): return len(self._jobs) @property def runner_filename(self): return os.path.join(self.condor_output_folder, 'condor', '')
[docs]class PermuteArgument(object): """ This is a container for to-be-permuted arguments. See the example in the introductions for details. """ def __init__(self, args): self.args = args