You are viewing v0.0.12 version. Click here to see docs for the latest stable version.

Cluster

A Cluster is a Runhouse primitive used for abstracting a particular hardware configuration. This can be either an on-demand cluster (requires valid cloud credentials), a BYO (bring-your-own) cluster (requires IP address and ssh creds), or a SageMaker cluster (requires an ARN role).

A cluster is assigned a name, through which it can be accessed and reused later on.

Cluster Factory Method

runhouse.cluster(name: str, host: Optional[Union[str, List[str]]] = None, ssh_creds: Optional[dict] = None, dryrun: bool = False, **kwargs) Union[Cluster, OnDemandCluster, SageMakerCluster][source]

Builds an instance of Cluster.

Parameters:
  • name (str) – Name for the cluster, to re-use later on.

  • host (str or List[str], optional) – Hostname, IP address, or list of IP addresses for the BYO cluster.

  • ssh_creds (dict, optional) – Dictionary mapping SSH credentials. Example: ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'}

  • dryrun (bool) – Whether to create the Cluster if it doesn’t exist, or load a Cluster object as a dryrun. (Default: False)

Returns:

The resulting cluster.

Return type:

Union[Cluster, OnDemandCluster, SageMakerCluster]

Example

>>> # using private key >>> gpu = rh.cluster(host='<hostname>', >>> ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'}, >>> name='rh-a10x').save()
>>> # using password >>> gpu = rh.cluster(host='<hostname>', >>> ssh_creds={'ssh_user': '...', 'password':'*****'}, >>> name='rh-a10x').save()
>>> # Load cluster from above >>> reloaded_cluster = rh.cluster(name="rh-a10x")

Cluster Class

class runhouse.Cluster(name, ips: Optional[List[str]] = None, ssh_creds: Optional[Dict] = None, dryrun=False, **kwargs)[source]
__init__(name, ips: Optional[List[str]] = None, ssh_creds: Optional[Dict] = None, dryrun=False, **kwargs)[source]

The Runhouse cluster, or system. This is where you can run Functions or access/transfer data between. You can BYO (bring-your-own) cluster by providing cluster IP and ssh_creds, or this can be an on-demand cluster that is spun up/down through SkyPilot, using your cloud credentials.

Note

To build a cluster, please use the factory method cluster().

add_secrets(provider_secrets: dict)[source]

Copy secrets from current environment onto the cluster

call(module_name, method_name, *args, stream_logs=True, run_name=None, remote=False, run_async=False, save=False, **kwargs)[source]

Call a method on a module that is in the cluster’s object store.

Parameters:
  • module_name (str) – Name of the module saved on system.

  • method_name (str) – Name of the method.

  • stream_logs (bool) – Whether to stream logs from the method call.

  • run_name (str) – Name for the run.

  • remote (bool) – Return a remote object from the function, rather than the result proper.

  • run_async (bool) – Run the method asynchronously and return a run_key to retreive results and logs later.

  • *args – Positional arguments to pass to the method.

  • **kwargs – Keyword arguments to pass to the method.

Example

>>> cluster.call("my_module", "my_method", arg1, arg2, kwarg1=kwarg1)
cancel(key: str, force=False)[source]

Cancel a given run on cluster by its key.

cancel_all(force=False)[source]

Cancel all runs on cluster.

clear()[source]

Clear the cluster’s object store.

delete(keys: Union[None, str, List[str]])[source]

Delete the given items from the cluster’s object store. To delete all items, use cluster.clear()

disconnect()[source]

Disconnect the RPC tunnel.

Example

>>> cluster.disconnect()
get(key: str, default: Optional[Any] = None, remote=False, stream_logs: bool = False)[source]

Get the result for a given key from the cluster’s object store. To raise an error if the key is not found, use cluster.get(key, default=KeyError).

install_packages(reqs: List[Union[Package, str]], env: Union[Env, str] = None)[source]

Install the given packages on the cluster.

Parameters:
  • reqs (List[Package or str) – List of packages to install on cluster and env

  • env (Env or str) – Environment to install package on. If left empty, defaults to base environment. (Default: None)

Example

>>> cluster.install_packages(reqs=["accelerate", "diffusers"]) >>> cluster.install_packages(reqs=["accelerate", "diffusers"], env="my_conda_env")
is_connected()[source]

Whether the RPC tunnel is up.

Example

>>> connected = cluster.is_connected()
is_up() bool[source]

Check if the cluster is up.

Example

>>> rh.cluster("rh-cpu").is_up()
keys(env=None)[source]

List all keys in the cluster’s object store.

notebook(persist: bool = False, sync_package_on_close: Optional[str] = None, port_forward: int = 8888)[source]

Tunnel into and launch notebook from the cluster.

Example

>>> rh.cluster("test-cluster").notebook()
on_this_cluster()[source]

Whether this function is being called on the same cluster.

pause_autostop()[source]

Context manager to temporarily pause autostop. Mainly for OnDemand clusters, for BYO cluster there is no autostop.

put(key: str, obj: Any, env=None)[source]

Put the given object on the cluster’s object store at the given key.

put_resource(resource: Resource, state=None, dryrun=False)[source]

Put the given resource on the cluster’s object store. Returns the key (important if name is not set).

remove_conda_env(env: Union[str, CondaEnv])[source]

Remove conda env from the cluster.

Example

>>> rh.ondemand_cluster("rh-cpu").remove_conda_env("my_conda_env")
rename(old_key: str, new_key: str)[source]

Rename a key in the cluster’s object store.

restart_server(_rh_install_url: Optional[str] = None, resync_rh: bool = True, restart_ray: bool = True, env_activate_cmd: Optional[str] = None)[source]

Restart the RPC server.

Parameters:
  • resync_rh (bool) – Whether to resync runhouse. (Default: True)

  • restart_ray (bool) – Whether to restart Ray. (Default: True)

  • env_activate_cmd (str, optional) – Command to activate the environment on the server. (Default: None)

Example

>>> rh.cluster("rh-cpu").restart_server()
run(commands: List[str], env: Union[Env, str] = None, stream_logs: bool = True, port_forward: Union[None, int, Tuple[int, int]] = None, require_outputs: bool = True, run_name: Optional[str] = None) list[source]

Run a list of shell commands on the cluster. If run_name is provided, the commands will be sent over to the cluster before being executed and a Run object will be created.

Example

>>> cpu.run(["pip install numpy"]) >>> cpu.run(["pip install numpy", env="my_conda_env"]) >>> cpu.run(["python script.py"], run_name="my_exp")
run_python(commands: List[str], env: Union[Env, str] = None, stream_logs: bool = True, port_forward: Optional[int] = None, run_name: Optional[str] = None)[source]

Run a list of python commands on the cluster.

Example

>>> cpu.run_python(['import numpy', 'print(numpy.__version__)']) >>> cpu.run_python(["print('hello')"])

Note

Running Python commands with nested quotes can be finicky. If using nested quotes, try to wrap the outer quote with double quotes (”) and the inner quotes with a single quote (‘).

ssh()[source]

SSH into the cluster

Example

>>> rh.cluster("rh-cpu").ssh()
ssh_creds()[source]

Retrieve SSH credentials.

sync_secrets(providers: Optional[List[str]] = None)[source]

Send secrets for the given providers.

Parameters:

providers (List[str] or None) – List of providers to send secrets for. If None, all providers configured in the environment will by sent.

Example

>>> cpu.sync_secrets(providers=["aws", "lambda"])
up_if_not()[source]

Bring up the cluster if it is not up. No-op if cluster is already up. This only applies to on-demand clusters, and has no effect on self-managed clusters.

Example

>>> rh.cluster("rh-cpu").up_if_not()

Cluster Hardware Setup

No additional setup is required. You will just need to have the IP address for the cluster and the path to SSH credentials ready to be used for the cluster initialization.

OnDemandCluster Class

A OnDemandCluster is a cluster that uses SkyPilot functionality underneath to handle various cluster properties.

class runhouse.OnDemandCluster(name, instance_type: Optional[str] = None, num_instances: Optional[int] = None, provider: Optional[str] = None, dryrun=False, autostop_mins=None, use_spot=False, image_id=None, region=None, sky_state=None, **kwargs)[source]
__init__(name, instance_type: Optional[str] = None, num_instances: Optional[int] = None, provider: Optional[str] = None, dryrun=False, autostop_mins=None, use_spot=False, image_id=None, region=None, sky_state=None, **kwargs)[source]

On-demand SkyPilot Cluster.

Note

To build a cluster, please use the factory method cluster().

static cluster_ssh_key(path_to_file)[source]

Retrieve SSH key for the cluster.

Example

>>> ssh_priv_key = rh.ondemand_cluster("rh-cpu").cluster_ssh_key("~/.ssh/id_rsa")
is_up() bool[source]

Whether the cluster is up.

Example

>>> rh.ondemand_cluster("rh-cpu").is_up()
keep_warm(autostop_mins: int = - 1)[source]

Keep the cluster warm for given number of minutes after inactivity.

Parameters:

autostop_mins (int) – Amount of time (in min) to keep the cluster warm after inactivity. If set to -1, keep cluster warm indefinitely. (Default: -1)

pause_autostop()[source]

Context manager to temporarily pause autostop.

Example

>>> with rh.ondemand_cluster.pause_autostop(): >>> rh.ondemand_cluster.run(["python train.py"])
ssh()[source]

SSH into the cluster.

Example

>>> rh.ondemand_cluster("rh-cpu").ssh()
ssh_creds()[source]

Retrieve SSH creds for the cluster.

Example

>>> credentials = rh.ondemand_cluster("rh-cpu").ssh_creds()
status(refresh: bool = True)[source]

Get status of Sky cluster.

Return dict looks like:

{'name': 'sky-cpunode-donny', 'launched_at': 1662317201, 'handle': ResourceHandle( cluster_name=sky-cpunode-donny, head_ip=54.211.97.164, cluster_yaml=/Users/donny/.sky/generated/sky-cpunode-donny.yml, launched_resources=1x AWS(m6i.2xlarge), tpu_create_script=None, tpu_delete_script=None), 'last_use': 'sky cpunode', 'status': <ClusterStatus.UP: 'UP'>, 'autostop': -1, 'metadata': {}}

Note

For more information see SkyPilot’s ResourceHandle class.

Example

>>> status = rh.ondemand_cluster("rh-cpu").status()
teardown()[source]

Teardown cluster.

Example

>>> rh.ondemand_cluster("rh-cpu").teardown()
teardown_and_delete()[source]

Teardown cluster and delete it from configs.

Example

>>> rh.ondemand_cluster("rh-cpu").teardown_and_delete()
up()[source]

Up the cluster.

Example

>>> rh.ondemand_cluster("rh-cpu").up()

OnDemandCluster Hardware Setup

On-Demand clusters use SkyPilot to automatically spin up and down clusters on the cloud. You will need to first set up cloud access on your local machine:

Run sky check to see which cloud providers are enabled, and how to set up cloud credentials for each of the providers.

sky check

For a more in depth tutorial on setting up individual cloud credentials, you can refer to SkyPilot setup docs.

SageMakerCluster Class

Note

SageMaker support is an alpha and under active development. Please report any bugs or let us know of any feature requests.

A SageMakerCluster is a cluster that uses a SageMaker instance under the hood.

Runhouse currently supports two core usage paths for SageMaker clusters:

  • Compute backend: You can use SageMaker as a compute backend, just as you would a BYO (bring-your-own) or an on-demand cluster. Runhouse will handle launching the SageMaker compute and creating the SSH connection to the cluster.

  • Dedicated training jobs: You can use a SageMakerCluster class to run a training job on SageMaker compute. To do so, you will need to provide an estimator.

Note

Runhouse requires an AWS IAM role (either name or full ARN) whose credentials have adequate permissions to create create SageMaker endpoints and access AWS resources.

Please see SageMaker Hardware Setup for more specific instructions and requirements for providing the role and setting up the cluster.

class runhouse.SageMakerCluster(name: str, role: Optional[str] = None, profile: Optional[str] = None, region: Optional[str] = None, ssh_key_path: Optional[str] = None, instance_id: Optional[str] = None, instance_type: Optional[str] = None, instance_count: Optional[int] = None, image_uri: Optional[str] = None, autostop_mins: Optional[int] = None, connection_wait_time: Optional[int] = None, estimator: Optional[Union[EstimatorBase, Dict]] = None, job_name: Optional[str] = None, dryrun=False, **kwargs)[source]
__init__(name: str, role: Optional[str] = None, profile: Optional[str] = None, region: Optional[str] = None, ssh_key_path: Optional[str] = None, instance_id: Optional[str] = None, instance_type: Optional[str] = None, instance_count: Optional[int] = None, image_uri: Optional[str] = None, autostop_mins: Optional[int] = None, connection_wait_time: Optional[int] = None, estimator: Optional[Union[EstimatorBase, Dict]] = None, job_name: Optional[str] = None, dryrun=False, **kwargs)[source]

The Runhouse SageMaker cluster abstraction. This is where you can use SageMaker as a compute backend, just as you would an on-demand cluster (i.e. cloud VMs) or a BYO (i.e. on-prem) cluster. Additionally supports running dedicated training jobs using SageMaker Estimators.

Note

To build a cluster, please use the factory method sagemaker_cluster().

property connection_wait_time

Amount of time the SSH helper will wait inside SageMaker before it continues normal execution

property default_bucket

Default bucket to use for storing the cluster’s authorized public keys.

is_up() bool[source]

Check if the cluster is up.

Example

>>> rh.sagemaker_cluster("sagemaker-cluster").is_up()
keep_warm(autostop_mins: int = - 1)[source]

Keep the cluster warm for given number of minutes after inactivity.

Parameters:

autostop_mins (int) – Amount of time (in minutes) to keep the cluster warm after inactivity. If set to -1, keep cluster warm indefinitely. (Default: -1)

pause_autostop()[source]

Context manager to temporarily pause autostop.

ssh(interactive: bool = True)[source]

SSH into the cluster.

Parameters:

interactive (bool) – Whether to start an interactive shell or not (Default: True).

Example

>>> rh.sagemaker_cluster(name="sagemaker-cluster").ssh()
property ssh_key_path

Relative path to the private SSH key used to connect to the cluster.

status() dict[source]

Get status of SageMaker cluster.

Example

>>> status = rh.sagemaker_cluster("sagemaker-cluster").status()
teardown()[source]

Teardown the SageMaker instance.

Example

>>> rh.sagemaker_cluster(name="sagemaker-cluster").teardown()
teardown_and_delete()[source]

Teardown the SageMaker instance and delete from RNS configs.

Example

>>> rh.sagemaker_cluster(name="sagemaker-cluster").teardown_and_delete()
up()[source]

Up the cluster.

Example

>>> rh.sagemaker_cluster("sagemaker-cluster").up()
up_if_not()[source]

Bring up the cluster if it is not up. No-op if cluster is already up.

Example

>>> rh.sagemaker_cluster("sagemaker-cluster").up_if_not()

SageMaker Hardware Setup

IAM Role

SageMaker clusters require AWS CLI V2 and configuring the SageMaker IAM role with the AWS Systems Manager.

In order to launch a cluster, you must grant SageMaker the necessary permissions with an IAM role, which can be provided either by name or by full ARN. You can also specify a profile explicitly or with the AWS_PROFILE environment variable.

For example, let’s say your local ~/.aws/config file contains:

[profile sagemaker] role_arn = arn:aws:iam::123456789:role/service-role/AmazonSageMaker-ExecutionRole-20230717T192142 region = us-east-1 source_profile = default

There are several ways to provide the necessary credentials when initializing the cluster:

  • Providing the AWS profile name: profile="sagemaker"

  • Providing the AWS Role ARN directly: role="arn:aws:iam::123456789:role/service-role/AmazonSageMaker-ExecutionRole-20230717T192142"

  • Environment Variable: setting AWS_PROFILE to "sagemaker"

Note

If no role or profile is provided, Runhouse will try using the default profile. Note if this default AWS identity is not a role, then you will need to provide the role or profile explicitly.

Tip

If you are providing an estimator, you must provide the role ARN explicitly as part of the estimator object. More info on estimators here.

Please see the AWS docs for further instructions on creating and configuring an ARN Role.

AWS CLI V2

Runhouse requires the AWS CLI V2 to be installed on your local machine.

To confirm the installation succeeded, run aws --version in the command line. You should see something like:

aws-cli/2.13.8 Python/3.11.4 Darwin/21.3.0 source/arm64 prompt/off

SSM Setup

The AWS Systems Manager service is used to create SSH tunnels with the SageMaker cluster.

To install the AWS Session Manager Plugin, please see the AWS docs or SageMaker SSH Helper. The SSH Helper package simplifies the process of creating SSH tunnels with SageMaker clusters. It is installed by default if you are installing Runhouse with the SageMaker dependency: pip install runhouse[sagemaker].

You can also install the Session Manager by running the CLI command:

sm-local-configure

To configure your SageMaker IAM role with the AWS Systems Manager, please refer to these instructions.

SageMaker Factory Method

runhouse.sagemaker_cluster(name: str, role: str = None, profile: str = None, ssh_key_path: str = None, instance_id: str = None, instance_type: str = None, instance_count: int = None, image_uri: str = None, autostop_mins: int = None, connection_wait_time: int = None, estimator: Union[sagemaker.estimator.EstimatorBase, Dict] = None, job_name: str = None, dryrun: bool = False) SageMakerCluster[source]

Builds an instance of SageMakerCluster.

Parameters:
  • name (str) – Name for the cluster, to re-use later on.

  • role (str, optional) –

    An AWS IAM role (either name or full ARN). Can be passed in explicitly as an argument or provided via an estimator. If not specified will try using the profile attribute or environment variable AWS_PROFILE to extract the relevant role ARN. More info on configuring an IAM role for SageMaker here.

  • profile (str, optional) – AWS profile to use for the cluster. If provided instead of a role, will lookup the role ARN associated with the profile in the local AWS credentials. If not provided, will use the default profile.

  • ssh_key_path (str, optional) – Path (relative or absolute) to private SSH key to use for connecting to the cluster. If not provided, will look for the key in path ~/.ssh/sagemaker-ssh-gw. If not found will generate new keys and upload the public key to the default s3 bucket for the Role ARN.

  • instance_id (str, optional) – ID of the AWS instance to use for the cluster. SageMaker does not expose IP addresses of its instance, so we use an instance ID as a unique identifier for the cluster.

  • instance_type (str, optional) –

    Type of AWS instance to use for the cluster. More info on supported instance options here. (Default: ml.m5.large.)

  • instance_count (int, optional) – Number of instances to use for the cluster. (Default: 1.)

  • image_uri (str, optional) – Image to use for the cluster instead of using the default SageMaker image which will be based on the framework_version and py_version. Can be an ECR url or dockerhub image and tag.

  • estimator (Union[str, sagemaker.estimator.EstimatorBase], optional) –

    Estimator to use for a dedicated training job. Leave as None if launching the compute without running a dedicated job. More info on creating an estimator here.

  • autostop_mins (int, optional) – Number of minutes to keep the cluster up after inactivity, or -1 to keep cluster up indefinitely. Note: this will keep the cluster up even if a dedicated training job has finished running or failed.

  • connection_wait_time (int, optional) – Amount of time to wait inside the SageMaker cluster before continuing with normal execution. Useful if you want to connect before a dedicated job starts (e.g. training). If you don’t want to wait, set it to 0. If no estimator is provided, will default to 0.

  • job_name (str, optional) – Name to provide for a training job. If not provided will generate a default name based on the image name and current timestamp (e.g. pytorch-training-2023-08-28-20-57-55-113).

  • dryrun (bool) – Whether to create the SageMakerCluster if it doesn’t exist, or load a SageMakerCluster object as a dryrun. (Default: False)

Returns:

The resulting cluster.

Return type:

SageMakerCluster

Example

>>> import runhouse as rh >>> # Launch a new SageMaker instance and keep it up indefinitely. >>> # Note: This will use Role ARN associated with the "sagemaker" profile defined in the local aws credentials >>> c = rh.sagemaker_cluster(name='sm-cluster', profile="sagemaker").save()
>>> # Running a training job with a provided Estimator >>> c = rh.sagemaker_cluster(name='sagemaker-cluster', >>> estimator=PyTorch(entry_point='train.py', >>> role='arn:aws:iam::123456789012:role/MySageMakerRole', >>> source_dir='/Users/myuser/dev/sagemaker', >>> framework_version='1.8.1', >>> py_version='py36', >>> instance_type='ml.p3.2xlarge'), >>> ).save()
>>> # Load cluster from above >>> reloaded_cluster = rh.sagemaker_cluster(name="sagemaker-cluster")