import logging
from typing import Any, Dict, Optional, Union
from runhouse.resources.envs import _get_env_from, Env
from runhouse.resources.hardware import _current_cluster, _get_cluster_from, Cluster
from runhouse.resources.module import Module
from runhouse.rns.utils.names import _generate_default_name, _generate_default_path
logger = logging.getLogger(__name__)
[docs]class Blob(Module):
RESOURCE_TYPE = "blob"
DEFAULT_FOLDER_PATH = "/runhouse-blob"
DEFAULT_CACHE_FOLDER = ".cache/runhouse/blobs"
[docs] def __init__(
self,
name: Optional[str] = None,
system: Union[Cluster] = None,
env: Optional[Env] = None,
dryrun: bool = False,
**kwargs,
):
"""
Runhouse Blob object
.. note::
To build a Blob, please use the factory method :func:`blob`.
"""
self.data = None
super().__init__(name=name, system=system, env=env, dryrun=dryrun, **kwargs)
[docs] def to(
self,
system: Union[str, Cluster],
env: Optional[Union[str, Env]] = None,
path: Optional[str] = None,
data_config: Optional[dict] = None,
):
"""Return a copy of the blob on the destination system, and optionally path.
Example:
>>> local_blob = rh.blob(data)
>>> s3_blob = blob.to("s3")
>>> cluster_blob = blob.to(my_cluster)
"""
if system == "here":
if not path:
current_cluster_config = _current_cluster(key="config")
if current_cluster_config:
system = Cluster.from_config(current_cluster_config)
else:
system = None
else:
system = "file"
system = _get_cluster_from(system)
if (not system or isinstance(system, Cluster)) and not path:
self.name = self.name or _generate_default_name(prefix="blob")
# TODO [DG] if system is the same, bounces off the laptop for no reason. Change to write through a
# call_module_method rpc (and same for similar file cases)
return super().to(system, env)
path = str(
path or self.default_path(self.rns_address, system)
) # Make sure it's a string and not a Path
from runhouse.resources.blobs.file import File
new_blob = File(path=path, system=system, data_config=data_config)
new_blob.write(self.fetch())
return new_blob
# TODO delete
[docs] def write(self, data):
"""Save the underlying blob to its cluster's store.
Example:
>>> rh.blob(data).write()
"""
self.data = data
[docs] def rm(self):
"""Delete the blob from wherever it's stored.
Example:
>>> blob = rh.blob(data)
>>> blob.rm()
"""
self.data = None
[docs] def exists_in_system(self):
"""Check whether the blob exists in the file system
Example:
>>> blob = rh.blob(data)
>>> blob.exists_in_system()
"""
if self.data is not None:
return True
[docs] def resolved_state(self, _state_dict=None):
"""Return the resolved state of the blob, which is the data.
Primarily used to define the behavior of the ``fetch`` method.
Example:
>>> blob = rh.blob(data)
>>> blob.resolved_state()
"""
return self.data
[docs]def blob(
data: [Any] = None,
name: Optional[str] = None,
path: Optional[str] = None,
system: Optional[str] = None,
env: Optional[Union[str, Env]] = None,
data_config: Optional[Dict] = None,
load: bool = True,
dryrun: bool = False,
):
"""Returns a Blob object, which can be used to interact with the resource at the given path
Args:
data: Blob data. The data to persist either on the cluster or in the filesystem.
name (Optional[str]): Name to give the blob object, to be reused later on.
path (Optional[str]): Path (or path) to the blob object. Specfying a path will force the blob to be
saved to the filesystem rather than persist in the cluster's object store.
system (Optional[str or Cluster]): File system or cluster name. If providing a file system this must be one of:
[``file``, ``github``, ``sftp``, ``ssh``, ``s3``, ``gs``, ``azure``].
We are working to add additional file system support. If providing a cluster, this must be a cluster object
or name, and whether the data is saved to the object store or filesystem depends on whether a path is
specified.
env (Optional[Env or str]): Environment for the blob. If left empty, defaults to base environment.
(Default: ``None``)
data_config (Optional[Dict]): The data config to pass to the underlying fsspec handler (in the case of
saving the the filesystem).
load (bool): Whether to try to load the Blob object from RNS. (Default: ``True``)
dryrun (bool): Whether to create the Blob if it doesn't exist, or load a Blob object as a dryrun.
(Default: ``False``)
Returns:
Blob: The resulting blob.
Example:
>>> import runhouse as rh
>>> import json
>>>
>>> data = list(range(50)
>>> serialized_data = json.dumps(data)
>>>
>>> # Local blob with name and no path (saved to Runhouse object store)
>>> rh.blob(name="@/my-blob", data=data)
>>>
>>> # Remote blob with name and no path (saved to cluster's Runhouse object store)
>>> rh.blob(name="@/my-blob", data=data, system=my_cluster)
>>>
>>> # Remote blob with name, filesystem, and no path (saved to filesystem with default path)
>>> rh.blob(name="@/my-blob", data=serialized_data, system="s3")
>>>
>>> # Remote blob with name and path (saved to remote filesystem)
>>> rh.blob(name='@/my-blob', data=serialized_data, path='/runhouse-tests/my_blob.pickle', system='s3')
>>>
>>> # Local blob with path and no system (saved to local filesystem)
>>> rh.blob(data=serialized_data, path=str(Path.cwd() / "my_blob.pickle"))
>>> # Loading a blob
>>> my_local_blob = rh.blob(name="~/my_blob")
>>> my_s3_blob = rh.blob(name="@/my_blob")
"""
if name and load and not any([data is not None, path, system, data_config]):
# Try reloading existing blob
try:
return Blob.from_name(name, dryrun)
except ValueError:
# This is a rare instance where passing no constructor params is actually valid
# (e.g. rh.blob(name=key).write(data)), so if we don't find the name, we still want to
# create a new blob.
pass
system = _get_cluster_from(system or _current_cluster(key="config"), dryrun=dryrun)
env = env or _get_env_from(env)
if (not system or isinstance(system, Cluster)) and not path and data_config is None:
# Blobs must be named, or we don't have a key for the kv store
name = name or _generate_default_name(prefix="blob")
new_blob = Blob(name=name, dryrun=dryrun).to(system, env)
if data is not None:
new_blob.data = data
return new_blob
path = str(path or _generate_default_path(Blob, name, system))
from runhouse.resources.blobs.file import File
name = name or _generate_default_name(prefix="file")
new_blob = File(
name=name,
path=path,
system=system,
env=env,
data_config=data_config,
dryrun=dryrun,
)
if isinstance(system, Cluster):
system.put_resource(new_blob)
if data is not None:
new_blob.write(data)
return new_blob