Source code for runhouse.resources.folders.folder

import copy
import os
import pickle
import shutil
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

from runhouse.globals import rns_client
from runhouse.logger import get_logger

from runhouse.resources.hardware import _current_cluster, _get_cluster_from, Cluster
from runhouse.resources.resource import Resource
from runhouse.rns.utils.api import generate_uuid, relative_file_path
from runhouse.utils import locate_working_dir

logger = get_logger(__name__)


[docs]class Folder(Resource): RESOURCE_TYPE = "folder" DEFAULT_FS = "file" CLUSTER_FS = "ssh" DEFAULT_FOLDER_PATH = "/runhouse-folder" DEFAULT_CACHE_FOLDER = "~/.cache/runhouse"
[docs] def __init__( self, name: Optional[str] = None, path: Optional[str] = None, system: Union[str, Cluster] = None, dryrun: bool = False, **kwargs, # We have this here to ignore extra arguments when calling from from_config ): """ Runhouse Folder object. .. note:: To build a folder, please use the factory method :func:`folder`. """ super().__init__(name=name, dryrun=dryrun) # https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.gui.FileSelector.urlpath # Note: no longer needed as part of previous fsspec usage, but still used by some s3 / gsutil commands self._urlpath = None self._system = _get_cluster_from( system or _current_cluster(key="config"), dryrun=dryrun ) # TODO [DG] Should we ever be allowing this to be None? if path is None: self._path = Folder.default_path(self.rns_address, self._system) else: if self._system != self.DEFAULT_FS: self._path = path else: self._path = self._path_absolute_to_rh_workdir(path)
def __getstate__(self): """Override the pickle method to clear _urlpath before pickling.""" state = self.__dict__.copy() state["_urlpath"] = None return state @classmethod def default_path(cls, rns_address, system): name = ( rns_client.split_rns_name_and_path(rns_address)[0] if rns_address else generate_uuid() ) if system == Folder.DEFAULT_FS: return str(Path.cwd() / name) elif isinstance(system, Cluster): return f"{Folder.DEFAULT_CACHE_FOLDER}/{name}" else: return f"{Folder.DEFAULT_FOLDER_PATH}/{name}" # ----------------------------------
[docs] @staticmethod def from_config(config: Dict, dryrun: bool = False, _resolve_children: bool = True): if _resolve_children: config = Folder._check_for_child_configs(config) if config["system"] == "s3": from .s3_folder import S3Folder return S3Folder.from_config(config, dryrun=dryrun) elif config["system"] == "gs": from .gcs_folder import GCSFolder return GCSFolder.from_config(config, dryrun=dryrun) elif isinstance(config["system"], dict): config["system"] = Cluster.from_config( config["system"], dryrun=dryrun, _resolve_children=_resolve_children ) return Folder(**config, dryrun=dryrun)
@classmethod def _check_for_child_configs(cls, config: Dict): """Overload by child resources to load any resources they hold internally.""" system = config.get("system") if isinstance(system, str) or isinstance(system, dict): config["system"] = _get_cluster_from(system) return config @property def path(self): if self._path is not None: if self.system == Folder.DEFAULT_FS: return str(Path(self._path).expanduser()) return str(self._path) else: return None @path.setter def path(self, path): self._path = path @property def system(self): return self._system @system.setter def system(self, new_system: Union[str, Cluster]): self._system = _get_cluster_from(new_system) @property def _fs_str(self): if isinstance(self.system, Cluster): if self.system.on_this_cluster(): return self.DEFAULT_FS return self.CLUSTER_FS else: return self.system @property def local_path(self): if self.is_local(): return str(Path(self.path).expanduser()) else: return None @property def _use_http_endpoint(self): """Whether to use system APIs to perform folder operations on the cluster via HTTP.""" return isinstance(self.system, Cluster) and not self.system.on_this_cluster()
[docs] def mv( self, system: Union[str, Cluster], path: Optional[str], overwrite: bool = True, ) -> None: """Move the folder to a new filesystem or cluster. Args: system (str or Cluster): Filesystem or cluster to move the folder to. path (str): Path to move the folder to. overwrite (bool, optional): Whether to override if a file already exists at the destination path. (Default: ``True``) Example: >>> folder = rh.folder(path="local/path") >>> folder.mv(my_cluster) >>> folder.mv("s3", "s3_bucket/path") """ if path is None: raise ValueError("A destination path must be specified.") src_path = Path(self.path) dest_path = Path(path) if self._use_http_endpoint: self.system._folder_mv( path=src_path, dest_path=dest_path, overwrite=overwrite ) else: dest_path = dest_path.expanduser() src_path = src_path.expanduser() if not src_path.exists(): raise FileNotFoundError(f"The source path {src_path} does not exist.") if system == self.DEFAULT_FS: if dest_path.exists(): raise FileExistsError( f"The destination path {dest_path} already exists." ) # Create the destination directory if it doesn't exist dest_path.parent.mkdir(parents=True, exist_ok=overwrite) # Move the directory shutil.move(str(src_path), str(dest_path)) # Update the path attribute self.path = str(dest_path) self.system = self.DEFAULT_FS else: # TODO [JL] support moving to other systems raise NotImplementedError(f"System {system} not supported for local mv")
[docs] def to( self, system: Union[str, "Cluster"], path: Optional[Union[str, Path]] = None, ): """Copy the folder to a new filesystem or cluster. Currently supported: ``here``, ``file``, ``gs``, ``s3``, or a cluster. Args: system (str or Cluster): Filesystem or cluster to move the folder to. path (str, optional): Path to move the folder to. Example: >>> local_folder = rh.folder(path="/my/local/folder") >>> s3_folder = local_folder.to("s3") """ if system == "here": current_cluster_config = _current_cluster(key="config") if current_cluster_config: system = Cluster.from_config(current_cluster_config) else: system = self.DEFAULT_FS path = str(Path.cwd() / self.path.split("/")[-1]) if path is None else path if isinstance(system, Cluster): if not path: # Use a default path on the cluster (in ~/.cache/runhouse) dest_path = Folder.default_path( rns_address=self.rns_address, system=system ) else: # Destination path on the cluster should be a relative path dest_path = ( relative_file_path(file_path=path) if Path(path).is_absolute() else path ) # rsync the folder contents to the cluster's destination path logger.debug(f"Syncing folder contents to cluster in path: {dest_path}") return self._to_cluster(system, path=dest_path) path = str( path or Folder.default_path(self.rns_address, system) ) # Make sure it's a string and not a Path system_str = getattr( system, "name", system ) # Use system.name if available, i.e. system is a cluster logger.info( f"Copying folder from {self.fsspec_url} to: {system_str}, with path: {path}" ) # to_local, to_cluster and to_data_store are also overridden by subclasses to dispatch # to more performant cloud-specific APIs system = _get_cluster_from(system) if system == "file": return self._to_local(dest_path=path) elif system in ["s3", "gs"]: return self._to_data_store(system=system, data_store_path=path) else: raise ValueError( f"System '{system}' not currently supported as a destination system." )
def _destination_folder( self, dest_path: str, dest_system: Optional[str] = "file", ): """Returns a new Folder object pointing to the destination folder.""" folder_config = self.config() folder_config["system"] = dest_system folder_config["path"] = dest_path new_folder = Folder.from_config(folder_config) return new_folder def _to_local(self, dest_path: str): """Copies folder to local. Only relevant for the base Folder if its system is a cluster.""" if isinstance(self.system, Cluster): # Cluster --> local copying logger.debug( f"Copying folder from cluster {self.system.name} to local path: {dest_path}" ) # Return a new folder objecting pointing to the local destination path return self._cluster_to_local(self.system, dest_path) if self.system == self.DEFAULT_FS: # Local --> local copying logger.debug(f"Copying folder to local path: {dest_path}") self.mv(system=self.system, path=dest_path) return self raise TypeError(f"Cannot copy from {self.system} to local.") def _to_data_store( self, system: str, data_store_path: Optional[str] = None, ): """Local or cluster to blob storage.""" local_folder_path = self.path # The new folder should be a sub-folder for the relevant data store (e.g. `S3Folder`) new_folder = self._destination_folder( dest_path=data_store_path, dest_system=system ) new_folder._upload(src=local_folder_path) return new_folder
[docs] def mkdir(self): """Create the folder in specified file system if it doesn't already exist.""" if self._use_http_endpoint: self.system._folder_mkdir(self.path) else: path = Path(self.path).expanduser() if not path.exists(): path.mkdir(parents=True, exist_ok=True) logger.debug(f"Folder created in path: {self.path}")
def _to_cluster(self, dest_cluster, path=None): """Copy the folder from a file or cluster source onto a destination cluster.""" if not dest_cluster.ips: raise ValueError("Cluster must be started before copying data to it.") dest_path = path or f"~/{Path(self.path).name}" # Need to add slash for rsync to copy the contents of the folder dest_folder = copy.deepcopy(self) dest_folder.path = dest_path dest_folder.system = dest_cluster if self._fs_str == self.DEFAULT_FS and dest_cluster.name is not None: # Includes case where we're on the cluster itself # And the destination is a cluster, not rh.here dest_cluster.rsync(source=self.path, dest=dest_path, up=True, contents=True) elif isinstance(self.system, Resource): if self.system.endpoint(external=False) == dest_cluster.endpoint( external=False ): # We're on the same cluster, so we can just move the files if not path: # If user didn't specify a path, we can just return self return self else: dest_cluster.run( [f"mkdir -p {dest_path}", f"cp -r {self.path}/* {dest_path}"], ) else: self._cluster_to_cluster(dest_cluster, dest_path) else: # data store folders have their own specific _to_cluster functions raise TypeError( f"`Sending from filesystem type {type(self.system)} is not supported" ) return dest_folder def _cluster_to_cluster(self, dest_cluster, dest_path): src_path = self.path cluster_creds = self.system.creds_values if not cluster_creds.get("password") and not dest_cluster.creds_values.get( "password" ): creds_file = cluster_creds.get("ssh_private_key") creds_cmd = f"-i '{creds_file}' " if creds_file else "" dest_cluster.run([f"mkdir -p {dest_path}"]) command = ( f"rsync -Pavz --filter='dir-merge,- .gitignore' -e \"ssh {creds_cmd}" f"-o StrictHostKeyChecking=no -o IdentitiesOnly=yes -o ExitOnForwardFailure=yes " f"-o ServerAliveInterval=5 -o ServerAliveCountMax=3 -o ConnectTimeout=30s -o ForwardAgent=yes " f'-o ControlMaster=auto -o ControlPersist=300s" {src_path}/ {dest_cluster.head_ip}:{dest_path}' ) status_codes = self.system.run([command]) if status_codes[0][0] != 0: raise Exception( f"Error syncing folder to destination cluster ({dest_cluster.name}). " f"Make sure the source cluster ({self.system.name}) has the necessary provider keys " f"if applicable. " ) else: local_folder = self._cluster_to_local(self.system, self.path) local_folder._to_cluster(dest_cluster, dest_path) def _cluster_to_local(self, cluster, dest_path): """Create a local folder with dest_path from the cluster. This function rsyncs down the data and return a folder with system=='file'. """ if not cluster.ips: raise ValueError("Cluster must be started before copying data from it.") Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True) cluster.rsync( source=self.path, dest=str(Path(dest_path).expanduser()), up=False, contents=True, ) new_folder = copy.deepcopy(self) new_folder.path = dest_path new_folder.system = self.DEFAULT_FS return new_folder
[docs] def is_local(self): """Whether the folder is on the local filesystem. Example: >>> is_local = my_folder.is_local() """ return ( self._fs_str == self.DEFAULT_FS and self.path is not None and Path(self.path).expanduser().exists() )
def _upload(self, src: str, region: Optional[str] = None): """Upload a folder to a remote folder.""" raise NotImplementedError def _upload_command(self, src: str, dest: str): """CLI command for uploading folder to remote bucket. Needed when uploading a folder from a cluster.""" raise NotImplementedError def _upload_folder_to_bucket(self, command: str): """Uploads a folder to a remote bucket (e.g. s3). Based on the CLI command skypilot uses to upload the folder""" # Adapted from: https://github.com/skypilot-org/skypilot/blob/983f5fa3197fe7c4b5a28be240f7b027f7192b15/sky/data/data_utils.py#L165 # noqa with subprocess.Popen( command, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, shell=True ) as process: stderr = [] while True: line = process.stderr.readline() if not line: break str_line = line.decode("utf-8") stderr.append(str_line) returncode = process.wait() if returncode != 0: raise RuntimeError(" ".join(stderr).strip()) def _download(self, dest): raise NotImplementedError def _download_command(self, src, dest): """CLI command for downloading folder from remote bucket. Needed when downloading a folder to a cluster.""" raise NotImplementedError def config(self, condensed: bool = True): config = super().config(condensed) if self.system == Folder.DEFAULT_FS: # If folder is local check whether path is relative, and if so take it relative to the working director # rather than to the home directory. If absolute, it's left alone. config["path"] = ( self._path_relative_to_rh_workdir(self.path) if self.path else None ) else: # if not a local filesystem save path as is (e.g. bucket/path) config["path"] = self.path if isinstance(self.system, Resource): # If system is a cluster config["system"] = self._resource_string_for_subconfig( self.system, condensed ) else: config["system"] = self.system return config def _save_sub_resources(self, folder: str = None): if isinstance(self.system, Resource): self.system.save(folder=folder) @staticmethod def _path_relative_to_rh_workdir(path: str): rh_workdir = Path(locate_working_dir()) try: return str(Path(path).relative_to(rh_workdir)) except ValueError: return path @staticmethod def _path_absolute_to_rh_workdir(path: str): return ( path if Path(path).expanduser().is_absolute() else str(Path(locate_working_dir()) / path) ) @staticmethod def _delete_contents(contents: List, folder_path: Path, recursive: bool): for content in contents: content_path = folder_path / content if content_path.exists(): if content_path.is_file(): content_path.unlink() elif content_path.is_dir() and recursive: shutil.rmtree(content_path) else: raise ValueError( f"Path {content_path} is a directory and recursive is set to False" ) @property def fsspec_url(self): """Generate the FSSpec style URL using the file system and path of the folder""" if self.path.startswith("/") and self._fs_str not in [ rns_client.DEFAULT_FS, self.CLUSTER_FS, ]: return f"{self._fs_str}:/{self.path}" else: # For local, ssh / sftp filesystems we need both slashes # e.g.: 'ssh:///home/ubuntu/.cache/runhouse/tables/dede71ef83ce45ffa8cb27d746f97ee8' return f"{self._fs_str}://{self.path}" @property def _bucket_name(self): return self.path.lstrip("/").split("/")[0] @property def _key(self): filtered_parts = self.path.split("/")[2:] return "/".join(filtered_parts) + "/"
[docs] def ls(self, full_paths: bool = True, sort: bool = False) -> List: """List the contents of the folder. Args: full_paths (Optional[bool]): Whether to list the full paths of the folder contents. Defaults to ``True``. sort (Optional[bool]): Whether to sort the folder contents by time modified. Defaults to ``False``. """ if self._use_http_endpoint: return self.system._folder_ls(self.path, full_paths=full_paths, sort=sort) else: path = Path(self.path).expanduser() paths = [p for p in path.iterdir()] # Sort the paths by modification time if sort is True if sort: paths.sort(key=lambda p: p.stat().st_mtime, reverse=True) # Convert paths to strings and format them based on full_paths if full_paths: return [str(p.resolve()) for p in paths] else: return [p.name for p in paths]
[docs] def resources(self, full_paths: bool = False): """List the resources in the folder. Args: full_paths (bool, optional): Whether to list the full path or relative path. (Default: ``False``) Example: >>> resources = my_folder.resources() """ try: if self._use_http_endpoint: paths = self.system._folder_ls(path=self.path, full_paths=full_paths) resources = [ path for path in paths if self.system._folder_exists(path=f"{path}/config.json") ] return resources else: resources = [ path for path in self.ls() if (Path(path) / "config.json").exists() ] if full_paths: return [ self.rns_address + "/" + Path(path).stem for path in resources ] else: return [Path(path).stem for path in resources] except Exception as e: logger.error(f"Error listing resources in folder: {e}") return []
@property def rns_address(self): # TODO Maybe later, account for folders along the path with a different RNS name. if self.name is None: # Anonymous folders have no rns address return None # Only should be necessary when a new base folder is being added (therefore isn't in rns_base_folders yet) if self._rns_folder: return str(Path(self._rns_folder) / self.name) if self.path in rns_client.rns_base_folders.values(): if self._rns_folder: return self._rns_folder + "/" + self.name else: return rns_client.default_folder + "/" + self.name segment = Path(self.path) while ( str(segment) not in rns_client.rns_base_folders.values() and not segment == Path.home() and not segment == segment.parent ): segment = segment.parent if ( segment == Path.home() or segment == segment.parent ): # TODO throw an error instead? return rns_client.default_folder + "/" + self.name else: base_folder = Folder(path=str(segment), dryrun=True) base_folder_path = base_folder.rns_address relative_path = str(Path(self.path).relative_to(base_folder.path)) return base_folder_path + "/" + relative_path
[docs] def contains(self, name_or_path: str) -> bool: """Whether path exists locally inside a folder. Args: name_or_path (str): Name or path of folder to check if it exists inside the folder. Example: >>> my_folder = rh.folder("local/folder/path") >>> in_folder = my_folder.contains("filename") """ path, _ = self.locate(name_or_path) return path is not None
[docs] def locate(self, name_or_path) -> Tuple[str, str]: """Locate the local path of a Folder given an rns path. Args: name_or_path (str): Name or path of folder to locate inside the folder. Example: >>> my_folder = rh.folder("local/folder/path") >>> local_path = my_folder.locate("file_name") """ # Note: Keep in mind we're using both _rns_ path and physical path logic below. Be careful! # If the path is already given relative to the current folder: if (Path(self.path) / name_or_path).exists(): return str(Path(self.path) / name_or_path), self.system # If name or path uses ~/ or ./, need to resolve with folder path abs_path = rns_client.resolve_rns_path(name_or_path) rns_path = self.rns_address # If this folder is anonymous, it has no rns contents if rns_path is None: return None, None if abs_path == rns_path: return self.path, self.system try: child_path = Path(self.path) / Path(abs_path).relative_to(rns_path) if child_path.exists(): return str(child_path), self.system except ValueError: pass # Last resort, recursively search inside sub-folders and children. segments = abs_path.lstrip("/").split("/") if len(segments) == 1: return ( None, None, ) # If only a single element, would have been found in ls above. # Look for lowest folder in the path that exists in filesystem, and recurse from that folder greatest_common_folder = Path(self.path) i = 0 for i, seg in enumerate(segments): if not (greatest_common_folder / seg).exists(): break greatest_common_folder = greatest_common_folder / seg if not str(greatest_common_folder) == self.path: return Folder( path=str(greatest_common_folder), system=self.system, dryrun=True ).locate("/".join(segments[i + 1 :])) return None, None
[docs] def open(self, name, mode: str = "rb", encoding: Optional[str] = None): """Returns the specified file as a stream (`botocore.response.StreamingBody`), which must be used as a content manager to be opened. Args: name (str): Name of file inside the folder to open. mode (str, optional): Mode for opening the file. (Default: ``"rb"``) encoding (str, optional): Encoding for opening the file. (Default: ``None``) Example: >>> with my_folder.open('obj_name') as my_file: >>> pickle.load(my_file) """ file_path = Path(self.path).expanduser() / name valid_modes = {"r", "w", "a", "rb", "wb", "ab", "r+", "w+", "a+"} if mode not in valid_modes: raise NotImplementedError( f"{mode} mode is not implemented yet for local files" ) return open(file_path, mode=mode, encoding=encoding)
[docs] def get(self, name, mode: str = "rb", encoding: Optional[str] = None): """Returns the contents of a file as a string or bytes. Args: name (str): Name of file to get contents of. mode (str, optional): Mode for opening the file. (Default: ``"rb"``) encoding (str, optional): Encoding for opening the file. (Default: ``None``) Example: >>> contents = my_folder.get(file_name) """ if self._use_http_endpoint: return self.system._folder_get( path=Path(self.path) / name, mode=mode, encoding=encoding ) with self.open(name, mode=mode, encoding=encoding) as f: return f.read()
# TODO [DG] fix this to follow the correct convention above def get_all(self): # TODO add docs for this # TODO we're not closing these, do we need to extract file-like objects so we can close them? raise NotImplementedError
[docs] def exists_in_system(self): """Whether the folder exists in the filesystem. Example: >>> exists_on_system = my_folder.exists_in_system() """ if self._use_http_endpoint: return self.system._folder_exists(path=self.path) else: full_path = Path(self.path).expanduser() return full_path.exists() and full_path.is_dir()
[docs] def rm(self, contents: List = None, recursive: bool = True): """Delete a folder from the file system. Optionally provide a list of folder contents to delete. Args: contents (Optional[List]): Specific contents to delete in the folder. If None, removes the entire folder. (Default: ``None``) recursive (bool, optional): Delete the folder itself (including all its contents). (Default: ``True``) Example: >>> my_folder.rm() """ if self._use_http_endpoint: self.system._folder_rm(self.path, contents, recursive) else: folder_path = Path(self.path).expanduser() if contents: Folder._delete_contents(contents, folder_path, recursive) else: if recursive: shutil.rmtree(folder_path) else: if folder_path.is_dir(): for item in folder_path.iterdir(): if item.is_file(): item.unlink() else: raise ValueError( f"Folder {item} found in {folder_path}, recursive is set to False" ) else: folder_path.unlink()
[docs] def put(self, contents: Dict[str, Any], overwrite: bool = False, mode: str = "wb"): """Put given contents in folder. Args: contents (Dict[str, Any] or Resource or List[Resource]): Contents to put in folder. Must be a dict with keys being the file names (without full paths) and values being the file-like objects to write, or a Resource object, or a list of Resources. overwrite (bool, optional): Whether to overwrite the existing file if it exists. (Default: ``False``) mode (str, optional): Write mode to use. (Default: ``wb``). Example: >>> my_folder.put(contents={"filename.txt": data}) """ # Handle lists of resources just for convenience if isinstance(contents, list): for resource in contents: self.put(resource, overwrite=overwrite) return if self._use_http_endpoint: self.system._folder_put( path=self.path, contents=contents, overwrite=overwrite, mode=mode ) else: self.mkdir() full_path = str(Path(self.path).expanduser()) if isinstance(contents, Folder): if ( contents.path is None ): # Should only be the case when Folder is created contents.path = os.path.join(full_path, contents.name) return if not isinstance(contents, dict): raise TypeError( "`contents` argument must be a dict mapping filenames to file-like objects" ) if overwrite is False: # Check if files exist and raise an error if they do existing_files = set(os.listdir(full_path)) intersection = existing_files.intersection(set(contents.keys())) if intersection: raise FileExistsError( f"File(s) {intersection} already exist(s) at path: {full_path}. " f"Cannot save them with overwrite={overwrite}." ) for filename, file_obj in contents.items(): file_obj = self._serialize_file_obj(file_obj) file_path = Path(full_path) / filename if not overwrite and file_path.exists(): raise FileExistsError(f"File {file_path} already exists.") try: with open(file_path, mode) as f: f.write(file_obj) except Exception as e: raise RuntimeError( f"Failed to write {filename} to {file_path}: {e}" )
@staticmethod def _serialize_file_obj(file_obj): if not isinstance(file_obj, bytes): try: file_obj = pickle.dumps(file_obj) except (pickle.PicklingError, TypeError) as e: raise ValueError(f"Cannot serialize file contents: {e}") return file_obj @staticmethod def _bucket_name_from_path(path: str) -> str: """Extract the bucket name from a path (e.g. '/my-bucket/my-folder/my-file.txt' -> 'my-bucket')""" return Path(path).parts[1]