Source code for runhouse.resources.resource

import pprint
import sys
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

from runhouse.globals import obj_store, rns_client
from runhouse.logger import get_logger

from runhouse.rns.top_level_rns_fns import (
    resolve_rns_path,
    save,
    split_rns_name_and_path,
)
from runhouse.rns.utils.api import (
    load_resp_content,
    read_resp_data,
    ResourceAccess,
    ResourceVisibility,
)
from runhouse.rns.utils.names import is_valid_resource_name

logger = get_logger(__name__)


[docs]class Resource: RESOURCE_TYPE = "resource"
[docs] def __init__( self, name: Optional[str] = None, dryrun: bool = False, access_level: ResourceAccess = ResourceAccess.WRITE, visibility: ResourceVisibility = ResourceVisibility.PRIVATE, **kwargs, ): """ Runhouse abstraction for objects that can be saved, shared, and reused. Args: name (Optional[str], optional): Name to assign the resource. (Default: None) dryrun (bool, optional): Whether to create the resource object, or load the object as a dryrun. (Default: ``False``) access_level (:obj:`ResourceAccess`, optional): Access level to provide for the resource. (Default: ``ResourceAccess.WRITE``) visibility (:obj:`ResourceVisibility`, optional): Type of visibility to provide for the resource. (Default: ``ResourceVisibility.PRIVATE``) """ self._name, self._rns_folder = None, None if name is not None: if name.startswith("/builtins/"): name = name[len("/builtins/") :] if name[0] == "^" and name != "^": name = name[1:] # Validate that name complies with a simple regex if not is_valid_resource_name(name): raise ValueError( f"Invalid name: {name} " "Resource names are limited to alphanumerics, dashes, and underscores. Max 200 characters. " "Slashes may be used to specify folders and must be included as the first character." ) self._name, self._rns_folder = rns_client.split_rns_name_and_path( rns_client.resolve_rns_path(name) ) self.dryrun = dryrun self.access_level = access_level self._visibility = visibility
# TODO add a utility to allow a parameter to be specified as "default" and then use the default value def config(self, condensed=True): config = { "name": self.rns_address or self.name, "resource_type": self.RESOURCE_TYPE, "resource_subtype": self.__class__.__name__, } self.save_attrs_to_config( config, [ "visibility", # Handles Enum to string conversion ], ) return config def _resource_string_for_subconfig( self, resource: Union[None, str, "Resource"], condensed=True ): """Returns a string representation of a sub-resource for use in a config.""" if resource is None or isinstance(resource, str): return resource if isinstance(resource, Resource): if condensed and resource.rns_address: # We operate on the assumption that rns_address is only populated once a resource has been saved. # That way, if rns_address is not None, we have reasonable likelihood that the resource was saved and # we can just pass the address. The only exception here is if the resource is a built-in. if resource.rns_address.startswith("^"): # Fork the resource if it's a built-in and consider it a new resource resource._rns_folder = None return resource.config(condensed) return resource.rns_address else: # If the resource doesn't have an rns_address, we consider it unsaved and put the whole config into # the parent config. return resource.config(condensed) raise ValueError( f"Resource {resource} is not a valid sub-resource for {self.__class__.__name__}" ) @property def rns_address(self): """Full address of resource saved in Den. Has the format {username}/{resource_name}""" if ( self.name is None or self._rns_folder is None ): # Anonymous folders have no rns address return None return str(Path(self._rns_folder) / self.name) @property def name(self): """Resource name.""" return self._name @name.setter def name(self, name): # Split the name and rns path if path is given (concat with current_folder if just stem is given) if name is None: self._name = None else: self._name, self._rns_folder = split_rns_name_and_path( resolve_rns_path(name) ) @property def visibility(self): """Resource visibility.""" return self._visibility @visibility.setter def visibility(self, visibility): self._visibility = visibility @rns_address.setter def rns_address(self, new_address): self.name = new_address # Note, this saves the resource to the new address! def _save_sub_resources(self, folder: str = None): """Overload by child resources to save any resources they hold internally.""" pass
[docs] def pin(self): """Write the resource to the object store.""" from runhouse.resources.hardware.utils import _current_cluster if _current_cluster(): if obj_store.has_local_storage: obj_store.put_local(self._name, self) else: obj_store.put(self._name, self) else: raise ValueError("Cannot pin a resource outside of a cluster.")
[docs] def refresh(self): """Update the resource in the object store.""" from runhouse.resources.hardware.utils import _current_cluster if _current_cluster(): return obj_store.get(self._name) else: return self
[docs] def save(self, name: str = None, overwrite: bool = True, folder: str = None): """Register the resource, saving it to the Den config store. Uses the resource's ``self.config()`` to generate the dict to save.""" # add this resource this run's downstream artifact registry if it's being saved as part of a run rns_client.add_downstream_resource(name or self.name) self._save_sub_resources(folder) if name: self.name = name # TODO handle self.access == 'read' instead of this weird overwrite argument save(self, overwrite=overwrite, folder=folder) return self
def __str__(self): return pprint.pformat(self.config()) @classmethod def _check_for_child_configs(cls, config: dict): """Overload by child resources to load any resources they hold internally.""" return config
[docs] @classmethod def from_name( cls, name: str, load_from_den: bool = True, dryrun: bool = False, _resolve_children: bool = True, ): """Load existing Resource via its name. Args: name (str): Name of the resource to load from name. load_from_den (bool, optional): Whether to try loading the module from Den. (Default: ``True``) dryrun (bool, optional): Whether to construct the object or load as dryrun. (Default: ``False``) """ # TODO is this the right priority order? from runhouse.resources.hardware.utils import _current_cluster if _current_cluster() and obj_store.contains(name): return obj_store.get(name) config = rns_client.load_config(name=name, load_from_den=load_from_den) if not config: raise ValueError(f"Resource {name} not found.") if _resolve_children: config = cls._check_for_child_configs(config) # Add this resource's name to the resource artifact registry if part of a run rns_client.add_upstream_resource(name) # Uses child class's from_config return cls.from_config( config=config, dryrun=dryrun, _resolve_children=_resolve_children )
[docs] @staticmethod def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True): """Load or construct resource from config. Args: config (Dict): Resource config. dryrun (bool, optional): Whether to construct resource or load as dryrun (Default: ``False``) """ resource_type = config.pop("resource_type", None) dryrun = config.pop("dryrun", False) or dryrun if resource_type == "resource": return Resource(**config, dryrun=dryrun) resource_class = getattr( sys.modules["runhouse"], resource_type.capitalize(), None ) if not resource_class: raise TypeError(f"Could not find module associated with {resource_type}") if _resolve_children: config = resource_class._check_for_child_configs(config) loaded = resource_class.from_config( config=config, dryrun=dryrun, _resolve_children=_resolve_children, ) if loaded.name: rns_client.add_upstream_resource(loaded.name) return loaded
[docs] def unname(self): """Remove the name of the resource. This changes the resource name to anonymous and deletes any Den configs for the resource.""" self.delete_configs() self._name = None
[docs] def history(self, limit: int = None) -> List[Dict]: """Return the history of the resource, including specific config fields (e.g. folder path) and which runs have overwritten it. Args: limit (int, optional): If specified, return the last ``limit`` number of entries in the history. Otherwise, return the entire history. (Default: ``None``) """ if not self.rns_address: raise ValueError("Resource must have a name in order to have a history") if self.rns_address[:2] == "~/": raise ValueError( "Resource must be saved to Den (not local) in order to have a history" ) resource_uri = rns_client.resource_uri(self.rns_address) base_uri = f"{rns_client.api_server_url}/resource/history/{resource_uri}" uri = f"{base_uri}?limit={limit}" if limit else base_uri resp = rns_client.session.get(uri, headers=rns_client.request_headers()) if resp.status_code != 200: logger.warning( f"Received [{resp.status_code}] from Den GET '{uri}': No resource history found: {load_resp_content(resp)}" ) return [] resource_history = read_resp_data(resp) return resource_history
# TODO delete sub-resources
[docs] def delete_configs(self): """Delete the resource's config from Den config store.""" rns_client.delete_configs(resource=self)
[docs] def save_attrs_to_config(self, config: Dict, attrs: List[str]): """Save the given attributes to the config""" for attr in attrs: val = self.__getattribute__(attr) if val or (val is False): # allow for saving `False` but not other falsey types if isinstance(val, Enum): val = val.value config[attr] = val
def is_local(self): return ( hasattr(self, "install_target") and isinstance(self.install_target, str) and self.install_target.startswith("~") or hasattr(self, "system") and self.system == "file" ) # TODO [DG] Implement proper sharing of subresources (with an overload of some kind)
[docs] def share( self, users: Union[str, List[str]] = None, access_level: Union[ResourceAccess, str] = ResourceAccess.READ, visibility: Optional[Union[ResourceVisibility, str]] = None, notify_users: bool = True, headers: Dict = None, ) -> Tuple[Dict[str, ResourceAccess], Dict[str, ResourceAccess]]: """Grant access to the resource for a list of users (or a single user). By default, the user will receive an email notification of access (if they have a Runhouse account) or instructions on creating an account to access the resource. If ``visibility`` is set to ``public``, users will not be notified. .. note:: You can only grant access to other users if you have write access to the resource. Args: users (Union[str, list], optional): Single user or list of user emails and / or runhouse account usernames. If none are provided and ``visibility`` is set to ``public``, resource will be made publicly available to all users. (Default: ``None``) access_level (:obj:`ResourceAccess`, optional): Access level to provide for the resource. (Default: ``read``). visibility (:obj:`ResourceVisibility`, optional): Type of visibility to provide for the shared resource. By default, the visibility is private. (Default: ``None``) notify_users (bool, optional): Whether to send an email notification to users who have been given access. Note: This is relevant for resources which are not ``shareable``. (Default: ``True``) headers (Dict, optional): Request headers to provide for the request to Den. Contains the user's auth token. Example: ``{"Authorization": f"Bearer {token}"}`` Returns: Tuple(Dict, Dict, Set): `added_users`: Users who already have a Runhouse account and have been granted access to the resource. `new_users`: Users who do not have Runhouse accounts and received notifications via their emails. `valid_users`: Set of valid usernames and emails from ``users`` parameter. Example: >>> # Write access to the resource for these specific users. >>> # Visibility will be set to private (users can search for and view resource in Den dashboard) >>> my_resource.share(users=["username1", "user2@gmail.com"], access_level='write') >>> # Make resource public, with read access to the resource for all users >>> my_resource.share(visibility='public') """ if self.name is None: raise ValueError("Resource must have a name in order to share") if users is None and visibility is None: raise ValueError( "Must specify `visibility` for the resource if no users are provided." ) if hasattr(self, "system") and self.system in ["ssh", "sftp"]: logger.warning( "Sharing a resource located on a cluster is not recommended. For persistence, we suggest" "saving to a cloud storage system (ex: `s3` or `gs`). You can copy your cluster based " f"{self.RESOURCE_TYPE} to your desired storage provider using the `.to()` method. " f"For example: `{self.RESOURCE_TYPE}.to(system='rh-cpu')`" ) if self.is_local(): if self.RESOURCE_TYPE == "package": raise TypeError( f"Unable to share a local {self.RESOURCE_TYPE}. Please make sure the {self.RESOURCE_TYPE} is " f"located on a cluster. You can use the `.to()` method to do so. " f"For example: `{self.name}.to(system='rh-cpu')`" ) else: raise TypeError( f"Unable to share a local {self.RESOURCE_TYPE}. Please make sure the {self.RESOURCE_TYPE} is " f"located on a cluster or a remote system. You can use the `.to()` method to do so. " f"For example: `{self.name}.to(system='s3')`" ) if isinstance(access_level, str): access_level = ResourceAccess(access_level) if visibility is not None: # Update the resource in Den with this global visibility value self.visibility = visibility logger.debug(f"Updating resource with visibility: {self.visibility}") self.save() if isinstance(users, str): users = [users] added_users, new_users, valid_users = rns_client.grant_resource_access( rns_address=self.rns_address, user_emails=users, access_level=access_level, notify_users=notify_users, headers=headers, ) return added_users, new_users, valid_users
[docs] def revoke( self, users: Union[str, List[str]] = None, headers: Optional[Dict] = None ): """Revoke access to the resource. Args: users (Union[str, str], optional): List of user emails and / or runhouse account usernames (or a single user). If no users are specified will revoke access for all users. (Default: ``None``) headers (Optional[Dict]): Request headers to provide for the request to Den. Contains the user's auth token. Example: ``{"Authorization": f"Bearer {token}"}`` Will default to authorization headers of the local config. (Default: ``None``) """ if isinstance(users, str): users = [users] request_uri = rns_client.resource_uri(self.rns_address) uri = f"{rns_client.api_server_url}/resource/{request_uri}/users/access" resp = rns_client.session.put( uri, json={"users": users, "access_level": ResourceAccess.DENIED}, headers=headers or rns_client.request_headers(), ) if resp.status_code != 200: raise Exception( f"Received [{resp.status_code}] from Den PUT '{uri}': Failed to revoke access for resource: {load_resp_content(resp)}" )