Source code for runhouse.rns.login

import os
from typing import Dict, Optional

import requests
import typer

from runhouse.globals import configs, rns_client
from runhouse.logger import get_logger

logger = get_logger(__name__)


def is_interactive():
    import __main__ as main

    return not hasattr(main, "__file__")


[docs]def login( token: str = None, download_config: bool = None, upload_config: bool = None, download_secrets: bool = None, upload_secrets: bool = None, ret_token: bool = False, interactive: bool = None, from_cli: bool = False, sync_secrets: bool = False, ): """Login to Runhouse. Validates token provided, with options to upload or download stored secrets or config between local environment and Runhouse / Vault. Args: token (str): Runhouse token, can be found at https://www.run.house/account#token. If not provided, function will interactively prompt for the token to be entered manually. download_config (bool): Whether to download configs from your Runhouse account to local environment. upload_config (bool): Whether to upload local configs into your Runhouse account. download_secrets (bool): Whether to download secrets from your Runhouse account to local environment. upload_secrets (bool): Whether to upload local secrets to your Runhouse account. ret_token (bool): Whether to return your Runhouse token. (Default: False) interactive (bool): Whether to interactively go through the login flow. ``token`` must be provided to set this to False. Returns: Token if ``ret_token`` is set to True, otherwise nothing. """ all_options_set = token and not any( arg is None for arg in (download_config, upload_config, download_secrets, upload_secrets) ) if interactive is False and not token: raise Exception( "`interactive` can only be set to `False` if token is provided." ) if interactive or (interactive is None and not all_options_set): from getpass import getpass from rich.console import Console console = Console() console.print( """ ____ __ @ @ @ / __ \__ ______ / /_ ____ __ __________ []___ / /_/ / / / / __ \/ __ \/ __ \/ / / / ___/ _ \ / /\____ @@ / _, _/ /_/ / / / / / / / /_/ / /_/ (__ ) __/ /_/\_//____/\ @@@@ /_/ |_|\__,_/_/ /_/_/ /_/\____/\__,_/____/\___/ | || |||__||| || """ ) account_url = f"{configs.get('dashboard_url')}/account#token" link = ( f"[link={account_url}]{account_url}[/link]" if is_interactive() else account_url ) if not token: console.print( f"Retrieve your token :key: here to use :person_running: :house: Runhouse for " f"secrets and artifact management: {link}", style="bold yellow", ) token = getpass("Token: ") rh_config_exists = configs.CONFIG_PATH.exists() if not rh_config_exists: upload_config = False download_secrets = False # download the config automatically if no config.yaml exists download_config = ( download_config if download_config is not None or not rh_config_exists else typer.confirm( "Download your Runhouse config to your local .rh folder?", default=True, ) ) upload_config = ( upload_config if upload_config is not None else typer.confirm("Upload your local .rh config to Runhouse?") ) # Default ssh secret if not rns_client.default_ssh_key: default_ssh_path = typer.prompt( "Input the private key path for your default SSH key to use for launching clusters (e.g. ~/.ssh/id_rsa), or press `Enter` to skip", default="", ) if default_ssh_path: from runhouse import provider_secret secret = provider_secret(provider="ssh", path=default_ssh_path) if secret.values: secret.save() configs.set("default_ssh_key", secret.name) else: console.print( f"Could not detect SSH key at {default_ssh_path}. Skipping" ) if sync_secrets: from runhouse import Secret if Secret.vault_secrets(rns_client.request_headers()): download_secrets = ( download_secrets if download_secrets is not None else typer.confirm( "Download secrets from Vault to your local Runhouse environment?" ) ) upload_secrets = ( upload_secrets if upload_secrets is not None else typer.confirm( "Upload your local enabled provider secrets to Vault?" ) ) if token: # Note, this is to explicitly add it to the config file, as opposed to setting in python # via configs.token = token configs.set("token", token) if download_config: configs.download_and_save_defaults() autostop_mins = configs.defaults_cache.get("default_autostop") if autostop_mins is None: new_autostop_mins = typer.prompt( "Set the default number of minutes of inactivity after which to auto-terminate on-demand clusters. " "Press `Enter` to set to 60 minutes, or `-1` to disable autostop entirely.", default=60, ) configs.set("default_autostop", int(new_autostop_mins)) if upload_config: configs.load_defaults_from_file() configs.upload_defaults(defaults=configs.defaults_cache) if not (download_config or upload_config): # If we are not downloading or uploading config, we still want to make sure the token is valid # and also download the username and default folder try: defaults = configs.download_defaults() except: logger.error("Failed to validate token") return None configs.set("username", defaults["username"]) configs.set("default_folder", defaults["default_folder"]) if download_secrets: _login_download_secrets(from_cli=from_cli) if upload_secrets: _login_upload_secrets(interactive=interactive) logger.info("Successfully logged into Runhouse.") if ret_token: return token
def _login_download_secrets(headers: Optional[str] = None, from_cli=False): from runhouse import Secret secrets = Secret.vault_secrets(headers=headers or rns_client.request_headers()) env_secrets = {} for name in secrets: try: secret = Secret.from_name(name) if not (hasattr(secret, "path") or hasattr(secret, "env_vars")): continue download_path = ( secret.path if secret.path else f"{secret._DEFAULT_CREDENTIALS_PATH}/{name}" if hasattr(secret, "provider") and secret.provider == "ssh" else secret._DEFAULT_CREDENTIALS_PATH ) if download_path and not secret.env_vars: logger.info(f"Loading down secrets for {name} into {download_path}") secret.write(path=download_path) else: env_vars = secret.env_vars or secret._DEFAULT_ENV_VARS logger.info( f"Writing down env secrets for {name} into {env_vars.values()}" ) if not from_cli: secret.write(env=True) else: for key, val in secret.values.items(): if key in env_vars: env_secrets.update({env_vars[key]: val}) except ValueError as e: logger.warning( f"Encountered {e}. Was not able to load down secrets for {name}." ) if from_cli and env_secrets: folder = os.path.expanduser("~/.rh/secrets") os.makedirs(folder, exist_ok=True) with open(f"{folder}/login.env", "w") as f: for key, val in env_secrets.items(): f.write(f"{key}={val}\n") logger.info( "Env var secrets written down into ~/.rh/secrets/login.env. " "Please run `source ~/.rh/secrets/login.env` to set the environment variables." ) def _login_upload_secrets(interactive: bool, headers: Optional[Dict] = None): from runhouse import Secret local_secrets = Secret.local_secrets() provider_secrets = Secret.extract_provider_secrets() local_secrets.update(provider_secrets) names = list(local_secrets.keys()) for name in names: rns_address = name if "/" in name else f"{rns_client.current_folder}/{name}" resource_uri = rns_client.resource_uri(rns_address) resp = requests.get( f"{rns_client.api_server_url}/resource/{resource_uri}", headers=headers or rns_client.request_headers(), ) if resp.status_code == 200: local_secrets.pop(name, None) continue if interactive is not False: upload_secrets = typer.confirm(f"Upload secrets for {name}?") if not upload_secrets: local_secrets.pop(name, None) if local_secrets: logger.info(f"Uploading secrets for {list(local_secrets)} to Vault.") for _, secret in local_secrets.items(): secret.save(save_values=True)
[docs]def logout( delete_loaded_secrets: bool = None, delete_rh_config_file: bool = None, interactive: bool = None, ): """ Logout from Runhouse. Provides option to delete credentials from the Runhouse config and the underlying credentials file. Token is also deleted from the config. Args: delete_loaded_secrets (bool, optional): If True, deletes the provider credentials file. Defaults to None. delete_rh_config_file (bool, optional): If True, deletes the rh config file. Defaults to None. interactive (bool, optional): If True, runs the logout process in interactive mode. Defaults to None. Returns: None """ from runhouse.resources.secrets import Secret from runhouse.resources.secrets.provider_secrets.ssh_secret import SSHSecret interactive_session: bool = ( interactive if interactive is not None else is_interactive() ) config_secrets = list(configs.get("secrets", {}).items()) for (name, value) in config_secrets: try: secret = Secret.from_name(name) if isinstance(secret, SSHSecret): logger.info( "Automatic deletion for local SSH credentials file is not supported. " "Please manually delete it if you would like to remove it" ) configs.delete_provider(name) continue except ValueError: pass if interactive_session: delete_loaded_secrets = typer.confirm( f"Delete credentials in {value} for {name}?" ) if delete_loaded_secrets: if isinstance(value, str): path = os.path.expanduser(value) if os.path.exists(path): os.remove(path) else: # list of env variables set for key in value: del os.environ[key] configs.delete_provider(name) local_secrets = Secret.local_secrets() for _, secret in local_secrets.items(): secret.delete() # Delete token and username/default folder from rh config file configs.delete(key="token") configs.delete(key="username") configs.delete(key="default_folder") # Delete values in configs object configs.token = None configs.username = None configs.default_folder = None # Wipe env vars os.environ.pop("RH_TOKEN", None) os.environ.pop("RH_USERNAME", None) rh_config_path = configs.CONFIG_PATH if not delete_rh_config_file and interactive_session: delete_rh_config_file = typer.confirm("Delete your local Runhouse config file?") if delete_rh_config_file: # Delete the credentials file on the file system configs.delete_defaults(rh_config_path) logger.info(f"Deleted Runhouse config file from path: {rh_config_path}") logger.info("Successfully logged out of Runhouse.")