Python API

The Rain Pyton API consists of two domains that observe the workflow graph differently, although the concepts are similar and some classes are used in both contexts.

  • Code run at the client, creating sessions and task graphs, executing and querying sessions. There, the tasks are only created and declared, never actually executed.
  • Python code that runs inside remote Pyhton tasks on the governors. This code has access to the actual input data, but only sees the adjacent data objects (input and output).

Client API

class rain.client.RainException[source]

Generic Rain error.

class rain.client.RainWarning[source]

Generic Rain warning.

class rain.common.ID[source]

A rain task and object ID. A named tuple (session_id, id).

Client

One instance per connection to a server.

class rain.client.Client(address, port)[source]

A client connection object. Can hold multiple Sessions.

get_server_info()[source]

Returns basic server info. Unstable.

Returns:dict – A JSON-like dictionary.
new_session(name='Unnamed Session', default=False)[source]

Creates a new session.

Note the session is destroyed server-side when the client disconnects.

Returns:Session – A new session

Session

One instance per a constructed graph (possibly with multiple submits). Tied to one Client.

class rain.client.Session(client, session_id, default=False)[source]

A container for one task graph.

Do not create directly, rather using Client.new_session(). When used as a context manager, all new objects and tasks are created within the session. Note the session is closed afterwards.

>>> with client.new_session() as s:
...     bl = blob("Hello rain!")
...     tsk = tasks.sleep(1.0, bl)
...     tsk.output.keep()
...     s.submit()
...     print(tsk.output.fetch()) # waits for completion

Currently, the graph and objects are alive on the server only as long as the Session exists.

bind_only()[source]

This method serves to bind session without autoclose functionality.

>>> with session.bind_only() as s:
...     doSometing()

binds the session, but do not close it at the end (so it may be bound again either with bind_only or normally with with session: ...).

close()[source]

Closes session; all tasks are stopped, all objects freed.

dataobj_count

The number of unsubmitted objects.

fetch(dataobject)[source]

Wait for the object to finish, update its state and fetch the object data.

Returns:DataInstance – The object data proxy.
keep_all()[source]

Set keep flag for all unsubmitted objects

make_graph(show_ids=True)[source]

Create a graph of tasks and objects that were not yet submitted.

submit()[source]

“Submit all unsubmitted objects.

task_count

The number of unsubmitted tasks.

unkeep(dataobjects)[source]

Unset keep flag for given objects.

update(items)[source]

Update the status and metadata of given tasks and objects.

wait(items)[source]

Wait until all specified tasks and dataobjects are finished.

wait_all()[source]

Wait until all submitted tasks and objects are finished.

wait_some(items)[source]

Wait until some of specified tasks/dataobjects are finished.

Returns:(finished_tasks, finished_dataobjs)

Data objects

Tied to a Session.

rain.client.blob(value, label='const', content_type=None, encode=None)[source]

Create a constant data object with accompanying data.

Given value may be either bytes or any object to be encoded with encoding content type. Strings are encoded with utf-8 by default. Specify at most one of content_type and encode.

rain.client.pickled(val, label='pickle')[source]

Create a data object with pickled val.

A shorthand for blob(val, ancode='pickle'). The default label is “pickle”.

class rain.client.DataObject(label=None, session=None, data_type=<DataType.BLOB: 'blob'>, content_type=None)[source]
content_type

Final content type (if known and present) or just the spec content_type.

expect_dir()[source]

Raise TypeError if the DataObject is not a directory data-type.

fetch()[source]

Fetch the object data and update its state.

Returns:DataInstance
id

Getter for object ID.

info

Getter for object information. Read only in client! In remote tasks only User’s Guide is writable.

is_kept()[source]

Returns the value of self._keep

keep()[source]

Set flag that is object should be kept on the server

spec

Getter for object specification. Read only!

state

Getter for object state.

unkeep()[source]

Remove data object from the server

Tasks

Tied to a Session.

class rain.client.Task(inputs, outputs, *, config=None, session=None, task_type=None, cpus=1, name=None, user_spec=None)[source]

A task instance in the task graph.

__init__ creates a single task instance, inserts it into Session and assigns it an ID. Creates output DataObject instances based on outputs given.

Task is commonly created by functions in rain.client.tasks, or task builders created by Remote or Program. Always belongs to a Session and has a valid ID. You may wish to call it explicitely (or subclass it) when creating your own task-types.

Particular task types are not realized via subclasses but with string task_type attribute. (Subclassing may be introduced later.)

The task state is not automatically updated by the server. The state and attributes are updated on Task.update(), Task.fetch() and Task.wait().

Parameters:
  • task_type (str) – Task-type name known to rain governors.
  • config – Any task-specific config.
  • inputs (LabeledList or sequence) – Sequence of Input or DataObject.
  • outputs (LabeledList or sequence) – Specification of Outputs for the task.
  • session (Session or None) – Session to create the task in. If not specified, the current Session is used.
  • cpus (int) – Number of cpus.
id

Auto-assigned task ID.

Type:ID
inputs

Input objects.

Type:LabeledList[DataObject]
outputs

Output objects created by the task.

Type:LabeledList[DataObject]
output

Shortcut for outputs[0]. Raises Exception on multiple outputs.

Type:DataObject
spec

Task specification data.

Type:TaskSpec
info

Task state information on last update.

Type:TaskInfo
state

Task state on last update.

Type:TaskState enum
stack

Text description of stack when task was created, used for debug messages

Type:str
fetch_outputs()[source]

Fetch all outputs of the task.

Returns:[DataInstance] – Fetched output data.
id

Getter for Task ID.

info

Getter for Task info on last update (None when never updated). Read only!

inputs

Getter for inputs LabeledList. Read only!

keep_outputs()[source]

Keep all output objects of the task.

output

Getter for the only output of the task. Fails if len(self.outputs)!=1.

outputs

Getter for outputs LabeledList. Read only!

spec

Getter for Task specification. Read only!

state

Getter for Task state on last update.

task_type

Getter for task_type identifier.

unkeep_outputs()[source]

Unkeep all output objects of the task.

update()[source]

Update task state and attributes. See Session.update().

wait()[source]

Wait for the task to complete. See Session.wait().

Attributes

Input and Output

These are helper objects are used to specify task input and output attributes. In particular, specifying an Output is the preferred way to set properties of the output DataObject.

class rain.client.InputBase(label=None, path=None, dataobj=None, load=None, content_type=None, write=False)[source]
class rain.client.Input(label=None, path=None, dataobj=None, load=None, content_type=None, write=False)[source]
class rain.client.InputDir(label=None, path=None, dataobj=None, load=None, content_type=None, write=False)[source]
class rain.client.OutputBase(label=None, *, size_hint=None, content_type=None, mode=None, encode=None, path=None)[source]

A multi-purpose object for specifying output data objects of tasks.

May be used in task factory construction (e.g. in @remote and Program), or in concrete task instantiation (as outputs=[...] or output=...).

A default label is the number of the output in the task.

expect_blob()[source]

Raise TypeError if the Output is not a directory data-type.

expect_dir()[source]

Raise TypeError if the Output is not a directory data-type.

merge_with_prototype(proto)[source]

Return a copy of self updated with Output proto properties.

class rain.client.Output(label=None, *, size_hint=None, content_type=None, mode=None, encode=None, path=None)[source]
class rain.client.OutputDir(label=None, *, size_hint=None, content_type=None, mode=None, encode=None, path=None)[source]

Builtin tasks and external programs

Native Rain tasks to be run at the governors.

class rain.client.tasks.Concat(inputs, *, name=None, session=None)[source]

Creates a task concatenating the given data objects.

TASK_TYPE = 'buildin/concat'
class rain.client.tasks.Execute(args, stdout=None, stdin=None, input_paths=(), output_paths=(), shell=False, *, name=None, session=None, cpus=1)[source]

A task executing a single external program with rich argument support.

TASK_TYPE = 'buildin/run'
class rain.client.tasks.Load(path, output=None, *, name=None, session=None)[source]

Load and output a file at the given path (at the worker).

TASK_TYPE = 'buildin/open'
class rain.client.tasks.LoadDir(path, output=None, *, name=None, session=None)[source]

Load and output a directory at the given path (at the worker).

TODO: Implement

TASK_TYPE = 'buildin/open_dir'
class rain.client.tasks.MakeDirectory(paths_objects, *, name=None, session=None)[source]

Create a directory from other objects (blobs or other directories).

Parameters:paths_objects – An iterable of pairs (path, obj) or a dictionary {path: obj} where path is the new relative path. Paths of some of the more directory object may be '' or '.' to use them as the base directory.

TODO: Specify behavior on overlapping subdirs/contents.

TASK_TYPE = 'buildin/make_directory'
class rain.client.tasks.Sleep(input, timeout, *, name=None, session=None, cpus=1)[source]

Task that forwards argument ‘dataobj’ after ‘timeout’ seconds. The type of resulting data object is the same as type of input data object This task serves for testing purposes.

Parameters:
  • input (DataObject) – The object to pass through.
  • timeout (float) – Number of seconds to wait, converted to whole miliseconds.
  • cpus (int) – Number of CPUs to reserve, for testing purposes.
TASK_TYPE = 'buildin/sleep'
class rain.client.tasks.SliceDirectory(input, path, output=None, *, name=None, session=None)[source]

Extract a file from a directory.

Parameters:
  • input (DataObject) – A directory object to slice.
  • path ([str]) – An iterable of paths. If the path ends with a slash '/', the output is a directory object, otherwise a file object is creates.
  • output (Output or OutputDir) – An optional output specification.
TASK_TYPE = 'buildin/slice_directory'
class rain.client.tasks.Store(input, path, *, name=None, session=None)[source]

Store the given object (blob or directory) at the given path (at the worker).

TASK_TYPE = 'buildin/export'
class rain.client.Program(args, stdout=None, stdin=None, input_paths=(), output_paths=(), shell=False, name=None, cpus=1)[source]

Data instance objects

Tied to a session and a DataObject. Also used in Remote Python tasks.

class rain.common.DataInstance(data_type, *, data=None, path=None, data_object=None, content_type=None, info=None, spec=None, object_id=None)[source]

Instance of Data object with data or file reference.

This serves as a proxy to a finished DataObject. The class is used in a python task in executors and as a result of DataObject.fetch.

The user should not manually create this object, but always use fetch() or a method on python task context.

get_bytes()[source]

Return the data as bytes. May read them from the disk.

get_str()[source]

Shortcut for get_bytes().decode()

load(cache=False)[source]

Load object according content type, optionally caching the result.

If you set cache=True, you must not modify the returned object as it may be shared between loads or even tasks. With cache=False, you get a new copy every time.

write(path)[source]

Write fresh copy of data into target path.

Resources

Note

TODO: Describe and document task resources.

class rain.common.LabeledList(items=None, labels=None, pairs=None)[source]

List data structure with additional optional unique labels for items. Supports all list operations except sort (in general collections.MutableSequence).

Indexing l[x] accepts either an integer, slice or a label. Modifying the sequence using l[x]=42 clears the label. Use l.set(x, 42, label='answer') or l.set_label(x, 'answer').

Labels may be any hashable objects except None (which represents no label) or int or slice (which are used for array indexing). The labels must be unique.

append(val, label=None)[source]

S.append(value) – append value to the end of the sequence

get_label(idx)[source]

Return the label for given index.

insert(idx, val, label=None)[source]

S.insert(index, value) – insert value before index

items()[source]

Return iterator over (label, value) pairs. Do not modify the list while active.

set(idx, val, label=None)[source]

Assign to the given index, always setting its label to label.

set_label(idx, label)[source]

Set label for given index.

Labeled list

class rain.common.LabeledList(items=None, labels=None, pairs=None)[source]

List data structure with additional optional unique labels for items. Supports all list operations except sort (in general collections.MutableSequence).

Indexing l[x] accepts either an integer, slice or a label. Modifying the sequence using l[x]=42 clears the label. Use l.set(x, 42, label='answer') or l.set_label(x, 'answer').

Labels may be any hashable objects except None (which represents no label) or int or slice (which are used for array indexing). The labels must be unique.

append(val, label=None)[source]

S.append(value) – append value to the end of the sequence

get_label(idx)[source]

Return the label for given index.

insert(idx, val, label=None)[source]

S.insert(index, value) – insert value before index

items()[source]

Return iterator over (label, value) pairs. Do not modify the list while active.

set(idx, val, label=None)[source]

Assign to the given index, always setting its label to label.

set_label(idx, label)[source]

Set label for given index.

Remote Python tasks

API for creating routines to be run at the governors. Created by the decorating with remote (preferred) or by Remote.

Whe specifying the remote task in the client code, the relevant classes are Remote, Input, Output, RainException, RainWarning, LabeledList and the decorateor remote.

Inside the running remote task, only RainException, RainWarning, LabeledList, DataInstance and Context are relevant.

The inputs of a Remote task are arbitrary python objects containing a DataInstance in place of every DataObject, or loaded data object if autoload=True or load=True is set on the Input.

The remote should return a list, tuple or LabeledList of DataInstance (created by Context.blob()), bytes or string.

rain.client.remote(*, outputs=None, inputs=(), auto_load=None, auto_encode=None, name=None, cpus=1)[source]

Decorator for Remote, see the documentation there.

class rain.client.Remote(fn, *, inputs=None, outputs=None, auto_load=False, auto_encode=None, name=None, cpus=1)[source]