Source code for rain.common.data_instance

import io
import os
import shutil
import tarfile

from .content_type import decode_value
from .data_type import DataType
from .errors import RainException
from .fs import fresh_copy_dir
from .ids import ID
from .utils import format_size
from .attributes import ObjectInfo


[docs]class DataInstance: """ Instance of Data object with data or file reference. This serves as a proxy to a *finished* `DataObject`. The class is used in a python task in executors and as a result of `DataObject.fetch`. The user should not manually create this object, but always use `fetch()` or a method on python task context. """ # The data object when at client _data_object = None # Cache for python deseriazed version of the object _load_cache = None # Cache for received/read bytes _data = None # Contains object id if the object is known to governor _object_id = None # The file path in case the data is in a file _path = None # The same semantics as parent DO attributes # (whether parent is present or not) _info = None _spec = None def __init__(self, data_type, *, data=None, path=None, data_object=None, content_type=None, info=None, spec=None, object_id=None): if (path is None) == (data is None): raise RainException("provide either `data` or `path`") if data is not None: assert isinstance(data, bytes) self._data = data else: self._path = path assert isinstance(data_type, DataType) self.data_type = data_type if data_object is not None: # At client assert info is None assert spec is None assert object_id is None assert content_type is None self._data_object = data_object self._object_id = data_object.id else: # At executor self._object_id = object_id if info is None: info = ObjectInfo() info.content_type = content_type else: assert content_type is None self._info = info self._spec = spec assert self._object_id is None or isinstance(self._object_id, ID) @property def info(self): if self._data_object: return self._data_object._info return self._info @property def spec(self): if self._data_object: return self._data_object._spec return self._spec @property def content_type(self): content_type = self.info.content_type if content_type: return content_type return self.spec.content_type
[docs] def load(self, cache=False): """ Load object according content type, optionally caching the result. If you set `cache=True`, you must not modify the returned object as it may be shared between loads or even tasks. With `cache=False`, you get a new copy every time. """ if self._load_cache is not None and cache: return self._load_cache if self._data: obj = decode_value(self._data, self.content_type) else: with open(self._path, "rb") as f: obj = decode_value(f.read(), self.content_type) if cache: self._load_cache = obj return obj
[docs] def get_str(self): """ Shortcut for get_bytes().decode() """ return self.get_bytes().decode()
[docs] def get_bytes(self): """ Return the data as `bytes`. May read them from the disk. """ if self._data is not None: return self._data else: with open(self._path, "rb") as f: return f.read()
def link(self, path): if self._data is None: os.symlink(self._path, path) else: self.write(path)
[docs] def write(self, path): """Write fresh copy of data into target path.""" if self._data is None: if self._path == path: return if self.data_type == DataType.DIRECTORY: fresh_copy_dir(self._path, path) else: shutil.copyfile(self._path, path) else: # TODO: Make security check that tarball does not contain absolute paths if self.data_type == DataType.BLOB: with open(path, "wb") as f: f.write(self._data) else: f = tarfile.open(fileobj=io.BytesIO(self._data)) f.extractall(path)
# def _to_capnp(self, builder): # "Internal serializer." # if self._object_id: # builder.storage.init("inGovernor") # id_to_capnp(self._object_id, builder.storage.inGovernor) # elif self._path: # builder.storage.path = self._path # else: # builder.storage.memory = self._data # attributes_to_capnp(self.attributes, builder.attributes) # @classmethod # def _from_capnp(cls, reader): # "Internal deserializer user ." # which = reader.storage.which() # data = None # path = None # if which == "memory": # data = reader.storage.memory # elif which == "path": # path = reader.storage.path # else: # raise Exception("Invalid storage type") # attributes = attributes_from_capnp(reader.attributes) # return cls(data=data, # path=path, # attributes=attributes, # data_type=DataType.from_capnp(reader.dataType)) def __repr__(self): if self._data: return "<DataInstance size={}>".format(format_size(len(self._data))) else: return "<DataInstance path={!r}>".format(self._path) def _remove(self): assert self._path if self.data_type == "blob": os.unlink(self._path) else: shutil.rmtree(self._path)