You are viewing stable version. Click here to see docs for the latest stable version.

Source code for runhouse.resources.packages.package

import copy
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Union

from runhouse.logger import get_logger

from runhouse.resources.envs.utils import install_conda, run_setup_command
from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import (
    _get_cluster_from,
    detect_cuda_version_or_cpu,
)
from runhouse.resources.resource import Resource
from runhouse.utils import (
    find_locally_installed_version,
    get_local_install_path,
    is_python_package_string,
    locate_working_dir,
)


INSTALL_METHODS = {"local", "reqs", "pip", "conda", "rh"}

logger = get_logger(__name__)


class CodeSyncError(Exception):
    pass


@dataclass
class InstallTarget:
    local_path: str
    _path_to_sync_to_on_cluster: Optional[str] = None

    @property
    def path_to_sync_to_on_cluster(self) -> str:
        return (
            self._path_to_sync_to_on_cluster
            if self._path_to_sync_to_on_cluster
            else f"~/{Path(self.full_local_path_str()).name}"
        )

    def full_local_path_str(self) -> str:
        return str(Path(self.local_path).expanduser().resolve())

    def __str__(self):
        return f"InstallTarget(local_path={self.local_path}, path_to_sync_to_on_cluster={self._path_to_sync_to_on_cluster})"


[docs]class Package(Resource): RESOURCE_TYPE = "package" # https://pytorch.org/get-started/locally/ # Note: no binaries exist for 11.4 (https://github.com/pytorch/pytorch/issues/75992) TORCH_INDEX_URLS = { "11.3": "https://download.pytorch.org/whl/cu113", "11.5": "https://download.pytorch.org/whl/cu115", "11.6": "https://download.pytorch.org/whl/cu116", "11.7": "https://download.pytorch.org/whl/cu117", "11.8": "https://download.pytorch.org/whl/cu118", "12.1": "https://download.pytorch.org/whl/cu121", "cpu": "https://download.pytorch.org/whl/cpu", }
[docs] def __init__( self, name: Optional[str] = None, install_method: Optional[str] = None, install_target: Optional[Union[str, "Folder"]] = None, install_args: Optional[str] = None, preferred_version: Optional[str] = None, dryrun: bool = False, **kwargs, # We have this here to ignore extra arguments when calling from from_config ): """ Runhouse Package resource. .. note:: To create a git package, please use the factory method :func:`package`. """ super().__init__( name=name, dryrun=dryrun, ) self.install_method = install_method self.install_target = install_target self.install_args = install_args self.preferred_version = preferred_version
def config(self, condensed: bool = True): # If the package is just a simple Package.from_string string, no # need to store it in rns, just give back the string. # if self.install_method in ['pip', 'conda', 'git']: # return f'{self.install_method}:{self.name}' config = super().config(condensed) config["install_method"] = self.install_method config["install_target"] = ( ( self.install_target.local_path, self.install_target._path_to_sync_to_on_cluster, ) if isinstance(self.install_target, InstallTarget) else self.install_target ) config["install_args"] = self.install_args config["preferred_version"] = self.preferred_version return config def __str__(self): if self.name: return f"Package: {self.name}" return f"Package: {self.install_target}" @staticmethod def _prepend_python_executable( install_cmd: str, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): return ( f"python3 -m {install_cmd}" if cluster or env else f"{sys.executable} -m {install_cmd}" ) @staticmethod def _prepend_env_command(install_cmd: str, env: Union[str, "Env"] = None): if env: from runhouse.resources.envs.utils import _get_env_from env = _get_env_from(env) install_cmd = env._full_command(install_cmd) return install_cmd def _validate_folder_path(self): # If self.path is the same as the user's home directory, raise an error. # Check this with Path and expanduser to handle both relative and absolute paths. if isinstance( self.install_target, InstallTarget ) and self.install_target.full_local_path_str() in [ str(Path("~").expanduser()), str(Path("/")), ]: raise CodeSyncError( "Cannot sync the home directory. Please include a Python configuration file in a subdirectory." ) def _pip_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if isinstance(self.install_target, InstallTarget): install_cmd = self.install_target.full_local_path_str() + install_args else: install_target = f'"{self.install_target}"' install_cmd = install_target + install_args install_cmd = f"pip install {self._install_cmd_for_torch(install_cmd, cluster)}" install_cmd = self._prepend_python_executable( install_cmd, cluster=cluster, env=env ) install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd def _conda_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if isinstance(self.install_target, InstallTarget): install_cmd = f"{self.install_target.local_path}" + install_args else: install_cmd = self.install_target + install_args install_cmd = f"conda install -y {install_cmd}" install_cmd = self._prepend_env_command(install_cmd, env=env) install_conda(cluster=cluster) return install_cmd def _reqs_install_cmd( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if not isinstance(self.install_target, InstallTarget): install_cmd = self.install_target + install_args else: on_cluster_path = self.install_target.local_path # If not cluster, we're on the cluster, and we must deal with the path locally if not cluster: reqs_path = f"{on_cluster_path}/requirements.txt" if not Path(reqs_path).expanduser().exists(): return None with open(str(Path(reqs_path).expanduser())) as f: reqs_list = f.readlines() # Otherwise, make sure the target folder is on the cluster, # and read reqs from the cluster else: if "requirements.txt" not in cluster._folder_ls( path=on_cluster_path, full_paths=False ): return None reqs_list = ( cluster._folder_get(f"{on_cluster_path}/requirements.txt", mode="r") .strip("\n") .split("\n") ) reqs_path = f"{on_cluster_path}/requirements.txt" install_cmd = self._reqs_install_cmd_for_torch( reqs_path, reqs_list, install_args, cluster=cluster ) install_cmd = f"pip install {install_cmd}" install_cmd = self._prepend_python_executable( install_cmd, env=env, cluster=cluster ) install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd def _install( self, env: Union[str, "Env"] = None, cluster: "Cluster" = None, node: Optional[str] = None, ): """Install package. Args: env (Env or str): Environment to install package on. If left empty, defaults to base environment. (Default: ``None``) cluster (Optional[Cluster]): If provided, will install package on cluster using SSH. """ logger.info(f"Installing {str(self)} with method {self.install_method}.") if isinstance(self.install_target, InstallTarget): if cluster and Path(self.install_target.local_path).expanduser().exists(): cluster.rsync( source=str(self.install_target.local_path), dest=str(self.install_target.path_to_sync_to_on_cluster), up=True, contents=True, node=node, ) self.install_target.local_path = ( self.install_target.path_to_sync_to_on_cluster ) path = self.install_target.local_path if not path: return if self.install_method == "pip": # If this is a generic pip package, with no version pinned, we want to check if there is a version # already installed. If there is, then we ignore preferred version and leave the existing version. # The user can always force a version install by doing `numpy==2.0.0` for example. Else, we install # the preferred version, that matches their local. if ( is_python_package_string(self.install_target) and self.preferred_version is not None ): # Check if this is installed retcode = run_setup_command( f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"", cluster=cluster, node=node, )[0] if retcode != 0: self.install_target = ( f"{self.install_target}=={self.preferred_version}" ) install_cmd = self._pip_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method pip: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Pip install {install_cmd} failed, check that the package exists and is available for your platform." ) elif self.install_method == "conda": install_cmd = self._conda_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method conda: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Conda install {install_cmd} failed, check that the package exists and is " "available for your platform." ) elif self.install_method == "reqs": install_cmd = self._reqs_install_cmd(env=env, cluster=cluster) if install_cmd: logger.info(f"Running via install_method reqs: {install_cmd}") retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform." ) else: logger.info(f"{path}/requirements.txt not found, skipping reqs install") else: if self.install_method != "local": raise ValueError( f"Unknown install method {self.install_method}. Must be one of {INSTALL_METHODS}" ) # Need to append to path if self.install_method in ["local", "reqs"]: if isinstance(self.install_target, InstallTarget): sys.path.insert( 0, self.install_target.full_local_path_str() ) if not cluster else run_setup_command( f"export PATH=$PATH;{self.install_target.full_local_path_str()}", cluster=cluster, node=node, ) elif not cluster: if Path(self.install_target).resolve().expanduser().exists(): sys.path.insert( 0, str(Path(self.install_target).resolve().expanduser()) ) else: raise ValueError( f"install_target {self.install_target} must be a Folder or a path to a directory for install_method {self.install_method}" ) else: raise ValueError( f"If cluster is provided, install_target must be a Folder for install_method {self.install_method}" ) # ---------------------------------- # Torch Install Helpers # ---------------------------------- def _reqs_install_cmd_for_torch( self, reqs_path, reqs_list, install_args="", cluster=None ): """Read requirements from file, append --index-url and --extra-index-url where relevant for torch packages, and return list of formatted packages.""" # if torch extra index url is already defined by the user or torch isn't a req, directly pip install reqs file if not any("torch" in req for req in reqs_list): return f"-r {reqs_path}" + install_args cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) for req in reqs_list: if ( "--index-url" in req or "--extra-index-url" in req ) and "pytorch.org" in req: return f"-r {reqs_path}" + install_args # add extra-index-url for torch if not found return f"-r {reqs_path} --extra-index-url {self._torch_index_url(cuda_version_or_cpu)}" def _install_cmd_for_torch(self, install_cmd, cluster=None): """Return the correct formatted pip install command for the torch package(s) provided.""" if install_cmd.startswith("#"): return None torch_source_packages = ["torch", "torchvision", "torchaudio"] if not any([x in install_cmd for x in torch_source_packages]): return install_cmd packages_to_install: list = self._packages_to_install_from_cmd(install_cmd) final_install_cmd = "" cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) for package_install_cmd in packages_to_install: formatted_cmd = self._install_url_for_torch_package( package_install_cmd, cuda_version_or_cpu ) if formatted_cmd: final_install_cmd += formatted_cmd + " " final_install_cmd = final_install_cmd.rstrip() return final_install_cmd if final_install_cmd != "" else None def _install_url_for_torch_package(self, install_cmd, cuda_version_or_cpu): """Build the full install command, adding a --index-url and --extra-index-url where applicable.""" # Grab the relevant index url for torch based on the CUDA version provided if "," in install_cmd: # If installing a range of versions format the string to make it compatible with `pip_install` method install_cmd = install_cmd.replace(" ", "") index_url = self._torch_index_url(cuda_version_or_cpu) if index_url and not any( specifier in install_cmd for specifier in ["--index-url ", "-i "] ): install_cmd = f"{install_cmd} --index-url {index_url}" if "--extra-index-url" not in install_cmd: return f"{install_cmd} --extra-index-url https://pypi.python.org/simple/" return install_cmd def _torch_index_url(self, cuda_version_or_cpu: str): return self.TORCH_INDEX_URLS.get(cuda_version_or_cpu) @staticmethod def _packages_to_install_from_cmd(install_cmd: str): """Split a string of command(s) into a list of separate commands""" # Remove any --extra-index-url flags from the install command (to be added later by default) install_cmd = re.sub(r"--extra-index-url\s+\S+", "", install_cmd) install_cmd = install_cmd.strip() if ", " in install_cmd: # Ex: 'torch>=1.13.0,<2.0.0' return [install_cmd] matches = re.findall(r"(\S+(?:\s+(-i|--index-url)\s+\S+)?)", install_cmd) packages_to_install = [match[0] for match in matches] return packages_to_install
[docs] def to( self, system: Union[str, Dict, "Cluster"], path: Optional[str] = None, ): """Copy the package onto filesystem or cluster, and return the new Package object. Args: system (str, Dict, or Cluster): Cluster to send the package to. """ if not isinstance(self.install_target, InstallTarget): raise TypeError( "`install_target` must be an InstallTarget in order to copy the package to a system." ) system = _get_cluster_from(system) if isinstance(system, Cluster) and system.on_this_cluster(): return self self._validate_folder_path() if isinstance(system, Cluster): system.rsync( source=str(self.install_target.full_local_path_str()), dest=str(self.install_target.path_to_sync_to_on_cluster), up=True, contents=True, ) new_package = copy.copy(self) new_package.install_target = InstallTarget( local_path=self.install_target.path_to_sync_to_on_cluster, _path_to_sync_to_on_cluster=self.install_target.path_to_sync_to_on_cluster, ) return new_package return self
[docs] @staticmethod def split_req_install_method(req_str: str): """Split a requirements string into a install method and the rest of the string.""" splat = req_str.split(":", 1) return (splat[0], splat[1]) if len(splat) > 1 else ("", splat[0])
[docs] @staticmethod def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True): if isinstance(config.get("install_target"), tuple): config["install_target"] = InstallTarget( local_path=config["install_target"][0], _path_to_sync_to_on_cluster=config["install_target"][1], ) if config.get("resource_subtype") == "GitPackage": from runhouse import GitPackage return GitPackage.from_config( config, dryrun=dryrun, _resolve_children=_resolve_children ) return Package(**config, dryrun=dryrun)
@staticmethod def from_string(specifier: str, dryrun: bool = False): if specifier == "requirements.txt": specifier = "reqs:./" # Use regex to check if specifier matches '<method>:https://github.com/<path>' or 'https://github.com/<path>' match = re.search( r"^(?:(?P<method>[^:]+):)?(?P<path>https://github.com/.+)", specifier ) if match: install_method = match.group("method") url = match.group("path") from runhouse.resources.packages.git_package import git_package return git_package( git_url=url, install_method=install_method, dryrun=dryrun ) install_method, target_and_args = Package.split_req_install_method(specifier) # Handles a case like "torch --index-url https://download.pytorch.org/whl/cu113" rel_target, args = ( target_and_args.split(" ", 1) if " " in target_and_args else (target_and_args, "") ) # We need to do this because relative paths are relative to the current working directory! abs_target = ( Path(rel_target).expanduser() if Path(rel_target).expanduser().is_absolute() else Path(locate_working_dir()) / rel_target ) if abs_target.exists(): target = InstallTarget( local_path=str(abs_target), _path_to_sync_to_on_cluster=None ) else: target = rel_target # If install method is not provided, we need to infer it if not install_method: if Path(specifier).resolve().exists(): install_method = "reqs" else: install_method = "pip" # If we are just defaulting to pip, attempt to install the same version of the package # that is already installed locally # Check if the target is only letters, nothing else. This means its a string like 'numpy'. preferred_version = None if install_method == "pip" and is_python_package_string(target): locally_installed_version = find_locally_installed_version(target) if locally_installed_version: # Check if this is a package that was installed from local local_install_path = get_local_install_path(target) if local_install_path and Path(local_install_path).exists(): target = InstallTarget( local_path=local_install_path, _path_to_sync_to_on_cluster=None ) else: # We want to preferrably install this version of the package server-side preferred_version = locally_installed_version # "Local" install method is a special case where we just copy a local folder and add to path if install_method == "local": return Package( install_target=target, install_method=install_method, dryrun=dryrun ) elif install_method in ["reqs", "pip", "conda"]: return Package( install_target=target, install_args=args, install_method=install_method, preferred_version=preferred_version, dryrun=dryrun, ) elif install_method == "rh": # Calling the factory method below return package(name=specifier[len("rh:") :], dryrun=dryrun) else: raise ValueError( f"Unknown install method {install_method}. Must be one of {INSTALL_METHODS}" )
[docs]def package( name: str = None, install_method: str = None, install_str: str = None, path: str = None, system: str = None, load_from_den: bool = True, dryrun: bool = False, ) -> Package: """ Builds an instance of :class:`Package`. Args: name (str, optional): Name to assign the package resource. install_method (str, optional): Method for installing the package. Options: [``pip``, ``conda``, ``reqs``, ``local``] install_str (str, optional): Additional arguments to install. path (str, optional): URL of the package to install. system (str, optional): File system or cluster on which the package lives. Currently this must a cluster or one of: [``file``, ``s3``, ``gs``]. load_from_den (bool, optional): Whether to try loading the Package from Den. (Default: ``True``) dryrun (bool, optional): Whether to create the Package if it doesn't exist, or load the Package object as a dryrun. (Default: ``False``) Returns: Package: The resulting package. Example: >>> import runhouse as rh >>> reloaded_package = rh.package(name="my-package") >>> local_package = rh.package(path="local/folder/path", install_method="local") """ if name and not any([install_method, install_str, path, system]): # If only the name is provided and dryrun is set to True return Package.from_name(name, load_from_den=load_from_den, dryrun=dryrun) install_target = None install_args = None if path is not None: install_target = (path, None) install_args = install_str elif install_str is not None: install_target, install_args = install_str.split(" ", 1) return Package( install_method=install_method, install_target=install_target, install_args=install_args, name=name, dryrun=dryrun, )