import logging
import shlex
import subprocess
import time
import webbrowser
from pathlib import Path
from typing import Optional
import ray
import requests
import typer
import yaml
from rich.console import Console
import runhouse as rh
import runhouse.rns.login
from runhouse import __version__, cluster, Cluster, configs
from runhouse.cli_utils import (
add_clusters_to_output_table,
check_if_command_exists,
create_output_table,
get_cluster_or_local,
get_wrapped_server_start_cmd,
LogsSince,
print_bring_cluster_up_msg,
print_cluster_config,
print_status,
StatusType,
)
from runhouse.constants import (
BULLET_UNICODE,
ITALIC_BOLD,
MAX_CLUSTERS_DISPLAY,
RAY_KILL_CMD,
RAY_START_CMD,
RESET_FORMAT,
SERVER_LOGFILE,
SERVER_STOP_CMD,
)
from runhouse.globals import rns_client
from runhouse.logger import get_logger
from runhouse.resources.hardware import (
check_for_existing_ray_instance,
ClustersListStatus,
get_all_sky_clusters,
kill_actors,
)
SKY_LIVE_CLUSTERS_MSG = (
"Live on-demand clusters created via Sky may exist that are not saved in Den. "
"For more information, please run [bold italic]`sky status -r`[/bold italic]."
)
# create an explicit Typer application
app = typer.Typer(add_completion=False)
###############################
# Resources Command Groups
###############################
# creating a cluster app to enable subcommands of cluster (ex: runhouse cluster list).
# Register it with the main runhouse application
cluster_app = typer.Typer(help="Cluster related CLI commands.")
app.add_typer(cluster_app, name="cluster")
# creating a server app to enable subcommands of server (ex: runhouse server status).
# Register it with the main runhouse application
server_app = typer.Typer(help="Runhouse server related CLI commands.")
app.add_typer(server_app, name="server")
# For printing with typer
console = Console()
logger = get_logger(__name__)
###############################
# General Runhouse CLI commands
###############################
[docs]@app.command()
def login(
token: Optional[str] = typer.Argument(None, help="Your Runhouse API token"),
sync_secrets: Optional[bool] = typer.Option(
False,
"--sync-secrets",
help="Whether to sync secrets. You will be prompted whether to upload local secrets or download saved secrets.",
),
yes: Optional[bool] = typer.Option(
False, "--yes", "-y", help="Sets any confirmations to 'yes' automatically."
),
):
"""Login to Runhouse. Validates token provided, with options to upload or download stored secrets or config between
local environment and Runhouse / Vault.
"""
valid_token: str = (
runhouse.rns.login.login(
token=token,
download_config=True,
upload_config=True,
download_secrets=True,
upload_secrets=True,
from_cli=True,
)
if yes
else runhouse.rns.login.login(
token=token,
interactive=True,
ret_token=True,
from_cli=True,
sync_secrets=sync_secrets,
)
)
if valid_token:
webbrowser.open(f"{configs.get('dashboard_url')}/dashboard?token={valid_token}")
raise typer.Exit()
else:
raise typer.Exit(code=1)
[docs]@app.command()
def logout():
"""Logout of Runhouse. Provides options to delete locally configured secrets and local Runhouse configs"""
runhouse.rns.login.logout(interactive=True)
raise typer.Exit()
###############################
# Cluster CLI commands
###############################
[docs]@cluster_app.command("ssh")
def cluster_ssh(cluster_name: str):
"""SSH into a remote cluster.
Example:
``$ runhouse cluster ssh /sashab/rh-basic-cpu``
"""
try:
c = cluster(name=cluster_name)
if not c.is_up():
print_bring_cluster_up_msg(cluster_name=cluster_name)
return
c.ssh()
except ValueError:
import sky
from runhouse import OnDemandCluster
from runhouse.constants import DEFAULT_SSH_PORT
from runhouse.resources.hardware.utils import _run_ssh_command
state = sky.status(cluster_names=[cluster_name], refresh=False)
if len(state) == 0:
console.print(
f"Could not load cluster called {cluster_name}. Cluster must either be saved to Den, "
"or be an ondemand cluster that is currently up."
)
raise typer.Exit(1)
resource_handle = state[0].get("handle", {})
sky_ssh_config = Path(f"~/.sky/generated/{cluster_name}.yml").expanduser()
if sky_ssh_config.exists():
with open(sky_ssh_config, "r", encoding="utf-8") as f:
ssh_config = yaml.safe_load(f)
ssh_proxy_command = ssh_config["auth"].get("ssh_proxy_command", None)
if ssh_proxy_command:
ssh_proxy_command = ssh_proxy_command.replace(
"skypilot:ssh_user", resource_handle.ssh_user
)
else:
ssh_proxy_command = None
_run_ssh_command(
address=resource_handle.head_ip,
ssh_user=resource_handle.ssh_user or "ubuntu",
ssh_port=resource_handle.stable_ssh_ports[0] or DEFAULT_SSH_PORT,
ssh_private_key=OnDemandCluster.DEFAULT_KEYFILE,
docker_user=resource_handle.docker_user,
ssh_proxy_command=ssh_proxy_command,
)
[docs]@cluster_app.command("status")
def cluster_status(
cluster_name: str = typer.Argument(
None,
help="Name of cluster to check. If not specified will check the local cluster.",
),
send_to_den: bool = typer.Option(
default=False,
help="Whether to update Den with the status.",
),
):
"""Load the status of the cluster.
Example:
``$ runhouse cluster status /sashab/rh-basic-cpu``
"""
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
try:
cluster_status = current_cluster.status(send_to_den=send_to_den)
except ValueError:
console.print("Failed to load status for cluster.")
return
except ConnectionError:
console.print(
"\N{smiling face with horns} Runhouse Daemon is not running... \N{No Entry} \N{Runner}"
)
return
print_status(cluster_status, current_cluster)
[docs]@cluster_app.command("list")
def cluster_list(
show_all: bool = typer.Option(
False,
"-a",
"--all",
help=f"Get all clusters saved in Den. Up to {MAX_CLUSTERS_DISPLAY} most recently active clusters "
f"will be displayed.",
),
since: Optional[str] = typer.Option(
None,
"--since",
help="Time duration to filter on. Minimum allowable filter is 1 minute. You may filter by seconds (s), "
"minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.",
),
cluster_status: Optional[ClustersListStatus] = typer.Option(
None,
"--status",
help="Cluster status to filter on. Supported filter values: 'running', 'terminated' (cluster is not live), "
"'down' (Runhouse server is down, but the cluster might be live).",
),
):
"""
Load Runhouse clusters
Examples:
``$ runhouse cluster list``
``$ runhouse cluster list --all``
``$ runhouse cluster list --status terminated``
``$ runhouse cluster list --since 15m``
"""
# logged out case
if not rh.configs.token:
sky_cli_command_formatted = f"{ITALIC_BOLD}`sky status -r{RESET_FORMAT}`" # will be printed bold and italic
console.print(
f"Listing clusters requires a Runhouse token. Please run `runhouse login` to get your token, "
f"or run {sky_cli_command_formatted} to list locally stored on-demand clusters."
)
return
clusters = Cluster.list(show_all=show_all, since=since, status=cluster_status)
den_clusters = clusters.get("den_clusters", None)
running_clusters = (
[
den_cluster
for den_cluster in den_clusters
if den_cluster.get("Status") == "running"
]
if den_clusters
else None
)
sky_clusters = clusters.get("sky_clusters", None)
if not den_clusters:
no_clusters_msg = f"No clusters found in Den for {rns_client.username}"
if show_all or since or cluster_status:
no_clusters_msg += " that match the provided filters"
console.print(no_clusters_msg)
if sky_clusters:
console.print(SKY_LIVE_CLUSTERS_MSG)
return
filters_requested: bool = show_all or since or cluster_status
clusters_to_print = den_clusters if filters_requested else running_clusters
if show_all:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[
: (min(len(clusters_to_print), MAX_CLUSTERS_DISPLAY))
]
# creating the clusters table
table = create_output_table(
total_clusters=len(den_clusters),
running_clusters=len(running_clusters),
displayed_clusters=len(clusters_to_print),
filters_requested=filters_requested,
)
add_clusters_to_output_table(table=table, clusters=clusters_to_print)
console.print(table)
# print msg about un-saved live sky clusters.
if sky_clusters:
console.print(SKY_LIVE_CLUSTERS_MSG)
[docs]@cluster_app.command("keep-warm")
def cluster_keep_warm(
cluster_name: str = typer.Argument(
None,
help="Name of cluster to keep warm. If not specified will check the local cluster.",
),
mins: Optional[int] = typer.Option(
-1,
help="Amount of time (in min) to keep the cluster warm after inactivity. "
"If set to -1, keeps cluster warm indefinitely.",
),
):
"""Keep the cluster warm for given number of minutes after inactivity.
Example:
``$ runhouse cluster keep-warm /sashab/rh-basic-cpu``
"""
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
try:
if not current_cluster.is_up():
print_bring_cluster_up_msg(cluster_name=cluster_name)
return
current_cluster.keep_warm(mins=mins)
except ValueError:
console.print(f"{cluster_name} is not saved in Den.")
sky_live_clusters = get_all_sky_clusters()
cluster_name = (
cluster_name.split("/")[-1] if "/" in cluster_name else cluster_name
)
if cluster_name in sky_live_clusters:
console.print(
f"You can keep warm the cluster by running [italic bold] `sky autostop {cluster_name} -i {mins}`"
)
except Exception as e:
console.print(f"Failed to keep the cluster warm: {e}")
[docs]@cluster_app.command("up")
def cluster_up(
cluster_name: str = typer.Argument(
None,
help="The name of cluster to bring up.",
)
):
"""Bring up the cluster if it is not up. No-op if cluster is already up.
This only applies to on-demand clusters, and has no effect on self-managed clusters.
Example:
``$ runhouse cluster up /sashab/rh-basic-cpu``
"""
current_cluster = rh.cluster(name=cluster_name, dryrun=True)
try:
current_cluster.up_if_not()
except ValueError:
console.print("Cluster is not saved in Den.")
except Exception as e:
console.print(f"Failed to bring up the cluster: {e}")
[docs]@cluster_app.command("down")
def cluster_down(
cluster_name: str = typer.Argument(
None,
help="The name of cluster to terminate.",
),
remove_configs: bool = typer.Option(
False,
"-rm",
"--remove-configs",
help="Whether to delete cluster config from Den (default: ``False``).",
),
remove_all: bool = typer.Option(
False,
"-a",
"--all",
help="Whether to terminate all running clusters saved in Den (default: ``False``).",
),
force_deletion: bool = typer.Option(
False, "-y", "--yes", help="Skip confirmation prompt."
),
):
"""Terminate cluster if it is not down. No-op if cluster is already down.
This only applies to on-demand clusters, and has no effect on self-managed clusters.
Example:
``$ runhouse cluster down /sashab/rh-basic-cpu``
"""
if not force_deletion:
if cluster_name:
proceed = typer.prompt(f"Terminating {cluster_name}. Proceed? [Y/n]")
elif remove_all:
proceed = typer.prompt(
"Terminating all running clusters saved in Den. Proceed? [Y/n]"
)
else:
console.print("Cannot determine which cluster to terminate, aborting.")
raise typer.Exit(1)
if proceed.lower() in ["n", "no"]:
console.print("Aborted!")
raise typer.Exit(0)
if remove_all:
running_den_clusters = Cluster.list(status=ClustersListStatus.running).get(
"den_clusters"
)
if not running_den_clusters:
console.print("No running clusters saved in Den")
raise typer.Exit(0)
terminated_clusters: int = 0
for running_cluster in running_den_clusters:
try:
current_cluster = rh.cluster(
name=f'/{rns_client.username}/{running_cluster.get("Name")}',
dryrun=True,
)
if isinstance(current_cluster, rh.OnDemandCluster):
current_cluster.teardown_and_delete() if remove_configs else current_cluster.teardown()
terminated_clusters += 1
else:
console.print(
f"[reset][bold italic]{current_cluster.rns_address} [reset]is not an on-demand cluster and must be terminated manually."
)
except ValueError:
continue
console.print(f"Successfully terminated [reset]{terminated_clusters} clusters.")
raise typer.Exit(0)
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
try:
if isinstance(current_cluster, rh.OnDemandCluster):
current_cluster.teardown_and_delete() if remove_configs else current_cluster.teardown()
else:
console.print(
f"{current_cluster.rns_address} is not an on-demand cluster and must be terminated manually."
)
except ValueError:
console.print("Cluster is not saved in Den, could not bring it down.")
sky_live_clusters = get_all_sky_clusters()
cluster_name = (
cluster_name.split("/")[-1] if "/" in cluster_name else cluster_name
)
if cluster_name in sky_live_clusters:
console.print(
f"You can bring down the cluster by running [italic bold] `sky down {cluster_name}`"
)
except Exception as e:
console.print(f"Failed to terminate the cluster: {e}")
[docs]@cluster_app.command("logs")
def cluster_logs(
cluster_name: str = typer.Argument(
None,
help="Name of cluster to load the logs from. If not specified, loads the logs of the local cluster.",
),
since: Optional[LogsSince] = typer.Option(
LogsSince.five.value,
"--since",
help="Get the logs from the provided timeframe (in minutes).",
),
):
"""Load the logs of the Runhouse server running on a cluster.
Examples:
``$ runhouse cluster logs /sashab/rh-basic-cpu``
``$ runhouse cluster logs /sashab/rh-basic-cpu --since 60``
"""
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
cluster_uri = rns_client.resource_uri(current_cluster.rns_address)
resp = requests.get(
f"{rns_client.api_server_url}/resource/{cluster_uri}/logs/file?minutes={since}",
headers=rns_client.request_headers(),
)
if resp.status_code != 200:
console.print("Failed to get cluster logs.")
return
logs_file_url = resp.json().get("data").get("logs_presigned_url")
logs_file_content = requests.get(logs_file_url).content.decode("utf-8")
console.print(f"[reset][cyan]{logs_file_content}")
###############################
# Server CLI commands
###############################
def _start_server(
restart,
restart_ray,
screen,
nohup,
create_logfile=True,
host=None,
port=None,
use_https=False,
den_auth=False,
ssl_keyfile=None,
ssl_certfile=None,
restart_proxy=False,
use_caddy=False,
domain=None,
certs_address=None,
api_server_url=None,
default_env_name=None,
conda_env=None,
from_python=None,
):
############################################
# Build CLI commands to start the server
############################################
cmds = []
if restart:
cmds.append(SERVER_STOP_CMD)
# We have to `ray start` not within screen/nohup
existing_ray_instance = check_for_existing_ray_instance()
if not existing_ray_instance or restart_ray:
cmds.append(RAY_KILL_CMD)
cmds.append(RAY_START_CMD)
# Collect flags
flags = []
den_auth_flag = " --use-den-auth" if den_auth else ""
if den_auth_flag:
logger.info("Starting server with Den auth.")
flags.append(den_auth_flag)
restart_proxy_flag = " --restart-proxy" if restart_proxy else ""
if restart_proxy_flag:
logger.info("Reinstalling server configs.")
flags.append(restart_proxy_flag)
use_caddy_flag = " --use-caddy" if use_caddy else ""
if use_caddy_flag:
logger.info("Configuring Caddy on the cluster.")
flags.append(use_caddy_flag)
ssl_keyfile_flag = f" --ssl-keyfile {ssl_keyfile}" if ssl_keyfile else ""
if ssl_keyfile_flag:
logger.info(f"Using SSL keyfile in path: {ssl_keyfile}")
flags.append(ssl_keyfile_flag)
ssl_certfile_flag = f" --ssl-certfile {ssl_certfile}" if ssl_certfile else ""
if ssl_certfile_flag:
logger.info(f"Using SSL certfile in path: {ssl_certfile}")
flags.append(ssl_certfile_flag)
domain = f" --domain {domain}" if domain else ""
if domain:
logger.info(f"Using domain: {domain}")
flags.append(domain)
# Use HTTPS if explicitly specified or if SSL cert or keyfile path are provided
https_flag = " --use-https" if use_https or (ssl_keyfile or ssl_certfile) else ""
if https_flag:
logger.info("Starting server with HTTPS.")
flags.append(https_flag)
host_flag = f" --host {host}" if host else ""
if host_flag:
logger.info(f"Using host: {host}.")
flags.append(host_flag)
port_flag = f" --port {port}" if port else ""
if port_flag:
logger.info(f"Using port: {port}.")
flags.append(port_flag)
address_flag = f" --certs-address {certs_address}" if certs_address else ""
if address_flag:
logger.info(f"Server public IP address: {certs_address}.")
flags.append(address_flag)
api_server_url_flag = (
f" --api-server-url {api_server_url}" if api_server_url else ""
)
if api_server_url_flag:
logger.info(f"Setting api_server url to {api_server_url}")
flags.append(api_server_url_flag)
default_env_flag = (
f" --default-env-name {default_env_name}" if default_env_name else ""
)
if default_env_flag:
logger.info(f"Starting server in default env named: {default_env_name}")
flags.append(default_env_flag)
conda_env_flag = f" --conda-env {conda_env}" if conda_env else ""
if conda_env_flag:
logger.info(f"Creating runtime env for conda env: {conda_env}")
flags.append(conda_env_flag)
flags.append(" --from-python" if from_python else "")
# Check if screen or nohup are available
screen = screen and check_if_command_exists("screen")
nohup = not screen and nohup and check_if_command_exists("nohup")
# Create logfile if we are using backgrounding
if (screen or nohup) and create_logfile and not Path(SERVER_LOGFILE).exists():
Path(SERVER_LOGFILE).parent.mkdir(parents=True, exist_ok=True)
Path(SERVER_LOGFILE).touch()
# Add flags to the server start command
cmds.append(get_wrapped_server_start_cmd(flags, screen, nohup))
logger.info(f"Starting API server using the following command: {cmds[-1]}.")
try:
# Open and read the lines of the server logfile so we only print the most recent lines after starting
f = None
if (screen or nohup) and Path(SERVER_LOGFILE).exists():
f = open(SERVER_LOGFILE, "r")
f.readlines() # Discard these, they're from the previous times the server was started
# We do these one by one so it's more obvious where the error is if there is one
for i, cmd in enumerate(cmds):
console.print(f"Executing `{cmd}`")
if (
i == len(cmds) - 1
): # last cmd is not being parsed correctly when ran with shlex.split
result = subprocess.run(cmd, shell=True, check=True)
else:
result = subprocess.run(shlex.split(cmd), text=True)
if result.returncode != 0:
# We don't want to raise an error if the server kill fails, as it may simply not be running
if "pkill" in cmd:
continue
# Retry ray start in case pkill process did not complete in time, up to 10s
if cmd == RAY_START_CMD:
console.print("Retrying:")
attempt = 0
while result.returncode != 0 and attempt < 10:
attempt += 1
time.sleep(1)
result = subprocess.run(
shlex.split(cmd), text=True, capture_output=True
)
if result.stderr and "ConnectionError" not in result.stderr:
break
if result.returncode == 0:
continue
console.print(f"Error while executing `{cmd}`")
raise typer.Exit(1)
server_started_str = "Uvicorn running on"
# Read and print the server logs until the
if screen or nohup:
while not Path(SERVER_LOGFILE).exists():
time.sleep(1)
f = f or open(SERVER_LOGFILE, "r")
start_time = time.time()
# Wait for input for 60 seconds max (for Caddy to download and set up)
while time.time() - start_time < 60:
for line in f:
if server_started_str in line:
console.print(line)
f.close()
return
else:
console.print(line, end="")
time.sleep(1)
f.close()
except FileNotFoundError:
console.print(
"python3 command was not found. Make sure you have python3 installed."
)
raise typer.Exit(1)
###############################
# Server CLI commands
###############################
[docs]@server_app.command("start")
def server_start(
cluster_name: Optional[str] = typer.Option(
None,
help="Specify a *saved* remote cluster to start the Runhouse server on that cluster. "
"If not provided, the locally running server will be started.",
),
restart_ray: bool = typer.Option(True, help="Restart the Ray runtime"),
screen: bool = typer.Option(
True,
help="Start the server in a screen. Only relevant when restarting locally.",
),
nohup: bool = typer.Option(
True,
help="Start the server in a nohup if screen is not available. Only relevant when restarting locally.",
),
resync_rh: bool = typer.Option(
False,
help="Resync the Runhouse package. Only relevant when restarting remotely.",
),
host: Optional[str] = typer.Option(
None, help="Custom server host address. Default is `0.0.0.0`."
),
port: Optional[str] = typer.Option(
None, help="Port for server. If not specified will start on 32300."
),
use_https: bool = typer.Option(
False, help="Start an HTTPS server with TLS verification."
),
use_den_auth: bool = typer.Option(
False, help="Whether to authenticate requests with a Runhouse token."
),
ssl_keyfile: Optional[str] = typer.Option(
None, help="Path to custom SSL key file to use for enabling HTTPS."
),
ssl_certfile: Optional[str] = typer.Option(
None, help="Path to custom SSL cert file to use for enabling HTTPS."
),
restart_proxy: bool = typer.Option(
False, help="Whether to reinstall server configs on the cluster."
),
use_caddy: bool = typer.Option(
False,
help="Whether to configure Caddy on the cluster as a reverse proxy.",
),
domain: str = typer.Option(
None,
help="Server domain. Relevant if using Caddy to automate generating CA verified certs.",
),
certs_address: Optional[str] = typer.Option(
None,
help="Public IP address of the server. Required for generating self-signed certs and enabling HTTPS.",
),
api_server_url: str = typer.Option(
default="https://api.run.house",
help="URL of Runhouse Den",
),
default_env_name: str = typer.Option(
None, help="Default env to start the server on."
),
conda_env: str = typer.Option(
None, help="Name of conda env corresponding to default env if it is a CondaEnv."
),
from_python: bool = typer.Option(
False,
help="Whether HTTP server started from inside a Python call rather than CLI.",
),
):
"""Start the HTTP server on the cluster."""
# If server is already up, ask the user to restart the server instead.
if not cluster_name:
server_status_cmd = "runhouse server status"
result = subprocess.run(
server_status_cmd, shell=True, capture_output=True, text=True
)
if result.returncode == 0 and "Server is up" in result.stdout:
console.print(
"Local Runhouse server is already running. To restart it, please "
"run [italic bold]`runhouse server restart` [reset]with the relevant parameters."
)
raise typer.Exit(0)
if cluster_name:
c = get_cluster_or_local(cluster_name=cluster_name)
c.start_server(resync_rh=resync_rh, restart_ray=restart_ray)
return
_start_server(
restart=False,
restart_ray=restart_ray,
screen=screen,
nohup=nohup,
create_logfile=True,
host=host,
port=port,
use_https=use_https,
den_auth=use_den_auth,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
restart_proxy=restart_proxy,
use_caddy=use_caddy,
domain=domain,
certs_address=certs_address,
api_server_url=api_server_url,
default_env_name=default_env_name,
conda_env=conda_env,
from_python=from_python,
)
[docs]@server_app.command("restart")
def server_restart(
cluster_name: str = typer.Option(
None,
help="Specify a *saved* remote cluster to restart the Runhouse server on that cluster. "
"If not provided, the locally running server will be restarted.",
),
restart_ray: bool = typer.Option(True, help="Restart the Ray runtime."),
screen: bool = typer.Option(
True,
help="Start the server in a screen. Only relevant when restarting locally.",
),
nohup: bool = typer.Option(
True,
help="Start the server in a nohup if screen is not available. Only relevant when restarting locally.",
),
resync_rh: bool = typer.Option(
False,
help="Resync the Runhouse package. Only relevant when restarting remotely.",
),
host: Optional[str] = typer.Option(
None, help="Custom server host address. Default is `0.0.0.0`."
),
port: Optional[str] = typer.Option(
None, help="Port for server. If not specified will start on 32300."
),
use_https: bool = typer.Option(
False, help="Start an HTTPS server with TLS verification."
),
use_den_auth: bool = typer.Option(
False, help="Whether to authenticate requests with a Runhouse token."
),
ssl_keyfile: Optional[str] = typer.Option(
None, help="Path to custom SSL key file to use for enabling HTTPS."
),
ssl_certfile: Optional[str] = typer.Option(
None, help="Path to custom SSL cert file to use for enabling HTTPS."
),
restart_proxy: bool = typer.Option(
False, help="Whether to reinstall server configs on the cluster."
),
use_caddy: bool = typer.Option(
False,
help="Whether to configure Caddy on the cluster as a reverse proxy.",
),
domain: str = typer.Option(
None,
help="Server domain. Relevant if using Caddy to automate generating CA verified certs.",
),
certs_address: Optional[str] = typer.Option(
None,
help="Public IP address of the server. Required for generating self-signed certs and enabling HTTPS.",
),
api_server_url: str = typer.Option(
default="https://api.run.house",
help="URL of Runhouse Den",
),
default_env_name: str = typer.Option(
None, help="Default env to start the server on."
),
conda_env: str = typer.Option(
None, help="Name of conda env corresponding to default env if it is a CondaEnv."
),
from_python: bool = typer.Option(
False,
help="Whether HTTP server started from inside a Python call rather than CLI.",
),
):
"""Restart the HTTP server on the cluster."""
if cluster_name:
c = get_cluster_or_local(cluster_name=cluster_name)
c.restart_server(resync_rh=resync_rh, restart_ray=restart_ray)
return
_start_server(
restart=True,
restart_ray=restart_ray,
screen=screen,
nohup=nohup,
create_logfile=True,
host=host,
port=port,
use_https=use_https,
den_auth=use_den_auth,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
restart_proxy=restart_proxy,
use_caddy=use_caddy,
domain=domain,
certs_address=certs_address,
api_server_url=api_server_url,
default_env_name=default_env_name,
conda_env=conda_env,
from_python=from_python,
)
[docs]@server_app.command("stop")
def server_stop(
cluster_name: Optional[str] = typer.Option(
None,
help="Specify a *saved* remote cluster to stop the Runhouse server on that cluster. "
"If not provided, the locally running server will be stopped.",
),
stop_ray: bool = typer.Option(False, help="Stop the Ray runtime."),
cleanup_actors: bool = typer.Option(True, help="Kill all Ray actors."),
):
"""Stop the HTTP server on the cluster."""
logger.debug("Stopping the server")
if cluster_name:
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
current_cluster.stop_server(stop_ray=stop_ray, cleanup_actors=cleanup_actors)
return
subprocess.run(SERVER_STOP_CMD, shell=True)
if cleanup_actors:
ray.init(
address="auto",
ignore_reinit_error=True,
logging_level=logging.ERROR,
namespace="runhouse",
)
kill_actors(namespace="runhouse", gracefully=False)
if stop_ray:
logger.info("Stopping Ray.")
subprocess.run(RAY_KILL_CMD, shell=True)
[docs]@server_app.command("status")
def server_status(
cluster_name: str = typer.Option(
None,
help="Specify a *saved* remote cluster to check the status of the Runhouse server on that cluster. "
"If not provided, the status of the locally running server will be checked.",
),
):
"""Check the HTTP server status on the cluster."""
logger.debug("Checking the server status.")
current_cluster = get_cluster_or_local(cluster_name=cluster_name)
if current_cluster._is_server_up():
status = current_cluster.status()
console.print(f"[reset]{BULLET_UNICODE} server pid: {status.get('server_pid')}")
print_cluster_config(
cluster_config=status.get("cluster_config"), status_type=StatusType.server
)
else:
console.print(
"Server is down. To check the status of the cluster, run [italic bold]`runhouse cluster status`"
)
@app.callback(invoke_without_command=True, help="Runhouse CLI")
def main(
ctx: typer.Context,
version: bool = typer.Option(
None, "--version", "-v", help="Show the version and exit."
),
):
"""
Runhouse CLI
"""
if version:
print(f"{__version__}")
elif ctx.invoked_subcommand is None:
subprocess.run("runhouse --help", shell=True)