You are viewing v0.0.12 version. Click here to see docs for the latest stable version.

Source code for runhouse.resources.envs.env

import copy
import logging
import os
import shlex
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Union

from runhouse.globals import obj_store
from runhouse.resources.folders import Folder
from runhouse.resources.hardware import _get_cluster_from, Cluster
from runhouse.resources.packages import Package
from runhouse.resources.resource import Resource

from .utils import _env_vars_from_file


logger = logging.getLogger(__name__)


[docs]class Env(Resource): RESOURCE_TYPE = "env" DEFAULT_NAME = "base_env"
[docs] def __init__( self, name: Optional[str] = None, reqs: List[Union[str, Package]] = [], setup_cmds: List[str] = None, env_vars: Union[Dict, str] = {}, working_dir: Optional[Union[str, Path]] = None, dryrun: bool = True, **kwargs, # We have this here to ignore extra arguments when calling from_config ): """ Runhouse Env object. .. note:: To create an Env, please use the factory method :func:`env`. """ super().__init__(name=name, dryrun=dryrun) self._reqs = reqs self.setup_cmds = setup_cmds self.env_vars = env_vars self.working_dir = working_dir
@property def env_name(self): return self.name or "base"
[docs] @staticmethod def from_config(config: dict, dryrun: bool = False): """Create an Env object from a config dict""" config["reqs"] = [ Package.from_config(req, dryrun=True) if isinstance(req, dict) else req for req in config.get("reqs", []) ] config["working_dir"] = ( Package.from_config(config["working_dir"], dryrun=True) if isinstance(config["working_dir"], dict) else config["working_dir"] ) resource_subtype = config.get("resource_subtype") if resource_subtype == "CondaEnv": from runhouse import CondaEnv return CondaEnv(**config, dryrun=dryrun) return Env(**config, dryrun=dryrun)
@staticmethod def _set_env_vars(env_vars): for k, v in env_vars.items(): os.environ[k] = v @property def config_for_rns(self): config = super().config_for_rns config.update( { "reqs": [ self._resource_string_for_subconfig(package) for package in self._reqs ], "setup_cmds": self.setup_cmds, "env_vars": self.env_vars, "working_dir": self._resource_string_for_subconfig(self.working_dir), "env_name": self.env_name, } ) return config @property def reqs(self): return (self._reqs or []) + ([self.working_dir] if self.working_dir else []) @reqs.setter def reqs(self, reqs): self._reqs = reqs def _reqs_to(self, system: Union[str, Cluster], path=None, mount=False): """Send self.reqs to the system (cluster or file system)""" new_reqs = [] for req in self.reqs: if isinstance(req, str): new_req = Package.from_string(req) if isinstance(new_req.install_target, Folder): req = new_req if isinstance(req, Package) and isinstance(req.install_target, Folder): req = ( req.to(system, path=path, mount=mount) if isinstance(system, Cluster) else req.to(system, path=path) ) new_reqs.append(req) if self.working_dir: return new_reqs[:-1], new_reqs[-1] return new_reqs, None
[docs] def install(self, force=False): """Locally install packages and run setup commands.""" # Hash the config_for_rns to check if we need to install env_config = self.config_for_rns # Remove the name because auto-generated names will be different, but the installed components are the same env_config.pop("name") install_hash = hash(str(env_config)) # Check the existing hash if install_hash in obj_store.installed_envs and not force: logger.info("Env already installed, skipping") return obj_store.installed_envs[install_hash] = self.name for package in self.reqs: if isinstance(package, str): pkg = Package.from_string(package) elif hasattr(package, "_install"): pkg = package else: raise ValueError(f"package {package} not recognized") logger.info(f"Installing package: {str(pkg)}") pkg._install(self) return self.run(self.setup_cmds) if self.setup_cmds else None
[docs] def run(self, cmds: List[str]): """Run command locally inside the environment""" ret_codes = [] for cmd in cmds: if self._run_cmd: cmd = f"{self._run_cmd} {cmd}" logging.info(f"Running: {cmd}") ret_code = subprocess.call( shlex.split(cmd), # cwd=self.working_dir, # Should we do this? shell=False, ) ret_codes.append(ret_code) return ret_codes
[docs] def to( self, system: Union[str, Cluster], path=None, mount=False, force_install=False ): """ Send environment to the system (Cluster or file system). This includes installing packages and running setup commands if system is a cluster. Example: >>> env = rh.env(reqs=["numpy", "pip"]) >>> cluster_env = env.to(my_cluster) >>> s3_env = env.to("s3", path="s3_bucket/my_env") """ system = _get_cluster_from(system) new_env = copy.deepcopy(self) # new_env.name = new_env.name or "base" new_env.reqs, new_env.working_dir = self._reqs_to(system, path, mount) if isinstance(system, Cluster): key = system.put_resource(new_env) env_vars = ( _env_vars_from_file(self.env_vars) if isinstance(self.env_vars, str) else self.env_vars ) if env_vars: system.call(key, "_set_env_vars", env_vars) system.call(key, "install", force=force_install) return new_env
@property def _activate_cmd(self): return "" @property def _run_cmd(self): return ""