You are viewing main version. Click here to see docs for the latest stable version.

Source code for runhouse.resources.resource

import pprint
import sys
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

from runhouse.globals import obj_store, rns_client
from runhouse.logger import get_logger

from runhouse.rns.top_level_rns_fns import (
    resolve_rns_path,
    save,
    split_rns_name_and_path,
)
from runhouse.rns.utils.api import (
    load_resp_content,
    read_resp_data,
    ResourceAccess,
    ResourceVisibility,
)
from runhouse.rns.utils.names import is_valid_resource_name

logger = get_logger(__name__)


[docs]class Resource: RESOURCE_TYPE = "resource"
[docs] def __init__( self, name: Optional[str] = None, dryrun: bool = False, access_level: ResourceAccess = ResourceAccess.WRITE, visibility: ResourceVisibility = ResourceVisibility.PRIVATE, **kwargs, ): """ Runhouse abstraction for objects that can be saved, shared, and reused. Args: name (Optional[str], optional): Name to assign the resource. (Default: None) dryrun (bool, optional): Whether to create the resource object, or load the object as a dryrun. (Default: ``False``) access_level (:obj:`ResourceAccess`, optional): Access level to provide for the resource. (Default: ``ResourceAccess.WRITE``) visibility (:obj:`ResourceVisibility`, optional): Type of visibility to provide for the resource. (Default: ``ResourceVisibility.PRIVATE``) """ self._name, self._rns_folder = None, None if name is not None: if name.startswith("/builtins/"): name = name[len("/builtins/") :] if name[0] == "^" and name != "^": name = name[1:] # Validate that name complies with a simple regex if not is_valid_resource_name(name): raise ValueError( f"Invalid name: {name} " "Resource names are limited to alphanumerics, dashes, and underscores. Max 200 characters. " "Slashes may be used to specify folders and must be included as the first character." ) self._name, self._rns_folder = rns_client.split_rns_name_and_path( rns_client.resolve_rns_path(name) ) self.dryrun = dryrun self.access_level = access_level self._visibility = visibility
# TODO add a utility to allow a parameter to be specified as "default" and then use the default value @property def config_for_rns(self): # Added for BC for version 0.0.20 return self.config(condensed=False) def config(self, condensed=True): config = { "name": self.rns_address or self.name, "resource_type": self.RESOURCE_TYPE, "resource_subtype": self.__class__.__name__, } self.save_attrs_to_config( config, [ "visibility", # Handles Enum to string conversion ], ) return config def _resource_string_for_subconfig( self, resource: Union[None, str, "Resource"], condensed=True ): """Returns a string representation of a sub-resource for use in a config.""" if resource is None or isinstance(resource, str): return resource if isinstance(resource, Resource): if condensed and resource.rns_address: # We operate on the assumption that rns_address is only populated once a resource has been saved. # That way, if rns_address is not None, we have reasonable likelihood that the resource was saved and # we can just pass the address. The only exception here is if the resource is a built-in. if resource.rns_address.startswith("^"): # Fork the resource if it's a built-in and consider it a new resource resource._rns_folder = None return resource.config(condensed) return resource.rns_address else: # If the resource doesn't have an rns_address, we consider it unsaved and put the whole config into # the parent config. return resource.config(condensed) raise ValueError( f"Resource {resource} is not a valid sub-resource for {self.__class__.__name__}" ) @property def rns_address(self): """Traverse up the filesystem until reaching one of the directories in rns_base_folders, then compute the relative path to that.""" if ( self.name is None or self._rns_folder is None ): # Anonymous folders have no rns address return None return str(Path(self._rns_folder) / self.name) @property def name(self): return self._name @name.setter def name(self, name): # Split the name and rns path if path is given (concat with current_folder if just stem is given) if name is None: self._name = None else: self._name, self._rns_folder = split_rns_name_and_path( resolve_rns_path(name) ) @property def visibility(self): return self._visibility @visibility.setter def visibility(self, visibility): self._visibility = visibility @rns_address.setter def rns_address(self, new_address): self.name = new_address # Note, this saves the resource to the new address! def _save_sub_resources(self, folder: str = None): """Overload by child resources to save any resources they hold internally.""" pass
[docs] def pin(self): """Write the resource to the object store.""" from runhouse.resources.hardware.utils import _current_cluster if _current_cluster(): if obj_store.has_local_storage: obj_store.put_local(self._name, self) else: obj_store.put(self._name, self) else: raise ValueError("Cannot pin a resource outside of a cluster.")
[docs] def refresh(self): """Update the resource in the object store.""" from runhouse.resources.hardware.utils import _current_cluster if _current_cluster(): return obj_store.get(self._name) else: return self
[docs] def save(self, name: str = None, overwrite: bool = True, folder: str = None): """Register the resource, saving it to the Den config store. Uses the resource's `self.config()` to generate the dict to save.""" # add this resource this run's downstream artifact registry if it's being saved as part of a run rns_client.add_downstream_resource(name or self.name) self._save_sub_resources(folder) if name: self.name = name # TODO handle self.access == 'read' instead of this weird overwrite argument save(self, overwrite=overwrite, folder=folder) return self
def __str__(self): return pprint.pformat(self.config()) @classmethod def _check_for_child_configs(cls, config: dict): """Overload by child resources to load any resources they hold internally.""" return config
[docs] @classmethod def from_name( cls, name: str, load_from_den: bool = True, dryrun: bool = False, _resolve_children: bool = True, ): """Load existing Resource via its name. Args: name (str): Name of the resource to load from name. load_from_den (bool, optional): Whether to try loading the module from Den. (Default: ``True``) dryrun (bool, optional): Whether to construct the object or load as dryrun. (Default: ``False``) """ # TODO is this the right priority order? from runhouse.resources.hardware.utils import _current_cluster if _current_cluster() and obj_store.contains(name): return obj_store.get(name) config = rns_client.load_config(name=name, load_from_den=load_from_den) if not config: raise ValueError(f"Resource {name} not found.") if _resolve_children: config = cls._check_for_child_configs(config) # Add this resource's name to the resource artifact registry if part of a run rns_client.add_upstream_resource(name) # Uses child class's from_config return cls.from_config( config=config, dryrun=dryrun, _resolve_children=_resolve_children )
[docs] @staticmethod def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True): """Load or construct resource from config. Args: config (Dict): Resource config. dryrun (bool, optional): Whether to construct resource or load as dryrun (Default: ``False``) """ resource_type = config.pop("resource_type", None) dryrun = config.pop("dryrun", False) or dryrun if resource_type == "resource": return Resource(**config, dryrun=dryrun) resource_class = getattr( sys.modules["runhouse"], resource_type.capitalize(), None ) if not resource_class: raise TypeError(f"Could not find module associated with {resource_type}") if _resolve_children: config = resource_class._check_for_child_configs(config) loaded = resource_class.from_config( config=config, dryrun=dryrun, _resolve_children=_resolve_children, ) if loaded.name: rns_client.add_upstream_resource(loaded.name) return loaded
[docs] def unname(self): """Remove the name of the resource. This changes the resource name to anonymous and deletes any Den configs for the resource.""" self.delete_configs() self._name = None
[docs] def history(self, limit: int = None) -> List[Dict]: """Return the history of the resource, including specific config fields (e.g. folder path) and which runs have overwritten it. Args: limit (int, optional): If specified, return the last ``limit`` number of entries in the history. Otherwise, return the entire history. (Default: ``None``) """ if not self.rns_address: raise ValueError("Resource must have a name in order to have a history") if self.rns_address[:2] == "~/": raise ValueError( "Resource must be saved to Den (not local) in order to have a history" ) resource_uri = rns_client.resource_uri(self.rns_address) base_uri = f"{rns_client.api_server_url}/resource/history/{resource_uri}" uri = f"{base_uri}?limit={limit}" if limit else base_uri resp = rns_client.session.get(uri, headers=rns_client.request_headers()) if resp.status_code != 200: logger.warning( f"Received [{resp.status_code}] from Den GET '{uri}': No resource history found: {load_resp_content(resp)}" ) return [] resource_history = read_resp_data(resp) return resource_history
# TODO delete sub-resources
[docs] def delete_configs(self): """Delete the resource's config from Den config store.""" rns_client.delete_configs(resource=self)
[docs] def save_attrs_to_config(self, config: Dict, attrs: List[str]): """Save the given attributes to the config""" for attr in attrs: val = self.__getattribute__(attr) if val or (val is False): # allow for saving `False` but not other falsey types if isinstance(val, Enum): val = val.value config[attr] = val
def is_local(self): return ( hasattr(self, "install_target") and isinstance(self.install_target, str) and self.install_target.startswith("~") or hasattr(self, "system") and self.system == "file" ) # TODO [DG] Implement proper sharing of subresources (with an overload of some kind)
[docs] def share( self, users: Union[str, List[str]] = None, access_level: Union[ResourceAccess, str] = ResourceAccess.READ, visibility: Optional[Union[ResourceVisibility, str]] = None, notify_users: bool = True, headers: Optional[Dict] = None, ) -> Tuple[Dict[str, ResourceAccess], Dict[str, ResourceAccess]]: """Grant access to the resource for a list of users (or a single user). By default, the user will receive an email notification of access (if they have a Runhouse account) or instructions on creating an account to access the resource. If ``visibility`` is set to ``public``, users will not be notified. .. note:: You can only grant access to other users if you have write access to the resource. Args: users (Union[str, list], optional): Single user or list of user emails and / or runhouse account usernames. If none are provided and ``visibility`` is set to ``public``, resource will be made publicly available to all users. (Default: ``None``) access_level (:obj:`ResourceAccess`, optional): Access level to provide for the resource. (Default: ``read``). visibility (:obj:`ResourceVisibility`, optional): Type of visibility to provide for the shared resource. By default, the visibility is private. (Default: ``None``) notify_users (bool, optional): Whether to send an email notification to users who have been given access. Note: This is relevant for resources which are not ``shareable``. (Default: ``True``) headers (Dict, optional): Request headers to provide for the request to Den. Contains the user's auth token. Example: ``{"Authorization": f"Bearer {token}"}`` Returns: Tuple(Dict, Dict, Set): `added_users`: Users who already have a Runhouse account and have been granted access to the resource. `new_users`: Users who do not have Runhouse accounts and received notifications via their emails. `valid_users`: Set of valid usernames and emails from ``users`` parameter. Example: >>> # Write access to the resource for these specific users. >>> # Visibility will be set to private (users can search for and view resource in Den dashboard) >>> my_resource.share(users=["username1", "user2@gmail.com"], access_level='write') >>> # Make resource public, with read access to the resource for all users >>> my_resource.share(visibility='public') """ if self.name is None: raise ValueError("Resource must have a name in order to share") if users is None and visibility is None: raise ValueError( "Must specify `visibility` for the resource if no users are provided." ) if hasattr(self, "system") and self.system in ["ssh", "sftp"]: logger.warning( "Sharing a resource located on a cluster is not recommended. For persistence, we suggest" "saving to a cloud storage system (ex: `s3` or `gs`). You can copy your cluster based " f"{self.RESOURCE_TYPE} to your desired storage provider using the `.to()` method. " f"For example: `{self.RESOURCE_TYPE}.to(system='rh-cpu')`" ) if self.is_local(): if self.RESOURCE_TYPE == "package": raise TypeError( f"Unable to share a local {self.RESOURCE_TYPE}. Please make sure the {self.RESOURCE_TYPE} is " f"located on a cluster. You can use the `.to()` method to do so. " f"For example: `{self.name}.to(system='rh-cpu')`" ) else: raise TypeError( f"Unable to share a local {self.RESOURCE_TYPE}. Please make sure the {self.RESOURCE_TYPE} is " f"located on a cluster or a remote system. You can use the `.to()` method to do so. " f"For example: `{self.name}.to(system='s3')`" ) if isinstance(access_level, str): access_level = ResourceAccess(access_level) if visibility is not None: # Update the resource in Den with this global visibility value self.visibility = visibility logger.debug(f"Updating resource with visibility: {self.visibility}") self.save() if isinstance(users, str): users = [users] added_users, new_users, valid_users = rns_client.grant_resource_access( rns_address=self.rns_address, user_emails=users, access_level=access_level, notify_users=notify_users, headers=headers, ) return added_users, new_users, valid_users
[docs] def revoke( self, users: Union[str, List[str]] = None, headers: Optional[Dict] = None ): """Revoke access to the resource. Args: users (Union[str, str], optional): List of user emails and / or runhouse account usernames (or a single user). If no users are specified will revoke access for all users. (Default: ``None``) headers (Optional[Dict]): Request headers to provide for the request to Den. Contains the user's auth token. Example: ``{"Authorization": f"Bearer {token}"}`` """ if isinstance(users, str): users = [users] request_uri = rns_client.resource_uri(self.rns_address) uri = f"{rns_client.api_server_url}/resource/{request_uri}/users/access" resp = rns_client.session.put( uri, json={"users": users, "access_level": ResourceAccess.DENIED}, headers=headers or rns_client.request_headers(), ) if resp.status_code != 200: raise Exception( f"Received [{resp.status_code}] from Den PUT '{uri}': Failed to revoke access for resource: {load_resp_content(resp)}" )