You are viewing main version. Click here to see docs for the latest stable version.

Source code for runhouse.rns.login

import os
from typing import Dict, Optional

import requests
import typer

from runhouse.globals import configs, rns_client
from runhouse.logger import get_logger

logger = get_logger(__name__)


def is_interactive():
    import __main__ as main

    return not hasattr(main, "__file__")


[docs]def login( token: str = None, download_config: bool = None, upload_config: bool = None, download_secrets: bool = None, upload_secrets: bool = None, ret_token: bool = False, interactive: bool = None, from_cli: bool = False, sync_secrets: bool = False, ): """Login to Runhouse. Validates token provided, with options to upload or download stored secrets or config between local environment and Runhouse / Vault. Args: token (str): Runhouse token, can be found at https://www.run.house/account#token. If not provided, function will interactively prompt for the token to be entered manually. download_config (bool): Whether to download configs from your Runhouse account to local environment. upload_config (bool): Whether to upload local configs into your Runhouse account. download_secrets (bool): Whether to download secrets from your Runhouse account to local environment. upload_secrets (bool): Whether to upload local secrets to your Runhouse account. ret_token (bool): Whether to return your Runhouse token. (Default: False) interactive (bool): Whether to interactively go through the login flow. ``token`` must be provided to set this to False. Returns: Token if ``ret_token`` is set to True, otherwise nothing. """ all_options_set = token and not any( arg is None for arg in (download_config, upload_config, download_secrets, upload_secrets) ) if interactive is False and not token: raise Exception( "`interactive` can only be set to `False` if token is provided." ) if interactive or (interactive is None and not all_options_set): from getpass import getpass from rich.console import Console console = Console() console.print( """ ____ __ @ @ @ / __ \__ ______ / /_ ____ __ __________ []___ / /_/ / / / / __ \/ __ \/ __ \/ / / / ___/ _ \ / /\____ @@ / _, _/ /_/ / / / / / / / /_/ / /_/ (__ ) __/ /_/\_//____/\ @@@@ /_/ |_|\__,_/_/ /_/_/ /_/\____/\__,_/____/\___/ | || |||__||| || """ ) account_url = f"{configs.get('dashboard_url')}/account#token" link = ( f"[link={account_url}]{account_url}[/link]" if is_interactive() else account_url ) if not token: console.print( f"Retrieve your token :key: here to use :person_running: :house: Runhouse for " f"secrets and artifact management: {link}", style="bold yellow", ) token = getpass("Token: ") rh_config_exists = configs.CONFIG_PATH.exists() if not rh_config_exists: upload_config = False download_secrets = False # download the config automatically if no config.yaml exists download_config = ( download_config if download_config is not None or not rh_config_exists else typer.confirm( "Download your Runhouse config to your local .rh folder?", default=True, ) ) upload_config = ( upload_config if upload_config is not None else typer.confirm("Upload your local .rh config to Runhouse?") ) # Default ssh secret if not rns_client.default_ssh_key: default_ssh_path = typer.prompt( "Input the private key path for your default SSH key to use for launching clusters (e.g. ~/.ssh/id_rsa), or press `Enter` to skip", default="", ) if default_ssh_path: from runhouse import provider_secret secret = provider_secret(provider="ssh", path=default_ssh_path) if secret.values: secret.save() configs.set("default_ssh_key", secret.name) else: console.print( f"Could not detect SSH key at {default_ssh_path}. Skipping" ) if sync_secrets: from runhouse import Secret if Secret.vault_secrets(rns_client.request_headers()): download_secrets = ( download_secrets if download_secrets is not None else typer.confirm( "Download secrets from Vault to your local Runhouse environment?" ) ) upload_secrets = ( upload_secrets if upload_secrets is not None else typer.confirm( "Upload your local enabled provider secrets to Vault?" ) ) if token: # Note, this is to explicitly add it to the config file, as opposed to setting in python # via configs.token = token configs.set("token", token) if download_config: configs.download_and_save_defaults() autostop_mins = configs.defaults_cache.get("default_autostop") if autostop_mins is None: new_autostop_mins = typer.prompt( "Set the default number of minutes of inactivity after which to auto-terminate on-demand clusters. " "Press `Enter` to set to 60 minutes, or `-1` to disable autostop entirely.", default=60, ) configs.set("default_autostop", int(new_autostop_mins)) if upload_config: configs.load_defaults_from_file() configs.upload_defaults(defaults=configs.defaults_cache) if not (download_config or upload_config): # If we are not downloading or uploading config, we still want to make sure the token is valid # and also download the username and default folder try: defaults = configs.download_defaults() except: logger.error("Failed to validate token") return None configs.set("username", defaults["username"]) configs.set("default_folder", defaults["default_folder"]) if download_secrets: _login_download_secrets(from_cli=from_cli) if upload_secrets: _login_upload_secrets(interactive=interactive) logger.info("Successfully logged into Runhouse.") if ret_token: return token
def _login_download_secrets(headers: Optional[str] = None, from_cli=False): from runhouse import Secret secrets = Secret.vault_secrets(headers=headers or rns_client.request_headers()) env_secrets = {} for name in secrets: try: secret = Secret.from_name(name) if not (hasattr(secret, "path") or hasattr(secret, "env_vars")): continue download_path = ( secret.path if secret.path else f"{secret._DEFAULT_CREDENTIALS_PATH}/{name}" if hasattr(secret, "provider") and secret.provider == "ssh" else secret._DEFAULT_CREDENTIALS_PATH ) if download_path and not secret.env_vars: logger.info(f"Loading down secrets for {name} into {download_path}") secret.write(path=download_path) else: env_vars = secret.env_vars or secret._DEFAULT_ENV_VARS logger.info( f"Writing down env secrets for {name} into {env_vars.values()}" ) if not from_cli: secret.write(env=True) else: for key, val in secret.values.items(): if key in env_vars: env_secrets.update({env_vars[key]: val}) except ValueError as e: logger.warning( f"Encountered {e}. Was not able to load down secrets for {name}." ) if from_cli and env_secrets: folder = os.path.expanduser("~/.rh/secrets") os.makedirs(folder, exist_ok=True) with open(f"{folder}/login.env", "w") as f: for key, val in env_secrets.items(): f.write(f"{key}={val}\n") logger.info( "Env var secrets written down into ~/.rh/secrets/login.env. " "Please run `source ~/.rh/secrets/login.env` to set the environment variables." ) def _login_upload_secrets(interactive: bool, headers: Optional[Dict] = None): from runhouse import Secret local_secrets = Secret.local_secrets() provider_secrets = Secret.extract_provider_secrets() local_secrets.update(provider_secrets) names = list(local_secrets.keys()) for name in names: rns_address = name if "/" in name else f"{rns_client.current_folder}/{name}" resource_uri = rns_client.resource_uri(rns_address) resp = requests.get( f"{rns_client.api_server_url}/resource/{resource_uri}", headers=headers or rns_client.request_headers(), ) if resp.status_code == 200: local_secrets.pop(name, None) continue if interactive is not False: upload_secrets = typer.confirm(f"Upload secrets for {name}?") if not upload_secrets: local_secrets.pop(name, None) if local_secrets: logger.info(f"Uploading secrets for {list(local_secrets)} to Vault.") for _, secret in local_secrets.items(): secret.save(save_values=True)
[docs]def logout( delete_loaded_secrets: bool = None, delete_rh_config_file: bool = None, interactive: bool = None, ): """ Logout from Runhouse. Provides option to delete credentials from the Runhouse config and the underlying credentials file. Token is also deleted from the config. Args: delete_loaded_secrets (bool, optional): If True, deletes the provider credentials file. Defaults to None. delete_rh_config_file (bool, optional): If True, deletes the rh config file. Defaults to None. interactive (bool, optional): If True, runs the logout process in interactive mode. Defaults to None. Returns: None """ from runhouse.resources.secrets import Secret from runhouse.resources.secrets.provider_secrets.ssh_secret import SSHSecret interactive_session: bool = ( interactive if interactive is not None else is_interactive() ) config_secrets = list(configs.get("secrets", {}).items()) for (name, value) in config_secrets: try: secret = Secret.from_name(name) if isinstance(secret, SSHSecret): logger.info( "Automatic deletion for local SSH credentials file is not supported. " "Please manually delete it if you would like to remove it" ) configs.delete_provider(name) continue except ValueError: pass if interactive_session: delete_loaded_secrets = typer.confirm( f"Delete credentials in {value} for {name}?" ) if delete_loaded_secrets: if isinstance(value, str): path = os.path.expanduser(value) if os.path.exists(path): os.remove(path) else: # list of env variables set for key in value: del os.environ[key] configs.delete_provider(name) local_secrets = Secret.local_secrets() for _, secret in local_secrets.items(): secret.delete() # Delete token and username/default folder from rh config file configs.delete(key="token") configs.delete(key="username") configs.delete(key="default_folder") # Delete values in configs object configs.token = None configs.username = None configs.default_folder = None # Wipe env vars os.environ.pop("RH_TOKEN", None) os.environ.pop("RH_USERNAME", None) rh_config_path = configs.CONFIG_PATH if not delete_rh_config_file and interactive_session: delete_rh_config_file = typer.confirm("Delete your local Runhouse config file?") if delete_rh_config_file: # Delete the credentials file on the file system configs.delete_defaults(rh_config_path) logger.info(f"Deleted Runhouse config file from path: {rh_config_path}") logger.info("Successfully logged out of Runhouse.")