from .task import Task
from .data import to_dataobj
from .input import Input, InputBase
from .output import Output, OutputBase, OutputDir
from .data import DataObject
from ..common.data_type import DataType
from ..common.utils import short_str
import shlex
[docs]class Concat(Task):
"""
Creates a task concatenating the given data objects.
"""
TASK_TYPE = "buildin/concat"
def __init__(self, inputs, *, name=None, session=None):
super().__init__(inputs, 1, name=name, session=session)
[docs]class Sleep(Task):
"""
Task that forwards argument 'dataobj' after 'timeout' seconds.
The type of resulting data object is the same as type of input data object
This task serves for testing purposes.
Args:
input (`DataObject`): The object to pass through.
timeout (`float`): Number of seconds to wait, converted to whole miliseconds.
cpus (`int`): Number of CPUs to reserve, for testing purposes.
"""
TASK_TYPE = "buildin/sleep"
def __init__(self, input, timeout, *, name=None, session=None, cpus=1):
input = to_dataobj(input)
otype = Output if input.spec.data_type == DataType.BLOB else OutputDir
output = otype(content_type=input.content_type)
# , size_hint=input.spec.size_hint) TODO: Add size_hint
super().__init__(
(input,), (output,), config=float(timeout), name=name, cpus=cpus, session=session)
[docs]class Load(Task):
"""
Load and output a file at the given path (at the worker).
"""
TASK_TYPE = "buildin/open"
def __init__(self, path, output=None, *, name=None, session=None):
if output is None:
output = Output()
output.expect_blob()
super().__init__([], (output,), config={"path": path}, name=name, session=session)
[docs]class LoadDir(Task):
"""
Load and output a directory at the given path (at the worker).
TODO: Implement
"""
TASK_TYPE = "buildin/open_dir"
def __init__(self, path, output=None, *, name=None, session=None):
if output is None:
output = OutputDir()
output.expect_dir()
super().__init__([], OutputDir(), config={"path": path}, name=name, session=session)
[docs]class Store(Task):
"""
Store the given object (blob or directory) at the given path (at the worker).
"""
TASK_TYPE = "buildin/export"
def __init__(self, input, path, *, name=None, session=None):
super().__init__((input, ), 0, config={"path": path}, name=name, session=session)
[docs]class MakeDirectory(Task):
"""
Create a directory from other objects (blobs or other directories).
Args:
paths_objects: An iterable of pairs `(path, obj)` or a dictionary `{path: obj}`
where `path` is the new relative path. Paths of some of the more directory
object may be `''` or `'.'` to use them as the base directory.
TODO: Specify behavior on overlapping subdirs/contents.
"""
TASK_TYPE = "buildin/make_directory"
def __init__(self, paths_objects, *, name=None, session=None):
if isinstance(paths_objects, dict):
paths_objects = paths_objects.items()
try:
paths_objects = list(paths_objects)
paths, inputs = zip(*paths_objects)
except (TypeError, ValueError) as e:
raise TypeError("MakeDirectory needs an iterable of pairs "
"`(path, obj)` or a dictionary `{path: obj}`") from e
super().__init__(
inputs, outputs=(OutputDir(),), config={"paths": paths}, name=name, session=session)
[docs]class SliceDirectory(Task):
"""Extract a file from a directory.
Args:
input (`DataObject`): A directory object to slice.
path (`[str]`): An iterable of paths. If the path ends with a slash `'/'`, the output
is a directory object, otherwise a file object is creates.
output (`Output` or `OutputDir`): An optional output specification.
"""
TASK_TYPE = "buildin/slice_directory"
def __init__(self, input, path, output=None, *, name=None, session=None):
input = to_dataobj(input)
input.expect_dir()
if output is None:
if path.endswith('/'):
output = OutputDir(path)
else:
output = Output(path)
if path.endswith('/'):
output.expect_dir()
else:
output.expect_blob()
super().__init__((input,), (output,), config={"path": path}, name=name, session=session)
[docs]class Execute(Task):
"""
A task executing a single external program with rich argument support.
"""
TASK_TYPE = "buildin/run"
def __init__(self, args,
stdout=None,
stdin=None,
input_paths=(),
output_paths=(),
shell=False,
*,
name=None,
session=None,
cpus=1):
ins = []
outs = []
if stdout is not None:
if stdout is True:
stdout = "stdout"
stdout = OutputBase._for_program(stdout, label="stdout", execute=True)
# '+out' is the file name of where stdout is redirected
stdout.path = "+out"
outs.append(stdout)
if stdin is not None:
# '+in' is the file name of where stdin is redirected
stdin = InputBase._for_program(stdin, label="stdin", execute=True)
stdin.path = "+in"
ins.append(stdin)
ins += [InputBase._for_program(obj, execute=True, label_as_path=True)
for obj in input_paths]
outs += [OutputBase._for_program(obj, execute=True, label_as_path=True)
for obj in output_paths]
if isinstance(args, str):
args = shlex.split(args)
proc_args = []
for i, a in enumerate(args):
argname = "arg{}".format(i)
if isinstance(a, str):
proc_args.append(a)
elif isinstance(a, InputBase) or isinstance(a, DataObject) or isinstance(a, Task):
arg = Input._for_program(a, execute=True, label=argname)
ins.append(arg)
proc_args.append(arg.path)
elif isinstance(a, OutputBase):
arg = OutputBase._for_program(a, execute=True, label=argname)
outs.append(arg)
proc_args.append(arg.path)
else:
raise Exception(
"{}. argument is invalid, it has to be string, Input, or Output, not {}"
.format(i, type(a)))
if shell:
proc_args = ("/bin/sh", "-c", " ".join(proc_args))
# proc_args = ("/bin/sh", "-c", " ".join(shlex.quote(a) for a in proc_args))
task_inputs = [obj.dataobj for obj in ins]
task_outputs = [output.create_data_object() for output in outs]
config = {
"args": proc_args,
"in_paths": [{"path": obj.path, "write": obj.write} for obj in ins],
"out_paths": [obj.path for obj in outs]}
super().__init__(
task_inputs, task_outputs, cpus=cpus, config=config, name=name, session=session)
def __repr__(self):
return "<{} {}, inputs {}, outputs {}, cmd {!r}>".format(
self.__class__.__name__, self.id, self.inputs,
self.outputs, short_str(self.spec.config["args"]))