import copy
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Union
from runhouse.globals import obj_store
from runhouse.logger import get_logger
from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import (
_get_cluster_from,
detect_cuda_version_or_cpu,
)
from runhouse.resources.resource import Resource
from runhouse.utils import (
conda_env_cmd,
find_locally_installed_version,
get_local_install_path,
install_conda,
is_python_package_string,
locate_working_dir,
run_setup_command,
)
INSTALL_METHODS = {"local", "reqs", "pip", "conda", "rh"}
logger = get_logger(__name__)
class CodeSyncError(Exception):
pass
@dataclass
class InstallTarget:
local_path: str
_path_to_sync_to_on_cluster: Optional[str] = None
@property
def path_to_sync_to_on_cluster(self) -> str:
return (
self._path_to_sync_to_on_cluster
if self._path_to_sync_to_on_cluster
else f"~/{Path(self.full_local_path_str()).name}"
)
def full_local_path_str(self) -> str:
return str(Path(self.local_path).expanduser().resolve())
def __str__(self):
return f"InstallTarget(local_path={self.local_path}, path_to_sync_to_on_cluster={self._path_to_sync_to_on_cluster})"
[docs]class Package(Resource):
RESOURCE_TYPE = "package"
# https://pytorch.org/get-started/locally/
# Note: no binaries exist for 11.4 (https://github.com/pytorch/pytorch/issues/75992)
TORCH_INDEX_URLS = {
"11.3": "https://download.pytorch.org/whl/cu113",
"11.5": "https://download.pytorch.org/whl/cu115",
"11.6": "https://download.pytorch.org/whl/cu116",
"11.7": "https://download.pytorch.org/whl/cu117",
"11.8": "https://download.pytorch.org/whl/cu118",
"12.1": "https://download.pytorch.org/whl/cu121",
"cpu": "https://download.pytorch.org/whl/cpu",
}
[docs] def __init__(
self,
name: Optional[str] = None,
install_method: Optional[str] = None,
install_target: Optional[Union[str, "Folder"]] = None,
install_args: Optional[str] = None,
preferred_version: Optional[str] = None,
dryrun: bool = False,
**kwargs, # We have this here to ignore extra arguments when calling from from_config
):
"""
Runhouse Package resource.
.. note::
To create a git package, please use the factory method :func:`package`.
"""
super().__init__(
name=name,
dryrun=dryrun,
)
self.install_method = install_method
self.install_target = install_target
self.install_args = install_args
self.preferred_version = preferred_version
def config(self, condensed: bool = True):
# If the package is just a simple Package.from_string string, no
# need to store it in rns, just give back the string.
# if self.install_method in ['pip', 'conda', 'git']:
# return f'{self.install_method}:{self.name}'
config = super().config(condensed)
config["install_method"] = self.install_method
config["install_target"] = (
(
self.install_target.local_path,
self.install_target._path_to_sync_to_on_cluster,
)
if isinstance(self.install_target, InstallTarget)
else self.install_target
)
config["install_args"] = self.install_args
config["preferred_version"] = self.preferred_version
return config
def __str__(self):
if self.name:
return f"Package: {self.name}"
return f"Package: {self.install_target}"
@staticmethod
def _prepend_python_executable(
install_cmd: str,
conda_env_name: Optional[str] = None,
cluster: "Cluster" = None,
):
return (
f"python3 -m {install_cmd}"
if cluster or conda_env_name
else f"{sys.executable} -m {install_cmd}"
)
@staticmethod
def _prepend_env_command(install_cmd: str, conda_env_name: Optional[str] = None):
if conda_env_name:
install_cmd = conda_env_cmd(cmd=install_cmd, conda_env_name=conda_env_name)
return install_cmd
def _validate_folder_path(self):
# If self.path is the same as the user's home directory, raise an error.
# Check this with Path and expanduser to handle both relative and absolute paths.
if isinstance(
self.install_target, InstallTarget
) and self.install_target.full_local_path_str() in [
str(Path("~").expanduser()),
str(Path("/")),
]:
raise CodeSyncError(
"Cannot sync the home directory. Please include a Python configuration file in a subdirectory."
)
def _pip_install_cmd(
self,
conda_env_name: Optional[str] = None,
cluster: "Cluster" = None,
):
install_args = f" {self.install_args}" if self.install_args else ""
if isinstance(self.install_target, InstallTarget):
install_cmd = self.install_target.full_local_path_str() + install_args
else:
install_target = f'"{self.install_target}"'
install_cmd = install_target + install_args
install_cmd = f"pip install {self._install_cmd_for_torch(install_cmd, cluster)}"
install_cmd = self._prepend_python_executable(
install_cmd, cluster=cluster, conda_env_name=conda_env_name
)
install_cmd = self._prepend_env_command(
install_cmd, conda_env_name=conda_env_name
)
return install_cmd
def _conda_install_cmd(
self, conda_env_name: Optional[str] = None, cluster: "Cluster" = None
):
install_args = f" {self.install_args}" if self.install_args else ""
if isinstance(self.install_target, InstallTarget):
install_cmd = f"{self.install_target.local_path}" + install_args
else:
install_cmd = self.install_target + install_args
install_cmd = f"conda install -y {install_cmd}"
install_cmd = self._prepend_env_command(
install_cmd, conda_env_name=conda_env_name
)
install_conda(cluster=cluster)
return install_cmd
def _reqs_install_cmd(
self, conda_env_name: Optional[str] = None, cluster: "Cluster" = None
):
install_args = f" {self.install_args}" if self.install_args else ""
if not isinstance(self.install_target, InstallTarget):
install_cmd = self.install_target + install_args
else:
on_cluster_path = self.install_target.local_path
# If not cluster, we're on the cluster, and we must deal with the path locally
if not cluster:
reqs_path = f"{on_cluster_path}/requirements.txt"
if not Path(reqs_path).expanduser().exists():
return None
with open(str(Path(reqs_path).expanduser())) as f:
reqs_list = f.readlines()
# Otherwise, make sure the target folder is on the cluster,
# and read reqs from the cluster
else:
if "requirements.txt" not in cluster._folder_ls(
path=on_cluster_path, full_paths=False
):
return None
reqs_list = (
cluster._folder_get(f"{on_cluster_path}/requirements.txt", mode="r")
.strip("\n")
.split("\n")
)
reqs_path = f"{on_cluster_path}/requirements.txt"
install_cmd = self._reqs_install_cmd_for_torch(
reqs_path, reqs_list, install_args, cluster=cluster
)
install_cmd = f"pip install {install_cmd}"
install_cmd = self._prepend_python_executable(
install_cmd, conda_env_name=conda_env_name, cluster=cluster
)
install_cmd = self._prepend_env_command(
install_cmd, conda_env_name=conda_env_name
)
return install_cmd
def _install(
self,
cluster: "Cluster" = None,
node: Optional[str] = None,
conda_env_name: Optional[str] = None,
force_sync_local: bool = False,
):
"""Install package.
Args:
cluster (Optional[Cluster]): If provided, will install package on cluster using SSH. Otherwise, the
assumption is that we are installing locally. (Default: ``None``)
node (Optional[str]): Node on the cluster to install the package on, if using SSH. If ``cluster`` is
provided without a ``node``, package will be installed on the head node. (Default: ``None``)
conda_env_name (Optional[str]): Name of the conda environment to install the package on, if using SSH and
installing in a specific conda env that is not activated by default.
force_sync_local (bool, optional): If the package exists both locally and remotely, whether to override
the remote version with the local version. By default, the local version will be installed only if
the package does not already exist on the cluster. (Default: ``False``)
"""
logger.info(f"Installing {str(self)} with method {self.install_method}.")
if self.install_method == "pip":
# If this is a generic pip package, with no version pinned, we want to check if there is a version
# already installed. If there is, and ``force_sync_local`` is not set to ``True``, then we ignore
# preferred version and leave the existing version. The user can also force a version install by
# doing `numpy==2.0.0`. Else, we install the preferred version, that matches their local.
if (
is_python_package_string(self.install_target)
and self.preferred_version is not None
):
# Check if this is installed
retcode = run_setup_command(
f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"",
cluster=cluster,
node=node,
)[0]
if retcode != 0 or force_sync_local:
self.install_target = (
f"{self.install_target}=={self.preferred_version}"
)
install_cmd = self._pip_install_cmd(
conda_env_name=conda_env_name, cluster=cluster
)
logger.info(f"Running via install_method pip: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Pip install {install_cmd} failed, check that the package exists and is available for your platform."
)
elif self.install_method == "conda":
install_cmd = self._conda_install_cmd(
conda_env_name=conda_env_name, cluster=cluster
)
logger.info(f"Running via install_method conda: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Conda install {install_cmd} failed, check that the package exists and is "
"available for your platform."
)
elif self.install_method == "reqs":
install_cmd = self._reqs_install_cmd(
conda_env_name=conda_env_name, cluster=cluster
)
if install_cmd:
logger.info(f"Running via install_method reqs: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform."
)
else:
logger.info(
f"{self.install_target.full_local_path_str()}/requirements.txt not found, skipping reqs install"
)
else:
if self.install_method != "local":
raise ValueError(
f"Unknown install method {self.install_method}. Must be one of {INSTALL_METHODS}"
)
# Need to append to path
if self.install_method in ["local", "reqs"]:
if isinstance(self.install_target, InstallTarget):
obj_store.add_sys_path_to_all_processes(
self.install_target.full_local_path_str()
) if not cluster else run_setup_command(
f"export PATH=$PATH;{self.install_target.full_local_path_str()}",
cluster=cluster,
node=node,
)
elif not cluster:
if Path(self.install_target).resolve().expanduser().exists():
obj_store.add_sys_path_to_all_processes(
str(Path(self.install_target).resolve().expanduser())
)
else:
raise ValueError(
f"install_target {self.install_target} must be a Folder or a path to a directory for install_method {self.install_method}"
)
else:
raise ValueError(
f"If cluster is provided, install_target must be a Folder for install_method {self.install_method}"
)
# ----------------------------------
# Torch Install Helpers
# ----------------------------------
def _reqs_install_cmd_for_torch(
self, reqs_path, reqs_list, install_args="", cluster=None
):
"""Read requirements from file, append --index-url and --extra-index-url where relevant for torch packages,
and return list of formatted packages."""
# if torch extra index url is already defined by the user or torch isn't a req, directly pip install reqs file
if not any("torch" in req for req in reqs_list):
return f"-r {reqs_path}" + install_args
for req in reqs_list:
if (
"--index-url" in req or "--extra-index-url" in req
) and "pytorch.org" in req:
return f"-r {reqs_path}" + install_args
# add extra-index-url for torch if not found
cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster)
return f"-r {reqs_path} --extra-index-url {self._torch_index_url(cuda_version_or_cpu)}"
def _install_cmd_for_torch(self, install_cmd, cluster=None):
"""Return the correct formatted pip install command for the torch package(s) provided."""
if install_cmd.startswith("#"):
return None
torch_source_packages = ["torch", "torchvision", "torchaudio"]
if not any([x in install_cmd for x in torch_source_packages]):
return install_cmd
if "+" in install_cmd or "--extra-index-url" in install_cmd:
return install_cmd
packages_to_install: list = self._packages_to_install_from_cmd(install_cmd)
final_install_cmd = ""
cuda_version_or_cpu = detect_cuda_version_or_cpu(cluster=cluster)
for package_install_cmd in packages_to_install:
formatted_cmd = self._install_url_for_torch_package(
package_install_cmd, cuda_version_or_cpu
)
if formatted_cmd:
final_install_cmd += formatted_cmd + " "
final_install_cmd = final_install_cmd.rstrip()
return final_install_cmd if final_install_cmd != "" else None
def _install_url_for_torch_package(self, install_cmd, cuda_version_or_cpu):
"""Build the full install command, adding a --index-url and --extra-index-url where applicable."""
# Grab the relevant index url for torch based on the CUDA version provided
if "," in install_cmd:
# If installing a range of versions format the string to make it compatible with `pip_install` method
install_cmd = install_cmd.replace(" ", "")
index_url = self._torch_index_url(cuda_version_or_cpu)
if index_url and not any(
specifier in install_cmd for specifier in ["--index-url ", "-i "]
):
install_cmd = f"{install_cmd} --index-url {index_url}"
if "--extra-index-url" not in install_cmd:
return f"{install_cmd} --extra-index-url https://pypi.python.org/simple/"
return install_cmd
def _torch_index_url(self, cuda_version_or_cpu: str):
return self.TORCH_INDEX_URLS.get(cuda_version_or_cpu)
@staticmethod
def _packages_to_install_from_cmd(install_cmd: str):
"""Split a string of command(s) into a list of separate commands"""
# Remove any --extra-index-url flags from the install command (to be added later by default)
install_cmd = re.sub(r"--extra-index-url\s+\S+", "", install_cmd)
install_cmd = install_cmd.strip()
if ", " in install_cmd:
# Ex: 'torch>=1.13.0,<2.0.0'
return [install_cmd]
matches = re.findall(r"(\S+(?:\s+(-i|--index-url)\s+\S+)?)", install_cmd)
packages_to_install = [match[0] for match in matches]
return packages_to_install
[docs] def to(
self,
system: Union[str, Dict, "Cluster"],
path: Optional[str] = None,
):
"""Copy the package onto filesystem or cluster, and return the new Package object.
Args:
system (str, Dict, or Cluster): Cluster to send the package to.
"""
if not isinstance(self.install_target, InstallTarget):
return self
system = _get_cluster_from(system)
if isinstance(system, Cluster) and system.on_this_cluster():
return self
self._validate_folder_path()
if isinstance(system, Cluster):
system.rsync(
source=str(self.install_target.full_local_path_str()),
dest=str(self.install_target.path_to_sync_to_on_cluster),
up=True,
contents=True,
node="all",
)
new_package = copy.copy(self)
new_package.install_target = InstallTarget(
local_path=self.install_target.path_to_sync_to_on_cluster,
_path_to_sync_to_on_cluster=self.install_target.path_to_sync_to_on_cluster,
)
return new_package
return self
[docs] @staticmethod
def split_req_install_method(req_str: str):
"""Split a requirements string into a install method and the rest of the string."""
splat = req_str.split(":", 1)
return (splat[0], splat[1]) if len(splat) > 1 else ("", splat[0])
[docs] @staticmethod
def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True):
if isinstance(config.get("install_target"), (tuple, list)):
config["install_target"] = InstallTarget(
local_path=config["install_target"][0],
_path_to_sync_to_on_cluster=config["install_target"][1],
)
if config.get("resource_subtype") == "GitPackage":
from runhouse import GitPackage
return GitPackage.from_config(
config, dryrun=dryrun, _resolve_children=_resolve_children
)
return Package(**config, dryrun=dryrun)
@staticmethod
def from_string(specifier: str, dryrun: bool = False):
if specifier == "requirements.txt":
specifier = "reqs:./"
# Use regex to check if specifier matches '<method>:https://github.com/<path>' or 'https://github.com/<path>'
match = re.search(
r"^(?:(?P<method>[^:]+):)?(?P<path>https://github.com/.+)", specifier
)
if match:
install_method = match.group("method")
url = match.group("path")
from runhouse.resources.packages.git_package import git_package
return git_package(
git_url=url, install_method=install_method, dryrun=dryrun
)
install_method, target_and_args = Package.split_req_install_method(specifier)
# Handles a case like "torch --index-url https://download.pytorch.org/whl/cu113"
rel_target, args = (
target_and_args.split(" ", 1)
if " " in target_and_args
else (target_and_args, "")
)
# We need to do this because relative paths are relative to the current working directory!
abs_target = (
Path(rel_target).expanduser()
if Path(rel_target).expanduser().is_absolute()
else Path(locate_working_dir()) / rel_target
)
if abs_target.exists():
target = InstallTarget(
local_path=str(abs_target), _path_to_sync_to_on_cluster=None
)
else:
target = rel_target
# If install method is not provided, we need to infer it
if not install_method:
if Path(specifier).expanduser().resolve().exists():
install_method = "reqs"
else:
install_method = "pip"
# If we are just defaulting to pip, attempt to install the same version of the package
# that is already installed locally
# Check if the target is only letters, nothing else. This means its a string like 'numpy'.
preferred_version = None
if install_method == "pip" and is_python_package_string(target):
locally_installed_version = find_locally_installed_version(target)
if locally_installed_version:
# Check if this is a package that was installed from local
local_install_path = get_local_install_path(target)
if local_install_path and Path(local_install_path).exists():
target = InstallTarget(
local_path=local_install_path, _path_to_sync_to_on_cluster=None
)
else:
# We want to preferrably install this version of the package server-side
preferred_version = locally_installed_version
# "Local" install method is a special case where we just copy a local folder and add to path
if install_method == "local":
return Package(
install_target=target, install_method=install_method, dryrun=dryrun
)
elif install_method in ["reqs", "pip", "conda"]:
return Package(
install_target=target,
install_args=args,
install_method=install_method,
preferred_version=preferred_version,
dryrun=dryrun,
)
elif install_method == "rh":
# Calling the factory method below
return package(name=specifier[len("rh:") :], dryrun=dryrun)
else:
raise ValueError(
f"Unknown install method {install_method}. Must be one of {INSTALL_METHODS}"
)
[docs]def package(
name: str = None,
install_method: str = None,
install_str: str = None,
path: str = None,
system: str = None,
load_from_den: bool = True,
dryrun: bool = False,
) -> Package:
"""
Builds an instance of :class:`Package`.
Args:
name (str, optional): Name to assign the package resource.
install_method (str, optional): Method for installing the package.
Options: [``pip``, ``conda``, ``reqs``, ``local``]
install_str (str, optional): Additional arguments to install.
path (str, optional): URL of the package to install.
system (str, optional): File system or cluster on which the package lives.
Currently this must a cluster or one of: [``file``, ``s3``, ``gs``].
load_from_den (bool, optional): Whether to try loading the Package from Den. (Default: ``True``)
dryrun (bool, optional): Whether to create the Package if it doesn't exist, or load the Package
object as a dryrun. (Default: ``False``)
Returns:
Package: The resulting package.
Example:
>>> reloaded_package = rh.package(name="my-package")
>>> local_package = rh.package(path="local/folder/path", install_method="local")
"""
if name and not any([install_method, install_str, path, system]):
# If only the name is provided and dryrun is set to True
return Package.from_name(name, load_from_den=load_from_den, dryrun=dryrun)
install_target = None
install_args = None
if path is not None:
install_target = (path, None)
install_args = install_str
elif install_str is not None:
install_target, install_args = install_str.split(" ", 1)
return Package(
install_method=install_method,
install_target=install_target,
install_args=install_args,
name=name,
dryrun=dryrun,
)