Source code for obob_condor.job

# -*- coding: UTF-8 -*-
# Copyright (c) 2018, Thomas Hartmann
#
# This file is part of the obob_condor Project, see: https://gitlab.com/obob/obob_condor
#
#    obob_condor is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    obob_condor is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with obob_subjectdb. If not, see <http://www.gnu.org/licenses/>.
import gzip
import importlib
import inspect
import json
import os
import sys
import six
from copy import deepcopy
import hashlib
from pathlib import Path

class JobBase(object):
    def __init__(self, *args, **kwargs):
        self._args = args
        self._kwargs = kwargs

    @property
    def args(self):
        return deepcopy(self._args)

    @property
    def kwargs(self):
        return deepcopy(self._kwargs)

    def run(self, *args, **kwargs):
        raise NotImplementedError

    def shall_run(self, *args, **kwargs):
        return True

    def run_private(self):
        return self.run(*self.args, **self.kwargs)

    def shall_run_private(self):
        return self.shall_run(*self.args, **self.kwargs)

    def make_hash_from_args(self):
        args_dict = {
            'args': self.args,
            'kwargs': self.kwargs
        }

        args_json = json.dumps(args_dict, sort_keys=True)

        hash = hashlib.blake2b(args_json.encode(), digest_size=5).hexdigest()
        return hash

[docs]class Job(JobBase): """ Abstract class for Jobs. This means, in order to define you own jobs, they need to be a subclass of this one. You **must** implement (i.e. define in your subclass) the :func:`run` method. The run method can take as many arguments as you like. Only the types of arguments are restricted because they need to be saved to disk. In general, strings, numbers, lists and dictionaries are fine. You **can** implement :func:`shall_run`. This can be used to see whether some output file already exists and restrict job submission to missing files. """
[docs] def run(self, *args, **kwargs): """ Implement this method to do the job. """ raise NotImplementedError
[docs] def shall_run(self, *args, **kwargs): """ This is an optional method. It gets called with the same arguments as the :func:`run` method, **before** the job is submitted. If it returns True, the job is submitted, if it returns False, it is not. """ return True
class JobItem(object): """ Internal class for items in the job queue """ def __init__(self, job_class_or_file, *args, **kwargs): if isinstance(job_class_or_file, (six.string_types, Path)): self._init_from_json(job_class_or_file) elif Job in inspect.getmro(job_class_or_file): self._init_from_class(job_class_or_file, *args, **kwargs) else: raise TypeError('JobItem needs either a filename or a job class') def _init_from_class(self, job_class, *args, **kwargs): self.job_module = inspect.getmodule(job_class).__name__ if self.job_module == '__main__': (path, f_name) = os.path.split(sys.argv[0]) self.job_module = os.path.splitext(f_name)[0] self.job_class = job_class.__name__ self.args = args self.kwargs = kwargs def _init_from_json(self, f_name): with gzip.open(f_name, 'rt') as gzip_file: raw_dict = json.load(gzip_file) self.job_class = raw_dict['job_class'] self.job_module = raw_dict['job_module'] self.args = raw_dict['args'] self.kwargs = raw_dict['kwargs'] def make_object(self): mod = importlib.import_module(self.job_module) this_class = getattr(mod, self.job_class) return this_class(*self.args, **self.kwargs) def to_json(self, f_name): with gzip.open(f_name, 'wt') as gzip_file: json.dump({ 'job_class': self.job_class, 'job_module': self.job_module, 'args': self.args, 'kwargs': self.kwargs }, gzip_file) def __str__(self): return '.'.join((self.job_module, self.job_class))
[docs]class AutomaticFilenameJob(Job): """ Abstract class for Jobs providing automatic filename generation. In order for this to work, you need to: 1. Set :attr:`base_data_folder` and :attr:`job_data_folder` as a class attribute. 2. If you use :meth:`shall_run`, you need to do the super call. This class then automatically creates the filename for each job using all the keyword arguments supplied. Please take a look at :doc:`autofilename` for detailed examples. Attributes ---------- base_data_folder : str or pathlib.Path The base folder for the data. Is normally set once for all jobs of a project. job_data_folder : str or pathlib.Path The folder where the data for this job should be saved. exclude_kwargs_from_filename : list Normally, all keyword arguments are used to build the filename. if you want to exclude some of them, put the key in the list here. include_hash_in_fname : bool Include a hash of all arguments in the filename. This is helpful if you excluded some keyword arguments from filename creation but still need to get distinct filename. run_only_when_not_existing : bool If true, this job will only run if the file does not already exist. create_folder : bool If true, calling folders are created automatically data_file_suffix : str, optional The extension of the file. Defaults to `.dat` """ base_data_folder = '' job_data_folder = '' exclude_kwargs_from_filename = [] include_hash_in_fname = False run_only_when_not_existing = True create_folders = True data_file_suffix = '.dat' def __init__(self, *args, **kwargs): if 'subject_id' not in kwargs: self.subject_id = 'dummy' else: self.subject_id = kwargs['subject_id'] super().__init__(*args, **kwargs)
[docs] @classmethod def get_full_data_folder(cls): """ Return the data folder for this job (i.e. :attr:`base_data_folder` plus :attr:`job_data_folder`). """ if not cls.base_data_folder: raise ValueError('base_data_folder must be set') if not cls.job_data_folder: raise ValueError('job_data_folder must be set') folder = Path(cls.base_data_folder, cls.job_data_folder) if cls.create_folders: folder.mkdir(exist_ok=True, parents=True) return folder
@property def output_folder(self): """ pathlib.Path: The output folder for this subject. """ folder = Path(self.get_full_data_folder(), self.subject_id) if self.create_folders: folder.mkdir(exist_ok=True, parents=True) return folder @property def output_filename(self): """ str: The filename for this subject. """ f_name_list = [self.subject_id] for key, val in self._kwargs.items(): if key == 'subject_id' or key in self.exclude_kwargs_from_filename: continue f_name_list.append('%s_%s' % (key, str(val))) if self.include_hash_in_fname: f_name_list.append(self.make_hash_from_args()) f_name = '__'.join(f_name_list) + self.data_file_suffix return f_name @property def full_output_path(self): """ pathlib.Path: The full path to the output file. """ return Path(self.output_folder, self.output_filename)
[docs] def shall_run(self, *args, **kwargs): this_shall_run = True if self.run_only_when_not_existing: this_shall_run = not self.full_output_path.exists() return this_shall_run and super().shall_run(*args, **kwargs)