Source code for blumycelium.machine_elf

from . import utils as ut
from . import custom_types
from . import graph_parameters as gp
from .the_exceptions import *

from .import *

from icecream import ic

[docs]class ValuePlaceholder(gp.Value): """A placeholder for a value return by a task""" def _init(self, run_job_id, name, worker_elf, *args, **kwargs): super(ValuePlaceholder, self)._init(*args, **kwargs) self.run_job_id = run_job_id self.name = name self.result_id = self.get_result_id(self.run_job_id, self.name) self.worker_elf = worker_elf
[docs] def set_origin(self, result_id): """set the result id represented by the placeholder""" self.parameter.set_origin(self.result_id, self.worker_elf.mycelium.get_result)
@classmethod def get_result_id(cls, job_id, name): return ut.legalize_key(job_id + name)
[docs]class TaskReturnPlaceHolder: """A place for the return of a task. Works as a dict, where each value is a key""" def __init__(self, worker_elf, task_function, run_job_id): self.task_function = task_function self.worker_elf = worker_elf self.run_job_id = run_job_id self.parameters = {} self.is_none = False
[docs] def make_placeholder(self): """make and validate the placeholder""" from inspect import signature, _empty sig = signature(self.task_function) ret_annotation = sig.return_annotation self.is_none = False if (ret_annotation is _empty) : raise EmptyAnnotationError("Task return annotation cannot be empty, expecting None, dict, tuple or list of parameter keys. Got: '%s'" % ret_annotation) if (ret_annotation is None) : self.is_none = True elif (type(ret_annotation) not in [list, tuple, dict]) : raise EmptyAnnotationError("Task return annotation cannot be empty, expecting None, dict, tuple or list of parameter keys. Got: '%s'" % ret_annotation) if not self.is_none: if type(ret_annotation) is dict: for name, value in ret_annotation.items(): value_place = ValuePlaceholder(self.run_job_id, name, worker_elf=self.worker_elf, as_type=value) value_place.set_origin(value_place.result_id) self.parameters[name] = value_place else: for name in ret_annotation: value_place = ValuePlaceholder(self.run_job_id, name, worker_elf=self.worker_elf) value_place.set_origin(value_place.result_id) self.parameters[name] = value_place
[docs] def get_result_id(self, name): """return the result_id for value in the placeholder""" if name not in self.parameters: raise PlaceHolerKeyError("Placeholder has no parameter: '%s'" % name) return self.parameters[name].result_id
def __getitem__(self, key): if self.is_none: return None return self.parameters[key] def __str__(self): if self.is_none: return "*-%s: None-*"% self.__class__.__name__ return "*-%s: %s-*" %( self.__class__.__name__, str(self.parameters) ) def __repr__(self): return str(self)
[docs]class TaskParameters: """Parameters of a task""" def __init__(self, fct, run_job_id, worker_elf): from inspect import signature self.run_job_id = run_job_id self.worker_elf = worker_elf self.signature = signature(fct) self.final_args = {} def set_parameters(self, *args, **kwargs): from inspect import _empty ret_annotation = self.signature.return_annotation parameters = self.signature.parameters self.final_args = {} for pid, param in enumerate(parameters): if pid < len(args): self.final_args[param] = args[pid] elif param in kwargs: if param in self.final_args: raise RedundantParameterError("Got 2 values for parameter '{param}', ( {val1} & {val2})".format(param, self.final_args[param], kwargs[param])) self.final_args[param] = kwargs[param] else: if parameters[param].default is _empty: raise RedundantParameterError("Mandatory parameter '{param}' missing)".format(param=param)) self.final_args[param] = parameters[param].default return self.final_args
[docs] def find_placeholders_in_key_value_iterrator(self, iterator): """find placeholders in dicts lists etc...""" placeholders = {} for key, value in iterator: if isinstance(value, ValuePlaceholder): placeholders[str(key)] = value return placeholders
[docs] def validate(self): """returns placeholders""" from inspect import _empty accepted_types = [dict, list, tuple, int, float, bool] parameters = self.signature.parameters for param, value in self.final_args.items(): if not isinstance(value, ValuePlaceholder): param_clas = None if not parameters[param].annotation is _empty : param_clas = parameters[param].annotation elif not parameters[param].default is _empty : param_clas = type(parameters[param].default) if not (param_clas is type(None)) and param_clas and not isinstance(value, param_clas): raise ParameterTypeError("Param '{param}' has the wrong type {type} expected {exp})".format(param=param, type=type(value), exp=param_clas)) if param_clas and (not param_clas in accepted_types ) : raise ParameterTypeError("Param '{param}' has the wrong type {type} expected one of the following: {exp})".format(param=param, type=type(value), exp=accepted_types))
# def get_storable_type(self, obj): # """ # get a type storable by the mycellum for obj # [list, tuple] -> array # dict -> obj # [str, float, int] -> primitive # None -> null # """ # if type(obj) in [list, tuple]: # return "array" # elif type(obj) is dict: # return "object" # elif type(obj) in [str, float, int]: # return "primitive" # elif type(obj) in type(None): # return "null" # else: # raise Exception("Unknown type; %s must be in %s" % (type(obj), [list, tuple, dict, str, float, int]))
[docs] def get_job_dependencies(self): """return the dependencies of a job (jobs that must run before) based on the parameters received""" def _rec(deps, iterator): for arg in iterator: if isinstance(arg, ValuePlaceholder): deps.append(arg.run_job_id) elif type(arg) in [list, tuple, set]: _rec(deps, arg) elif type(arg) is dict: _rec(deps, arg.values()) dependencies = [] _rec(dependencies, self.final_args.values()) return dependencies
[docs] def get_parameter_dict(self): """return a dictionary of parameters""" params = {} for name, arg in self.final_args.items(): if type(arg) in [dict, list, tuple, set]: param = gp.unravel(arg) val = ValuePlaceholder(self.run_job_id, name, worker_elf=self.worker_elf) val.parameter = param elif not isinstance(arg, ValuePlaceholder): val = ValuePlaceholder(self.run_job_id, name, worker_elf=self.worker_elf) val.set_value(arg) else: val = arg params[name] = val.traverse(to_dict=True) return params
[docs] @classmethod def develop(cls, mycelium, dct_params:dict): """compute the value of parameters""" ret = {} for key, trav in dct_params.items(): ret[key] = gp.GraphParameter.build_from_traversal(trav, pull_origin_function=mycelium.get_result) ret[key] = ret[key].make() return ret
[docs]class Job: """a Job for an elf""" def __init__( self, task, run_job_id, worker_elf, parameters, start_date, completion_date, status, mycelium, return_placeholder, dependencies ): self.task=task self.run_job_id=run_job_id self.worker_elf=worker_elf self.parameters=parameters self.start_date=start_date self.completion_date=completion_date self.status=status self.mycelium = mycelium self.return_placeholder=return_placeholder self.dependencies = dependencies
[docs] def commit(self): """save the job to the mycelium""" self.mycelium.push_job(self)
[docs]class Task: """Represent a task for an elf""" def __init__(self, machine_elf, function, name): self.machine_elf = machine_elf self.function = function self.name = name # self.parameters = None self.return_placeholder = None self.source_code = None self.revision = None self.documentation = None self.signature = None self.inspect_function()
[docs] def inspect_function(self): """inspect the function of a task""" import hashlib self.source_code = ut.inpsect_none_if_exception_or_empty(self.function, "getsource") self.revision = ut.legalize_key(self.name + ut.get_hash_key(self.source_code)) self.documentation = ut.inpsect_none_if_exception_or_empty(self.function, "cleandoc") self.signature = ut.inpsect_none_if_exception_or_empty(self.function, "signature")
[docs] def wrap(self, *args, **kwargs): """wrap the function inside a new function that creates a Job""" args = args kwargs = kwargs run_job_id = ut.legalize_key( self.name + ut.getuid() ) # run_job_id = ut.getuid() def _wrapped(): parameters = TaskParameters(self.function, run_job_id=run_job_id, worker_elf=self.machine_elf) parameters.set_parameters(*args, **kwargs) placeholders = parameters.validate() return_placeholder = TaskReturnPlaceHolder(worker_elf=self.machine_elf, task_function=self.function, run_job_id=run_job_id) return_placeholder.make_placeholder() dependencies = parameters.get_job_dependencies() # if len(placeholders) > 0: # dependencies = [phold.run_job_id for phold in placeholders] job = Job( task = self, run_job_id = run_job_id, worker_elf = self.machine_elf, parameters = parameters, start_date = None, completion_date = None, status = custom_types.STATUS["PENDING"], mycelium = self.machine_elf.mycelium, return_placeholder=return_placeholder, dependencies = dependencies ) job_id = job.commit() return return_placeholder return _wrapped
[docs] def run(self, job_id, *args, **kwargs): """run the task""" fct_ret = self.function(*args, **kwargs) if fct_ret is None: return None ret = {} for name, value_ret in fct_ret.items(): ret[name] = { "result_id": ValuePlaceholder.get_result_id(job_id, name), "value": value_ret } return ret
def __call__(self, *args, **kwargs): return self.wrap( *args, **kwargs )() def __str__(self): return "*-Task '%s.%s': %s -*" % (self.machine_elf.uid, self.name, self.signature) def __repr__(self): return str(self)
[docs]class MachineElf: """An elf that runs tasks""" def __init__(self, uid, mycelium): self.uid = ut.legalize_key(uid) self.mycelium = mycelium self.tasks = {} self.source_code = None self.revision = None self.documentation= None self.inspect_self() self.find_tasks()
[docs] def inspect_self(self): """inspect the elf to get the source code etc...""" self.source_code = ut.inpsect_none_if_exception_or_empty(self.__class__, "getsource") self.revision = ut.get_hash_key(self.source_code+self.uid, prefix=self.__class__.__name__ ) self.documentation = ut.inpsect_none_if_exception_or_empty(self.__class__, "cleandoc")
[docs] def find_tasks(self): """find all function whose name starts by `task_` """ import inspect for name, member in inspect.getmembers(self): if name.startswith("task_") and inspect.ismethod(member): task = Task(self, member, name) self.tasks[name] = task setattr(self, name, task)
[docs] def register(self, store_source=False): """register the elf to the mycelium""" self.mycelium.register_machine_elf(self, store_source)
[docs] def get_jobs(self): """retuen the list of jobs for an elf""" return self.mycelium.get_jobs(self.uid)
def is_job_ready(self, job_id): return self.mycelium.is_job_ready(job_id)
[docs] def start_jobs(self, store_failures=True, raise_exceptions=True): """start a job for an elf""" jobs = self.mycelium.get_jobs(self.uid) for job in jobs: if self.mycelium.is_job_ready(job["id"]) : params = self.mycelium.get_job_parameters(job["id"]) try: params = TaskParameters.develop(self.mycelium, params) except ResultNotFound: print(">>Unable to retrieve result. Will try again later.") else: self.run_task(job["id"], job["task"]["name"], params, store_failures=store_failures, raise_exceptions=raise_exceptions)
[docs] def run_task(self, job_id:str, task_name:str, parameters:dict, store_failures:bool, raise_exceptions:bool): """run a task for an elf""" import sys try: task = getattr(self, task_name) self.mycelium.start_job(job_id) ret = task.run(job_id, **parameters) self.mycelium.store_results(job_id, ret) self.mycelium.complete_job(job_id) except Exception as exp: if store_failures: exc_type, exc_value, exc_traceback = sys.exc_info() self.mycelium.register_job_failure(exc_type, exc_value, exc_traceback, job_id) if raise_exceptions: raise exp return ret