You are viewing stable version. Click here to see docs for the latest stable version.

Source code for runhouse.resources.envs.env

import copy
import os
import shlex
from pathlib import Path
from typing import Dict, List, Optional, Union

from runhouse.globals import obj_store
from runhouse.logger import get_logger

from runhouse.resources.envs.utils import _process_env_vars, run_setup_command
from runhouse.resources.hardware import _get_cluster_from, Cluster
from runhouse.resources.packages import InstallTarget, Package
from runhouse.resources.resource import Resource
from runhouse.utils import run_with_logs

logger = get_logger(__name__)


[docs]class Env(Resource): RESOURCE_TYPE = "env"
[docs] def __init__( self, name: Optional[str] = None, reqs: List[Union[str, Package]] = [], setup_cmds: List[str] = None, env_vars: Union[Dict, str] = {}, working_dir: Optional[Union[str, Path]] = None, secrets: Optional[Union[str, "Secret"]] = [], compute: Optional[Dict] = {}, dryrun: bool = True, **kwargs, # We have this here to ignore extra arguments when calling from_config ): """ Runhouse Env object. .. note:: To create an Env, please use the factory method :func:`env`. """ super().__init__(name=name, dryrun=dryrun) self._reqs = reqs self.setup_cmds = setup_cmds self.env_vars = env_vars self.working_dir = working_dir self.secrets = secrets self.compute = compute
@property def env_name(self): return self.name or "base"
[docs] @staticmethod def from_config(config: dict, dryrun: bool = False, _resolve_children: bool = True): config["reqs"] = [ Package.from_config(req, dryrun=True, _resolve_children=_resolve_children) if isinstance(req, dict) else req for req in config.get("reqs", []) ] config["working_dir"] = ( Package.from_config( config["working_dir"], dryrun=True, _resolve_children=_resolve_children ) if isinstance(config.get("working_dir"), dict) else config.get("working_dir") ) resource_subtype = config.get("resource_subtype") if resource_subtype == "CondaEnv": from runhouse import CondaEnv return CondaEnv(**config, dryrun=dryrun) return Env(**config, dryrun=dryrun)
@staticmethod def _set_env_vars(env_vars): for k, v in env_vars.items(): os.environ[k] = v
[docs] def add_env_var(self, key: str, value: str): """Add an env var to the environment. Environment must be re-installed to propagate new environment variables if it already lives on a cluster.""" self.env_vars.update({key: value})
def config(self, condensed=True): config = super().config(condensed) self.save_attrs_to_config( config, ["setup_cmds", "env_vars", "env_name", "compute"] ) config.update( { "reqs": [ self._resource_string_for_subconfig(package, condensed) for package in self._reqs ], "working_dir": self._resource_string_for_subconfig( self.working_dir, condensed ), } ) return config @property def reqs(self): return (self._reqs or []) + ([self.working_dir] if self.working_dir else []) @reqs.setter def reqs(self, reqs): self._reqs = reqs def _reqs_to(self, system: Union[str, Cluster], path=None): """Send self.reqs to the system (cluster or file system)""" new_reqs = [] for req in self.reqs: if isinstance(req, str): new_req = Package.from_string(req) req = new_req if isinstance(req, Package) and isinstance( req.install_target, InstallTarget ): req = ( req.to(system, path=path) if isinstance(system, Cluster) else req.to(system, path=path) ) new_reqs.append(req) if self.working_dir: return new_reqs[:-1], new_reqs[-1] return new_reqs, None def _secrets_to(self, system: Union[str, Cluster]): from runhouse.resources.secrets import Secret new_secrets = [] for secret in self.secrets: if isinstance(secret, str): secret = Secret.from_name(secret) new_secrets.append(secret.to(system=system, env=self)) return new_secrets def _install_reqs( self, cluster: Cluster = None, reqs: List = None, node: str = "all" ): reqs = reqs or self.reqs if reqs: for package in reqs: if isinstance(package, str): pkg = Package.from_string(package) if pkg.install_method in ["reqs", "local"] and cluster: pkg = pkg.to(cluster) elif hasattr(package, "_install"): pkg = package else: raise ValueError(f"package {package} not recognized") logger.debug(f"Installing package: {str(pkg)}") pkg._install(env=self, cluster=cluster, node=node) def _run_setup_cmds( self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all" ): setup_cmds = setup_cmds or self.setup_cmds if not setup_cmds: return for cmd in setup_cmds: cmd = self._full_command(cmd) run_setup_command( cmd, cluster=cluster, env_vars=_process_env_vars(self.env_vars), node=node, )
[docs] def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"): """Locally install packages and run setup commands. Args: force (bool, optional): Whether to setup the installation again if the env already exists on the cluster. (Default: ``False``) cluster (Clsuter, optional): Cluster to install the env on. If not provided, env is installed on the current cluster. (Default: ``None``) node (str, optional): Node to install the env on. (Default: ``"all"``) """ # If we're doing the install remotely via SSH (e.g. for default_env), there is no cache if not cluster: # Hash the config_for_rns to check if we need to install env_config = self.config() # Remove the name because auto-generated names will be different, but the installed components are the same env_config.pop("name") install_hash = hash(str(env_config)) # Check the existing hash if install_hash in obj_store.installed_envs and not force: logger.debug("Env already installed, skipping") return obj_store.installed_envs[install_hash] = self.name self._install_reqs(cluster=cluster, node=node) self._run_setup_cmds(cluster=cluster, node=node)
def _full_command(self, command: str): if self._run_cmd: return f"{self._run_cmd} ${{SHELL:-/bin/bash}} -c {shlex.quote(command)}" return command def _run_command(self, command: str, **kwargs): """Run command locally inside the environment""" command = self._full_command(command) logger.info(f"Running command in {self.name}: {command}") return run_with_logs(command, **kwargs)
[docs] def to( self, system: Union[str, Cluster], node_idx: Optional[int] = None, path: str = None, force_install: bool = False, ): """ Send environment to the system, and set it up if on a cluster. Args: system (str or Cluster): Cluster or file system to send the env to. node_idx (int, optional): Node index of the cluster to send the env to. If not specified, uses the head node. (Default: ``None``) path (str, optional): Path on the cluster to sync the env's working dir to. Uses a default path if not specified. (Default: ``None``) force_install (bool, optional): Whether to setup the installation again if the env already exists on the cluster. (Default: ``False``) Example: >>> env = rh.env(reqs=["numpy", "pip"]) >>> cluster_env = env.to(my_cluster) >>> s3_env = env.to("s3", path="s3_bucket/my_env") """ system = _get_cluster_from(system) if ( isinstance(system, Cluster) and node_idx is not None and node_idx >= len(system.ips) ): raise ValueError( f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds." ) new_env = copy.deepcopy(self) new_env.reqs, new_env.working_dir = self._reqs_to(system, path) if isinstance(system, Cluster): if node_idx is not None: new_env.compute = new_env.compute or {} new_env.compute["node_idx"] = node_idx key = ( system.put_resource(new_env) if new_env.name else system.default_env.name ) env_vars = _process_env_vars(self.env_vars) if env_vars: system.call(key, "_set_env_vars", env_vars) if new_env.name: system.call(key, "install", force=force_install) else: system.call(key, "_install_reqs", reqs=new_env.reqs) system.call(key, "_run_setup_cmds", setup_cmds=new_env.setup_cmds) # Secrets are resources that go in the env, so put them in after the env is created new_env.secrets = self._secrets_to(system) return new_env
@property def _activate_cmd(self): return "" @property def _run_cmd(self): return ""