Source code for rain.client.output


import collections
from copy import copy

from ..common.content_type import check_content_type, merge_content_types
from ..common.labeled_list import LabeledList
from .data import DataObject, DataType


[docs]class OutputBase: """ A multi-purpose object for specifying output data objects of tasks. May be used in task factory construction (e.g. in `@remote` and `Program`), or in concrete task instantiation (as `outputs=[...]` or `output=...`). A default label is the number of the output in the task. """ data_type = None def __init__(self, label=None, *, size_hint=None, content_type=None, mode=None, encode=None, path=None): assert self.data_type is not None self.label = label self.size_hint = size_hint self.content_type = content_type check_content_type(self.content_type) assert mode is None, "Data object modes not supported yet" self.encode = encode if (self.encode is not None and self.content_type is not None and self.content_type != self.encode and self.content_type != ""): raise ValueError( "When specifying both encode and content_type " + "for Output, they must match.") self.path = path def _to_json(self): return {k: v for (k, v) in self.__dict__.items() if v is not None} def _check_for_task(self, task, order): "Check the output for a task instance creation." if self.encode is not None or self.path is not None: raise ValueError("Task Outputs do not accept `encode`, `path`.") def _check_for_remote(self, pytask): "Check and finalize the output for a Remote (pytask) factory." if self.path is not None: raise ValueError("Python remote task Outputs do not accept `path`.")
[docs] def merge_with_prototype(self, proto): "Return a copy of self updated with `Output` `proto` properties." assert isinstance(proto, OutputBase) o = copy(self) if o.size_hint is None: o.size_hint == proto.size_hint if o.label is None: o.label = proto.label if o.path is None: o.path = proto.path o.content_type = merge_content_types(o.content_type, proto.content_type) o.encode = merge_content_types(o.encode, proto.encode) return o
def create_data_object(self, session=None): d = DataObject(label=self.label, session=session, data_type=self.data_type, content_type=self.content_type) if self.size_hint is not None: d.spec.size_hint = self.size_hint return d @classmethod def _for_program(cls, out, label=None, execute=False, label_as_path=False): """ Create `Output` from `Output` or `str` for `Program` or `execute`. """ if isinstance(out, str): out = Output(out) if not isinstance(out, OutputBase): raise TypeError("Object {!r} cannot be used as output".format(out)) if out.label is None: out.label = label if out.label is None: raise ValueError("Program/execute Outputs need `label`") if out.encode is not None: raise ValueError("Program/execute Outputs do not accept `encode`.") if execute and out.path is None: if label_as_path: out.path = out.label else: out.path = "out_{}".format(out.label) return out
[docs] def expect_dir(self): """Raise TypeError if the Output is not a directory data-type.""" if self.data_type != DataType.DIRECTORY: raise TypeError("Directory output object expected.")
[docs] def expect_blob(self): """Raise TypeError if the Output is not a directory data-type.""" if self.data_type != DataType.BLOB: raise TypeError("Directory output object expected.")
[docs]class Output(OutputBase): data_type = DataType.BLOB def __repr__(self): if self.path is not None: return "<Output {!r} path={!r}>".format(self.label, self.path) else: return "<Output {!r}>".format(self.label)
[docs]class OutputDir(OutputBase): data_type = DataType.DIRECTORY def __repr__(self): if self.path is not None: return "<OutputDir {!r} path={!r}>".format(self.label, self.path) else: return "<OutputDir {!r}>".format(self.label)
def to_output(obj): if isinstance(obj, OutputBase): return obj if isinstance(obj, str): return Output(obj) raise Exception("Object {!r} cannot be used as output".format(obj)) class OutputSpec: """ A base class for task outputs list. Provides input and output specification, checking and instantiation. """ # Required / default outputs; LabeledList of `Output`s outputs = () def __init__(self, outputs=None, output=None): if output is not None: if outputs is not None: raise ValueError("Both `output` and `outputs` not allowed.") outputs = (output,) if isinstance(outputs, int): self.outputs = LabeledList(Output() for i in range(outputs)) elif isinstance(outputs, LabeledList): self.outputs = outputs elif isinstance(outputs, collections.Sequence): self.outputs = LabeledList(outputs) else: raise TypeError("expected int, LabeledList or a sequence " "for `outputs`, got {:r}".format(type(outputs))) for i, (label, output) in enumerate(self.outputs.items()): if isinstance(output, str): self.outputs.set(i, Output(label=output), label=output) elif not isinstance(output, OutputBase): raise TypeError("Only string labels and `Output` accepted in output list.") # Config for auto-encoding self.encode = [None] * len(self.outputs) def instantiate(self, outputs=None, output=None, session=None): """ Create new output `DataObject`s for `Output`s given. Returns a tuple of `LabeledList`s `(outputs, data_objects)`. If both `output=None` and `outputs=None`, creates builder prototype outputs. """ if output is not None: if outputs is not None: raise ValueError("Both `output` and `outputs` not allowed.") outputs = (output,) if outputs is None: outputs = LabeledList(self.outputs) if not isinstance(outputs, LabeledList): if not isinstance(outputs, collections.Sequence): raise TypeError("`outputs` must be None or a sequence type.") outputs = LabeledList(outputs) if len(outputs) != len(self.outputs): raise ValueError("Got {} outputs, {} expected." .format(len(outputs), len(self.outputs))) objs = LabeledList() self.encode = [None] * len(outputs) for i, (label, out) in enumerate(outputs.items()): if i < len(self.outputs): proto = self.outputs[i] else: proto = self.more_outputs if isinstance(out, str): out = Output(label=out) if out is None: out = Output() if not isinstance(out, OutputBase): raise TypeError("Only `Output` and `str` instances accepted in output list.") out_merged = out.merge_with_prototype(proto) if out_merged.label is None: out_merged.label = "out{}".format(i) do = out_merged.create_data_object(session=session) if out_merged.encode is not None: self.encode[i] = out_merged.encode do._spec.content_type = out_merged.encode if out_merged.size_hint is not None: do._spec.size_hint = out_merged.size_hint objs.append(do, label=do._spec.label) return objs