import warnings
from typing import Dict, List, Optional, Union

from runhouse.resources.hardware.utils import RESERVED_SYSTEM_NAMES

from .cluster import Cluster
from .on_demand_cluster import OnDemandCluster
from .sagemaker_cluster import SageMakerCluster

# Cluster factory method
[docs]def cluster( name: str, host: Union[str, List[str]] = None, ssh_creds: Optional[dict] = None, dryrun: bool = False, **kwargs, ) -> Union[Cluster, OnDemandCluster, SageMakerCluster]: """ Builds an instance of :class:`Cluster`. Args: name (str): Name for the cluster, to re-use later on. host (str or List[str], optional): Hostname, IP address, or list of IP addresses for the BYO cluster. ssh_creds (dict, optional): Dictionary mapping SSH credentials. Example: ``ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'}`` dryrun (bool): Whether to create the Cluster if it doesn't exist, or load a Cluster object as a dryrun. (Default: ``False``) Returns: Union[Cluster, OnDemandCluster, SageMakerCluster]: The resulting cluster. Example: >>> # using private key >>> gpu = rh.cluster(host='<hostname>', >>> ssh_creds={'ssh_user': '...', 'ssh_private_key':'<path_to_key>'}, >>> name='rh-a10x').save() >>> # using password >>> gpu = rh.cluster(host='<hostname>', >>> ssh_creds={'ssh_user': '...', 'password':'*****'}, >>> name='rh-a10x').save() >>> # Load cluster from above >>> reloaded_cluster = rh.cluster(name="rh-a10x") """ if "ips" in kwargs: host = kwargs["ips"] warnings.warn( "``ips`` argument has been deprecated. Please use ``host`` to refer to the cluster IPs or host instead." ) if name and host is None and ssh_creds is None and not kwargs: # If only the name is provided and dryrun is set to True return Cluster.from_name(name, dryrun) if name in RESERVED_SYSTEM_NAMES: raise ValueError( f"Cluster name {name} is a reserved name. Please use a different name which is not one of " f"{RESERVED_SYSTEM_NAMES}." ) if "instance_type" in kwargs.keys(): return ondemand_cluster(name=name, **kwargs) if any( k in kwargs.keys() for k in [ "role", "estimator", "instance_type", "autostop_mins", "connection_wait_time", "instance_count", ] ): warnings.warn( "The `cluster` factory is intended to be used for static clusters. " "If you would like to create a sagemaker cluster, please use `rh.sagemaker_cluster()` instead." ) return sagemaker_cluster(name=name, **kwargs) return Cluster(ips=host, ssh_creds=ssh_creds, name=name, dryrun=dryrun)
# OnDemandCluster factory method def ondemand_cluster( name: str, instance_type: Optional[str] = None, num_instances: Optional[int] = None, provider: Optional[str] = None, autostop_mins: Optional[int] = None, use_spot: bool = False, image_id: Optional[str] = None, region: Optional[str] = None, dryrun: bool = False, ) -> OnDemandCluster: """ Builds an instance of :class:`OnDemandCluster`. Args: name (str): Name for the cluster, to re-use later on. instance_type (int, optional): Type of cloud instance to use for the cluster. This could be a Runhouse built-in type, or your choice of instance type. num_instances (int, optional): Number of instances to use for the cluster. provider (str, optional): Cloud provider to use for the cluster. autostop_mins (int, optional): Number of minutes to keep the cluster up after inactivity, or ``-1`` to keep cluster up indefinitely. use_spot (bool, optional): Whether or not to use spot instance. image_id (str, optional): Custom image ID for the cluster. region (str, optional): The region to use for the cluster. dryrun (bool): Whether to create the Cluster if it doesn't exist, or load a Cluster object as a dryrun. (Default: ``False``) Returns: OnDemandCluster: The resulting cluster. Example: >>> import runhouse as rh >>> # On-Demand SkyPilot Cluster (OnDemandCluster) >>> gpu = rh.ondemand_cluster(name='rh-4-a100s', >>> instance_type='A100:4', >>> provider='gcp', >>> autostop_mins=-1, >>> use_spot=True, >>> image_id='my_ami_string', >>> region='us-east-1', >>> ).save() >>> # Load cluster from above >>> reloaded_cluster = rh.ondemand_cluster(name="rh-4-a100s") """ if name and not any([instance_type, num_instances, provider, image_id, region]): # If only the name is provided and dryrun is set to True return Cluster.from_name(name, dryrun) if name in RESERVED_SYSTEM_NAMES: raise ValueError( f"Cluster name {name} is a reserved name. Please use a different name which is not one of " f"{RESERVED_SYSTEM_NAMES}." ) return OnDemandCluster( instance_type=instance_type, provider=provider, num_instances=num_instances, autostop_mins=autostop_mins, use_spot=use_spot, image_id=image_id, region=region, name=name, dryrun=dryrun, )
[docs]def sagemaker_cluster( name: str, role: str = None, profile: str = None, ssh_key_path: str = None, instance_id: str = None, instance_type: str = None, instance_count: int = None, image_uri: str = None, autostop_mins: int = None, connection_wait_time: int = None, estimator: Union["sagemaker.estimator.EstimatorBase", Dict] = None, job_name: str = None, dryrun: bool = False, ) -> SageMakerCluster: """ Builds an instance of :class:`SageMakerCluster`. Args: name (str): Name for the cluster, to re-use later on. role (str, optional): An AWS IAM role (either name or full ARN). Can be passed in explicitly as an argument or provided via an estimator. If not specified will try using the ``profile`` attribute or environment variable ``AWS_PROFILE`` to extract the relevant role ARN. More info on configuring an IAM role for SageMaker `here <>`_. profile (str, optional): AWS profile to use for the cluster. If provided instead of a ``role``, will lookup the role ARN associated with the profile in the local AWS credentials. If not provided, will use the ``default`` profile. ssh_key_path (str, optional): Path (relative or absolute) to private SSH key to use for connecting to the cluster. If not provided, will look for the key in path ``~/.ssh/sagemaker-ssh-gw``. If not found will generate new keys and upload the public key to the default s3 bucket for the Role ARN. instance_id (str, optional): ID of the AWS instance to use for the cluster. SageMaker does not expose IP addresses of its instance, so we use an instance ID as a unique identifier for the cluster. instance_type (str, optional): Type of AWS instance to use for the cluster. More info on supported instance options `here <>`_. (Default: ``ml.m5.large``.) instance_count (int, optional): Number of instances to use for the cluster. (Default: ``1``.) image_uri (str, optional): Image to use for the cluster instead of using the default SageMaker image which will be based on the framework_version and py_version. Can be an ECR url or dockerhub image and tag. estimator (Union[str, sagemaker.estimator.EstimatorBase], optional): Estimator to use for a dedicated training job. Leave as ``None`` if launching the compute without running a dedicated job. More info on creating an estimator `here <>`_. autostop_mins (int, optional): Number of minutes to keep the cluster up after inactivity, or ``-1`` to keep cluster up indefinitely. *Note: this will keep the cluster up even if a dedicated training job has finished running or failed*. connection_wait_time (int, optional): Amount of time to wait inside the SageMaker cluster before continuing with normal execution. Useful if you want to connect before a dedicated job starts (e.g. training). If you don't want to wait, set it to ``0``. If no estimator is provided, will default to ``0``. job_name (str, optional): Name to provide for a training job. If not provided will generate a default name based on the image name and current timestamp (e.g. ``pytorch-training-2023-08-28-20-57-55-113``). dryrun (bool): Whether to create the SageMakerCluster if it doesn't exist, or load a SageMakerCluster object as a dryrun. (Default: ``False``) Returns: SageMakerCluster: The resulting cluster. Example: >>> import runhouse as rh >>> # Launch a new SageMaker instance and keep it up indefinitely. >>> # Note: This will use Role ARN associated with the "sagemaker" profile defined in the local aws credentials >>> c = rh.sagemaker_cluster(name='sm-cluster', profile="sagemaker").save() >>> # Running a training job with a provided Estimator >>> c = rh.sagemaker_cluster(name='sagemaker-cluster', >>> estimator=PyTorch(entry_point='', >>> role='arn:aws:iam::123456789012:role/MySageMakerRole', >>> source_dir='/Users/myuser/dev/sagemaker', >>> framework_version='1.8.1', >>> py_version='py36', >>> instance_type='ml.p3.2xlarge'), >>> ).save() >>> # Load cluster from above >>> reloaded_cluster = rh.sagemaker_cluster(name="sagemaker-cluster") """ ssh_key_path = ( SageMakerCluster._relative_ssh_path(ssh_key_path) if ssh_key_path else None ) if name: alt_options = dict( role=role, profile=profile, ssh_key_path=ssh_key_path, instance_id=instance_id, image_uri=image_uri, estimator=estimator, instance_type=instance_type, job_name=job_name, instance_count=instance_count, ) # Filter out None/default values alt_options = {k: v for k, v in alt_options.items() if v is not None} try: c = SageMakerCluster.from_name(name, dryrun, alt_options=alt_options) if c: return c except ValueError: pass if name in RESERVED_SYSTEM_NAMES: raise ValueError( f"Cluster name {name} is a reserved name. Please use a different name which is not one of " f"{RESERVED_SYSTEM_NAMES}." ) return SageMakerCluster( name=name, role=role, profile=profile, ssh_key_path=ssh_key_path, estimator=estimator, job_name=job_name, instance_id=instance_id, instance_type=instance_type, instance_count=instance_count, image_uri=image_uri, autostop_mins=autostop_mins, connection_wait_time=connection_wait_time, dryrun=dryrun, )