You are viewing main version. Click here to see docs for the latest stable version.

Source code for runhouse.resources.packages.package

import copy
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Union

from runhouse.constants import INSUFFICIENT_DISK_MSG
from runhouse.exceptions import InsufficientDiskError

from runhouse.globals import obj_store

from runhouse.logger import get_logger
from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import (
    _get_cluster_from,
    detect_cuda_version_or_cpu,
)
from runhouse.resources.resource import Resource

from runhouse.utils import (
    conda_env_cmd,
    find_locally_installed_version,
    get_local_install_path,
    install_conda,
    is_python_package_string,
    locate_working_dir,
    run_setup_command,
    split_pip_extras,
    venv_cmd,
)


INSTALL_METHODS = {"local", "pip", "uv", "conda"}

logger = get_logger(__name__)


class CodeSyncError(Exception):
    pass


@dataclass
class InstallTarget:
    local_path: str
    _path_to_sync_to_on_cluster: Optional[str] = None

    @property
    def path_to_sync_to_on_cluster(self) -> str:
        return (
            self._path_to_sync_to_on_cluster
            if self._path_to_sync_to_on_cluster
            else f"~/{Path(self.full_local_path_str()).name}"
        )

    def full_local_path_str(self) -> str:
        return str(Path(self.local_path).expanduser().resolve())

    def __str__(self):
        return f"InstallTarget(local_path={self.local_path}, path_to_sync_to_on_cluster={self._path_to_sync_to_on_cluster})"


[docs]class Package(Resource): RESOURCE_TYPE = "package" # https://pytorch.org/get-started/locally/ # Note: no binaries exist for 11.4 (https://github.com/pytorch/pytorch/issues/75992) TORCH_INDEX_URLS = { "11.3": "https://download.pytorch.org/whl/cu113", "11.5": "https://download.pytorch.org/whl/cu115", "11.6": "https://download.pytorch.org/whl/cu116", "11.7": "https://download.pytorch.org/whl/cu117", "11.8": "https://download.pytorch.org/whl/cu118", "12.1": "https://download.pytorch.org/whl/cu121", "cpu": "https://download.pytorch.org/whl/cpu", }
[docs] def __init__( self, name: Optional[str] = None, install_method: Optional[str] = None, install_target: Optional[Union[str, "Folder"]] = None, install_args: Optional[str] = None, install_extras: Optional[str] = None, preferred_version: Optional[str] = None, dryrun: bool = False, **kwargs, # We have this here to ignore extra arguments when calling from from_config ): """ Runhouse Package resource. .. note:: To create a git package, please use the factory method :func:`package`. """ super().__init__( name=name, dryrun=dryrun, ) self.install_method = install_method self.install_target = install_target self.install_args = install_args self.install_extras = install_extras self.preferred_version = preferred_version
def config(self, condensed: bool = True): # If the package is just a simple Package.from_string string, no # need to store it in rns, just give back the string. # if self.install_method in ['pip', 'conda', 'git']: # return f'{self.install_method}:{self.name}' config = super().config(condensed) config["install_method"] = self.install_method config["install_target"] = ( ( self.install_target.local_path, self.install_target._path_to_sync_to_on_cluster, ) if isinstance(self.install_target, InstallTarget) else self.install_target ) config["install_args"] = self.install_args config["install_extras"] = self.install_extras config["preferred_version"] = self.preferred_version return config def __str__(self): if self.name: return f"Package: {self.name}" return f"Package: {self.install_target}" @staticmethod def _prepend_python_executable( install_cmd: str, conda_env_name: Optional[str] = None, venv_path: Optional[str] = None, cluster: "Cluster" = None, ): if venv_path: return install_cmd if cluster or conda_env_name: return f"python3 -m {install_cmd}" return f"{sys.executable} -m {install_cmd}" @staticmethod def _prepend_env_command( install_cmd: str, conda_env_name: Optional[str] = None, venv_path: Optional[str] = None, ): if conda_env_name: install_cmd = conda_env_cmd(cmd=install_cmd, conda_env_name=conda_env_name) if venv_path: install_cmd = venv_cmd(cmd=install_cmd, venv_path=venv_path) return install_cmd def _validate_folder_path(self): # If self.path is the same as the user's home directory, raise an error. # Check this with Path and expanduser to handle both relative and absolute paths. if isinstance( self.install_target, InstallTarget ) and self.install_target.full_local_path_str() in [ str(Path("~").expanduser()), str(Path("/")), ]: raise CodeSyncError( "Cannot sync the home directory. Please include a Python configuration file in a subdirectory." ) def _install_cmd_with_extras(self, install_target: str, install_extras: str): install_specific_version_of_target = "==" in install_target if install_specific_version_of_target: split_target = install_target.split( "==" ) # split the target package name from its version return f'"{split_target[0]}{install_extras}=={split_target[1]}"' return f'"{self.install_target}{install_extras}"' def _pip_install_cmd( self, conda_env_name: Optional[str] = None, venv_path: Optional[str] = None, cluster: "Cluster" = None, uv: bool = None, ): if uv is None: uv = self.install_method == "uv" install_args = f" {self.install_args}" if self.install_args else "" install_extras = f"[{self.install_extras}]" if self.install_extras else "" if isinstance(self.install_target, InstallTarget): if cluster: install_cmd = ( self.install_target.path_to_sync_to_on_cluster + install_extras + install_args ) else: install_cmd = ( self.install_target.full_local_path_str() + install_extras + install_args ) else: install_target = ( f'"{self.install_target}"' if not install_extras else self._install_cmd_with_extras( install_target=self.install_target, install_extras=install_extras ) ) install_cmd = install_target + install_args install_cmd = f"pip install {self._install_cmd_for_torch(install_cmd, cluster)}" if uv: install_cmd = f"uv {install_cmd}" else: # uv doesn't need python executable install_cmd = self._prepend_python_executable( install_cmd, cluster=cluster, conda_env_name=conda_env_name, venv_path=venv_path, ) install_cmd = self._prepend_env_command( install_cmd, conda_env_name=conda_env_name, venv_path=venv_path ) return install_cmd def _conda_install_cmd( self, conda_env_name: Optional[str] = None, cluster: "Cluster" = None ): install_args = f" {self.install_args}" if self.install_args else "" if isinstance(self.install_target, InstallTarget): install_cmd = f"{self.install_target.local_path}" + install_args else: install_cmd = self.install_target + install_args install_cmd = f"conda install -y {install_cmd}" install_cmd = self._prepend_env_command( install_cmd, conda_env_name=conda_env_name ) install_conda(cluster=cluster) return install_cmd def _install_and_validate_output( self, install_cmd: str, cluster: "Cluster" = None, node: Optional[str] = None ): install_res = run_setup_command(install_cmd, cluster=cluster, node=node) retcode = install_res[0] if retcode != 0: stdout, stderr = install_res[1], install_res[2] if INSUFFICIENT_DISK_MSG in stdout or INSUFFICIENT_DISK_MSG in stderr: raise InsufficientDiskError(command=install_cmd) raise RuntimeError( f"{self.install_method} install '{install_cmd}' failed, check that the package exists and is available for your platform." ) def _install( self, cluster: "Cluster" = None, node: Optional[str] = None, conda_env_name: Optional[str] = None, venv_path: Optional[str] = None, override_remote_version: bool = False, ): """Install package. Args: cluster (Optional[Cluster]): If provided, will install package on cluster using SSH. Otherwise, the assumption is that we are installing locally. (Default: ``None``) node (Optional[str]): Node on the cluster to install the package on, if using SSH. If ``cluster`` is provided without a ``node``, package will be installed on the head node. (Default: ``None``) conda_env_name (Optional[str]): Name of the conda environment to install the package on, if using SSH and installing in a specific conda env that is not activated by default. override_remote_version (bool, optional): If the package exists both locally and remotely, whether to override the remote version with the local version. By default, the local version will be installed only if the package does not already exist on the cluster. (Default: ``False``) """ logger.info(f"Installing {str(self)} with method {self.install_method}.") if self.install_method in ["pip", "uv"]: # If this is a generic pip package, with no version pinned, we want to check if there is a version # already installed. If there is, and ``override_remote_version`` is not set to ``True``, then we ignore # preferred version and leave the existing version. The user can also force a version install by # doing `numpy==2.0.0`. Else, we install the preferred version, that matches their local. if is_python_package_string(self.install_target): exists_remotely = ( run_setup_command( f"[ -d ~/{self.install_target} ]", cluster=cluster, conda_env_name=conda_env_name, venv_path=venv_path, node=node, stream_logs=False, )[0] == 0 ) if exists_remotely: self.install_target = InstallTarget( local_path=self.install_target, _path_to_sync_to_on_cluster=f"~/{self.install_target}", ) elif self.preferred_version is not None: if not override_remote_version: # Check if this is installed retcode = run_setup_command( f"python3 -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"", cluster=cluster, conda_env_name=conda_env_name, venv_path=venv_path, node=node, )[0] if retcode == 0: logger.info( f"{self.install_target} already installed. Skipping." ) return else: override_remote_version = True if override_remote_version: self.install_target = ( f"{self.install_target}=={self.preferred_version}" ) install_cmd = self._pip_install_cmd( conda_env_name=conda_env_name, venv_path=venv_path, cluster=cluster, uv=(self.install_method == "uv"), ) logger.info(f"Running via install_method pip: {install_cmd}") self._install_and_validate_output( install_cmd=install_cmd, cluster=cluster, node=node ) elif self.install_method == "conda": install_cmd = self._conda_install_cmd( conda_env_name=conda_env_name, cluster=cluster ) logger.info(f"Running via install_method conda: {install_cmd}") self._install_and_validate_output( install_cmd=install_cmd, cluster=cluster, node=node ) else: if self.install_method != "local": raise ValueError( f"Unknown install method {self.install_method}. Must be one of {INSTALL_METHODS}" ) # Need to append to path if self.install_method == "local": if isinstance(self.install_target, InstallTarget): obj_store.add_sys_path_to_all_processes( self.install_target.full_local_path_str() ) if not cluster else run_setup_command( f'export PATH="$PATH;{self.install_target.path_to_sync_to_on_cluster}"', cluster=cluster, venv_path=venv_path, node=node, ) elif not cluster: if Path(self.install_target).resolve().expanduser().exists(): obj_store.add_sys_path_to_all_processes( str(Path(self.install_target).resolve().expanduser()) ) else: raise ValueError( f"install_target {self.install_target} must be a Folder or a path to a directory for install_method {self.install_method}" ) else: raise ValueError( f"If cluster is provided, install_target must be a Folder for install_method {self.install_method}" ) # ---------------------------------- # Torch Install Helpers # ---------------------------------- def _reqs_install_cmd_for_torch( self, reqs_path, reqs_list, install_args="", cluster=None ): """Read requirements from file, append --index-url and --extra-index-url where relevant for torch packages, and return list of formatted packages.""" # if torch extra index url is already defined by the user or torch isn't a req, directly pip install reqs file if not any("torch" in req for req in reqs_list): return f"-r {reqs_path}" + install_args for req in reqs_list: if ( "--index-url" in req or "--extra-index-url" in req ) and "pytorch.org" in req: return f"-r {reqs_path}" + install_args # add extra-index-url for torch if not found cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) return f"-r {reqs_path} --extra-index-url {self._torch_index_url(cuda_version_or_cpu)}" def _install_cmd_for_torch(self, install_cmd, cluster=None): """Return the correct formatted pip install command for the torch package(s) provided.""" if install_cmd.startswith("#"): return None torch_source_packages = ["torch", "torchvision", "torchaudio"] if not any([x in install_cmd for x in torch_source_packages]): return install_cmd if "+" in install_cmd or "--extra-index-url" in install_cmd: return install_cmd packages_to_install: list = self._packages_to_install_from_cmd(install_cmd) final_install_cmd = "" cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster) for package_install_cmd in packages_to_install: formatted_cmd = self._install_url_for_torch_package( package_install_cmd, cuda_version_or_cpu ) if formatted_cmd: final_install_cmd += formatted_cmd + " " final_install_cmd = final_install_cmd.rstrip() return final_install_cmd if final_install_cmd != "" else None def _install_url_for_torch_package(self, install_cmd, cuda_version_or_cpu): """Build the full install command, adding a --index-url and --extra-index-url where applicable.""" # Grab the relevant index url for torch based on the CUDA version provided if "," in install_cmd: # If installing a range of versions format the string to make it compatible with `pip_install` method install_cmd = install_cmd.replace(" ", "") index_url = self._torch_index_url(cuda_version_or_cpu) if index_url and not any( specifier in install_cmd for specifier in ["--index-url ", "-i "] ): install_cmd = f"{install_cmd} --index-url {index_url}" if "--extra-index-url" not in install_cmd: return f"{install_cmd} --extra-index-url https://pypi.python.org/simple/" return install_cmd def _torch_index_url(self, cuda_version_or_cpu: str): return self.TORCH_INDEX_URLS.get(cuda_version_or_cpu) @staticmethod def _packages_to_install_from_cmd(install_cmd: str): """Split a string of command(s) into a list of separate commands""" # Remove any --extra-index-url flags from the install command (to be added later by default) install_cmd = re.sub(r"--extra-index-url\s+\S+", "", install_cmd) install_cmd = install_cmd.strip() if ", " in install_cmd: # Ex: 'torch>=1.13.0,<2.0.0' return [install_cmd] matches = re.findall(r"(\S+(?:\s+(-i|--index-url)\s+\S+)?)", install_cmd) packages_to_install = [match[0] for match in matches] return packages_to_install
[docs] def to( self, system: Union[str, Dict, "Cluster"], ): """Copy the package onto filesystem or cluster, and return the new Package object. Args: system (str, Dict, or Cluster): Cluster to send the package to. """ if not isinstance(self.install_target, InstallTarget): return self system = _get_cluster_from(system) if isinstance(system, Cluster) and system.on_this_cluster(): return self self._validate_folder_path() if isinstance(system, Cluster): system.rsync( source=str(self.install_target.full_local_path_str()), dest=str(self.install_target.path_to_sync_to_on_cluster), up=True, contents=True, node="all", parallel=True, ) new_package = copy.copy(self) new_package.install_target = InstallTarget( local_path=self.install_target.path_to_sync_to_on_cluster, _path_to_sync_to_on_cluster=self.install_target.path_to_sync_to_on_cluster, ) return new_package return self
[docs] @staticmethod def split_req_install_method(req_str: str): """Split a requirements string into a install method and the rest of the string.""" splat = req_str.split(":", 1) return (splat[0], splat[1]) if len(splat) > 1 else ("", splat[0])
[docs] @staticmethod def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True): if isinstance(config.get("install_target"), (tuple, list)): config["install_target"] = InstallTarget( local_path=config["install_target"][0], _path_to_sync_to_on_cluster=config["install_target"][1], ) return Package(**config, dryrun=dryrun)
@staticmethod def from_string(specifier: str, dryrun: bool = False): install_method, target_and_args = Package.split_req_install_method(specifier) # Handles a case like "torch --index-url https://download.pytorch.org/whl/cu113" target, args = ( target_and_args.split(" ", 1) if " " in target_and_args else (target_and_args, "") ) target, extras = split_pip_extras(target_and_args) # If the target is a path # If it doesn't have a /, we're assuming it's a pip installable thing first and foremost if "/" in target or "~" in target or install_method == "local": # We need to do this because relative paths are relative to the current working directory! abs_target = ( Path(target).expanduser() if Path(target).expanduser().is_absolute() else Path(locate_working_dir()) / target ) if abs_target.exists(): target = InstallTarget( local_path=str(abs_target), _path_to_sync_to_on_cluster=None ) # If install method is not provided, default to pip if not install_method: if Path(specifier).expanduser().resolve().exists(): install_method = "local" else: install_method = "pip" # Attempt to use the same version of the package installed locally, if pip_install (pip) # or a sync_package (local) preferred_version = None if is_python_package_string(target): locally_installed_version = find_locally_installed_version(target) if locally_installed_version: # Check if this is a package that was installed from local local_install_path = get_local_install_path(target) if local_install_path and Path(local_install_path).exists(): install_method = install_method or "local" if install_method == "local": target = InstallTarget( local_path=local_install_path, _path_to_sync_to_on_cluster=None, ) else: # We want to preferrably install this version of the package server-side install_method == install_method or "pip" if install_method in ["pip", "uv"]: preferred_version = locally_installed_version # If install method is still not set, default to pip install_method = install_method or "pip" # "Local" install method is a special case where we just copy a local folder and add to path if install_method == "local": return Package( install_target=target, install_method=install_method, dryrun=dryrun ) elif install_method in ["pip", "uv", "conda"]: return Package( install_target=target, install_args=args, install_method=install_method, install_extras=extras, preferred_version=preferred_version, dryrun=dryrun, ) else: raise ValueError( f"Unknown install method {install_method}. Must be one of {INSTALL_METHODS}" )
[docs]def package( name: str = None, install_method: str = None, install_str: str = None, path: str = None, system: str = None, load_from_den: bool = True, dryrun: bool = False, ) -> Package: """ Builds an instance of :class:`Package`. Args: name (str, optional): Name to assign the package resource. install_method (str, optional): Method for installing the package. Options: [``pip``, ``conda``, ``local``] install_str (str, optional): Additional arguments to install. path (str, optional): URL of the package to install. system (str, optional): File system or cluster on which the package lives. Currently this must a cluster or one of: [``file``, ``s3``, ``gs``]. load_from_den (bool, optional): Whether to try loading the Package from Den. (Default: ``True``) dryrun (bool, optional): Whether to create the Package if it doesn't exist, or load the Package object as a dryrun. (Default: ``False``) Returns: Package: The resulting package. Example: >>> reloaded_package = rh.package(name="my-package") >>> local_package = rh.package(path="local/folder/path", install_method="local") """ if name and not any([install_method, install_str, path, system]): # If only the name is provided and dryrun is set to True return Package.from_name(name, load_from_den=load_from_den, dryrun=dryrun) install_target = None install_args = None if path is not None: install_target = (path, None) install_args = install_str elif install_str is not None: install_target, install_args = install_str.split(" ", 1) return Package( install_method=install_method, install_target=install_target, install_args=install_args, name=name, dryrun=dryrun, )